Skip to content
Snippets Groups Projects

fill new thing_parser table

Open David Schäfer requested to merge multi-parser into main
Files
2
+ 60
18
@@ -3,9 +3,10 @@ from __future__ import annotations
@@ -3,9 +3,10 @@ from __future__ import annotations
import json
import json
import logging
import logging
import typing
from typing import Any, Literal, Sequence, cast
from typing import Any, Literal, Sequence, cast
 
import datetime
 
from psycopg import Connection, sql
from psycopg import Connection, sql
from psycopg.rows import dict_row
from psycopg.rows import dict_row
from psycopg.types.json import Jsonb
from psycopg.types.json import Jsonb
@@ -25,16 +26,18 @@ if get_envvar_as_bool("DEBUG_SQL"):
@@ -25,16 +26,18 @@ if get_envvar_as_bool("DEBUG_SQL"):
_IDS_BY_UUID_QUERY = sql.SQL(
_IDS_BY_UUID_QUERY = sql.SQL(
"""\
"""\
SELECT t.id as thing_id, t.project_id, p.database_id, t.ingest_type_id, t.s3_store_id,
SELECT t.id as thing_id, t.project_id, p.database_id, t.ingest_type_id, t.s3_store_id,
s3s.file_parser_id, fp.file_parser_type_id, t.mqtt_id, m.mqtt_device_type_id,
tp.file_parser_id, fp.file_parser_type_id, t.mqtt_id, m.mqtt_device_type_id,
t.ext_sftp_id, t.ext_api_id, ea.api_type_id
t.ext_sftp_id, t.ext_api_id, ea.api_type_id
FROM config_db.thing t
FROM config_db.thing t
LEFT JOIN config_db.project p ON t.project_id = p.id
LEFT JOIN config_db.project p ON t.project_id = p.id
LEFT JOIN config_db.s3_store s3s ON t.s3_store_id = s3s.id
LEFT JOIN config_db.thing_parser tp ON t.id = tp.thing_id
LEFT JOIN config_db.file_parser fp ON s3s.file_parser_id = fp.id
LEFT JOIN config_db.s3_store s3s ON t.s3_store_id = s3s.id
LEFT JOIN config_db.mqtt m ON t.mqtt_id = m.id
LEFT JOIN config_db.file_parser fp ON tp.file_parser_id = fp.id
LEFT JOIN config_db.ext_api ea ON t.ext_api_id = ea.id
LEFT JOIN config_db.mqtt m ON t.mqtt_id = m.id
WHERE t.uuid = %s
LEFT JOIN config_db.ext_api ea ON t.ext_api_id = ea.id
 
WHERE tp.valid_to is null
 
AND t.uuid = %s;
"""
"""
)
)
_no_ids = {
_no_ids = {
@@ -276,9 +279,7 @@ def upsert_table_project(
@@ -276,9 +279,7 @@ def upsert_table_project(
return id_
return id_
def upsert_table_s3_store(
def upsert_table_s3_store(conn: Connection, values: dict, s3_id: int | None) -> int:
conn: Connection, values: dict, parser_id: int, s3_id: int | None
) -> int:
# "raw_data_storage":
# "raw_data_storage":
# "bucket_name": storage.bucket,
# "bucket_name": storage.bucket,
# "username": storage.access_key,
# "username": storage.access_key,
@@ -288,13 +289,12 @@ def upsert_table_s3_store(
@@ -288,13 +289,12 @@ def upsert_table_s3_store(
id_ = _upsert(
id_ = _upsert(
conn,
conn,
table="s3_store",
table="s3_store",
columns=("user", "password", "bucket", "filename_pattern", "file_parser_id"),
columns=("user", "password", "bucket", "filename_pattern"),
values=(
values=(
v.pop("username"),
v.pop("username"),
v.pop("password"),
v.pop("password"),
v.pop("bucket_name"),
v.pop("bucket_name"),
v.pop("filename_pattern"),
v.pop("filename_pattern"),
parser_id,
),
),
id=s3_id,
id=s3_id,
)
)
@@ -302,6 +302,45 @@ def upsert_table_s3_store(
@@ -302,6 +302,45 @@ def upsert_table_s3_store(
return id_
return id_
 
def upsert_table_thing_parser(
 
conn: Connection,
 
thing_id: int,
 
file_parser_id: int,
 
parser_settings: dict[str, Any],
 
start_date: datetime.datetime | None = None,
 
end_date: datetime.datetime | None = None,
 
) -> int:
 
r = conn.execute(
 
"""
 
SELECT tp.id, tp.thing_id, tp.file_parser_id, fp.params
 
FROM config_db.thing_parser tp
 
JOIN config_db.file_parser fp ON tp.file_parser_id = fp.id
 
WHERE tp.valid_to is NULL AND tp.file_parser_id = %s
 
""",
 
(file_parser_id,),
 
).fetchone()
 
 
if r is not None and [thing_id, file_parser_id, parser_settings] != r[1:]:
 
# we actually got a new parser -> update the latest parser
 
if start_date is None:
 
raise ValueError("replacement parsers must have a valid start date")
 
# set the end date of the last valid parser
 
_upsert(
 
conn,
 
table="thing_parser",
 
columns=("valid_to",),
 
values=(start_date,),
 
id=r[0],
 
)
 
# insert the new parser
 
return _upsert(
 
conn,
 
table="thing_parser",
 
columns=("thing_id", "file_parser_id", "valid_from", "valid_to"),
 
values=(thing_id, file_parser_id, start_date, end_date),
 
)
 
 
def upsert_table_mqtt(conn: Connection, values: dict, mqtt_id: int | None) -> int:
def upsert_table_mqtt(conn: Connection, values: dict, mqtt_id: int | None) -> int:
# mqtt_device_type: None or name
# mqtt_device_type: None or name
# "username": thing.mqtt_username,
# "username": thing.mqtt_username,
@@ -468,10 +507,10 @@ def store_thing_config(conn: Connection, data: dict):
@@ -468,10 +507,10 @@ def store_thing_config(conn: Connection, data: dict):
parser = data["parsers"]["parsers"][idx]
parser = data["parsers"]["parsers"][idx]
parser_id = upsert_table_file_parser(conn, parser, ids["file_parser_id"])
parser_id = upsert_table_file_parser(conn, parser, ids["file_parser_id"])
s3_id = upsert_table_s3_store(
s3_id = upsert_table_s3_store(
conn, data["raw_data_storage"], parser_id, ids["s3_store_id"]
conn, data["raw_data_storage"], ids["s3_store_id"]
)
)
upsert_table_thing(
thing_id = upsert_table_thing(
conn,
conn,
uuid,
uuid,
name,
name,
@@ -484,6 +523,9 @@ def store_thing_config(conn: Connection, data: dict):
@@ -484,6 +523,9 @@ def store_thing_config(conn: Connection, data: dict):
ids["thing_id"],
ids["thing_id"],
)
)
 
if ingest_type in ["sftp", "extsftp"]:
 
upsert_table_thing_parser(conn, thing_id, parser_id, parser["settings"])
 
# ===================================================================
# ===================================================================
# QAQC related stuff
# QAQC related stuff
Loading