Skip to content
Snippets Groups Projects
Commit 8504de88 authored by Bert Palm's avatar Bert Palm :bug:
Browse files

removed obsolete ConfigDB class from databases.py, moved mqtt_ingest to feta

parent 92ae4493
No related branches found
No related tags found
1 merge request!289some generic fixes and cleanup
......@@ -12,7 +12,9 @@ from timeio.mqtt import AbstractHandler
from timeio.common import get_envvar, setup_logging
from timeio.errors import UserInputError
from timeio.journaling import Journal
from timeio.databases import ConfigDB, DBapi
from timeio.databases import DBapi
from timeio.feta import Thing
from timeio.parser import get_parser
logger = logging.getLogger("mqtt-ingest")
journal = Journal("Parser")
......@@ -30,7 +32,7 @@ class ParseMqttDataHandler(AbstractHandler):
mqtt_clean_session=get_envvar("MQTT_CLEAN_SESSION", cast_to=bool),
)
self.confdb = ConfigDB(get_envvar("CONFIGDB_DSN"))
self.configdb_dsn = get_envvar("CONFIGDB_DSN")
self.dbapi = DBapi(get_envvar("DB_API_BASE_URL"))
self.pub_topic = get_envvar("TOPIC_DATA_PARSED")
......@@ -39,10 +41,12 @@ class ParseMqttDataHandler(AbstractHandler):
logger.info(f"get thing")
mqtt_user = message.topic.split("/")[1]
thing_uuid = self.confdb.get_thing_uuid("mqtt_user", mqtt_user)
thing = Thing.from_mqtt_user_name(mqtt_user)
thing_uuid = thing.uuid
logger.info(f"get parser")
parser = self.confdb.get_mqtt_parser(thing_uuid)
p = thing.s3_store.file_parser
parser = get_parser(p.file_parser_type, p.params)
logger.info(f"parsing rawdata")
try:
......
......@@ -42,63 +42,6 @@ class Database:
raise ConnectionError(f"Ping to {self.name} failed. ({self.info})") from e
class ConfigDB(Database):
name = "configDB"
def get_parser(self, thing_uuid) -> parser.FileParser:
"""Returns parser-type-name and parser-parameter"""
query = (
"select fpt.name, fp.params from thing t "
"join s3_store s3 on t.s3_store_id = s3.id "
"join file_parser fp on s3.file_parser_id = fp.id "
"join file_parser_type fpt on fp.file_parser_type_id = fpt.id "
"where t.uuid = %s"
)
with self.connection() as conn:
p_type, p_params = conn.execute(query, [thing_uuid]).fetchone() # noqa
return parser.get_parser(p_type, p_params)
def get_mqtt_parser(self, thing_uuid) -> parser.MqttDataParser:
query = (
"select mdt.name from thing t join mqtt m on t.mqtt_id = m.id "
"join mqtt_device_type mdt on m.mqtt_device_type_id = mdt.id "
"where t.uuid = %s"
)
with self.connection() as conn:
dev_type = conn.execute(query, [thing_uuid]).fetchone() # noqa
return parser.get_parser(dev_type, None)
def get_thing_uuid(self, by: Literal["bucket", "mqtt_user"], value) -> str | None:
# fmt: off
by_map = {
"bucket": "select t.uuid from thing t join s3_store s3 on "
"t.s3_store_id = s3.id where s3.bucket = %s",
"mqtt_user": 'select t.uuid from mqtt m join thing t on '
'm.id = t.mqtt_id where m."user" = %s',
}
# fmt: on
logging.debug(f"get thing uuid for {by}={value}")
if query := by_map.get(by):
with self.connection() as conn:
res = conn.execute(query, [value]).fetchone()
if res is None:
raise DataNotFoundError(f"No thing for {by}: {value}")
uuid = res[0]
logging.debug(f"got thing {uuid}")
return uuid
raise ValueError("Argument 'by' must be one of 'bucket' or 'mqtt_user'")
def get_s3_store(self, thing_uuid):
query = (
"select s3s.* from config_db.s3_store s3s join "
"thing t on s3s.id = t.s3_store_id where t.uuid = %s"
)
with self.connection() as conn:
with conn.cursor(row_factory=dict_row) as cur:
return cur.execute(query, [thing_uuid]).fetchone()
class DBapi:
def __init__(self, base_url):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment