Skip to content
Snippets Groups Projects
Commit 50c36f71 authored by David Schäfer's avatar David Schäfer
Browse files

Merge branch 'feta-parsing' into 'main'

switched to feta

See merge request !286
parents fd69f5cb 9343e5c2
No related branches found
No related tags found
1 merge request!286switched to feta
Pipeline #496489 passed
...@@ -9,11 +9,13 @@ import warnings ...@@ -9,11 +9,13 @@ import warnings
from minio import Minio from minio import Minio
from minio.commonconfig import Tags from minio.commonconfig import Tags
from timeio.mqtt import AbstractHandler, MQTTMessage
from timeio.common import get_envvar, setup_logging from timeio.common import get_envvar, setup_logging
from timeio.databases import DBapi
from timeio.errors import UserInputError, ParsingError from timeio.errors import UserInputError, ParsingError
from timeio.feta import Thing
from timeio.journaling import Journal from timeio.journaling import Journal
from timeio.databases import DBapi, ConfigDB from timeio.mqtt import AbstractHandler, MQTTMessage
from timeio.parser import get_parser
_FILE_MAX_SIZE = 256 * 1024 * 1024 _FILE_MAX_SIZE = 256 * 1024 * 1024
...@@ -41,7 +43,7 @@ class ParserJobHandler(AbstractHandler): ...@@ -41,7 +43,7 @@ class ParserJobHandler(AbstractHandler):
) )
self.pub_topic = get_envvar("TOPIC_DATA_PARSED") self.pub_topic = get_envvar("TOPIC_DATA_PARSED")
self.dbapi = DBapi(get_envvar("DB_API_BASE_URL")) self.dbapi = DBapi(get_envvar("DB_API_BASE_URL"))
self.confdb = ConfigDB(get_envvar("CONFIGDB_DSN")) self.configdb_dsn = get_envvar("CONFIGDB_DSN")
def act(self, content: dict, message: MQTTMessage): def act(self, content: dict, message: MQTTMessage):
...@@ -52,8 +54,10 @@ class ParserJobHandler(AbstractHandler): ...@@ -52,8 +54,10 @@ class ParserJobHandler(AbstractHandler):
# Directories are part of the filename # Directories are part of the filename
# eg: foo/bar/file.ext -> bucket: foo, file: bar/file.ext # eg: foo/bar/file.ext -> bucket: foo, file: bar/file.ext
bucket_name, filename = content["Key"].split("/", maxsplit=1) bucket_name, filename = content["Key"].split("/", maxsplit=1)
thing_uuid = self.confdb.get_thing_uuid("bucket", bucket_name)
pattern = self.confdb.get_s3_store(thing_uuid)["filename_pattern"] thing = Thing.from_s3_bucket_name(bucket_name, dsn=self.configdb_dsn)
thing_uuid = thing.uuid
pattern = thing.s3_store.filename_pattern
if not fnmatch.fnmatch(filename, pattern): if not fnmatch.fnmatch(filename, pattern):
logger.debug(f"{filename} is excluded by filename_pattern {pattern!r}") logger.debug(f"{filename} is excluded by filename_pattern {pattern!r}")
...@@ -62,12 +66,14 @@ class ParserJobHandler(AbstractHandler): ...@@ -62,12 +66,14 @@ class ParserJobHandler(AbstractHandler):
source_uri = f"{bucket_name}/{filename}" source_uri = f"{bucket_name}/{filename}"
logger.debug(f"loading parser for {thing_uuid}") logger.debug(f"loading parser for {thing_uuid}")
parser = self.confdb.get_parser(thing_uuid)
pobj = thing.s3_store.file_parser
parser = get_parser(pobj.file_parser_type.name, pobj.params)
logger.debug(f"reading raw data file {source_uri}") logger.debug(f"reading raw data file {source_uri}")
rawdata = self.read_file(bucket_name, filename) rawdata = self.read_file(bucket_name, filename)
logger.info(f"parsing rawdata ... ") logger.info("parsing rawdata ... ")
file = source_uri file = source_uri
with warnings.catch_warnings() as w: with warnings.catch_warnings() as w:
try: try:
...@@ -84,7 +90,7 @@ class ParserJobHandler(AbstractHandler): ...@@ -84,7 +90,7 @@ class ParserJobHandler(AbstractHandler):
if w: if w:
journal.warning(w[0].message, thing_uuid) journal.warning(w[0].message, thing_uuid)
logger.debug(f"storing observations to database ...") logger.debug("storing observations to database ...")
try: try:
self.dbapi.upsert_observations(thing_uuid, obs) self.dbapi.upsert_observations(thing_uuid, obs)
except Exception as e: except Exception as e:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment