diff --git a/docker-compose.yml b/docker-compose.yml index 59c9a3de15491d29650d5860aec35f332dedc3cd..5754926a4eb4ba006dde818c36bfb82eea479098 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -497,7 +497,7 @@ services: start_period: 10s # ================================================================= - # SETUP worker (topic: thing_creation) + # SETUP worker (topic: configdb_update) # ================================================================= # fills and updates the config-db from setup mqtt-messages @@ -546,7 +546,7 @@ services: condition: service_healthy environment: LOG_LEVEL: "${LOG_LEVEL}" - TOPIC: thing_creation + TOPIC: configdb_update MQTT_BROKER: mqtt-broker:1883 MQTT_USER: "${MQTT_USER}" MQTT_PASSWORD: "${MQTT_PASSWORD}" @@ -560,6 +560,12 @@ services: DB_API_BASE_URL: "${DB_API_BASE_URL}" JOURNALING: "${JOURNALING}" FERNET_ENCRYPTION_SECRET: "${FERNET_ENCRYPTION_SECRET}" + CONFIGDB_DSN: "postgresql://\ + ${CONFIGDB_USER}:\ + ${CONFIGDB_PASSWORD}@\ + ${CONFIGDB_HOST}:\ + ${CONFIGDB_PORT}/\ + ${CONFIGDB_DB}" entrypoint: ["python3", "setup_minio.py"] @@ -587,7 +593,7 @@ services: condition: service_healthy environment: LOG_LEVEL: "${LOG_LEVEL}" - TOPIC: thing_creation + TOPIC: configdb_update MQTT_BROKER: mqtt-broker:1883 MQTT_USER: "${MQTT_USER}" MQTT_PASSWORD: "${MQTT_PASSWORD}" @@ -604,6 +610,12 @@ services: DB_API_BASE_URL: "${DB_API_BASE_URL}" JOURNALING: "${JOURNALING}" FERNET_ENCRYPTION_SECRET: "${FERNET_ENCRYPTION_SECRET}" + CONFIGDB_DSN: "postgresql://\ + ${CONFIGDB_USER}:\ + ${CONFIGDB_PASSWORD}@\ + ${CONFIGDB_HOST}:\ + ${CONFIGDB_PORT}/\ + ${CONFIGDB_DB}" entrypoint: ["python3", "setup_user_database.py"] @@ -624,7 +636,7 @@ services: - tomcat-context:/home/appuser/app/src/frost_context_files:rw environment: LOG_LEVEL: "${LOG_LEVEL}" - TOPIC: thing_creation + TOPIC: configdb_update MQTT_BROKER: mqtt-broker:1883 MQTT_USER: "${MQTT_USER}" MQTT_PASSWORD: "${MQTT_PASSWORD}" @@ -635,6 +647,12 @@ services: DB_API_BASE_URL: "${DB_API_BASE_URL}" JOURNALING: "${JOURNALING}" FERNET_ENCRYPTION_SECRET: "${FERNET_ENCRYPTION_SECRET}" + CONFIGDB_DSN: "postgresql://\ + ${CONFIGDB_USER}:\ + ${CONFIGDB_PASSWORD}@\ + ${CONFIGDB_HOST}:\ + ${CONFIGDB_PORT}/\ + ${CONFIGDB_DB}" entrypoint: ["python3", "setup_frost.py"] @@ -654,7 +672,7 @@ services: condition: service_healthy environment: LOG_LEVEL: "${LOG_LEVEL}" - TOPIC: thing_creation + TOPIC: configdb_update MQTT_BROKER: mqtt-broker:1883 MQTT_USER: "${MQTT_USER}" MQTT_PASSWORD: "${MQTT_PASSWORD}" @@ -668,6 +686,12 @@ services: ${CREATEDB_POSTGRES_DATABASE}" DB_API_BASE_URL: "${DB_API_BASE_URL}" JOURNALING: "${JOURNALING}" + CONFIGDB_DSN: "postgresql://\ + ${CONFIGDB_USER}:\ + ${CONFIGDB_PASSWORD}@\ + ${CONFIGDB_HOST}:\ + ${CONFIGDB_PORT}/\ + ${CONFIGDB_DB}" entrypoint: ["python3", "setup_mqtt_user.py"] @@ -686,7 +710,7 @@ services: condition: service_healthy environment: LOG_LEVEL: "${LOG_LEVEL}" - TOPIC: thing_creation + TOPIC: configdb_update MQTT_BROKER: mqtt-broker:1883 MQTT_USER: "${MQTT_USER}" MQTT_PASSWORD: "${MQTT_PASSWORD}" @@ -700,6 +724,12 @@ services: DB_API_BASE_URL: "${DB_API_BASE_URL}" JOURNALING: "${JOURNALING}" FERNET_ENCRYPTION_SECRET: "${FERNET_ENCRYPTION_SECRET}" + CONFIGDB_DSN: "postgresql://\ + ${CONFIGDB_USER}:\ + ${CONFIGDB_PASSWORD}@\ + ${CONFIGDB_HOST}:\ + ${CONFIGDB_PORT}/\ + ${CONFIGDB_DB}" entrypoint: ["python3", "setup_grafana_dashboard.py"] @@ -828,7 +858,7 @@ services: condition: service_healthy environment: LOG_LEVEL: "${LOG_LEVEL}" - TOPIC: thing_creation + TOPIC: configdb_update MQTT_BROKER: mqtt-broker:1883 MQTT_USER: "${MQTT_USER}" MQTT_PASSWORD: "${MQTT_PASSWORD}" @@ -837,6 +867,12 @@ services: MQTT_QOS: "${MQTT_QOS}" DB_API_BASE_URL: "${DB_API_BASE_URL}" JOURNALING: "${JOURNALING}" + CONFIGDB_DSN: "postgresql://\ + ${CONFIGDB_USER}:\ + ${CONFIGDB_PASSWORD}@\ + ${CONFIGDB_HOST}:\ + ${CONFIGDB_PORT}/\ + ${CONFIGDB_DB}" entrypoint: ["python3", "setup_crontab.py"] volumes: - ./cron/crontab.txt:/tmp/cron/crontab.txt diff --git a/src/setup_crontab.py b/src/setup_crontab.py index faf920993465cc9bcbf130a9cebc8790f5607296..eedbf982f72156805bb6985e0c475bbb60f705dc 100755 --- a/src/setup_crontab.py +++ b/src/setup_crontab.py @@ -7,9 +7,10 @@ from random import randint from crontab import CronItem, CronTab from timeio.mqtt import AbstractHandler, MQTTMessage -from timeio.thing import Thing +from timeio.feta import Thing from timeio.common import get_envvar, setup_logging from timeio.journaling import Journal +from timeio.typehints import MqttPayload logger = logging.getLogger("crontab-setup") journal = Journal("Cron") @@ -27,9 +28,10 @@ class CreateThingInCrontabHandler(AbstractHandler): mqtt_clean_session=get_envvar("MQTT_CLEAN_SESSION", cast_to=bool), ) self.tabfile = "/tmp/cron/crontab.txt" + self.configdb_dsn = get_envvar("CONFIGDB_DSN") - def act(self, content: dict, message: MQTTMessage): - thing = Thing.get_instance(content) + def act(self, content: MqttPayload.ConfigDBUpdate, message: MQTTMessage): + thing = Thing.from_uuid(content["thing"], dsn=self.configdb_dsn) with CronTab(tabfile=self.tabfile) as crontab: for job in crontab: if self.job_belongs_to_thing(job, thing): @@ -55,28 +57,28 @@ class CreateThingInCrontabHandler(AbstractHandler): info = "" comment = cls.mk_comment(thing) uuid = thing.uuid - if thing.external_sftp is not None: - interval = int(thing.external_sftp.sync_interval) + if thing.ext_sftp is not None: + interval = int(thing.ext_sftp.sync_interval) schedule = cls.get_schedule(interval) script = "/scripts/sync_sftp.py" - keyfile = thing.external_sftp.private_key_path + keyfile = thing.ext_sftp.ssh_priv_key command = f"{script} {uuid} {keyfile} > $STDOUT 2> $STDERR" - job.enable(enabled=thing.external_sftp.enabled) + job.enable(enabled=thing.ext_sftp.sync_enabled) job.set_comment(comment, pre_comment=True) job.setall(schedule) job.set_command(command) - info = f"sFTP {thing.external_sftp.uri} @ {interval}s" - if thing.external_api is not None: - interval = int(thing.external_api.sync_interval) + info = f"sFTP {thing.ext_sftp.uri} @ {interval}s" + if thing.ext_api is not None: + interval = int(thing.ext_api.sync_interval) schedule = cls.get_schedule(interval) - script = f"/scripts/sync_{thing.external_api.api_type_name}_api.py" + script = f"/scripts/sync_{thing.ext_api.api_type_name}_api.py" target_uri = thing.database.url - command = f"""{script} {uuid} "{thing.external_api.settings}" {target_uri} > $STDOUT 2> $STDERR""" - job.enable(enabled=thing.external_api.enabled) + command = f"""{script} {uuid} "{thing.ext_api.settings}" {target_uri} > $STDOUT 2> $STDERR""" + job.enable(enabled=thing.ext_api.sync_enabled) job.set_comment(comment, pre_comment=True) job.setall(schedule) job.set_command(command) - info = f"{thing.external_api.api_type_name}-API @ {interval}s" + info = f"{thing.ext_api.api_type_name}-API @ {interval}s" return info # alias diff --git a/src/setup_frost.py b/src/setup_frost.py index 80bd108abc3c3604695e908e12aa4a0df5d1bb6d..300cc894e1acdc72f56dbc637830ef1eae0a0d62 100755 --- a/src/setup_frost.py +++ b/src/setup_frost.py @@ -1,8 +1,10 @@ import logging from timeio.mqtt import AbstractHandler, MQTTMessage -from timeio.thing import Thing +from timeio.feta import Thing from timeio.common import get_envvar, setup_logging +from timeio.typehints import MqttPayload +from timeio import frost logger = logging.getLogger("frost-setup") @@ -20,10 +22,17 @@ class CreateFrostInstanceHandler(AbstractHandler): mqtt_clean_session=get_envvar("MQTT_CLEAN_SESSION", cast_to=bool), ) self.tomcat_proxy_url = get_envvar("TOMCAT_PROXY_URL") - - def act(self, content: dict, message: MQTTMessage): - thing = Thing.get_instance(content) - thing.setup_frost(self.tomcat_proxy_url) + self.configdb_dsn = get_envvar("CONFIGDB_DSN") + + def act(self, content: MqttPayload.ConfigDBUpdate, message: MQTTMessage): + thing = Thing.from_uuid(content["thing"], dsn=self.configdb_dsn) + frost.write_context_file( + schema=thing.database.schema, + user=f"sta_{thing.database.ro_username.lower()}", + password=thing.database.ro_password, + db_url=thing.database.ro_url, + tomcat_proxy_url=self.tomcat_proxy_url, + ) if __name__ == "__main__": diff --git a/src/setup_grafana_dashboard.py b/src/setup_grafana_dashboard.py index b3df9be5f35aeb2603cf41bbc39b7efdc401b0c3..1b447e75f85c9f6b7d473945f141945596c9802f 100755 --- a/src/setup_grafana_dashboard.py +++ b/src/setup_grafana_dashboard.py @@ -5,9 +5,10 @@ from grafana_client import GrafanaApi from grafana_client.client import GrafanaException from timeio.mqtt import AbstractHandler, MQTTMessage -from timeio.thing import Thing +from timeio.feta import Thing from timeio.common import get_envvar, setup_logging from timeio.crypto import decrypt, get_crypt_key +from timeio.typehints import MqttPayload logger = logging.getLogger("grafana-dashboard-setup") @@ -33,9 +34,10 @@ class CreateThingInGrafanaHandler(AbstractHandler): ) # needed when defining new datasource self.sslmode = get_envvar("GRAFANA_DEFAULT_DATASOURCE_SSLMODE") + self.configdb_dsn = get_envvar("CONFIGDB_DSN") - def act(self, content: dict, message: MQTTMessage): - thing = Thing.get_instance(content) + def act(self, content: MqttPayload.ConfigDBUpdate, message: MQTTMessage): + thing = Thing.from_uuid(content["thing"], dsn=self.configdb_dsn) self.create_organization(thing) # create datasource, folder, dashboard in project org diff --git a/src/setup_minio.py b/src/setup_minio.py index f13c338bc1a62fcc408b978890f358c40bd1bbe8..6ef3d565b4804ca28d7b014433aeda9f2d6a78c2 100755 --- a/src/setup_minio.py +++ b/src/setup_minio.py @@ -5,9 +5,10 @@ import logging from minio_cli_wrapper.mc import Mc from timeio.mqtt import AbstractHandler, MQTTMessage -from timeio.thing import Thing +from timeio.feta import Thing from timeio.common import get_envvar, setup_logging from timeio.crypto import decrypt, get_crypt_key +from timeio.typehints import MqttPayload logger = logging.getLogger("minio-setup") @@ -30,9 +31,10 @@ class CreateThingInMinioHandler(AbstractHandler): secret_key=get_envvar("MINIO_SECURE_KEY"), secure=get_envvar("MINIO_SECURE", default=True, cast_to=bool), ) + self.configdb_dsn = get_envvar("CONFIGDB_DSN") - def act(self, content: dict, message: MQTTMessage): - thing = Thing.get_instance(content) + def act(self, content: MqttPayload.ConfigDBUpdate, message: MQTTMessage): + thing = Thing.from_uuid(content["thing"], dsn=self.configdb_dsn) user = thing.raw_data_storage.username passw = decrypt(thing.raw_data_storage.password, get_crypt_key()) bucket = thing.raw_data_storage.bucket_name diff --git a/src/setup_mqtt_user.py b/src/setup_mqtt_user.py index 113f8088a3492f8d0dbb957656d5cc90e064d482..a05f7ebbc66d98738287e37335cbd05359c7262d 100755 --- a/src/setup_mqtt_user.py +++ b/src/setup_mqtt_user.py @@ -6,9 +6,10 @@ import logging import psycopg2 from timeio.mqtt import AbstractHandler, MQTTMessage -from timeio.thing import Thing +from timeio.feta import Thing from timeio.common import get_envvar, setup_logging from timeio.journaling import Journal +from timeio.typehints import MqttPayload logger = logging.getLogger("mqtt-user-setup") journal = Journal("System") @@ -26,33 +27,30 @@ class CreateMqttUserHandler(AbstractHandler): mqtt_clean_session=get_envvar("MQTT_CLEAN_SESSION", cast_to=bool), ) self.db = psycopg2.connect(get_envvar("DATABASE_URL")) + self.configdb_dsn = get_envvar("CONFIGDB_DSN") - def act(self, content: dict, message: MQTTMessage): - thing = Thing.get_instance(content) - if content["mqtt_authentication_credentials"]: - user = content["mqtt_authentication_credentials"]["username"] - pw = content["mqtt_authentication_credentials"]["password_hash"] + def act(self, content: MqttPayload.ConfigDBUpdate, message: MQTTMessage): + thing = Thing.from_uuid(content["thing"], dsn=self.configdb_dsn) + user = thing.mqtt.user + pw = thing.mqtt.password_hashed - logger.info(f"create user. {user=}") - created = self.create_user(thing, user, pw) - action = "Created" if created else "Updated" - journal.info(f"{action} MQTT user {user}", thing.uuid) - else: - logger.warning(f"no 'mqtt_authentication_credentials' present") + logger.info(f"create user. {user=}") + created = self.create_user(thing, user, pw) + action = "Created" if created else "Updated" + journal.info(f"{action} MQTT user {user}", thing.uuid) def create_user(self, thing, user, pw) -> bool: """Returns True for insert and False for update""" sql = ( "INSERT INTO mqtt_auth.mqtt_user (project_uuid, thing_uuid, username, " - "password, description,properties, db_schema) " - "VALUES (%s, %s, %s, %s ,%s ,%s, %s) " + "password, description, db_schema) " + "VALUES (%s, %s, %s, %s ,%s ,%s) " "ON CONFLICT (thing_uuid) " "DO UPDATE SET" " project_uuid = EXCLUDED.project_uuid," " username = EXCLUDED.username," " password=EXCLUDED.password," " description = EXCLUDED.description," - " properties = EXCLUDED.properties," " db_schema = EXCLUDED.db_schema " "RETURNING (xmax = 0)" ) @@ -66,7 +64,6 @@ class CreateMqttUserHandler(AbstractHandler): user, pw, thing.description, - json.dumps(thing.properties), thing.database.username, ), ) diff --git a/src/setup_user_database.py b/src/setup_user_database.py index 744c11d24d59378390a6e579a51cea087c3ee8a7..c025f366a73f6912e2534859bcc1ab83bb10a6c9 100755 --- a/src/setup_user_database.py +++ b/src/setup_user_database.py @@ -9,10 +9,11 @@ import psycopg from timeio.mqtt import AbstractHandler, MQTTMessage from timeio.databases import ReentrantConnection -from timeio.thing import Thing +from timeio.feta import Thing from timeio.common import get_envvar, setup_logging from timeio.journaling import Journal from timeio.crypto import decrypt, get_crypt_key +from timeio.typehints import MqttPayload logger = logging.getLogger("db-setup") journal = Journal("System") @@ -31,10 +32,11 @@ class CreateThingInPostgresHandler(AbstractHandler): ) self.db_conn = ReentrantConnection(get_envvar("DATABASE_URL")) self.db = self.db_conn.connect() + self.configdb_dsn = get_envvar("CONFIGDB_DSN") def act(self, content: dict, message: MQTTMessage): self.db = self.db_conn.reconnect() - thing = Thing.get_instance(content) + thing = Thing.from_uuid(content["thing"], dsn=self.configdb_dsn) logger.info(f"start processing. {thing.name=}, {thing.uuid=}") STA_PREFIX = "sta_" GRF_PREFIX = "grf_"