From 261f70b53a08ef59b0e4d896dfbf8bea9b381c7c Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?David=20Sch=C3=A4fer?=
 <schaefed@mac-schaefed-363.intranet.ufz.de>
Date: Mon, 20 Jan 2025 21:27:46 +0100
Subject: [PATCH 1/7] fill new thing_parser table

---
 requirements.txt |  1 +
 src/configdb.py  | 74 +++++++++++++++++++++++++++++++++++++-----------
 2 files changed, 58 insertions(+), 17 deletions(-)

diff --git a/requirements.txt b/requirements.txt
index 9e8f863..4cb6757 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,3 +1,4 @@
+pandas==2.1.4
 paho-mqtt==2.1.0
 psycopg[binary]==3.2.1
 psycopg-pool==3.2.2
diff --git a/src/configdb.py b/src/configdb.py
index efb3ca2..65168fa 100644
--- a/src/configdb.py
+++ b/src/configdb.py
@@ -2,9 +2,10 @@
 from __future__ import annotations
 
 import logging
-import typing
 from typing import Any, Literal, Sequence, cast
 
+import pandas as pd
+
 from psycopg import Connection, sql
 from psycopg.rows import dict_row
 from psycopg.types.json import Jsonb
@@ -25,15 +26,16 @@ if get_envvar_as_bool("DEBUG_SQL"):
 _IDS_BY_UUID_QUERY = sql.SQL(
     """\
 SELECT t.id as thing_id, t.project_id, p.database_id, t.ingest_type_id, t.s3_store_id,
-s3s.file_parser_id, fp.file_parser_type_id, t.mqtt_id, m.mqtt_device_type_id,
-t.ext_sftp_id, t.ext_api_id, ea.api_type_id
-FROM config_db.thing t
-LEFT JOIN config_db.project p ON t.project_id = p.id
-LEFT JOIN config_db.s3_store s3s ON t.s3_store_id = s3s.id
-LEFT JOIN config_db.file_parser fp ON s3s.file_parser_id = fp.id
-LEFT JOIN config_db.mqtt m ON t.mqtt_id = m.id
-LEFT JOIN config_db.ext_api ea ON t.ext_api_id = ea.id
-WHERE t.uuid = %s
+       tp.file_parser_id, fp.file_parser_type_id, t.mqtt_id, m.mqtt_device_type_id,
+       t.ext_sftp_id, t.ext_api_id, ea.api_type_id
+  FROM config_db.thing t
+       LEFT JOIN config_db.project p ON t.project_id = p.id
+       LEFT JOIN config_db.thing_parser tp ON t.id = tp.thing_id
+       LEFT JOIN config_db.s3_store s3s ON t.s3_store_id = s3s.id
+       LEFT JOIN config_db.file_parser fp ON tp.file_parser_id = fp.id
+       LEFT JOIN config_db.mqtt m ON t.mqtt_id = m.id
+       LEFT JOIN config_db.ext_api ea ON t.ext_api_id = ea.id
+ WHERE tp.valid_to is NULL AND t.uuid = %s;
 """
 )
 _no_ids = {
@@ -275,9 +277,7 @@ def upsert_table_project(
     return id_
 
 
-def upsert_table_s3_store(
-    conn: Connection, values: dict, parser_id: int, s3_id: int | None
-) -> int:
+def upsert_table_s3_store(conn: Connection, values: dict, s3_id: int | None) -> int:
     #     "raw_data_storage":
     #         "bucket_name": storage.bucket,
     #         "username": storage.access_key,
@@ -287,13 +287,12 @@ def upsert_table_s3_store(
     id_ = _upsert(
         conn,
         table="s3_store",
-        columns=("user", "password", "bucket", "filename_pattern", "file_parser_id"),
+        columns=("user", "password", "bucket", "filename_pattern"),
         values=(
             v.pop("username"),
             v.pop("password"),
             v.pop("bucket_name"),
             v.pop("filename_pattern"),
-            parser_id,
         ),
         id=s3_id,
     )
@@ -301,6 +300,44 @@ def upsert_table_s3_store(
     return id_
 
 
+def upsert_table_thing_parser(
+    conn: Connection,
+    thing_id: int,
+    file_parser_id: int,
+    parser_settings: dict[str, Any],
+    start_date: pd.Timestamp | None = None,
+    end_date: pd.Timestamp | None = None,
+) -> int:
+    r = conn.execute(
+        """
+        SELECT tp.id, tp.thing_id, tp.file_parser_id, fp.params
+        FROM config_db.thing_parser tp JOIN config_db.file_parser fp on tp.file_parser_id = fp.id
+        WHERE tp.valid_to is NULL AND tp.file_parser_id = %s
+        """,
+        (file_parser_id,),
+    ).fetchone()
+
+    if r is not None and [thing_id, file_parser_id, parser_settings] != r[1:]:
+        # we actually got a new parser -> update the latest parser
+        if start_date is None:
+            raise ValueError("replacement parsers must have a valid start date")
+        # set the end date of the last valid parser
+        _upsert(
+            conn,
+            table="thing_parser",
+            columns=("valid_to",),
+            values=(start_date,),
+            id=r[0],
+        )
+    # insert the new parser
+    return _upsert(
+        conn,
+        table="thing_parser",
+        columns=("thing_id", "file_parser_id", "valid_from", "valid_to"),
+        values=(thing_id, file_parser_id, start_date, end_date),
+    )
+
+
 def upsert_table_mqtt(conn: Connection, values: dict, mqtt_id: int | None) -> int:
     #         mqtt_device_type: None or name
     #         "username": thing.mqtt_username,
@@ -467,10 +504,10 @@ def store_thing_config(conn: Connection, data: dict):
         parser = data["parsers"]["parsers"][idx]
         parser_id = upsert_table_file_parser(conn, parser, ids["file_parser_id"])
         s3_id = upsert_table_s3_store(
-            conn, data["raw_data_storage"], parser_id, ids["s3_store_id"]
+            conn, data["raw_data_storage"], ids["s3_store_id"]
         )
 
-    upsert_table_thing(
+    thing_id = upsert_table_thing(
         conn,
         uuid,
         name,
@@ -483,6 +520,9 @@ def store_thing_config(conn: Connection, data: dict):
         ids["thing_id"],
     )
 
+    if ingest_type in ["sftp", "extsftp"]:
+        upsert_table_thing_parser(conn, thing_id, parser_id, parser["settings"])
+
 
 # ===================================================================
 # QAQC related stuff
-- 
GitLab


From 29cf665a4db03093249ad7246756ac4d33ef0a8b Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?David=20Sch=C3=A4fer?= <david.schaefer@ufz.de>
Date: Thu, 30 Jan 2025 11:56:07 +0100
Subject: [PATCH 2/7] Apply 1 suggestion(s) to 1 file(s)

Co-authored-by: Bert Palm <bert.palm@ufz.de>
---
 src/configdb.py | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/src/configdb.py b/src/configdb.py
index 65168fa..eb4b666 100644
--- a/src/configdb.py
+++ b/src/configdb.py
@@ -311,7 +311,8 @@ def upsert_table_thing_parser(
     r = conn.execute(
         """
         SELECT tp.id, tp.thing_id, tp.file_parser_id, fp.params
-        FROM config_db.thing_parser tp JOIN config_db.file_parser fp on tp.file_parser_id = fp.id
+        FROM config_db.thing_parser tp 
+        JOIN config_db.file_parser fp ON tp.file_parser_id = fp.id
         WHERE tp.valid_to is NULL AND tp.file_parser_id = %s
         """,
         (file_parser_id,),
-- 
GitLab


From b48d3e5253502ea7ee3d80e13252a259a912dda3 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?David=20Sch=C3=A4fer?= <david.schaefer@ufz.de>
Date: Thu, 30 Jan 2025 11:57:14 +0100
Subject: [PATCH 3/7] Apply 1 suggestion(s) to 1 file(s)

Co-authored-by: Bert Palm <bert.palm@ufz.de>
---
 requirements.txt | 1 -
 1 file changed, 1 deletion(-)

diff --git a/requirements.txt b/requirements.txt
index 4cb6757..9e8f863 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,4 +1,3 @@
-pandas==2.1.4
 paho-mqtt==2.1.0
 psycopg[binary]==3.2.1
 psycopg-pool==3.2.2
-- 
GitLab


From 71c2036058a28e2585da1a01f116809fc3ae2f7d Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?David=20Sch=C3=A4fer?= <david.schaefer@ufz.de>
Date: Thu, 30 Jan 2025 11:57:23 +0100
Subject: [PATCH 4/7] Apply 1 suggestion(s) to 1 file(s)

Co-authored-by: Bert Palm <bert.palm@ufz.de>
---
 src/configdb.py | 1 +
 1 file changed, 1 insertion(+)

diff --git a/src/configdb.py b/src/configdb.py
index eb4b666..cdec4d9 100644
--- a/src/configdb.py
+++ b/src/configdb.py
@@ -5,6 +5,7 @@ import logging
 from typing import Any, Literal, Sequence, cast
 
 import pandas as pd
+import datetime
 
 from psycopg import Connection, sql
 from psycopg.rows import dict_row
-- 
GitLab


From 0accaa8015f2f0ddb95969222ee0cdfc0f437b22 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?David=20Sch=C3=A4fer?= <david.schaefer@ufz.de>
Date: Thu, 30 Jan 2025 11:57:30 +0100
Subject: [PATCH 5/7] Apply 1 suggestion(s) to 1 file(s)

Co-authored-by: Bert Palm <bert.palm@ufz.de>
---
 src/configdb.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/src/configdb.py b/src/configdb.py
index cdec4d9..7008cef 100644
--- a/src/configdb.py
+++ b/src/configdb.py
@@ -306,8 +306,8 @@ def upsert_table_thing_parser(
     thing_id: int,
     file_parser_id: int,
     parser_settings: dict[str, Any],
-    start_date: pd.Timestamp | None = None,
-    end_date: pd.Timestamp | None = None,
+    start_date: datetime.datetime | None = None,
+    end_date: datetime.datetime | None = None,
 ) -> int:
     r = conn.execute(
         """
-- 
GitLab


From aff0b7ff898c883daa2e3859ecdcc8c0319e06c0 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?David=20Sch=C3=A4fer?= <david.schaefer@ufz.de>
Date: Thu, 30 Jan 2025 11:59:56 +0100
Subject: [PATCH 6/7] Apply 1 suggestion(s) to 1 file(s)

Co-authored-by: Bert Palm <bert.palm@ufz.de>
---
 src/configdb.py | 23 ++++++++++++-----------
 1 file changed, 12 insertions(+), 11 deletions(-)

diff --git a/src/configdb.py b/src/configdb.py
index 7008cef..adc21d1 100644
--- a/src/configdb.py
+++ b/src/configdb.py
@@ -26,17 +26,18 @@ if get_envvar_as_bool("DEBUG_SQL"):
 
 _IDS_BY_UUID_QUERY = sql.SQL(
     """\
-SELECT t.id as thing_id, t.project_id, p.database_id, t.ingest_type_id, t.s3_store_id,
-       tp.file_parser_id, fp.file_parser_type_id, t.mqtt_id, m.mqtt_device_type_id,
-       t.ext_sftp_id, t.ext_api_id, ea.api_type_id
-  FROM config_db.thing t
-       LEFT JOIN config_db.project p ON t.project_id = p.id
-       LEFT JOIN config_db.thing_parser tp ON t.id = tp.thing_id
-       LEFT JOIN config_db.s3_store s3s ON t.s3_store_id = s3s.id
-       LEFT JOIN config_db.file_parser fp ON tp.file_parser_id = fp.id
-       LEFT JOIN config_db.mqtt m ON t.mqtt_id = m.id
-       LEFT JOIN config_db.ext_api ea ON t.ext_api_id = ea.id
- WHERE tp.valid_to is NULL AND t.uuid = %s;
+SELECT t.id as thing_id, t.project_id, p.database_id, t.ingest_type_id, t.s3_store_id, 
+tp.file_parser_id, fp.file_parser_type_id, t.mqtt_id, m.mqtt_device_type_id, 
+t.ext_sftp_id, t.ext_api_id, ea.api_type_id 
+FROM config_db.thing t 
+LEFT JOIN config_db.project p ON t.project_id = p.id 
+LEFT JOIN config_db.thing_parser tp ON t.id = tp.thing_id 
+LEFT JOIN config_db.s3_store s3s ON t.s3_store_id = s3s.id 
+LEFT JOIN config_db.file_parser fp ON tp.file_parser_id = fp.id 
+LEFT JOIN config_db.mqtt m ON t.mqtt_id = m.id 
+LEFT JOIN config_db.ext_api ea ON t.ext_api_id = ea.id 
+WHERE tp.valid_to is null 
+AND t.uuid = %s;
 """
 )
 _no_ids = {
-- 
GitLab


From 087ab3990d975e2b8a7de2018f02f846285030f2 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?David=20Sch=C3=A4fer?=
 <schaefed@mac-schaefed-363.intranet.ufz.de>
Date: Thu, 30 Jan 2025 17:29:21 +0100
Subject: [PATCH 7/7] removed obsolete import

---
 src/configdb.py | 1 -
 1 file changed, 1 deletion(-)

diff --git a/src/configdb.py b/src/configdb.py
index adc21d1..63f6bb0 100644
--- a/src/configdb.py
+++ b/src/configdb.py
@@ -4,7 +4,6 @@ from __future__ import annotations
 import logging
 from typing import Any, Literal, Sequence, cast
 
-import pandas as pd
 import datetime
 
 from psycopg import Connection, sql
-- 
GitLab