diff --git a/src/run_file_ingest.py b/src/run_file_ingest.py index b67d2e6879410885469f544c8356c613be44838d..732b9e241e9cc20345763bd15e9e943fdac92cc2 100755 --- a/src/run_file_ingest.py +++ b/src/run_file_ingest.py @@ -9,11 +9,13 @@ import warnings from minio import Minio from minio.commonconfig import Tags -from timeio.mqtt import AbstractHandler, MQTTMessage from timeio.common import get_envvar, setup_logging +from timeio.databases import DBapi from timeio.errors import UserInputError, ParsingError +from timeio.feta import Thing from timeio.journaling import Journal -from timeio.databases import DBapi, ConfigDB +from timeio.mqtt import AbstractHandler, MQTTMessage +from timeio.parser import get_parser _FILE_MAX_SIZE = 256 * 1024 * 1024 @@ -41,7 +43,7 @@ class ParserJobHandler(AbstractHandler): ) self.pub_topic = get_envvar("TOPIC_DATA_PARSED") self.dbapi = DBapi(get_envvar("DB_API_BASE_URL")) - self.confdb = ConfigDB(get_envvar("CONFIGDB_DSN")) + self.configdb_dsn = get_envvar("CONFIGDB_DSN") def act(self, content: dict, message: MQTTMessage): @@ -52,8 +54,10 @@ class ParserJobHandler(AbstractHandler): # Directories are part of the filename # eg: foo/bar/file.ext -> bucket: foo, file: bar/file.ext bucket_name, filename = content["Key"].split("/", maxsplit=1) - thing_uuid = self.confdb.get_thing_uuid("bucket", bucket_name) - pattern = self.confdb.get_s3_store(thing_uuid)["filename_pattern"] + + thing = Thing.from_s3_bucket_name(bucket_name, dsn=self.configdb_dsn) + thing_uuid = thing.uuid + pattern = thing.s3_store.filename_pattern if not fnmatch.fnmatch(filename, pattern): logger.debug(f"{filename} is excluded by filename_pattern {pattern!r}") @@ -62,12 +66,14 @@ class ParserJobHandler(AbstractHandler): source_uri = f"{bucket_name}/{filename}" logger.debug(f"loading parser for {thing_uuid}") - parser = self.confdb.get_parser(thing_uuid) + + pobj = thing.s3_store.file_parser + parser = get_parser(pobj.file_parser_type.name, pobj.params) logger.debug(f"reading raw data file {source_uri}") rawdata = self.read_file(bucket_name, filename) - logger.info(f"parsing rawdata ... ") + logger.info("parsing rawdata ... ") file = source_uri with warnings.catch_warnings() as w: try: @@ -84,7 +90,7 @@ class ParserJobHandler(AbstractHandler): if w: journal.warning(w[0].message, thing_uuid) - logger.debug(f"storing observations to database ...") + logger.debug("storing observations to database ...") try: self.dbapi.upsert_observations(thing_uuid, obs) except Exception as e: diff --git a/src/timeio/feta.py b/src/timeio/feta.py index 1d31b4d80c78acf6a32e687055f5ab4053c99fbe..c7fd0e31b0be8b1de86d6cf851a9f9f048bfecde 100644 --- a/src/timeio/feta.py +++ b/src/timeio/feta.py @@ -580,7 +580,7 @@ class Thing(Base, FromNameMixin, FromUUIDMixin): """ query = ( f"select t.* from {_cfgdb}.thing t join s3_store s3 on " - "t.s3_store_id = s3.id where s3.bucket = %s", + "t.s3_store_id = s3.id where s3.bucket = %s" ) conn = cls._get_connection(dsn, **kwargs) if not (res := cls._fetchall(conn, query, bucket_name)):