Skip to content
Snippets Groups Projects

fill new thing_parser table

Open David Schäfer requested to merge multi-parser into main
Files
2
+ 60
18
@@ -3,9 +3,10 @@ from __future__ import annotations
import json
import logging
import typing
from typing import Any, Literal, Sequence, cast
import datetime
from psycopg import Connection, sql
from psycopg.rows import dict_row
from psycopg.types.json import Jsonb
@@ -25,16 +26,18 @@ if get_envvar_as_bool("DEBUG_SQL"):
_IDS_BY_UUID_QUERY = sql.SQL(
"""\
SELECT t.id as thing_id, t.project_id, p.database_id, t.ingest_type_id, t.s3_store_id,
s3s.file_parser_id, fp.file_parser_type_id, t.mqtt_id, m.mqtt_device_type_id,
t.ext_sftp_id, t.ext_api_id, ea.api_type_id
FROM config_db.thing t
LEFT JOIN config_db.project p ON t.project_id = p.id
LEFT JOIN config_db.s3_store s3s ON t.s3_store_id = s3s.id
LEFT JOIN config_db.file_parser fp ON s3s.file_parser_id = fp.id
LEFT JOIN config_db.mqtt m ON t.mqtt_id = m.id
LEFT JOIN config_db.ext_api ea ON t.ext_api_id = ea.id
WHERE t.uuid = %s
SELECT t.id as thing_id, t.project_id, p.database_id, t.ingest_type_id, t.s3_store_id,
tp.file_parser_id, fp.file_parser_type_id, t.mqtt_id, m.mqtt_device_type_id,
t.ext_sftp_id, t.ext_api_id, ea.api_type_id
FROM config_db.thing t
LEFT JOIN config_db.project p ON t.project_id = p.id
LEFT JOIN config_db.thing_parser tp ON t.id = tp.thing_id
LEFT JOIN config_db.s3_store s3s ON t.s3_store_id = s3s.id
LEFT JOIN config_db.file_parser fp ON tp.file_parser_id = fp.id
LEFT JOIN config_db.mqtt m ON t.mqtt_id = m.id
LEFT JOIN config_db.ext_api ea ON t.ext_api_id = ea.id
WHERE tp.valid_to is null
AND t.uuid = %s;
"""
)
_no_ids = {
@@ -276,9 +279,7 @@ def upsert_table_project(
return id_
def upsert_table_s3_store(
conn: Connection, values: dict, parser_id: int, s3_id: int | None
) -> int:
def upsert_table_s3_store(conn: Connection, values: dict, s3_id: int | None) -> int:
# "raw_data_storage":
# "bucket_name": storage.bucket,
# "username": storage.access_key,
@@ -288,13 +289,12 @@ def upsert_table_s3_store(
id_ = _upsert(
conn,
table="s3_store",
columns=("user", "password", "bucket", "filename_pattern", "file_parser_id"),
columns=("user", "password", "bucket", "filename_pattern"),
values=(
v.pop("username"),
v.pop("password"),
v.pop("bucket_name"),
v.pop("filename_pattern"),
parser_id,
),
id=s3_id,
)
@@ -302,6 +302,45 @@ def upsert_table_s3_store(
return id_
def upsert_table_thing_parser(
conn: Connection,
thing_id: int,
file_parser_id: int,
parser_settings: dict[str, Any],
start_date: datetime.datetime | None = None,
end_date: datetime.datetime | None = None,
) -> int:
r = conn.execute(
"""
SELECT tp.id, tp.thing_id, tp.file_parser_id, fp.params
FROM config_db.thing_parser tp
JOIN config_db.file_parser fp ON tp.file_parser_id = fp.id
WHERE tp.valid_to is NULL AND tp.file_parser_id = %s
""",
(file_parser_id,),
).fetchone()
if r is not None and [thing_id, file_parser_id, parser_settings] != r[1:]:
# we actually got a new parser -> update the latest parser
if start_date is None:
raise ValueError("replacement parsers must have a valid start date")
# set the end date of the last valid parser
_upsert(
conn,
table="thing_parser",
columns=("valid_to",),
values=(start_date,),
id=r[0],
)
# insert the new parser
return _upsert(
conn,
table="thing_parser",
columns=("thing_id", "file_parser_id", "valid_from", "valid_to"),
values=(thing_id, file_parser_id, start_date, end_date),
)
def upsert_table_mqtt(conn: Connection, values: dict, mqtt_id: int | None) -> int:
# mqtt_device_type: None or name
# "username": thing.mqtt_username,
@@ -468,10 +507,10 @@ def store_thing_config(conn: Connection, data: dict):
parser = data["parsers"]["parsers"][idx]
parser_id = upsert_table_file_parser(conn, parser, ids["file_parser_id"])
s3_id = upsert_table_s3_store(
conn, data["raw_data_storage"], parser_id, ids["s3_store_id"]
conn, data["raw_data_storage"], ids["s3_store_id"]
)
upsert_table_thing(
thing_id = upsert_table_thing(
conn,
uuid,
name,
@@ -484,6 +523,9 @@ def store_thing_config(conn: Connection, data: dict):
ids["thing_id"],
)
if ingest_type in ["sftp", "extsftp"]:
upsert_table_thing_parser(conn, thing_id, parser_id, parser["settings"])
# ===================================================================
# QAQC related stuff
Loading