From f1d960b6b2f784c34c01ddcd77716a6b06dda261 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?David=20Sch=C3=A4fer?=
 <schaefed@mac-schaefed-363.intranet.ufz.de>
Date: Thu, 20 Feb 2025 16:28:28 +0100
Subject: [PATCH 1/3] added script

---
 src/run_reparse_thing.py | 101 +++++++++++++++++++++++++++++++++++++++
 1 file changed, 101 insertions(+)
 create mode 100644 src/run_reparse_thing.py

diff --git a/src/run_reparse_thing.py b/src/run_reparse_thing.py
new file mode 100644
index 00000000..4aa28545
--- /dev/null
+++ b/src/run_reparse_thing.py
@@ -0,0 +1,101 @@
+#! /usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import logging
+import json
+from fnmatch import fnmatch
+
+import click
+from minio import Minio
+import paho.mqtt.client as mqtt
+
+from timeio.feta import Thing
+from timeio.journaling import Journal
+
+logging.basicConfig(level=logging.INFO)
+logger = logging.getLogger("reprocess-files")
+journal = Journal("Reprocessing")
+
+
+def setupMQTT(host, username, password):
+    host, port = host.split(":")
+    port = int(port)
+
+    client = mqtt.Client(client_id="reparse-files", clean_session=False)
+
+    if port == 8883:
+        client.tls_set()
+
+    client.suppress_exceptions = False
+    client.username_pw_set(username, password)
+
+    client.connect(host, port, keepalive=60)
+    if not client.is_connected():
+        raise RuntimeError(f"Couldn't etsablish a connection to {host}:{port}")
+
+    return client
+
+
+@click.command()
+@click.option(
+    "--configdb-dsn",
+    default="postgresql://configdb:configdb@localhost:5432/postgres",
+    envvar="CONFIGDB_DSN",
+)
+@click.option(
+    "--thing-uuid", default="0a308373-ab29-4317-b351-1443e8a1babd", envvar="THING_UUID"
+)
+@click.option("--minio-host", default="localhost:9000", envvar="MINIO_HOST")
+@click.option("--minio-user", default="minioadmin", envvar="MINIO_USER")
+@click.option("--minio-password", default="minioadmin", envvar="MINIO_PASSWORD")
+@click.option("--mqtt-host", default="localhost:1883", envvar="MQTT_HOST")
+@click.option("--mqtt-user", default="mqtt", envvar="MQTT_USER")
+@click.option("--mqtt-password", default="mqtt", envvar="MQTT_PASSWORD")
+def main(
+    configdb_dsn,
+    thing_uuid,
+    minio_host,
+    minio_user,
+    minio_password,
+    mqtt_host,
+    mqtt_user,
+    mqtt_password,
+):
+    store = Thing.from_uuid(thing_uuid, dsn=configdb_dsn).raw_data_storage
+
+    minio = Minio(
+        endpoint=minio_host,
+        access_key=minio_user,
+        secret_key=minio_password,
+        secure=not minio_host.startswith("localhost"),
+    )
+    mqtt = setupMQTT(mqtt_host, mqtt_user, mqtt_password)
+
+    bucket = store.bucket
+    fnpattern = store.filename_pattern
+
+    mqtt.loop_start()
+
+    message = {"EventName": "s3:ObjectCreated:Put"}
+    for i, obj in enumerate(minio.list_objects(bucket)):
+        fname = obj.object_name
+        if fnmatch(fname, fnpattern):
+            message["Key"] = f"{bucket}/{fname}"
+            logging.info(f"republishing file: {message['Key']}")
+            result = mqtt.publish(
+                topic="object_storage_notification", payload=json.dumps(message), qos=1
+            )
+            if result[0] != 0:
+                logger.warning(
+                    f"Failed to deliver reprocessing message for file: {message['Key']}"
+                )
+                # import ipdb; ipdb.set_trace()
+        if i == 0:
+            break
+
+    mqtt.loop_stop()
+    mqtt.disconnect()
+
+
+if __name__ == "__main__":
+    main()
-- 
GitLab


From f0f2829215047686bd1abdb59bb84de383c1956c Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?David=20Sch=C3=A4fer?= <david.schaefer@ufz.de>
Date: Fri, 21 Feb 2025 07:30:47 +0100
Subject: [PATCH 2/3] rm leftover

---
 src/run_reparse_thing.py | 1 -
 1 file changed, 1 deletion(-)

diff --git a/src/run_reparse_thing.py b/src/run_reparse_thing.py
index 4aa28545..34efeba2 100644
--- a/src/run_reparse_thing.py
+++ b/src/run_reparse_thing.py
@@ -89,7 +89,6 @@ def main(
                 logger.warning(
                     f"Failed to deliver reprocessing message for file: {message['Key']}"
                 )
-                # import ipdb; ipdb.set_trace()
         if i == 0:
             break
 
-- 
GitLab


From 79bb54c365511db3e95c79e9e5974ce1f337261d Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?David=20Sch=C3=A4fer?= <david.schaefer@ufz.de>
Date: Fri, 21 Feb 2025 07:31:03 +0100
Subject: [PATCH 3/3] rm leftover

---
 src/run_reparse_thing.py | 2 --
 1 file changed, 2 deletions(-)

diff --git a/src/run_reparse_thing.py b/src/run_reparse_thing.py
index 34efeba2..ac5312e4 100644
--- a/src/run_reparse_thing.py
+++ b/src/run_reparse_thing.py
@@ -89,8 +89,6 @@ def main(
                 logger.warning(
                     f"Failed to deliver reprocessing message for file: {message['Key']}"
                 )
-        if i == 0:
-            break
 
     mqtt.loop_stop()
     mqtt.disconnect()
-- 
GitLab