Skip to content
Snippets Groups Projects
sync_uba_api.py 8.91 KiB
#! /usr/bin/env python3

# UBA API docs: https://www.umweltbundesamt.de/daten/luft/luftdaten/doc

import requests
import os
import logging
import json
import click

from datetime import datetime, timedelta
import timeio.mqtt as mqtt

from timeio.journaling import Journal

journal = Journal("CronJob")
api_base_url = os.environ.get("DB_API_BASE_URL")


def adjust_datetime(datetime_str: str) -> str:
    """UBA API returns datetime format with hours from 1 to 24 so it
    has to be parsed for timeIO DB API
    """
    date = datetime.strptime(datetime_str[0:10], "%Y-%m-%d")
    date_adjusted = date + timedelta(days=1)

    return date_adjusted.strftime("%Y-%m-%d %H:%M:%S")


def get_timerange_parameters():
    """UBA API expects time_from/time_to in the range of 1 to 24"""
    datetime_now = datetime.now()
    datetime_from = datetime_now - timedelta(hours=1)
    if datetime_now.hour == 0:
        time_to = 24
        date_to = (datetime_now - timedelta(days=1)).strftime("%Y-%m-%d")
    else:
        time_to = datetime_now.hour
        date_to = datetime_now.strftime("%Y-%m-%d")

    if datetime_from.hour == 0:
        time_from = 24
        date_from = (datetime_from - timedelta(days=1)).strftime("%Y-%m-%d")
    else:
        time_from = datetime_from.hour
        date_from = datetime_from.strftime("%Y-%m-%d")

    return date_from, time_from, date_to, time_to


def get_components_and_scopes():
    """Get components (i.e measured quantites) and scopes
    (aggregation infos) for later mapping
    """
    response_components = requests.get(
        "https://www.umweltbundesamt.de/api/air_data/v3/components/json"
    )
    response_components.raise_for_status()
    response_scopes = requests.get(
        "https://www.umweltbundesamt.de/api/air_data/v3/scopes/json"
    )
    response_scopes.raise_for_status()
    components = {
        int(v[0]): v[1]
        for k, v in response_components.json().items()
        if k not in ["count", "indices"]
    }
    scopes = {
        int(v[0]): v[1]
        for k, v in response_scopes.json().items()
        if k not in ["count", "indices"]
    }
    return components, scopes


def get_station_info(station_id: str) -> list:
    """Get all available components and scope combinations of a given
    station
    """
    station_info = list()
    response = requests.get(
        "https://www.umweltbundesamt.de/api/air_data/v3/measures/limits"
    )
    response.raise_for_status()
    response_json = response.json()["data"]
    for k, v in response_json.items():
        if v[2] == station_id:
            station_info.append({"scope": int(v[0]), "component": int(v[1])})
    return station_info


def request_measure_endpoint(
    station_id: str,
    component_id: int,
    scope_id: int,
    date_from: str,
    date_to: str,
    time_from: int,
    time_to: int,
) -> dict:
    """Request uba api measure endpoint for a given component and scope
    and a given time range
    """
    params = {
        "date_from": date_from,
        "date_to": date_to,
        "time_from": time_from,
        "time_to": time_to,
        "station": station_id,
        "component": component_id,
        "scope": scope_id,
    }
    response = requests.get(
        url="https://www.umweltbundesamt.de/api/air_data/v3/measures/json",
        params=params,
    )
    response.raise_for_status()
    response_json = response.json()
    if response_json["data"]:
        return response_json["data"][station_id]
    else:
        return response_json["data"]


def combine_measure_responses(
    station_id: str,
    date_from: str,
    date_to: str,
    time_from: int,
    time_to: int,
    components: dict,
    scopes: dict,
) -> list:
    """Combine uba respones for all component/scope combinations into
    one object
    """
    measure_data = list()
    station_info = get_station_info(station_id)
    for entry in station_info:
        response = request_measure_endpoint(
            station_id,
            entry["component"],
            entry["scope"],
            date_from,
            date_to,
            time_from,
            time_to,
        )
        for k, v in response.items():
            measure_data.append(
                {
                    "timestamp": v[3],
                    "value": v[2],
                    "measure": f"{components[entry['component']]} {scopes[entry['scope']]}",
                }
            )
    return measure_data


def parse_measure_data(measure_data: list, station_id: str) -> list:
    """Creates POST body from combined uba measures data"""
    bodies = []
    source = {
        "uba_station_id": station_id,
        "endpoint": "/measures",
    }
    for entry in measure_data:
        if entry["timestamp"][11:13] == "24":
            entry["timestamp"] = adjust_datetime(entry["timestamp"])
        if entry["value"]:
            body = {
                "result_time": entry["timestamp"],
                "result_type": 0,
                "result_number": entry["value"],
                "datastream_pos": entry["measure"],
                "parameters": json.dumps(
                    {"origin": "uba_data", "column_header": source}
                ),
            }
            bodies.append(body)
    return bodies


def get_airquality_data(
    station_id: str,
    date_from: str,
    date_to: str,
    time_from: int,
    time_to: int,
    components: dict,
) -> list:
    """Request uba api airquality endpoint for a given station_id and
    time range
    """
    params = {
        "date_from": date_from,
        "date_to": date_to,
        "time_from": time_from,
        "time_to": time_to,
        "station": station_id,
    }
    response = requests.get(
        "https://www.umweltbundesamt.de/api/air_data/v3/airquality/json", params=params
    )
    response.raise_for_status()
    response_json = response.json()
    if not response_json["data"]:
        return []

    response_data = response_json["data"][station_id]
    aqi_data = []
    for k, v in response_data.items():
        pollutant_info = list()
        for i in range(3, len(v)):
            entry = {"component": components[v[i][0]], "airquality_index": v[i][2]}
            pollutant_info.append(entry)
        aqi_data.append(
            {
                "timestamp": v[0],
                "airquality_index": v[1],
                "data_complete": v[2],
                "pollutant_info": pollutant_info,
            }
        )
    return aqi_data


def parse_aqi_data(aqi_data: list, station_id: str) -> list:
    """Creates POST body from uba air quality data"""
    bodies = []
    for entry in aqi_data:
        source = {
            "uba_station_id": station_id,
            "endpoint": "/airquality",
            "pollutant_info": entry["pollutant_info"],
        }
        if entry["timestamp"][11:13] == "24":
            entry["timestamp"] = adjust_datetime(entry["timestamp"])
        if entry["airquality_index"]:
            body = {
                "result_time": entry["timestamp"],
                "result_type": 0,
                "result_number": entry["airquality_index"],
                "datastream_pos": "AQI",
                "parameters": json.dumps(
                    {"origin": "uba_data", "column_header": source}
                ),
            }
            bodies.append(body)
    return bodies


@click.command()
@click.argument("thing_uuid")
@click.argument("parameters")
@click.argument("target_uri")
def main(thing_uuid, parameters, target_uri):
    logging.basicConfig(level=os.environ.get("LOG_LEVEL", "INFO").upper())
    logger = logging.getLogger("extApi_sync.uba")

    logger.info(f"Start fetching UBA data for thing {thing_uuid}")
    params = json.loads(parameters.replace("'", '"'))
    date_from, time_from, date_to, time_to = get_timerange_parameters()
    components, scopes = get_components_and_scopes()
    measure_data = combine_measure_responses(
        params["station_id"], date_from, date_to, time_from, time_to, components, scopes
    )
    aqi_data = get_airquality_data(
        params["station_id"], date_from, date_to, time_from, time_to, components
    )
    parsed_measure_data = parse_measure_data(measure_data, params["station_id"])
    parsed_aqi_data = parse_aqi_data(aqi_data, params["station_id"])
    parsed_observations = {"observations": parsed_measure_data + parsed_aqi_data}
    logger.info(f"Finished fetching UBA data for thing {thing_uuid}")
    resp = requests.post(
        f"{api_base_url}/observations/upsert/{thing_uuid}",
        json=parsed_observations,
        headers={"Content-type": "application/json"},
    )
    if resp.status_code != 200:
        journal.error(
            f"Failed to insert UBA data into timeIO DB: {resp.text}", thing_uuid
        )
        resp.raise_for_status()
        # exit

    journal.info(
        f"Successfully inserted {len(parsed_observations['observations'])} "
        f"observations for thing {thing_uuid} from UBA API into timeIO DB",
        thing_uuid,
    )
    mqtt.publish_single("data_parsed", json.dumps({"thing_uuid": thing_uuid}))


if __name__ == "__main__":
    main()