diff --git a/src/run_file_ingest.py b/src/run_file_ingest.py index b67d2e6879410885469f544c8356c613be44838d..732b9e241e9cc20345763bd15e9e943fdac92cc2 100755 --- a/src/run_file_ingest.py +++ b/src/run_file_ingest.py @@ -9,11 +9,13 @@ import warnings from minio import Minio from minio.commonconfig import Tags -from timeio.mqtt import AbstractHandler, MQTTMessage from timeio.common import get_envvar, setup_logging +from timeio.databases import DBapi from timeio.errors import UserInputError, ParsingError +from timeio.feta import Thing from timeio.journaling import Journal -from timeio.databases import DBapi, ConfigDB +from timeio.mqtt import AbstractHandler, MQTTMessage +from timeio.parser import get_parser _FILE_MAX_SIZE = 256 * 1024 * 1024 @@ -41,7 +43,7 @@ class ParserJobHandler(AbstractHandler): ) self.pub_topic = get_envvar("TOPIC_DATA_PARSED") self.dbapi = DBapi(get_envvar("DB_API_BASE_URL")) - self.confdb = ConfigDB(get_envvar("CONFIGDB_DSN")) + self.configdb_dsn = get_envvar("CONFIGDB_DSN") def act(self, content: dict, message: MQTTMessage): @@ -52,8 +54,10 @@ class ParserJobHandler(AbstractHandler): # Directories are part of the filename # eg: foo/bar/file.ext -> bucket: foo, file: bar/file.ext bucket_name, filename = content["Key"].split("/", maxsplit=1) - thing_uuid = self.confdb.get_thing_uuid("bucket", bucket_name) - pattern = self.confdb.get_s3_store(thing_uuid)["filename_pattern"] + + thing = Thing.from_s3_bucket_name(bucket_name, dsn=self.configdb_dsn) + thing_uuid = thing.uuid + pattern = thing.s3_store.filename_pattern if not fnmatch.fnmatch(filename, pattern): logger.debug(f"{filename} is excluded by filename_pattern {pattern!r}") @@ -62,12 +66,14 @@ class ParserJobHandler(AbstractHandler): source_uri = f"{bucket_name}/{filename}" logger.debug(f"loading parser for {thing_uuid}") - parser = self.confdb.get_parser(thing_uuid) + + pobj = thing.s3_store.file_parser + parser = get_parser(pobj.file_parser_type.name, pobj.params) logger.debug(f"reading raw data file {source_uri}") rawdata = self.read_file(bucket_name, filename) - logger.info(f"parsing rawdata ... ") + logger.info("parsing rawdata ... ") file = source_uri with warnings.catch_warnings() as w: try: @@ -84,7 +90,7 @@ class ParserJobHandler(AbstractHandler): if w: journal.warning(w[0].message, thing_uuid) - logger.debug(f"storing observations to database ...") + logger.debug("storing observations to database ...") try: self.dbapi.upsert_observations(thing_uuid, obs) except Exception as e: