Skip to content
Snippets Groups Projects
Commit 25713b93 authored by Bert Palm's avatar Bert Palm :bug:
Browse files

Merge branch 'forced_QCrun' into 'main'

added ability to process data_parsed version 2 messages (a user forced a qc...

See merge request !272
parents ca8adc99 c942b035
No related branches found
No related tags found
1 merge request!272added ability to process data_parsed version 2 messages (a user forced a qc...
Pipeline #495651 failed
......@@ -13,6 +13,7 @@ from timeio.common import get_envvar, setup_logging
from timeio.errors import DataNotFoundError, UserInputError, NoDataWarning
from timeio.journaling import Journal
from timeio.databases import Database, DBapi
from timeio.types import MqttPayload, check_dict_by_TypedDict as _chkmsg
logger = logging.getLogger("run-quality-control")
journal = Journal("QualityControl")
......@@ -34,43 +35,85 @@ class QcHandler(AbstractHandler):
self.db = Database(get_envvar("DATABASE_DSN"))
self.dbapi = DBapi(get_envvar("DB_API_BASE_URL"))
def _check_data(self, content, keys: list[str]):
for key in keys:
if key not in content:
raise DataNotFoundError(
"mandatory field '{key}' is not present in data"
)
def act(self, content: dict, message: MQTTMessage):
if (thing_uuid := content.get("thing_uuid")) is None:
raise DataNotFoundError(
"mandatory field 'thing_uuid' is not present in data"
)
logger.info(f"Thing {thing_uuid} triggered QAQC service")
thing_uuid = None
if (version := content.get("version", 1)) == 1:
_chkmsg(content, MqttPayload.DataParsedV1, "data-parsed message")
thing_uuid = content["thing_uuid"]
logger.info(f"QC was triggered by data upload to thing. {content=}")
elif version == 2:
_chkmsg(content, MqttPayload.DataParsedV2, "data-parsed message")
logger.info(f"QC was triggered by user (in frontend). {content=}")
else:
raise NotImplementedError(f"data_parsed payload version {version}")
self.dbapi.ping_dbapi()
with self.db.connection() as conn:
logger.info("successfully connected to configdb")
if version == 1:
content: MqttPayload.DataParsedV1
qc = QualityControl.from_thing(
conn,
self.dbapi.base_url,
uuid=thing_uuid,
)
else:
content: MqttPayload.DataParsedV2
qc = QualityControl.from_project(
conn,
self.dbapi.base_url,
uuid=content["project_uuid"],
config_name=content["qc_settings_name"],
start_date=content["start_date"],
end_date=content["end_date"],
)
try:
qaqc = QualityControl(conn, self.dbapi.base_url, thing_uuid)
except NoDataWarning as w:
# TODO: uncomment if QC is production-ready
# journal.warning(str(w), thing_uuid)
raise w
try:
some = qaqc.qacq_for_thing()
if qc.legacy:
# A legacy workflow should only be possible with v1
# and must deliver a thing_uuid (because it works on
# a single thing).
assert version == 1 and thing_uuid is not None
some = qc.run_legacy(thing_uuid)
else:
start_date: str | None = (None,)
end_date: str | None = (None,)
some = qc.run()
except UserInputError as e:
journal.error(str(e), thing_uuid)
if thing_uuid is not None:
journal.error(str(e), thing_uuid)
raise e
except NoDataWarning as w:
journal.warning(str(w), thing_uuid)
if thing_uuid is not None:
journal.warning(str(w), thing_uuid)
raise w
if some:
journal.info(f"QC done. Config: {qaqc.conf['name']}", thing_uuid)
else:
journal.warning(
f"QC done, but no quality labels were generated. "
f"Config: {qaqc.conf['name']}",
thing_uuid,
)
return
if thing_uuid is not None:
if some:
journal.info(f"QC done. Config: {qc.conf['name']}", thing_uuid)
else:
journal.warning(
f"QC done, but no quality labels were generated. "
f"Config: {qc.conf['name']}",
thing_uuid,
)
return
logger.debug(f"inform downstream services about success of qc.")
payload = json.dumps({"thing": thing_uuid})
payload = json.dumps(
{
"version": 1,
"project_uuid": qc.proj["uuid"],
"thing_uuid": thing_uuid, # None allowed
}
)
self.mqtt_client.publish(
topic=self.publish_topic, payload=payload, qos=self.publish_qos
)
......
......@@ -10,7 +10,7 @@ import subprocess
import sys
import typing
import warnings
from typing import Any, Hashable, Literal, TypedDict, cast
from typing import Any, Hashable, Literal, cast
import pandas as pd
import requests
......@@ -95,17 +95,17 @@ def dict_update_to_list(dict_: dict, key: Hashable, value: Any) -> None:
dict_[key] = [dict_[key], value]
def check_keys_by_TypedDict(value: dict, expected: type[typing.TypedDict], name: str):
missing = expected.__required_keys__ - value.keys()
if missing:
raise KeyError(f"{', '.join(missing)} are a mandatory keys for {name!r}")
def ping_dbapi(base_url):
r = requests.get(f"{base_url}/health")
r.raise_for_status()
def timestamp_from_string(ts: str) -> pd.Timestamp | None:
if ts is None:
return None
return pd.Timestamp(ts)
class KwargsScheme(saqc.core.core.FloatScheme):
@staticmethod
......@@ -165,7 +165,6 @@ class KwargsScheme(saqc.core.core.FloatScheme):
class QualityControl:
conn: Connection
api_url: str
thing: TypedDict("ThingT", {"id": int, "name": str, "uuid": str})
proj: ConfDB.ProjectT
schema: str
conf: ConfDB.QaqcT
......@@ -173,15 +172,18 @@ class QualityControl:
window: pd.Timedelta | int
legacy: bool
def __init__(self, conn: Connection, dbapi_url: str, thing_uuid: str):
def __init__(
self,
conn: Connection,
dbapi_url: str,
project_uuid: str,
qc_id: str,
):
self.conn: Connection = conn
self.api_url = dbapi_url
self.schema = self.fetch_schema(thing_uuid)
self.proj = self.fetch_project(thing_uuid)
self.thing = self.fetch_thing(thing_uuid)
if not self.thing:
raise DataNotFoundError(f"A thing with UUID {thing_uuid} does not exist")
self.conf = self.fetch_qaqc_config(thing_uuid)
self.schema = self.fetch_schema(project_uuid)
self.proj = self.fetch_project(project_uuid)
self.conf = self.fetch_qc_config(qc_id)
if not self.conf:
raise NoDataWarning(
f"No qaqc config present in project {self.proj['name']}"
......@@ -190,6 +192,47 @@ class QualityControl:
self.window = self.parse_ctx_window(self.conf["context_window"])
self.legacy = any(map(lambda t: t.get("position") is not None, self.tests))
@classmethod
def from_project(
cls,
conn: Connection,
dbapi_url: str,
uuid: str,
config_name: str | None = None,
start_date: str | None = None,
end_date: str | None = None,
):
q = (
"SELECT q.id FROM config_db.qaqc q "
"JOIN config_db.project p ON q.project_id = p.id "
"WHERE p.uuid::text = %s "
)
if config_name is None:
# Normally only one configuration should have the default
# flag set, but if multiple configurations have it set,
# we use the last updated (ORDER BY).
q += "AND q.default = true ORDER BY q.id DESC "
params = [uuid]
else:
q += "AND q.name = %s ORDER BY q.id DESC "
params = [uuid, config_name]
with conn.cursor() as cur:
qc_id = cur.execute(cast(Literal, q), params).fetchone()
if qc_id is None and config_name is not None:
raise DataNotFoundError(f"No QC-Settings with name {config_name}")
return cls(conn, dbapi_url, uuid, qc_id[0], start_date, end_date)
@classmethod
def from_thing(cls, conn: Connection, dbapi_url: str, uuid: str):
q = (
"SELECT p.uuid FROM config_db.project p "
"JOIN config_db.thing t ON p.id = t.project_id "
"WHERE t.uuid::text = %s"
)
with conn.cursor() as cur:
proj_uuid = cur.execute(cast(Literal, q), [uuid]).fetchone()
return cls.from_project(conn, dbapi_url, proj_uuid[0], config_name=None)
@staticmethod
def extract_data_by_result_type(df: pd.DataFrame) -> pd.Series:
"""Selects the column, specified as integer in the column 'result_type'."""
......@@ -242,53 +285,34 @@ class QualityControl:
t, *ds = name.split("S", maxsplit=1)
return (int(t), int(ds[0])) if ds else (None, None)
def fetch_qaqc_config(self, thing_uuid) -> ConfDB.QaqcT | None:
# Normally only one configuration should have the default
# flag set, but if multiple configurations have it set,
# we use the last updated (ORDER BY).
q = (
"SELECT q.* FROM config_db.qaqc q "
"JOIN config_db.project p ON q.project_id = p.id "
"JOIN config_db.thing t ON p.id = t.project_id "
"WHERE t.uuid = %s "
"AND q.default = true "
"ORDER BY q.id DESC "
)
def fetch_qc_config(self, qc_id) -> ConfDB.QaqcT | None:
q = "SELECT q.* FROM config_db.qaqc q WHERE q.id = %s "
with self.conn.cursor(row_factory=dict_row) as cur:
return cur.execute(cast(Literal, q), [thing_uuid]).fetchone()
return cur.execute(cast(Literal, q), [qc_id]).fetchone()
def fetch_qaqc_tests(self, qaqc_id: int) -> list[ConfDB.QaqcTestT]:
q = "SELECT * FROM config_db.qaqc_test qt WHERE qt.qaqc_id = %s"
with self.conn.cursor(row_factory=dict_row) as cur:
return cur.execute(cast(Literal, q), [qaqc_id]).fetchall()
def fetch_project(self, thing_uuid: str) -> ConfDB.ProjectT:
def fetch_project(self, project_uuid: str) -> ConfDB.ProjectT:
"""Returns project UUID and project name for a given thing."""
q = (
"SELECT p.* FROM config_db.project p "
"JOIN config_db.thing t ON p.id = t.project_id "
"WHERE t.uuid = %s"
)
with self.conn.cursor(row_factory=dict_row) as cur:
return cur.execute(cast(Literal, q), [thing_uuid]).fetchone()
def fetch_schema(self, thing_uuid) -> str:
return self.conn.execute(
"SELECT schema FROM public.schema_thing_mapping WHERE thing_uuid = %s",
[thing_uuid],
).fetchone()[0]
def fetch_thing(self, thing_uuid: str):
q = sql.SQL(
'select "id", "name", "uuid" from {schema}.thing where "uuid" = %s'
).format(schema=sql.Identifier(self.schema))
q = "SELECT p.* FROM config_db.project p WHERE p.uuid:text = %s"
with self.conn.cursor(row_factory=dict_row) as cur:
return cur.execute(q, [thing_uuid]).fetchone()
return cur.execute(cast(Literal, q), [project_uuid]).fetchone()
def fetch_schema(self, proj_uuid) -> str:
query = (
"select d.schema from config_db.database d "
"join config_db.project p on d.id = p.database_id "
"where p.uuid::text = %s"
)
return self.conn.execute(cast(Literal, query), [proj_uuid]).fetchone()[0]
def fetch_thing_uuid_for_sta_stream(self, sta_stream_id: int):
q = (
"select thing_id as thing_uuid from public.datastream_link where "
"device_property_id = %s"
"select thing_id as thing_uuid from public.datastream_link "
"where device_property_id = %s"
)
row = self.conn.execute(cast(Literal, q), [sta_stream_id]).fetchone()
return row and row[0]
......@@ -479,7 +503,7 @@ class QualityControl:
return None, None
return r[0][0], r[1][0]
def qaqc_legacy(self) -> tuple[saqc.SaQC, dict[str, pd.DataFrame]]:
def qaqc_legacy(self, thing_uuid) -> tuple[saqc.SaQC, dict[str, pd.DataFrame]]:
"""
Returns a tuple of data in saqc.SaQC and a metadata dict.
......@@ -494,7 +518,7 @@ class QualityControl:
"""
def fetch_data(pos) -> tuple[pd.Series, pd.DataFrame]:
ds = self.fetch_datastream_by_pos(self.thing["uuid"], pos)
ds = self.fetch_datastream_by_pos(thing_uuid, pos)
earlier, later = self.fetch_unflagged_daterange_legacy(ds["id"])
obs = None
if earlier is not None: # else no unflagged data
......@@ -514,7 +538,7 @@ class QualityControl:
meta.attrs = {
"repr_name": f"Datastream {ds['name']} of Thing {self.thing['name']}"
}
meta["thing_uuid"] = self.thing["uuid"]
meta["thing_uuid"] = thing_uuid
meta["datastream_id"] = ds["id"]
return data, meta
......@@ -554,13 +578,21 @@ class QualityControl:
return qc, md
def qaqc_sta(self) -> tuple[saqc.SaQC, dict[str, pd.DataFrame]]:
def qaqc_sta(
self,
start_date: pd.Timestamp | None,
end_date: pd.Timestamp | None,
) -> tuple[saqc.SaQC, dict[str, pd.DataFrame]]:
def fetch_sta_data(thing_id: int, stream_id):
if start_date is None:
earlier, later = self.fetch_unflagged_daterange_sta(stream_id)
else:
earlier, later = start_date, end_date
thing_uuid = self.fetch_thing_uuid_for_sta_stream(stream_id)
earlier, later = self.fetch_unflagged_daterange_sta(stream_id)
obs = None
if earlier is not None: # else no unflagged data
if earlier is not None: # else: we have no unflagged data
obs = self.fetch_datastream_data_sta(
stream_id, earlier, later, self.window
)
......@@ -618,32 +650,37 @@ class QualityControl:
return qc, md
def qacq_for_thing(self):
def run(self, start_date: str | None = None, end_date: str | None = None):
"""
Run QA/QC on data in the Observation-DB.
Returns the number of observation that was updated and/or created.
Run QA/QC on data in the Observation-DB and returns
the number of observation that was updated.
"""
logger.info(f"Execute qaqc config {self.conf['name']!r}")
if not self.tests:
raise NoDataWarning(
f"No quality functions present in config {self.conf['name']!r}",
)
start_date = timestamp_from_string(start_date)
end_date = timestamp_from_string(end_date)
qc, meta = self.qaqc_sta(start_date, end_date)
m = self._upload(qc, meta)
return m
if self.legacy:
qc, meta = self.qaqc_legacy()
else:
qc, meta = self.qaqc_sta()
def run_legacy(self, thing_uuid):
"""
Run QA/QC on data in the Observation-DB for a single Thing.
Returns the number of observation that was updated and/or created.
"""
logger.info(f"Execute qaqc config {self.conf['name']!r}")
qc, meta = self.qaqc_legacy(thing_uuid)
# ============= legacy dataproducts =============
# Data products must be created before quality labels are uploaded.
# If we first do the upload and an error occur we will not be able to
# recreate the same data for the dataproducts, this is because a second
# run ignores already flagged data.
# recreate the same data for the dataproducts, this is because a
# legacy qc run always just sees unflagged data.
n = 0
dp_columns = [c for c in qc.columns if c not in meta.keys()]
if self.legacy and dp_columns:
n += self._create_dataproducts_legacy(qc[dp_columns])
if dp_columns := [c for c in qc.columns if c not in meta.keys()]:
n += self._create_dataproducts_legacy(thing_uuid, qc[dp_columns])
m = self._upload(qc, meta)
return n + m
......@@ -784,7 +821,7 @@ class QualityControl:
valid = df["data"].notna()
return df[valid].apply(compose_json, axis=1).dropna().to_list()
def _create_dataproducts_legacy(self, qc):
def _create_dataproducts_legacy(self, thing_uuid, qc):
total = 0
flags, data = qc.flags, qc.data # implicit flags translation ->KwargsScheme
for name in flags.columns:
......@@ -794,7 +831,7 @@ class QualityControl:
logger.debug(f"no data for data product {name}")
continue
obs = self._create_dataproduct_legacy(df, name, self.conf["id"])
self._upload_dataproduct(self.thing["uuid"], obs)
self._upload_dataproduct(thing_uuid, obs)
logger.info(f"uploaded {len(obs)} data points for dataproduct {name!r}")
total += len(obs)
continue
......
......@@ -54,6 +54,17 @@ class MqttPayload:
thing_sta_id: int | None
sta_stream_id: int | None
class DataParsedV1(_t.TypedDict):
version: _t.Literal[1] | None
thing_uuid: str
class DataParsedV2(_t.TypedDict):
version: _t.Literal[2]
project_uuid: str
qc_settings_name: str
start_date: str
end_date: str
class ConfDB:
......@@ -138,6 +149,7 @@ class ConfDB:
arg_name: str
sta_thing_id: int | None
sta_stream_id: int | None
alias: str
class S3_StoreT(_t.TypedDict):
id: int
......@@ -157,3 +169,9 @@ class ConfDB:
mqtt_id: int
ext_sftp_id: int | None
ext_api_id: int | None
def check_dict_by_TypedDict(value: dict, expected: type[_t.TypedDict], name: str):
missing = expected.__required_keys__ - value.keys()
if missing:
raise KeyError(f"{', '.join(missing)} are a mandatory keys for {name!r}")
__version__ = "0.7.0"
__version__ = "0.8.0"
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment