From 261f70b53a08ef59b0e4d896dfbf8bea9b381c7c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20Sch=C3=A4fer?= <schaefed@mac-schaefed-363.intranet.ufz.de> Date: Mon, 20 Jan 2025 21:27:46 +0100 Subject: [PATCH 1/7] fill new thing_parser table --- requirements.txt | 1 + src/configdb.py | 74 +++++++++++++++++++++++++++++++++++++----------- 2 files changed, 58 insertions(+), 17 deletions(-) diff --git a/requirements.txt b/requirements.txt index 9e8f863..4cb6757 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ +pandas==2.1.4 paho-mqtt==2.1.0 psycopg[binary]==3.2.1 psycopg-pool==3.2.2 diff --git a/src/configdb.py b/src/configdb.py index efb3ca2..65168fa 100644 --- a/src/configdb.py +++ b/src/configdb.py @@ -2,9 +2,10 @@ from __future__ import annotations import logging -import typing from typing import Any, Literal, Sequence, cast +import pandas as pd + from psycopg import Connection, sql from psycopg.rows import dict_row from psycopg.types.json import Jsonb @@ -25,15 +26,16 @@ if get_envvar_as_bool("DEBUG_SQL"): _IDS_BY_UUID_QUERY = sql.SQL( """\ SELECT t.id as thing_id, t.project_id, p.database_id, t.ingest_type_id, t.s3_store_id, -s3s.file_parser_id, fp.file_parser_type_id, t.mqtt_id, m.mqtt_device_type_id, -t.ext_sftp_id, t.ext_api_id, ea.api_type_id -FROM config_db.thing t -LEFT JOIN config_db.project p ON t.project_id = p.id -LEFT JOIN config_db.s3_store s3s ON t.s3_store_id = s3s.id -LEFT JOIN config_db.file_parser fp ON s3s.file_parser_id = fp.id -LEFT JOIN config_db.mqtt m ON t.mqtt_id = m.id -LEFT JOIN config_db.ext_api ea ON t.ext_api_id = ea.id -WHERE t.uuid = %s + tp.file_parser_id, fp.file_parser_type_id, t.mqtt_id, m.mqtt_device_type_id, + t.ext_sftp_id, t.ext_api_id, ea.api_type_id + FROM config_db.thing t + LEFT JOIN config_db.project p ON t.project_id = p.id + LEFT JOIN config_db.thing_parser tp ON t.id = tp.thing_id + LEFT JOIN config_db.s3_store s3s ON t.s3_store_id = s3s.id + LEFT JOIN config_db.file_parser fp ON tp.file_parser_id = fp.id + LEFT JOIN config_db.mqtt m ON t.mqtt_id = m.id + LEFT JOIN config_db.ext_api ea ON t.ext_api_id = ea.id + WHERE tp.valid_to is NULL AND t.uuid = %s; """ ) _no_ids = { @@ -275,9 +277,7 @@ def upsert_table_project( return id_ -def upsert_table_s3_store( - conn: Connection, values: dict, parser_id: int, s3_id: int | None -) -> int: +def upsert_table_s3_store(conn: Connection, values: dict, s3_id: int | None) -> int: # "raw_data_storage": # "bucket_name": storage.bucket, # "username": storage.access_key, @@ -287,13 +287,12 @@ def upsert_table_s3_store( id_ = _upsert( conn, table="s3_store", - columns=("user", "password", "bucket", "filename_pattern", "file_parser_id"), + columns=("user", "password", "bucket", "filename_pattern"), values=( v.pop("username"), v.pop("password"), v.pop("bucket_name"), v.pop("filename_pattern"), - parser_id, ), id=s3_id, ) @@ -301,6 +300,44 @@ def upsert_table_s3_store( return id_ +def upsert_table_thing_parser( + conn: Connection, + thing_id: int, + file_parser_id: int, + parser_settings: dict[str, Any], + start_date: pd.Timestamp | None = None, + end_date: pd.Timestamp | None = None, +) -> int: + r = conn.execute( + """ + SELECT tp.id, tp.thing_id, tp.file_parser_id, fp.params + FROM config_db.thing_parser tp JOIN config_db.file_parser fp on tp.file_parser_id = fp.id + WHERE tp.valid_to is NULL AND tp.file_parser_id = %s + """, + (file_parser_id,), + ).fetchone() + + if r is not None and [thing_id, file_parser_id, parser_settings] != r[1:]: + # we actually got a new parser -> update the latest parser + if start_date is None: + raise ValueError("replacement parsers must have a valid start date") + # set the end date of the last valid parser + _upsert( + conn, + table="thing_parser", + columns=("valid_to",), + values=(start_date,), + id=r[0], + ) + # insert the new parser + return _upsert( + conn, + table="thing_parser", + columns=("thing_id", "file_parser_id", "valid_from", "valid_to"), + values=(thing_id, file_parser_id, start_date, end_date), + ) + + def upsert_table_mqtt(conn: Connection, values: dict, mqtt_id: int | None) -> int: # mqtt_device_type: None or name # "username": thing.mqtt_username, @@ -467,10 +504,10 @@ def store_thing_config(conn: Connection, data: dict): parser = data["parsers"]["parsers"][idx] parser_id = upsert_table_file_parser(conn, parser, ids["file_parser_id"]) s3_id = upsert_table_s3_store( - conn, data["raw_data_storage"], parser_id, ids["s3_store_id"] + conn, data["raw_data_storage"], ids["s3_store_id"] ) - upsert_table_thing( + thing_id = upsert_table_thing( conn, uuid, name, @@ -483,6 +520,9 @@ def store_thing_config(conn: Connection, data: dict): ids["thing_id"], ) + if ingest_type in ["sftp", "extsftp"]: + upsert_table_thing_parser(conn, thing_id, parser_id, parser["settings"]) + # =================================================================== # QAQC related stuff -- GitLab From 29cf665a4db03093249ad7246756ac4d33ef0a8b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20Sch=C3=A4fer?= <david.schaefer@ufz.de> Date: Thu, 30 Jan 2025 11:56:07 +0100 Subject: [PATCH 2/7] Apply 1 suggestion(s) to 1 file(s) Co-authored-by: Bert Palm <bert.palm@ufz.de> --- src/configdb.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/configdb.py b/src/configdb.py index 65168fa..eb4b666 100644 --- a/src/configdb.py +++ b/src/configdb.py @@ -311,7 +311,8 @@ def upsert_table_thing_parser( r = conn.execute( """ SELECT tp.id, tp.thing_id, tp.file_parser_id, fp.params - FROM config_db.thing_parser tp JOIN config_db.file_parser fp on tp.file_parser_id = fp.id + FROM config_db.thing_parser tp + JOIN config_db.file_parser fp ON tp.file_parser_id = fp.id WHERE tp.valid_to is NULL AND tp.file_parser_id = %s """, (file_parser_id,), -- GitLab From b48d3e5253502ea7ee3d80e13252a259a912dda3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20Sch=C3=A4fer?= <david.schaefer@ufz.de> Date: Thu, 30 Jan 2025 11:57:14 +0100 Subject: [PATCH 3/7] Apply 1 suggestion(s) to 1 file(s) Co-authored-by: Bert Palm <bert.palm@ufz.de> --- requirements.txt | 1 - 1 file changed, 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 4cb6757..9e8f863 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,3 @@ -pandas==2.1.4 paho-mqtt==2.1.0 psycopg[binary]==3.2.1 psycopg-pool==3.2.2 -- GitLab From 71c2036058a28e2585da1a01f116809fc3ae2f7d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20Sch=C3=A4fer?= <david.schaefer@ufz.de> Date: Thu, 30 Jan 2025 11:57:23 +0100 Subject: [PATCH 4/7] Apply 1 suggestion(s) to 1 file(s) Co-authored-by: Bert Palm <bert.palm@ufz.de> --- src/configdb.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/configdb.py b/src/configdb.py index eb4b666..cdec4d9 100644 --- a/src/configdb.py +++ b/src/configdb.py @@ -5,6 +5,7 @@ import logging from typing import Any, Literal, Sequence, cast import pandas as pd +import datetime from psycopg import Connection, sql from psycopg.rows import dict_row -- GitLab From 0accaa8015f2f0ddb95969222ee0cdfc0f437b22 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20Sch=C3=A4fer?= <david.schaefer@ufz.de> Date: Thu, 30 Jan 2025 11:57:30 +0100 Subject: [PATCH 5/7] Apply 1 suggestion(s) to 1 file(s) Co-authored-by: Bert Palm <bert.palm@ufz.de> --- src/configdb.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/configdb.py b/src/configdb.py index cdec4d9..7008cef 100644 --- a/src/configdb.py +++ b/src/configdb.py @@ -306,8 +306,8 @@ def upsert_table_thing_parser( thing_id: int, file_parser_id: int, parser_settings: dict[str, Any], - start_date: pd.Timestamp | None = None, - end_date: pd.Timestamp | None = None, + start_date: datetime.datetime | None = None, + end_date: datetime.datetime | None = None, ) -> int: r = conn.execute( """ -- GitLab From aff0b7ff898c883daa2e3859ecdcc8c0319e06c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20Sch=C3=A4fer?= <david.schaefer@ufz.de> Date: Thu, 30 Jan 2025 11:59:56 +0100 Subject: [PATCH 6/7] Apply 1 suggestion(s) to 1 file(s) Co-authored-by: Bert Palm <bert.palm@ufz.de> --- src/configdb.py | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/src/configdb.py b/src/configdb.py index 7008cef..adc21d1 100644 --- a/src/configdb.py +++ b/src/configdb.py @@ -26,17 +26,18 @@ if get_envvar_as_bool("DEBUG_SQL"): _IDS_BY_UUID_QUERY = sql.SQL( """\ -SELECT t.id as thing_id, t.project_id, p.database_id, t.ingest_type_id, t.s3_store_id, - tp.file_parser_id, fp.file_parser_type_id, t.mqtt_id, m.mqtt_device_type_id, - t.ext_sftp_id, t.ext_api_id, ea.api_type_id - FROM config_db.thing t - LEFT JOIN config_db.project p ON t.project_id = p.id - LEFT JOIN config_db.thing_parser tp ON t.id = tp.thing_id - LEFT JOIN config_db.s3_store s3s ON t.s3_store_id = s3s.id - LEFT JOIN config_db.file_parser fp ON tp.file_parser_id = fp.id - LEFT JOIN config_db.mqtt m ON t.mqtt_id = m.id - LEFT JOIN config_db.ext_api ea ON t.ext_api_id = ea.id - WHERE tp.valid_to is NULL AND t.uuid = %s; +SELECT t.id as thing_id, t.project_id, p.database_id, t.ingest_type_id, t.s3_store_id, +tp.file_parser_id, fp.file_parser_type_id, t.mqtt_id, m.mqtt_device_type_id, +t.ext_sftp_id, t.ext_api_id, ea.api_type_id +FROM config_db.thing t +LEFT JOIN config_db.project p ON t.project_id = p.id +LEFT JOIN config_db.thing_parser tp ON t.id = tp.thing_id +LEFT JOIN config_db.s3_store s3s ON t.s3_store_id = s3s.id +LEFT JOIN config_db.file_parser fp ON tp.file_parser_id = fp.id +LEFT JOIN config_db.mqtt m ON t.mqtt_id = m.id +LEFT JOIN config_db.ext_api ea ON t.ext_api_id = ea.id +WHERE tp.valid_to is null +AND t.uuid = %s; """ ) _no_ids = { -- GitLab From 087ab3990d975e2b8a7de2018f02f846285030f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20Sch=C3=A4fer?= <schaefed@mac-schaefed-363.intranet.ufz.de> Date: Thu, 30 Jan 2025 17:29:21 +0100 Subject: [PATCH 7/7] removed obsolete import --- src/configdb.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/configdb.py b/src/configdb.py index adc21d1..63f6bb0 100644 --- a/src/configdb.py +++ b/src/configdb.py @@ -4,7 +4,6 @@ from __future__ import annotations import logging from typing import Any, Literal, Sequence, cast -import pandas as pd import datetime from psycopg import Connection, sql -- GitLab