diff --git a/flyway/migrations/V2_10__fix_qaqc_trigger.sql b/flyway/migrations/V2_10__fix_qaqc_trigger.sql new file mode 100644 index 0000000000000000000000000000000000000000..d4a8b4239f6b614301c831f5efa15f557b02571e --- /dev/null +++ b/flyway/migrations/V2_10__fix_qaqc_trigger.sql @@ -0,0 +1,29 @@ +SET search_path TO config_db; + +/* + Define function to set existing active qaqc setup to 'false' + if different/new setup for the same project_id is set to 'true' + */ +CREATE OR REPLACE FUNCTION unique_qaqc_per_project() +RETURNS TRIGGER AS $$ +BEGIN + IF NEW."default" = TRUE THEN + UPDATE config_db.qaqc + SET "default" = FALSE + WHERE project_id = NEW.project_id AND "default" = TRUE AND id <> NEW.id; + END IF; + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +/* + Remove the existing trigger + */ +DROP TRIGGER IF EXISTS enforce_unique_qaqc_setup ON config_db.qaqc; +/* + Create trigger to execute the function before insert/update on qaqc + */ +CREATE TRIGGER enforce_unique_qaqc_setup +BEFORE INSERT OR UPDATE ON config_db.qaqc +FOR EACH ROW +EXECUTE FUNCTION unique_qaqc_per_project(); diff --git a/flyway/migrations/V2_8__configdb_multiple_timestamp_columns.sql b/flyway/migrations/V2_8__configdb_multiple_timestamp_columns.sql new file mode 100644 index 0000000000000000000000000000000000000000..390f448690aa5cd9bb72d455987891c1e19630e1 --- /dev/null +++ b/flyway/migrations/V2_8__configdb_multiple_timestamp_columns.sql @@ -0,0 +1,26 @@ +SET search_path TO config_db; + +CREATE TABLE file_parser_backup AS TABLE file_parser WITH DATA; + +UPDATE file_parser + SET params = jsonb_set( + params, + '{timestamp_columns}', + COALESCE( + params->'timestamp_columns', + jsonb_build_array( + jsonb_build_object( + 'column', (params->>'timestamp_column')::int, + 'format', params->>'timestamp_format' + ) + ) + ) + ) + WHERE params ? 'timestamp_column' + AND params ? 'timestamp_format' + AND NOT params ? 'timestamp_columns'; + +UPDATE file_parser + SET params = params - 'timestamp_column' - 'timestamp_format' + WHERE params ? 'timestamp_column' OR params ? 'timestamp_format'; + diff --git a/src/sql/sta/datastream.sql b/src/sql/sta/datastream.sql index 0bc55ed51885a1a39d07432c0ee838ec0adfcc71..f4fc6d89147446cc0d366a1339977ff47030006e 100644 --- a/src/sql/sta/datastream.sql +++ b/src/sql/sta/datastream.sql @@ -8,7 +8,8 @@ SELECT dsl.device_property_id AS "ID", concat(c.label, ':', d.short_name, ':', - dp.property_name + dp.property_name, ':', + dp.label ) as "NAME", concat(d.short_name, ' ', dp.property_name, ' ', @@ -139,6 +140,6 @@ GROUP BY dsl.device_property_id, c.label, d.short_name, dp.property_name, dma.of dp.unit_name, dp.unit_uri, d.id, dp.id, cv_agg.definition, dp.aggregation_type_uri, cv_u.provenance, cv_u.term, dp.resolution, cv_ur.provenance, dp.resolution_unit_name, dp.resolution_unit_uri, dp.accuracy, cv_ua.provenance, dp.accuracy_unit_name, dp.accuracy_unit_uri, dp.measuring_range_min, dp.measuring_range_max, cv_l.term, cv_l.provenance_uri, cv_l.definition, c.id, dma.id, dma.label, dma.begin_description, dma.begin_date, dma.offset_x, - dma.offset_y, csl.x, csl.y, dp.property_uri, dma.end_date; + dma.offset_y, csl.x, csl.y, dp.property_uri, dma.end_date, dp.label; COMMIT; \ No newline at end of file diff --git a/src/timeio/parser.py b/src/timeio/parser.py index 84eb9968779069fdab3c66a81e1e620da3fd82bd..7b67f3baf81e554487c597e03e2c2a0b53f602ee 100644 --- a/src/timeio/parser.py +++ b/src/timeio/parser.py @@ -7,9 +7,11 @@ import logging import math import re import warnings + from abc import ABC, abstractmethod from dataclasses import dataclass from datetime import datetime +from functools import reduce from io import StringIO from typing import Any, TypedDict, TypeVar, cast @@ -129,22 +131,33 @@ class FileParser(Parser): class CsvParser(FileParser): - def _cast_index(self, index: pd.Series, fmt: str) -> pd.DatetimeIndex: - try: - index = index.str.strip() - except AttributeError: - pass - out = pd.to_datetime(index, format=fmt, errors="coerce") - if out.isna().any(): - nat = out.isna() + def _set_index(self, df: pd.DataFrame, timestamp_columns: dict) -> pd.DataFrame: + + date_columns = [d["column"] for d in timestamp_columns] + date_format = " ".join([d["format"] for d in timestamp_columns]) + + for c in date_columns: + if c not in df.columns: + raise ParsingError(f"Timestamp column {c} does not exist. ") + + index = reduce( + lambda x, y: x + " " + y, + [df[c].fillna("").astype(str).str.strip() for c in date_columns], + ) + df = df.drop(columns=date_columns) + + index = pd.to_datetime(index, format=date_format, errors="coerce") + if index.isna().any(): + nat = index.isna() warnings.warn( - f"Could not parse {nat.sum()} of {out.count()} timestamps " - f"with provided timestamp format {fmt!r}. First failing " + f"Could not parse {nat.sum()} of {len(index)} timestamps " + f"with provided timestamp format {date_format!r}. First failing " f"timestamp: '{index[nat].iloc[0]}'", ParsingWarning, ) - out.name = None - return pd.DatetimeIndex(out) + index.name = None + df.index = index + return df def do_parse(self, rawdata: str) -> pd.DataFrame: """ @@ -152,10 +165,11 @@ class CsvParser(FileParser): rawdata: the unparsed content NOTE: we need to preserve the original column numbering - and check for the date index column """ settings = self.settings.copy() - index_col = settings.pop("index_col") + self.logger.info(settings) + + timestamp_columns = settings.pop("timestamp_columns") if "comment" in settings: rawdata = filter_lines(rawdata, settings.pop("comment")) @@ -173,13 +187,7 @@ class CsvParser(FileParser): if df.empty: return pd.DataFrame(index=pd.DatetimeIndex([])) - if index_col not in df.columns: - raise ParsingError( - f"Could not get Timestamp-Column {index_col}. " - f"Data has only {len(df.columns)} columns." - ) - - df.index = self._cast_index(df.pop(index_col), settings["date_format"]) + df = self._set_index(df, timestamp_columns) # remove rows with broken dates df = df.loc[df.index.notna()] @@ -407,7 +415,5 @@ def get_parser(parser_type, settings) -> FileParser | MqttDataParser: kwargs = settings.pop("pandas_read_csv") or {} settings = {**default_settings, **kwargs, **settings} - settings["index_col"] = settings.pop("timestamp_column") - settings["date_format"] = settings.pop("timestamp_format") return klass(settings) return klass() diff --git a/tests/test_timeio/test_csv.py b/tests/test_timeio/test_csv.py index cdd71c8e2085cdee0dd2c5b1e88ba40a133678cb..5f200cc2a17bc41dd2d04b1257f06db5990a3769 100644 --- a/tests/test_timeio/test_csv.py +++ b/tests/test_timeio/test_csv.py @@ -17,17 +17,6 @@ RAWDATA = """ 1420, 2021/09/09 06:15:00, 987.1, 989.76, 991.12, 15.9, 128.9, 15.8, 14.6, 14.5, 76.1,119.0, 89.5, 11.855, 165, 103, 900, 900, 18.1, 63.2, 17.9, 63.8, 0 """ -DIRTYDATA = """ -//Hydroinnova CRS-1000 Data -//CellSig=12 - -//RecordNum,Date Time(UTC),P1_mb,P3_mb,P4_mb,T1_C, - -1418, 2021/09/09 05:45:00, 987.0, 989.70, 991.05, 15.9 -1419, 2021/09/09 06:00:00, 987.0, xW8, 991.05, 15.9 -1420, 2021/09/09 06:15:00, 987.1, 989.76, 991.12, 15.9 -""" - def test_parsing(): settings = { @@ -35,8 +24,7 @@ def test_parsing(): "delimiter": ",", "skiprows": 3, "skipfooter": 0, - "index_col": 1, - "date_format": "%Y/%m/%d %H:%M:%S", + "timestamp_columns": [{"column": 1, "format": "%Y/%m/%d %H:%M:%S"}], } parser = CsvParser(settings) df = parser.do_parse(RAWDATA) @@ -45,14 +33,25 @@ def test_parsing(): assert (df.columns == [0, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22]).all() # fmt: skip +DIRTYDATA = """ +//Hydroinnova CRS-1000 Data +//CellSig=12 + +//RecordNum,Date Time(UTC),P1_mb,P3_mb,P4_mb,T1_C, + +1418, 2021/09/09 05:45:00, 987.0, 989.70, 991.05, 15.9 +1419, 2021/09/09 06:00:00, 987.0, xW8, 991.05, 15.9 +1420, 2021/09/09 06:15:00, 987.1, 989.76, 991.12, 15.9 +""" + + def test_dirty_data_parsing(): settings = { "decimal": ".", "delimiter": ",", "skiprows": 3, "skipfooter": 0, - "index_col": 1, - "date_format": "%Y/%m/%d %H:%M:%S", + "timestamp_columns": [{"column": 1, "format": "%Y/%m/%d %H:%M:%S"}], } parser = CsvParser(settings) @@ -78,3 +77,44 @@ def test_dirty_data_parsing(): "datastream_pos": "3", "parameters": '{"origin": "test", "column_header": "3"}', } + + +MULTIDATECOLUMDATA = """ +============================================================================ + Datum Zeit Temp spezLeitf Tiefe Chl Chl ODO ODOsat Batterie + t/m/j hh:mm:ss C uS/cm Meter ug/l RFU mg/l %Lokal Volt +---------------------------------------------------------------------------- +02/11/22 14:00:51 20.52 3 0.151 9.1 2.2 9.10 100.5 12.5 +02/11/22 15:00:51 20.38 3 0.158 -23.5 -5.6 9.11 100.3 12.5 +02/11/22 16:00:51 20.19 3 0.161 -0.5 -0.1 9.15 100.3 12.4 +02/11/22 17:00:51 20.02 3 0.164 0.0 0.0 9.18 100.3 12.5 +""" + + +def test_multi_date_column_parsing(): + settings = { + "decimal": ".", + "delimiter": "\\s+", + "skiprows": 4, + "skipfooter": 0, + "header": None, + "timestamp_columns": [ + {"column": 0, "format": "%d/%m/%y"}, + {"column": 1, "format": "%H:%M:%S"}, + ], + } + parser = CsvParser(settings) + df = parser.do_parse(MULTIDATECOLUMDATA.strip()) + assert df.index.equals( + pd.to_datetime( + [ + "2022-11-02 14:00:51", + "2022-11-02 15:00:51", + "2022-11-02 16:00:51", + "2022-11-02 17:00:51", + ] + ) + ) + assert df.columns.equals(pd.RangeIndex(2, 10)) + assert (df[2] == [20.52, 20.38, 20.19, 20.02]).all() + assert (df[9] == [12.5, 12.5, 12.4, 12.5]).all()