Skip to content
Snippets Groups Projects
Commit 054bdf75 authored by David Schäfer's avatar David Schäfer
Browse files

Merge branch 'reparse-all-script' into 'main'

added reparse all script

See merge request !285
parents 50c36f71 79bb54c3
No related branches found
No related tags found
1 merge request!285added reparse all script
Pipeline #496501 passed
#! /usr/bin/env python
# -*- coding: utf-8 -*-
import logging
import json
from fnmatch import fnmatch
import click
from minio import Minio
import paho.mqtt.client as mqtt
from timeio.feta import Thing
from timeio.journaling import Journal
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("reprocess-files")
journal = Journal("Reprocessing")
def setupMQTT(host, username, password):
host, port = host.split(":")
port = int(port)
client = mqtt.Client(client_id="reparse-files", clean_session=False)
if port == 8883:
client.tls_set()
client.suppress_exceptions = False
client.username_pw_set(username, password)
client.connect(host, port, keepalive=60)
if not client.is_connected():
raise RuntimeError(f"Couldn't etsablish a connection to {host}:{port}")
return client
@click.command()
@click.option(
"--configdb-dsn",
default="postgresql://configdb:configdb@localhost:5432/postgres",
envvar="CONFIGDB_DSN",
)
@click.option(
"--thing-uuid", default="0a308373-ab29-4317-b351-1443e8a1babd", envvar="THING_UUID"
)
@click.option("--minio-host", default="localhost:9000", envvar="MINIO_HOST")
@click.option("--minio-user", default="minioadmin", envvar="MINIO_USER")
@click.option("--minio-password", default="minioadmin", envvar="MINIO_PASSWORD")
@click.option("--mqtt-host", default="localhost:1883", envvar="MQTT_HOST")
@click.option("--mqtt-user", default="mqtt", envvar="MQTT_USER")
@click.option("--mqtt-password", default="mqtt", envvar="MQTT_PASSWORD")
def main(
configdb_dsn,
thing_uuid,
minio_host,
minio_user,
minio_password,
mqtt_host,
mqtt_user,
mqtt_password,
):
store = Thing.from_uuid(thing_uuid, dsn=configdb_dsn).raw_data_storage
minio = Minio(
endpoint=minio_host,
access_key=minio_user,
secret_key=minio_password,
secure=not minio_host.startswith("localhost"),
)
mqtt = setupMQTT(mqtt_host, mqtt_user, mqtt_password)
bucket = store.bucket
fnpattern = store.filename_pattern
mqtt.loop_start()
message = {"EventName": "s3:ObjectCreated:Put"}
for i, obj in enumerate(minio.list_objects(bucket)):
fname = obj.object_name
if fnmatch(fname, fnpattern):
message["Key"] = f"{bucket}/{fname}"
logging.info(f"republishing file: {message['Key']}")
result = mqtt.publish(
topic="object_storage_notification", payload=json.dumps(message), qos=1
)
if result[0] != 0:
logger.warning(
f"Failed to deliver reprocessing message for file: {message['Key']}"
)
mqtt.loop_stop()
mqtt.disconnect()
if __name__ == "__main__":
main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment