Skip to content
Snippets Groups Projects
Commit cec31c66 authored by Bert Palm's avatar Bert Palm :bug:
Browse files

Merge branch 'main' into fetaFolloUpAndFixes

parents d15aaf84 e169d863
No related branches found
No related tags found
1 merge request!289some generic fixes and cleanup
Pipeline #498126 failed
SET search_path TO config_db;
/*
Define function to set existing active qaqc setup to 'false'
if different/new setup for the same project_id is set to 'true'
*/
CREATE OR REPLACE FUNCTION unique_qaqc_per_project()
RETURNS TRIGGER AS $$
BEGIN
IF NEW."default" = TRUE THEN
UPDATE config_db.qaqc
SET "default" = FALSE
WHERE project_id = NEW.project_id AND "default" = TRUE AND id <> NEW.id;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
/*
Remove the existing trigger
*/
DROP TRIGGER IF EXISTS enforce_unique_qaqc_setup ON config_db.qaqc;
/*
Create trigger to execute the function before insert/update on qaqc
*/
CREATE TRIGGER enforce_unique_qaqc_setup
BEFORE INSERT OR UPDATE ON config_db.qaqc
FOR EACH ROW
EXECUTE FUNCTION unique_qaqc_per_project();
SET search_path TO config_db;
CREATE TABLE file_parser_backup AS TABLE file_parser WITH DATA;
UPDATE file_parser
SET params = jsonb_set(
params,
'{timestamp_columns}',
COALESCE(
params->'timestamp_columns',
jsonb_build_array(
jsonb_build_object(
'column', (params->>'timestamp_column')::int,
'format', params->>'timestamp_format'
)
)
)
)
WHERE params ? 'timestamp_column'
AND params ? 'timestamp_format'
AND NOT params ? 'timestamp_columns';
UPDATE file_parser
SET params = params - 'timestamp_column' - 'timestamp_format'
WHERE params ? 'timestamp_column' OR params ? 'timestamp_format';
......@@ -8,7 +8,8 @@ SELECT
dsl.device_property_id AS "ID",
concat(c.label, ':',
d.short_name, ':',
dp.property_name
dp.property_name, ':',
dp.label
) as "NAME",
concat(d.short_name, ' ',
dp.property_name, ' ',
......@@ -139,6 +140,6 @@ GROUP BY dsl.device_property_id, c.label, d.short_name, dp.property_name, dma.of
dp.unit_name, dp.unit_uri, d.id, dp.id, cv_agg.definition, dp.aggregation_type_uri, cv_u.provenance, cv_u.term, dp.resolution, cv_ur.provenance,
dp.resolution_unit_name, dp.resolution_unit_uri, dp.accuracy, cv_ua.provenance, dp.accuracy_unit_name, dp.accuracy_unit_uri, dp.measuring_range_min,
dp.measuring_range_max, cv_l.term, cv_l.provenance_uri, cv_l.definition, c.id, dma.id, dma.label, dma.begin_description, dma.begin_date, dma.offset_x,
dma.offset_y, csl.x, csl.y, dp.property_uri, dma.end_date;
dma.offset_y, csl.x, csl.y, dp.property_uri, dma.end_date, dp.label;
COMMIT;
\ No newline at end of file
......@@ -7,9 +7,11 @@ import logging
import math
import re
import warnings
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime
from functools import reduce
from io import StringIO
from typing import Any, TypedDict, TypeVar, cast
......@@ -129,22 +131,33 @@ class FileParser(Parser):
class CsvParser(FileParser):
def _cast_index(self, index: pd.Series, fmt: str) -> pd.DatetimeIndex:
try:
index = index.str.strip()
except AttributeError:
pass
out = pd.to_datetime(index, format=fmt, errors="coerce")
if out.isna().any():
nat = out.isna()
def _set_index(self, df: pd.DataFrame, timestamp_columns: dict) -> pd.DataFrame:
date_columns = [d["column"] for d in timestamp_columns]
date_format = " ".join([d["format"] for d in timestamp_columns])
for c in date_columns:
if c not in df.columns:
raise ParsingError(f"Timestamp column {c} does not exist. ")
index = reduce(
lambda x, y: x + " " + y,
[df[c].fillna("").astype(str).str.strip() for c in date_columns],
)
df = df.drop(columns=date_columns)
index = pd.to_datetime(index, format=date_format, errors="coerce")
if index.isna().any():
nat = index.isna()
warnings.warn(
f"Could not parse {nat.sum()} of {out.count()} timestamps "
f"with provided timestamp format {fmt!r}. First failing "
f"Could not parse {nat.sum()} of {len(index)} timestamps "
f"with provided timestamp format {date_format!r}. First failing "
f"timestamp: '{index[nat].iloc[0]}'",
ParsingWarning,
)
out.name = None
return pd.DatetimeIndex(out)
index.name = None
df.index = index
return df
def do_parse(self, rawdata: str) -> pd.DataFrame:
"""
......@@ -152,10 +165,11 @@ class CsvParser(FileParser):
rawdata: the unparsed content
NOTE:
we need to preserve the original column numbering
and check for the date index column
"""
settings = self.settings.copy()
index_col = settings.pop("index_col")
self.logger.info(settings)
timestamp_columns = settings.pop("timestamp_columns")
if "comment" in settings:
rawdata = filter_lines(rawdata, settings.pop("comment"))
......@@ -173,13 +187,7 @@ class CsvParser(FileParser):
if df.empty:
return pd.DataFrame(index=pd.DatetimeIndex([]))
if index_col not in df.columns:
raise ParsingError(
f"Could not get Timestamp-Column {index_col}. "
f"Data has only {len(df.columns)} columns."
)
df.index = self._cast_index(df.pop(index_col), settings["date_format"])
df = self._set_index(df, timestamp_columns)
# remove rows with broken dates
df = df.loc[df.index.notna()]
......@@ -407,7 +415,5 @@ def get_parser(parser_type, settings) -> FileParser | MqttDataParser:
kwargs = settings.pop("pandas_read_csv") or {}
settings = {**default_settings, **kwargs, **settings}
settings["index_col"] = settings.pop("timestamp_column")
settings["date_format"] = settings.pop("timestamp_format")
return klass(settings)
return klass()
......@@ -17,17 +17,6 @@ RAWDATA = """
1420, 2021/09/09 06:15:00, 987.1, 989.76, 991.12, 15.9, 128.9, 15.8, 14.6, 14.5, 76.1,119.0, 89.5, 11.855, 165, 103, 900, 900, 18.1, 63.2, 17.9, 63.8, 0
"""
DIRTYDATA = """
//Hydroinnova CRS-1000 Data
//CellSig=12
//RecordNum,Date Time(UTC),P1_mb,P3_mb,P4_mb,T1_C,
1418, 2021/09/09 05:45:00, 987.0, 989.70, 991.05, 15.9
1419, 2021/09/09 06:00:00, 987.0, xW8, 991.05, 15.9
1420, 2021/09/09 06:15:00, 987.1, 989.76, 991.12, 15.9
"""
def test_parsing():
settings = {
......@@ -35,8 +24,7 @@ def test_parsing():
"delimiter": ",",
"skiprows": 3,
"skipfooter": 0,
"index_col": 1,
"date_format": "%Y/%m/%d %H:%M:%S",
"timestamp_columns": [{"column": 1, "format": "%Y/%m/%d %H:%M:%S"}],
}
parser = CsvParser(settings)
df = parser.do_parse(RAWDATA)
......@@ -45,14 +33,25 @@ def test_parsing():
assert (df.columns == [0, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22]).all() # fmt: skip
DIRTYDATA = """
//Hydroinnova CRS-1000 Data
//CellSig=12
//RecordNum,Date Time(UTC),P1_mb,P3_mb,P4_mb,T1_C,
1418, 2021/09/09 05:45:00, 987.0, 989.70, 991.05, 15.9
1419, 2021/09/09 06:00:00, 987.0, xW8, 991.05, 15.9
1420, 2021/09/09 06:15:00, 987.1, 989.76, 991.12, 15.9
"""
def test_dirty_data_parsing():
settings = {
"decimal": ".",
"delimiter": ",",
"skiprows": 3,
"skipfooter": 0,
"index_col": 1,
"date_format": "%Y/%m/%d %H:%M:%S",
"timestamp_columns": [{"column": 1, "format": "%Y/%m/%d %H:%M:%S"}],
}
parser = CsvParser(settings)
......@@ -78,3 +77,44 @@ def test_dirty_data_parsing():
"datastream_pos": "3",
"parameters": '{"origin": "test", "column_header": "3"}',
}
MULTIDATECOLUMDATA = """
============================================================================
Datum Zeit Temp spezLeitf Tiefe Chl Chl ODO ODOsat Batterie
t/m/j hh:mm:ss C uS/cm Meter ug/l RFU mg/l %Lokal Volt
----------------------------------------------------------------------------
02/11/22 14:00:51 20.52 3 0.151 9.1 2.2 9.10 100.5 12.5
02/11/22 15:00:51 20.38 3 0.158 -23.5 -5.6 9.11 100.3 12.5
02/11/22 16:00:51 20.19 3 0.161 -0.5 -0.1 9.15 100.3 12.4
02/11/22 17:00:51 20.02 3 0.164 0.0 0.0 9.18 100.3 12.5
"""
def test_multi_date_column_parsing():
settings = {
"decimal": ".",
"delimiter": "\\s+",
"skiprows": 4,
"skipfooter": 0,
"header": None,
"timestamp_columns": [
{"column": 0, "format": "%d/%m/%y"},
{"column": 1, "format": "%H:%M:%S"},
],
}
parser = CsvParser(settings)
df = parser.do_parse(MULTIDATECOLUMDATA.strip())
assert df.index.equals(
pd.to_datetime(
[
"2022-11-02 14:00:51",
"2022-11-02 15:00:51",
"2022-11-02 16:00:51",
"2022-11-02 17:00:51",
]
)
)
assert df.columns.equals(pd.RangeIndex(2, 10))
assert (df[2] == [20.52, 20.38, 20.19, 20.02]).all()
assert (df[9] == [12.5, 12.5, 12.4, 12.5]).all()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment