Skip to content
Snippets Groups Projects
Commit 9343e5c2 authored by David Schäfer's avatar David Schäfer
Browse files

switched to feta

parent 3c11b99d
No related branches found
No related tags found
1 merge request!286switched to feta
Pipeline #496344 passed
......@@ -9,11 +9,13 @@ import warnings
from minio import Minio
from minio.commonconfig import Tags
from timeio.mqtt import AbstractHandler, MQTTMessage
from timeio.common import get_envvar, setup_logging
from timeio.databases import DBapi
from timeio.errors import UserInputError, ParsingError
from timeio.feta import Thing
from timeio.journaling import Journal
from timeio.databases import DBapi, ConfigDB
from timeio.mqtt import AbstractHandler, MQTTMessage
from timeio.parser import get_parser
_FILE_MAX_SIZE = 256 * 1024 * 1024
......@@ -41,7 +43,7 @@ class ParserJobHandler(AbstractHandler):
)
self.pub_topic = get_envvar("TOPIC_DATA_PARSED")
self.dbapi = DBapi(get_envvar("DB_API_BASE_URL"))
self.confdb = ConfigDB(get_envvar("CONFIGDB_DSN"))
self.configdb_dsn = get_envvar("CONFIGDB_DSN")
def act(self, content: dict, message: MQTTMessage):
......@@ -52,8 +54,10 @@ class ParserJobHandler(AbstractHandler):
# Directories are part of the filename
# eg: foo/bar/file.ext -> bucket: foo, file: bar/file.ext
bucket_name, filename = content["Key"].split("/", maxsplit=1)
thing_uuid = self.confdb.get_thing_uuid("bucket", bucket_name)
pattern = self.confdb.get_s3_store(thing_uuid)["filename_pattern"]
thing = Thing.from_s3_bucket_name(bucket_name, dsn=self.configdb_dsn)
thing_uuid = thing.uuid
pattern = thing.s3_store.filename_pattern
if not fnmatch.fnmatch(filename, pattern):
logger.debug(f"{filename} is excluded by filename_pattern {pattern!r}")
......@@ -62,12 +66,14 @@ class ParserJobHandler(AbstractHandler):
source_uri = f"{bucket_name}/{filename}"
logger.debug(f"loading parser for {thing_uuid}")
parser = self.confdb.get_parser(thing_uuid)
pobj = thing.s3_store.file_parser
parser = get_parser(pobj.file_parser_type.name, pobj.params)
logger.debug(f"reading raw data file {source_uri}")
rawdata = self.read_file(bucket_name, filename)
logger.info(f"parsing rawdata ... ")
logger.info("parsing rawdata ... ")
file = source_uri
with warnings.catch_warnings() as w:
try:
......@@ -84,7 +90,7 @@ class ParserJobHandler(AbstractHandler):
if w:
journal.warning(w[0].message, thing_uuid)
logger.debug(f"storing observations to database ...")
logger.debug("storing observations to database ...")
try:
self.dbapi.upsert_observations(thing_uuid, obs)
except Exception as e:
......
......@@ -580,7 +580,7 @@ class Thing(Base, FromNameMixin, FromUUIDMixin):
"""
query = (
f"select t.* from {_cfgdb}.thing t join s3_store s3 on "
"t.s3_store_id = s3.id where s3.bucket = %s",
"t.s3_store_id = s3.id where s3.bucket = %s"
)
conn = cls._get_connection(dsn, **kwargs)
if not (res := cls._fetchall(conn, query, bucket_name)):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment