Skip to content
Snippets Groups Projects
StaImporter.py 10.4 KiB
Newer Older
Christian Schulz's avatar
Christian Schulz committed
import requests
Christian Schulz's avatar
Christian Schulz committed

from data_import.models import CsvImportJob, ExtendedPointData, PointData  # noqa
from data_import.models import CsvParser, CsvImportJob, CsvParserExtraColumn, CsvIncludeCriteria, PointData, ExtendedPointData # noqa
from main.models import StaThingProperty    # noqa
from data_import.models import STA_THING # noqa
from data_import.lib.utils import is_row_included   # noqa
Christian Schulz's avatar
Christian Schulz committed


class StaImporter:
Christian Schulz's avatar
Christian Schulz committed

    def __init__(self, job: CsvImportJob):
        self.error = False
        self.logs = []
        self.import_job = job

        self.sta_endpoint = job.bucket.sta_endpoint
        self.sta_url = self.sta_endpoint.base_url.rstrip('/') + '/v1.1/'
        self.auth = (self.sta_endpoint.username, self.sta_endpoint.password)


    def import_csv_in_sta(self):
        file_name = self.import_job.s3_file.split('/')[-1]

        with open(file_name, newline='') as csvfile:

            reader = csv.reader(csvfile, delimiter=',')
            next(reader)  # skip header
            thing_props = StaThingProperty.objects.filter(endpoint=self.import_job.bucket.sta_endpoint)
            extra_columns = CsvParserExtraColumn.objects.filter(parser=self.import_job.bucket.csv_parser)

            include_criteria = CsvIncludeCriteria.objects.filter(parser=self.import_job.bucket.csv_parser)

            rows_succeeded = 0
            rows_failed = 0
            for row in reader:

                if is_row_included(row, include_criteria):

                    point_data = create_point_data(self.import_job, row)
                    extended_data = create_extended_data(point_data, extra_columns, thing_props, row)

                    self.import_point_data(point_data, extended_data)

                    if self.error is True:
                        point_data.validation_error = "\n".join(self.logs)
                        point_data.save()
                        for data in extended_data:
                            data.save()
                        rows_failed += 1
                    else:
                        rows_succeeded += 1
                    self.error = False
                    self.logs = []


Christian Schulz's avatar
Christian Schulz committed
    def import_point_data(self, point_data: PointData, extended_point_data: list[ExtendedPointData]):

        location = self.get_location_json(point_data)
        if self.error:
            return

        thing = self.get_thing_json(point_data, extended_point_data)

        thing_id = self.get_entity_id_if_exists('Things', thing)
        if not thing_id:
            self.create_entity('Things', thing)
            thing_id = self.get_entity_id_if_exists('Things', thing)

            # print("Thing-ID: " + str(thing_id))

            location["Things"] = [{"@iot.id": thing_id}]
            self.create_entity('Locations', location)

        obs_property = self.get_property_json(point_data, extended_point_data)

        property_id = self.get_entity_id_if_exists('ObservedProperties', obs_property)
        if not property_id:
            self.create_entity('ObservedProperties', obs_property)
            property_id = self.get_entity_id_if_exists('ObservedProperties', obs_property)

            # print("property-ID: " + str(property_id))

        sensor = {
            "name": "tbd",
            "description": "sensor",
            "properties": {},
            "encodingType": "application/pdf",
            "metadata": ""
        }
        sensor_id = self.get_entity_id_if_exists('Sensors', sensor)
        if not sensor_id:
            self.create_entity('Sensors', sensor)
            sensor_id = self.get_entity_id_if_exists('Sensors', sensor)

        datastream_id = self.get_datastream_id_if_exists(thing_id, property_id, sensor_id, point_data.result_unit)

        if not datastream_id:
            datastream = {
                "name": point_data.property,
                "description": '',
                "observationType": "https://defs.opengis.net/vocprez/object?uri=http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement",
                "unitOfMeasurement": {
                    "name": "",
                    "symbol": point_data.result_unit,
                    "definition": ""
                },
                "Thing": {"@iot.id": thing_id},
                "Sensor": {"@iot.id": sensor_id},
                "ObservedProperty": {"@iot.id": property_id}
            }
            self.create_entity('Datastreams', datastream)
            datastream_id = self.get_datastream_id_if_exists(thing_id, property_id, sensor_id, point_data.result_unit)

        # print("Datastream-ID: " + str(datastream_id))

        observation = self.get_observation_json(point_data, extended_point_data)
        observation["Datastream"] = {"@iot.id": datastream_id}

        self.create_entity('Observations', observation)

    def get_entity_id_if_exists(self, route, entity_dict):

        url = "{}{}?$top=1&$count=true".format(self.sta_url, route)

        params = ["name eq '{}'".format(sanitize_str(entity_dict['name']))]
        for key, value in entity_dict['properties'].items():
            params.append("properties/{} eq '{}'".format(key, sanitize_str(value)))
        url += "&$filter=" + " and ".join(params)

        response = requests.get(url, auth=self.auth)
        if response.status_code == 200:
            count = response.json()["@iot.count"]
            if count >= 1:
                return self.extract_id(response, route)
        return None

    def get_datastream_id_if_exists(self, thing_id, obs_property_id, sensor_id, unit):
        url = "{}Datastreams?$top=1&$count=true&$filter=Thing/id eq {} and ObservedProperty/id eq {} and Sensor/id eq {} and unitOfMeasurement/symbol eq '{}'".format(self.sta_url, thing_id, obs_property_id, sensor_id, unit)
        response = requests.get(url, auth=self.auth)

        if response.status_code == 200:
            count = response.json()["@iot.count"]
            if count >= 1:
                return self.extract_id(response, 'Datastreams')
        return None

    def create_entity(self, route, payload):
        response = requests.post('{}{}'.format(self.sta_url, route), json=payload, auth=self.auth)

        if response.status_code != 201:
            self.error = True
            self.logs.append(str(payload))
            return None

    def extract_id(self, response, route):
        content = response.json()

        for record in content['value']:
            obj_id = record['@iot.id']
            return obj_id

        self.error = True
        self.logs.append(content)
        print('error {}: {}'.format(route, content))
        return False

    def get_thing_json(self, point_data: PointData, extended_data: list[ExtendedPointData]):
        thing = {
            "name": point_data.thing_name,
            "description": "",
            "properties": {}
        }

        for data in extended_data:
            if data.related_entity == 'thing':
                thing["properties"][data.name] = data.value

        return thing

    def get_property_json(self, point_data: PointData, extended_data: list[ExtendedPointData]):

        obs_property = {
            "name": point_data.property,
            "description": '',
            "properties": {},
            "definition": ''
        }

        for data in extended_data:
            if data.related_entity == 'property':
                obs_property["properties"][data.name] = data.value

        return obs_property

    def get_observation_json(self, point_data: PointData, extended_data: list[ExtendedPointData]):

        if point_data.result_value == '' or point_data.result_value is None:
            value = None
        else:
            try:
                value = float(point_data.result_value)
            except:
                self.error = True
                self.logs.append("Cannot read value: " + point_data.result_value)
                value = None

        timestamp = point_data.result_time

        if "T" not in timestamp and not timestamp.endswith('Z'):
            timestamp += "T00:00:00Z"

        observation = {
          "result": value,
          "phenomenonTime": timestamp,
          "resultTime": timestamp,
          "parameters": {
              "import_job": self.import_job.id
          }
        }

        for data in extended_data:
            if data.related_entity == 'observation':
                observation["parameters"][data.name] = data.value

        return observation


    def get_location_json(self, point_data: PointData):

        try:
            lat = float(point_data.coord_lat)
            lon = float(point_data.coord_lon)

            if lat and lon:
                return {
                    "name": point_data.location_name,
                    "description": '',
                    "properties": {},
                    "encodingType": "application/geo+json",
                    "location": {
                        "type": "Point",
                        "coordinates": [lon, lat]
                    },
                }
        except:
            self.error = True
            self.logs.append("Cannot extract coordinates.")
            return None

def sanitize_str(text: str):
    result = text.replace("'", "''")
    result = result.replace("+", "%2b")
    return result.replace("/", "%2F")


def create_point_data(job: CsvImportJob, row):
    p: CsvParser = job.bucket.csv_parser

    point_data = PointData(
        import_job = job,
        thing_name = row[p.station_col_num],
        location_name = row[p.station_col_num],
        coord_lat = row[p.lat_col_num],
        coord_lon = row[p.lon_col_num],
        # geometry = '',
        property = row[p.property_col_num],
        # sensor = '',
        result_value = row[p.value_col_num],
        result_unit = row[p.unit_col_num],
        result_time = row[p.time_col_num]
    )
    return point_data


def create_extended_data(point_data: PointData, extra_columns: list[CsvParserExtraColumn], thing_props: list[StaThingProperty], row):
    result = []

    for prop in thing_props:
        extended_data = ExtendedPointData(
            point_data = point_data,
            related_entity = STA_THING,
            name = prop.property_key,
            value = prop.property_value
        )
        result.append(extended_data)

    for column in extra_columns:
        extended_data = ExtendedPointData(
            point_data = point_data,
            related_entity = column.related_entity,
            name = column.col_name,
            value = row[column.col_num]
        )
        result.append(extended_data)

    return result