Skip to content
Snippets Groups Projects
Commit e169d863 authored by Bert Palm's avatar Bert Palm :bug:
Browse files

Merge branch 'multi-date-column' into 'main'

Multi date column

See merge request !281
parents e1609c3c 18032e1c
No related branches found
No related tags found
1 merge request!281Multi date column
Pipeline #498121 failed
SET search_path TO config_db;
CREATE TABLE file_parser_backup AS TABLE file_parser WITH DATA;
UPDATE file_parser
SET params = jsonb_set(
params,
'{timestamp_columns}',
COALESCE(
params->'timestamp_columns',
jsonb_build_array(
jsonb_build_object(
'column', (params->>'timestamp_column')::int,
'format', params->>'timestamp_format'
)
)
)
)
WHERE params ? 'timestamp_column'
AND params ? 'timestamp_format'
AND NOT params ? 'timestamp_columns';
UPDATE file_parser
SET params = params - 'timestamp_column' - 'timestamp_format'
WHERE params ? 'timestamp_column' OR params ? 'timestamp_format';
......@@ -7,9 +7,11 @@ import logging
import math
import re
import warnings
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime
from functools import reduce
from io import StringIO
from typing import Any, TypedDict, TypeVar, cast
......@@ -129,22 +131,33 @@ class FileParser(Parser):
class CsvParser(FileParser):
def _cast_index(self, index: pd.Series, fmt: str) -> pd.DatetimeIndex:
try:
index = index.str.strip()
except AttributeError:
pass
out = pd.to_datetime(index, format=fmt, errors="coerce")
if out.isna().any():
nat = out.isna()
def _set_index(self, df: pd.DataFrame, timestamp_columns: dict) -> pd.DataFrame:
date_columns = [d["column"] for d in timestamp_columns]
date_format = " ".join([d["format"] for d in timestamp_columns])
for c in date_columns:
if c not in df.columns:
raise ParsingError(f"Timestamp column {c} does not exist. ")
index = reduce(
lambda x, y: x + " " + y,
[df[c].fillna("").astype(str).str.strip() for c in date_columns],
)
df = df.drop(columns=date_columns)
index = pd.to_datetime(index, format=date_format, errors="coerce")
if index.isna().any():
nat = index.isna()
warnings.warn(
f"Could not parse {nat.sum()} of {out.count()} timestamps "
f"with provided timestamp format {fmt!r}. First failing "
f"Could not parse {nat.sum()} of {len(index)} timestamps "
f"with provided timestamp format {date_format!r}. First failing "
f"timestamp: '{index[nat].iloc[0]}'",
ParsingWarning,
)
out.name = None
return pd.DatetimeIndex(out)
index.name = None
df.index = index
return df
def do_parse(self, rawdata: str) -> pd.DataFrame:
"""
......@@ -152,10 +165,11 @@ class CsvParser(FileParser):
rawdata: the unparsed content
NOTE:
we need to preserve the original column numbering
and check for the date index column
"""
settings = self.settings.copy()
index_col = settings.pop("index_col")
self.logger.info(settings)
timestamp_columns = settings.pop("timestamp_columns")
if "comment" in settings:
rawdata = filter_lines(rawdata, settings.pop("comment"))
......@@ -173,13 +187,7 @@ class CsvParser(FileParser):
if df.empty:
return pd.DataFrame(index=pd.DatetimeIndex([]))
if index_col not in df.columns:
raise ParsingError(
f"Could not get Timestamp-Column {index_col}. "
f"Data has only {len(df.columns)} columns."
)
df.index = self._cast_index(df.pop(index_col), settings["date_format"])
df = self._set_index(df, timestamp_columns)
# remove rows with broken dates
df = df.loc[df.index.notna()]
......@@ -407,7 +415,5 @@ def get_parser(parser_type, settings) -> FileParser | MqttDataParser:
kwargs = settings.pop("pandas_read_csv") or {}
settings = {**default_settings, **kwargs, **settings}
settings["index_col"] = settings.pop("timestamp_column")
settings["date_format"] = settings.pop("timestamp_format")
return klass(settings)
return klass()
......@@ -17,17 +17,6 @@ RAWDATA = """
1420, 2021/09/09 06:15:00, 987.1, 989.76, 991.12, 15.9, 128.9, 15.8, 14.6, 14.5, 76.1,119.0, 89.5, 11.855, 165, 103, 900, 900, 18.1, 63.2, 17.9, 63.8, 0
"""
DIRTYDATA = """
//Hydroinnova CRS-1000 Data
//CellSig=12
//RecordNum,Date Time(UTC),P1_mb,P3_mb,P4_mb,T1_C,
1418, 2021/09/09 05:45:00, 987.0, 989.70, 991.05, 15.9
1419, 2021/09/09 06:00:00, 987.0, xW8, 991.05, 15.9
1420, 2021/09/09 06:15:00, 987.1, 989.76, 991.12, 15.9
"""
def test_parsing():
settings = {
......@@ -35,8 +24,7 @@ def test_parsing():
"delimiter": ",",
"skiprows": 3,
"skipfooter": 0,
"index_col": 1,
"date_format": "%Y/%m/%d %H:%M:%S",
"timestamp_columns": [{"column": 1, "format": "%Y/%m/%d %H:%M:%S"}],
}
parser = CsvParser(settings)
df = parser.do_parse(RAWDATA)
......@@ -45,14 +33,25 @@ def test_parsing():
assert (df.columns == [0, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22]).all() # fmt: skip
DIRTYDATA = """
//Hydroinnova CRS-1000 Data
//CellSig=12
//RecordNum,Date Time(UTC),P1_mb,P3_mb,P4_mb,T1_C,
1418, 2021/09/09 05:45:00, 987.0, 989.70, 991.05, 15.9
1419, 2021/09/09 06:00:00, 987.0, xW8, 991.05, 15.9
1420, 2021/09/09 06:15:00, 987.1, 989.76, 991.12, 15.9
"""
def test_dirty_data_parsing():
settings = {
"decimal": ".",
"delimiter": ",",
"skiprows": 3,
"skipfooter": 0,
"index_col": 1,
"date_format": "%Y/%m/%d %H:%M:%S",
"timestamp_columns": [{"column": 1, "format": "%Y/%m/%d %H:%M:%S"}],
}
parser = CsvParser(settings)
......@@ -78,3 +77,44 @@ def test_dirty_data_parsing():
"datastream_pos": "3",
"parameters": '{"origin": "test", "column_header": "3"}',
}
MULTIDATECOLUMDATA = """
============================================================================
Datum Zeit Temp spezLeitf Tiefe Chl Chl ODO ODOsat Batterie
t/m/j hh:mm:ss C uS/cm Meter ug/l RFU mg/l %Lokal Volt
----------------------------------------------------------------------------
02/11/22 14:00:51 20.52 3 0.151 9.1 2.2 9.10 100.5 12.5
02/11/22 15:00:51 20.38 3 0.158 -23.5 -5.6 9.11 100.3 12.5
02/11/22 16:00:51 20.19 3 0.161 -0.5 -0.1 9.15 100.3 12.4
02/11/22 17:00:51 20.02 3 0.164 0.0 0.0 9.18 100.3 12.5
"""
def test_multi_date_column_parsing():
settings = {
"decimal": ".",
"delimiter": "\\s+",
"skiprows": 4,
"skipfooter": 0,
"header": None,
"timestamp_columns": [
{"column": 0, "format": "%d/%m/%y"},
{"column": 1, "format": "%H:%M:%S"},
],
}
parser = CsvParser(settings)
df = parser.do_parse(MULTIDATECOLUMDATA.strip())
assert df.index.equals(
pd.to_datetime(
[
"2022-11-02 14:00:51",
"2022-11-02 15:00:51",
"2022-11-02 16:00:51",
"2022-11-02 17:00:51",
]
)
)
assert df.columns.equals(pd.RangeIndex(2, 10))
assert (df[2] == [20.52, 20.38, 20.19, 20.02]).all()
assert (df[9] == [12.5, 12.5, 12.4, 12.5]).all()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment