Skip to content
Snippets Groups Projects
Commit 0edbf9fa authored by Bert Palm's avatar Bert Palm :bug:
Browse files

moved code from dispatcherMR and configMR here

parent 3cdb56ec
No related branches found
No related tags found
1 merge request!237multiple parser per thing in configdb
Pipeline #490132 passed
......@@ -4,16 +4,19 @@ import fnmatch
import json
import logging
from datetime import datetime
from typing import Tuple, cast
import warnings
import pandas as pd
from minio import Minio
from minio.commonconfig import Tags
from timeio.databases import DBapi, ConfigDB
from timeio.mqtt import AbstractHandler, MQTTMessage
from timeio.common import get_envvar, setup_logging
from timeio.errors import UserInputError, ParsingError
from timeio.journaling import Journal
from timeio.databases import DBapi, ConfigDB
_FILE_MAX_SIZE = 256 * 1024 * 1024
......@@ -60,26 +63,23 @@ class ParserJobHandler(AbstractHandler):
return
source_uri = f"{bucket_name}/{filename}"
logger.debug(f"loading parser for {thing_uuid}")
parser = self.confdb.get_parser(thing_uuid)
logger.debug(f"reading raw data file {source_uri}")
rawdata = self.read_file(bucket_name, filename)
rawdata, file_date = self.read_file(bucket_name, filename)
parser = self.confdb.get_parser(thing_uuid, file_date)
logger.info(f"parsing rawdata ... ")
file = source_uri
with warnings.catch_warnings() as w:
try:
df = parser.do_parse(rawdata)
obs = parser.to_observations(df, source_uri)
except ParsingError as e:
journal.error(
f"Parsing failed. Detail: {e}. File: {file!r}", thing_uuid
f"Parsing failed. Detail: {e}. File: {source_uri!r}", thing_uuid
)
raise e
except Exception as e:
journal.error(f"Parsing failed for file {file!r}", thing_uuid)
journal.error(f"Parsing failed for file {source_uri!r}", thing_uuid)
raise UserInputError("Parsing failed") from e
if w:
journal.warning(w[0].message, thing_uuid)
......@@ -91,13 +91,13 @@ class ParserJobHandler(AbstractHandler):
# Tell the user that his parsing was successful
journal.error(
f"Parsing was successful, but storing data "
f"in database failed. File: {file!r}",
f"in database failed. File: {source_uri!r}",
thing_uuid,
)
raise e
# Now everything is fine and we tell the user
journal.info(f"Parsed file {file}", thing_uuid)
journal.info(f"Parsed file {source_uri}", thing_uuid)
object_tags = Tags.new_object_tags()
object_tags["parsed_at"] = datetime.now().isoformat()
......@@ -114,7 +114,7 @@ class ParserJobHandler(AbstractHandler):
"s3:ObjectCreated:CompleteMultipartUpload",
)
def read_file(self, bucket_name, object_name) -> str:
def read_file(self, bucket_name, object_name) -> Tuple[str, pd.Timestamp]:
stat = self.minio.stat_object(bucket_name, object_name)
if stat.size > _FILE_MAX_SIZE:
raise IOError("Maximum filesize of 256M exceeded")
......@@ -125,7 +125,7 @@ class ParserJobHandler(AbstractHandler):
# remove the ASCII control character ETX (end-of-text)
.rstrip("\x03")
)
return rawdata
return rawdata, cast(pd.Timestamp, pd.Timestamp(stat.last_modified, unit="s"))
if __name__ == "__main__":
......
......@@ -5,10 +5,13 @@ import json
import logging
from typing import Any, Literal, Sequence, cast
import datetime
from psycopg import Connection, sql
from psycopg.rows import dict_row
from psycopg.types.json import Jsonb
from timeio.common import get_envvar, get_envvar_as_bool
from timeio.common import get_envvar_as_bool, get_envvar
from timeio.types import MqttPayload
logger = logging.getLogger("configdb-updater")
......@@ -23,16 +26,18 @@ if get_envvar_as_bool("DEBUG_SQL"):
_IDS_BY_UUID_QUERY = sql.SQL(
"""\
SELECT t.id as thing_id, t.project_id, p.database_id, t.ingest_type_id, t.s3_store_id,
s3s.file_parser_id, fp.file_parser_type_id, t.mqtt_id, m.mqtt_device_type_id,
t.ext_sftp_id, t.ext_api_id, ea.api_type_id
FROM config_db.thing t
LEFT JOIN config_db.project p ON t.project_id = p.id
LEFT JOIN config_db.s3_store s3s ON t.s3_store_id = s3s.id
LEFT JOIN config_db.file_parser fp ON s3s.file_parser_id = fp.id
LEFT JOIN config_db.mqtt m ON t.mqtt_id = m.id
LEFT JOIN config_db.ext_api ea ON t.ext_api_id = ea.id
WHERE t.uuid = %s
SELECT t.id as thing_id, t.project_id, p.database_id, t.ingest_type_id, t.s3_store_id,
tp.file_parser_id, fp.file_parser_type_id, t.mqtt_id, m.mqtt_device_type_id,
t.ext_sftp_id, t.ext_api_id, ea.api_type_id
FROM config_db.thing t
LEFT JOIN config_db.project p ON t.project_id = p.id
LEFT JOIN config_db.thing_parser tp ON t.id = tp.thing_id
LEFT JOIN config_db.s3_store s3s ON t.s3_store_id = s3s.id
LEFT JOIN config_db.file_parser fp ON tp.file_parser_id = fp.id
LEFT JOIN config_db.mqtt m ON t.mqtt_id = m.id
LEFT JOIN config_db.ext_api ea ON t.ext_api_id = ea.id
WHERE tp.valid_to is null
AND t.uuid = %s;
"""
)
_no_ids = {
......@@ -274,9 +279,7 @@ def upsert_table_project(
return id_
def upsert_table_s3_store(
conn: Connection, values: dict, parser_id: int, s3_id: int | None
) -> int:
def upsert_table_s3_store(conn: Connection, values: dict, s3_id: int | None) -> int:
# "raw_data_storage":
# "bucket_name": storage.bucket,
# "username": storage.access_key,
......@@ -286,13 +289,12 @@ def upsert_table_s3_store(
id_ = _upsert(
conn,
table="s3_store",
columns=("user", "password", "bucket", "filename_pattern", "file_parser_id"),
columns=("user", "password", "bucket", "filename_pattern"),
values=(
v.pop("username"),
v.pop("password"),
v.pop("bucket_name"),
v.pop("filename_pattern"),
parser_id,
),
id=s3_id,
)
......@@ -300,6 +302,45 @@ def upsert_table_s3_store(
return id_
def upsert_table_thing_parser(
conn: Connection,
thing_id: int,
file_parser_id: int,
parser_settings: dict[str, Any],
start_date: datetime.datetime | None = None,
end_date: datetime.datetime | None = None,
) -> int:
r = conn.execute(
"""
SELECT tp.id, tp.thing_id, tp.file_parser_id, fp.params
FROM config_db.thing_parser tp
JOIN config_db.file_parser fp ON tp.file_parser_id = fp.id
WHERE tp.valid_to is NULL AND tp.file_parser_id = %s
""",
(file_parser_id,),
).fetchone()
if r is not None and [thing_id, file_parser_id, parser_settings] != r[1:]:
# we actually got a new parser -> update the latest parser
if start_date is None:
raise ValueError("replacement parsers must have a valid start date")
# set the end date of the last valid parser
_upsert(
conn,
table="thing_parser",
columns=("valid_to",),
values=(start_date,),
id=r[0],
)
# insert the new parser
return _upsert(
conn,
table="thing_parser",
columns=("thing_id", "file_parser_id", "valid_from", "valid_to"),
values=(thing_id, file_parser_id, start_date, end_date),
)
def upsert_table_mqtt(conn: Connection, values: dict, mqtt_id: int | None) -> int:
# mqtt_device_type: None or name
# "username": thing.mqtt_username,
......@@ -466,10 +507,10 @@ def store_thing_config(conn: Connection, data: dict):
parser = data["parsers"]["parsers"][idx]
parser_id = upsert_table_file_parser(conn, parser, ids["file_parser_id"])
s3_id = upsert_table_s3_store(
conn, data["raw_data_storage"], parser_id, ids["s3_store_id"]
conn, data["raw_data_storage"], ids["s3_store_id"]
)
upsert_table_thing(
thing_id = upsert_table_thing(
conn,
uuid,
name,
......@@ -482,6 +523,9 @@ def store_thing_config(conn: Connection, data: dict):
ids["thing_id"],
)
if ingest_type in ["sftp", "extsftp"]:
upsert_table_thing_parser(conn, thing_id, parser_id, parser["settings"])
# ===================================================================
# QAQC related stuff
......
......@@ -14,7 +14,7 @@ import requests
from psycopg import Connection, conninfo
from psycopg.rows import dict_row
import timeio.parser as parser
import timeio.parser as parsing
from timeio.errors import DataNotFoundError
......@@ -45,20 +45,27 @@ class Database:
class ConfigDB(Database):
name = "configDB"
def get_parser(self, thing_uuid) -> parser.FileParser:
def get_parser(self, thing_uuid, file_date) -> parsing.Parser:
"""Returns parser-type-name and parser-parameter"""
query = (
"select fpt.name, fp.params from thing t "
"join s3_store s3 on t.s3_store_id = s3.id "
"join file_parser fp on s3.file_parser_id = fp.id "
"join file_parser_type fpt on fp.file_parser_type_id = fpt.id "
"where t.uuid = %s"
)
query = """
SELECT fpt.name, fp.params FROM thing_parser tp
JOIN thing t ON tp.thing_id = t.id
JOIN file_parser fp ON tp.file_parser_id = fp.id
JOIN file_parser_type fpt ON fp.file_parser_type_id = fpt.id
WHERE
t.uuid = %(uuid)s
AND tp.valid_from is NULL AND tp.valid_to is NULL
OR tp.valid_from <= %(date)s AND tp.valid_to is NULL
OR tp.valid_from <= %(date)s AND tp.valid_to > %(date)s;
"""
with self.connection() as conn:
p_type, p_params = conn.execute(query, [thing_uuid]).fetchone() # noqa
return parser.get_parser(p_type, p_params)
p_type, p_params = conn.execute(
query, {"uuid": thing_uuid, "date": str(file_date)}
).fetchone() # noqa
return parsing.get_parser(p_type, p_params)
def get_mqtt_parser(self, thing_uuid) -> parser.MqttDataParser:
def get_mqtt_parser(self, thing_uuid) -> parsing.MqttDataParser:
query = (
"select mdt.name from thing t join mqtt m on t.mqtt_id = m.id "
"join mqtt_device_type mdt on m.mqtt_device_type_id = mdt.id "
......@@ -67,7 +74,7 @@ class ConfigDB(Database):
with self.connection() as conn:
dev_type = conn.execute(query, [thing_uuid]).fetchone() # noqa
return parser.get_parser(dev_type, None)
return parsing.get_parser(dev_type, None)
def get_thing_uuid(self, by: Literal["bucket", "mqtt_user"], value) -> str | None:
# fmt: off
......@@ -89,14 +96,14 @@ class ConfigDB(Database):
return uuid
raise ValueError("Argument 'by' must be one of 'bucket' or 'mqtt_user'")
def get_s3_store(self, thing_uuid):
def get_s3_store(self, thing_uuid) -> dict:
query = (
"select s3s.* from config_db.s3_store s3s join "
"thing t on s3s.id = t.s3_store_id where t.uuid = %s"
)
with self.connection() as conn:
with conn.cursor(row_factory=dict_row) as cur:
return cur.execute(query, [thing_uuid]).fetchone()
return cur.execute(query, [thing_uuid]).fetchone() or {}
class DBapi:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment