Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • ufz-sdi/spatialio
1 result
Show changes
Commits on Source (17)
Showing
with 1141 additions and 161 deletions
...@@ -74,6 +74,9 @@ ...@@ -74,6 +74,9 @@
#### VueJS 3 #### VueJS 3
- https://v3.vuejs.org/guide/introduction.html - https://v3.vuejs.org/guide/introduction.html
#### Pinia-Store
- https://pinia.vuejs.org/getting-started.html
#### Typescript #### Typescript
- https://www.typescriptlang.org/ - https://www.typescriptlang.org/
- https://v3.vuejs.org/guide/typescript-support.html - https://v3.vuejs.org/guide/typescript-support.html
...@@ -107,6 +110,19 @@ ...@@ -107,6 +110,19 @@
### GeoServer ### GeoServer
#### Setup
- https://docs.geoserver.org/main/en/user/production/index.html
- https://geoserver.geosolutionsgroup.com/edu/en/adv_gsconfig/gsproduction.html
#### WFS
- https://docs.geoserver.org/main/en/user/services/wfs/basics.html
- https://docs.geoserver.org/main/en/user/services/wfs/reference.html
- https://docs.geoserver.org/2.22.x/en/user/extensions/querylayer/index.html
- https://docs.ogc.org/is/09-025r2/09-025r2.html#50
- https://docs.geoserver.org/main/en/user/tutorials/cql/cql_tutorial.html#cql-tutorial
- https://gis.stackexchange.com/questions/132229/cql-filter-that-joins-two-feature-types-in-wfs
- https://docs.geoserver.org/latest/en/user/filter/function_reference.html#filter-function-reference
- Python library for using the api: https://pypi.org/project/geoserver-restconfig/ - Python library for using the api: https://pypi.org/project/geoserver-restconfig/
### Django / Backend ### Django / Backend
......
...@@ -9,6 +9,7 @@ class CsvImportJobAdmin(admin.ModelAdmin): ...@@ -9,6 +9,7 @@ class CsvImportJobAdmin(admin.ModelAdmin):
list_display = ( list_display = (
's3_file', 's3_file',
'bucket', 'bucket',
'target',
'is_processed', 'is_processed',
'is_running', 'is_running',
'is_success', 'is_success',
...@@ -19,7 +20,7 @@ class CsvImportJobAdmin(admin.ModelAdmin): ...@@ -19,7 +20,7 @@ class CsvImportJobAdmin(admin.ModelAdmin):
fieldsets = [ fieldsets = [
(None, { (None, {
'fields': (('bucket'), ('s3_file', 'file_size', 'num_rows'), ('time_created', 'is_running', 'is_processed'), ('started_at', 'finished_at', 'execution_time' )), 'fields': (('bucket'), ('s3_file', 'file_size', 'num_rows'), 'target', ('time_created', 'is_running', 'is_processed'), ('started_at', 'finished_at', 'execution_time' )),
}), }),
('Results', { ('Results', {
'fields': ('is_success', ('data_points_created', 'data_points_failed'), 'validation_error', ), 'fields': ('is_success', ('data_points_created', 'data_points_failed'), 'validation_error', ),
......
...@@ -29,13 +29,18 @@ class CsvParserAdmin(DjangoTabbedChangeformAdmin, admin.ModelAdmin): ...@@ -29,13 +29,18 @@ class CsvParserAdmin(DjangoTabbedChangeformAdmin, admin.ModelAdmin):
'classes': ('tab-basic',), 'classes': ('tab-basic',),
}), }),
(None, { (None, {
'fields': ('group', ), 'fields': ('location_id_col_num', 'property_id_col_num', 'country_col_num', 'community_col_num' ),
'classes': ('tab-optional',),
}),
(None, {
'fields': ('target', 'table_prefix', 'group', ),
'classes': ('tab-group',), 'classes': ('tab-group',),
}), }),
] ]
tabs = [ tabs = [
("Basic Information", ["tab-basic"]), ("Basic Information", ["tab-basic"]),
("Optional Information", ["tab-optional"]),
("Extra-Columns", ["tab-extra-columns-inline"]), ("Extra-Columns", ["tab-extra-columns-inline"]),
("Include-Criteria", ["tab-include-criteria-inline"]), ("Include-Criteria", ["tab-include-criteria-inline"]),
("Group", ["tab-group"]), ("Group", ["tab-group"]),
......
This diff is collapsed.
import requests import requests
import csv
from data_import.models import CsvImportJob, ExtendedPointData, PointData # noqa from data_import.models import CsvImportJob, ExtendedPointData, PointData # noqa
from data_import.models import CsvParser, CsvImportJob, CsvParserExtraColumn, CsvIncludeCriteria, PointData, ExtendedPointData # noqa
from main.models import StaThingProperty # noqa
from data_import.models import STA_THING # noqa
from data_import.lib.utils import is_row_included # noqa
class StaApi: class StaImporter:
def __init__(self, job: CsvImportJob): def __init__(self, job: CsvImportJob):
self.error = False self.error = False
...@@ -14,6 +19,42 @@ class StaApi: ...@@ -14,6 +19,42 @@ class StaApi:
self.sta_url = self.sta_endpoint.base_url.rstrip('/') + '/v1.1/' self.sta_url = self.sta_endpoint.base_url.rstrip('/') + '/v1.1/'
self.auth = (self.sta_endpoint.username, self.sta_endpoint.password) self.auth = (self.sta_endpoint.username, self.sta_endpoint.password)
def import_csv_in_sta(self):
file_name = self.import_job.s3_file.split('/')[-1]
with open(file_name, newline='') as csvfile:
reader = csv.reader(csvfile, delimiter=',')
next(reader) # skip header
thing_props = StaThingProperty.objects.filter(endpoint=self.import_job.bucket.sta_endpoint)
extra_columns = CsvParserExtraColumn.objects.filter(parser=self.import_job.bucket.csv_parser)
include_criteria = CsvIncludeCriteria.objects.filter(parser=self.import_job.bucket.csv_parser)
rows_succeeded = 0
rows_failed = 0
for row in reader:
if is_row_included(row, include_criteria):
point_data = create_point_data(self.import_job, row)
extended_data = create_extended_data(point_data, extra_columns, thing_props, row)
self.import_point_data(point_data, extended_data)
if self.error is True:
point_data.validation_error = "\n".join(self.logs)
point_data.save()
for data in extended_data:
data.save()
rows_failed += 1
else:
rows_succeeded += 1
self.error = False
self.logs = []
def import_point_data(self, point_data: PointData, extended_point_data: list[ExtendedPointData]): def import_point_data(self, point_data: PointData, extended_point_data: list[ExtendedPointData]):
location = self.get_location_json(point_data) location = self.get_location_json(point_data)
...@@ -125,7 +166,6 @@ class StaApi: ...@@ -125,7 +166,6 @@ class StaApi:
print('error {}: {}'.format(route, content)) print('error {}: {}'.format(route, content))
return False return False
def get_thing_json(self, point_data: PointData, extended_data: list[ExtendedPointData]): def get_thing_json(self, point_data: PointData, extended_data: list[ExtendedPointData]):
thing = { thing = {
"name": point_data.thing_name, "name": point_data.thing_name,
...@@ -213,3 +253,46 @@ def sanitize_str(text: str): ...@@ -213,3 +253,46 @@ def sanitize_str(text: str):
result = text.replace("'", "''") result = text.replace("'", "''")
result = result.replace("+", "%2b") result = result.replace("+", "%2b")
return result.replace("/", "%2F") return result.replace("/", "%2F")
def create_point_data(job: CsvImportJob, row):
p: CsvParser = job.bucket.csv_parser
point_data = PointData(
import_job = job,
thing_name = row[p.station_col_num],
location_name = row[p.station_col_num],
coord_lat = row[p.lat_col_num],
coord_lon = row[p.lon_col_num],
# geometry = '',
property = row[p.property_col_num],
# sensor = '',
result_value = row[p.value_col_num],
result_unit = row[p.unit_col_num],
result_time = row[p.time_col_num]
)
return point_data
def create_extended_data(point_data: PointData, extra_columns: list[CsvParserExtraColumn], thing_props: list[StaThingProperty], row):
result = []
for prop in thing_props:
extended_data = ExtendedPointData(
point_data = point_data,
related_entity = STA_THING,
name = prop.property_key,
value = prop.property_value
)
result.append(extended_data)
for column in extra_columns:
extended_data = ExtendedPointData(
point_data = point_data,
related_entity = column.related_entity,
name = column.col_name,
value = row[column.col_num]
)
result.append(extended_data)
return result
from data_import.models import CsvIncludeCriteria # noqa
def is_row_included(row, include_criteria: list[CsvIncludeCriteria]) -> bool:
if len(include_criteria) > 0:
for criteria in include_criteria:
if row[criteria.col_num] == criteria.text_value:
return True
return False
else:
return True
# Generated by Django 4.2.16 on 2024-11-15 10:25
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('data_import', '0003_csvimportjob_finished_at_csvimportjob_started_at_and_more'),
]
operations = [
migrations.CreateModel(
name='WfsImportRecord',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('location_id', models.CharField(blank=True, max_length=1000, null=True)),
('location_name', models.CharField(blank=True, max_length=1000, null=True)),
('type_id', models.CharField(blank=True, max_length=1000, null=True)),
('type_name', models.CharField(blank=True, max_length=1000, null=True)),
('period_start', models.CharField(blank=True, max_length=1000, null=True)),
('period_end', models.CharField(blank=True, max_length=1000, null=True)),
('lat', models.CharField(blank=True, max_length=1000, null=True)),
('lon', models.CharField(blank=True, max_length=1000, null=True)),
('srid', models.CharField(blank=True, max_length=1000, null=True)),
('geom', models.CharField(blank=True, max_length=4000, null=True)),
('gtype', models.CharField(blank=True, max_length=1000, null=True)),
('country', models.CharField(blank=True, max_length=1000, null=True)),
('community', models.CharField(blank=True, max_length=1000, null=True)),
('corine', models.CharField(blank=True, max_length=1000, null=True)),
('value', models.CharField(blank=True, max_length=1000, null=True)),
('date', models.CharField(blank=True, max_length=1000, null=True)),
('unit', models.CharField(blank=True, max_length=1000, null=True)),
('property_name', models.CharField(blank=True, max_length=1000, null=True)),
('property_id', models.CharField(blank=True, max_length=1000, null=True)),
('location_properties', models.JSONField(blank=True, null=True)),
('property_properties', models.JSONField(blank=True, null=True)),
('record_properties', models.JSONField(blank=True, null=True)),
('bucket_name', models.CharField(max_length=1000)),
('import_job', models.PositiveIntegerField()),
('created_at', models.DateTimeField(auto_now_add=True)),
],
),
migrations.AddField(
model_name='csvimportjob',
name='target',
field=models.CharField(blank=True, max_length=1000, null=True),
),
migrations.AddField(
model_name='csvparser',
name='community_col_num',
field=models.IntegerField(blank=True, null=True),
),
migrations.AddField(
model_name='csvparser',
name='country_col_num',
field=models.IntegerField(blank=True, null=True),
),
migrations.AddField(
model_name='csvparser',
name='location_id_col_num',
field=models.IntegerField(blank=True, null=True),
),
migrations.AddField(
model_name='csvparser',
name='property_id_col_num',
field=models.IntegerField(blank=True, null=True),
),
migrations.AddField(
model_name='csvparser',
name='target',
field=models.CharField(choices=[('WFS', 'Web Feature Service'), ('STA', 'Sensorthings API')], default='WFS', max_length=20),
),
]
# Generated by Django 4.2.16 on 2024-12-13 13:39
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('data_import', '0004_wfsimportrecord_csvimportjob_target_and_more'),
]
operations = [
migrations.AddField(
model_name='csvparserextracolumn',
name='type_in_db',
field=models.CharField(choices=[('varchar', 'string'), ('integer', 'integer'), ('double precision', 'float'), ('timestamp', 'date')], default='varchar', max_length=20),
),
]
# Generated by Django 4.2.16 on 2024-12-13 16:21
import data_import.models
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('data_import', '0005_csvparserextracolumn_type_in_db'),
]
operations = [
migrations.AddField(
model_name='csvparser',
name='table_prefix',
field=models.CharField(blank=True, max_length=20, null=True, unique=True, validators=[data_import.models.validate_prefix]),
),
]
# Generated by Django 4.2.16 on 2024-12-13 16:30
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('data_import', '0006_csvparser_table_prefix'),
]
operations = [
migrations.RemoveField(
model_name='wfsimportrecord',
name='type_id',
),
]
from django.db import models # noqa from django.db import models # noqa
from django.contrib.auth.models import Group # noqa from django.contrib.auth.models import Group # noqa
import re
from django.core.exceptions import ValidationError # noqa
STA_THING = 'thing' STA_THING = 'thing'
STA_PROPERTY = 'property'
STA_OBSERVATION = 'observation'
STA_COLUMN_TYPES = ( CSV_ENTITIES = (
(STA_THING, 'thing'), (STA_THING, 'thing'),
('property', 'property'), (STA_PROPERTY, 'property'),
('observation', 'observation'), (STA_OBSERVATION, 'observation'),
)
DB_COLUMN_TYPES = (
('varchar', 'string'),
('integer', 'integer'),
('double precision', 'float'),
('timestamp', 'date'),
)
WFS = 'WFS'
STA = 'STA'
TARGETS = (
(WFS, 'Web Feature Service'),
(STA, 'Sensorthings API'),
) )
def validate_prefix(value):
if len(value) < 3 or len(value) > 20:
raise ValidationError("Length must be between 3 and 20 characters.")
if not re.match(r'^[a-z]*$', value):
raise ValidationError('Only lowercase-letters are allowed.')
class CsvParser(models.Model): class CsvParser(models.Model):
lat_col_num = models.IntegerField() lat_col_num = models.IntegerField()
lon_col_num = models.IntegerField() lon_col_num = models.IntegerField()
...@@ -18,6 +44,12 @@ class CsvParser(models.Model): ...@@ -18,6 +44,12 @@ class CsvParser(models.Model):
value_col_num = models.IntegerField() value_col_num = models.IntegerField()
unit_col_num = models.IntegerField() unit_col_num = models.IntegerField()
time_col_num = models.IntegerField() time_col_num = models.IntegerField()
location_id_col_num = models.IntegerField(blank=True, null=True)
property_id_col_num = models.IntegerField(blank=True, null=True)
country_col_num = models.IntegerField(blank=True, null=True)
community_col_num = models.IntegerField(blank=True, null=True)
table_prefix = models.CharField(max_length=20, blank=True, null=True, unique=True, validators=[validate_prefix])
target = models.CharField(max_length=20, choices=TARGETS, default=WFS)
group = models.ForeignKey(Group, on_delete=models.CASCADE, null=True) group = models.ForeignKey(Group, on_delete=models.CASCADE, null=True)
class Meta: class Meta:
...@@ -25,13 +57,14 @@ class CsvParser(models.Model): ...@@ -25,13 +57,14 @@ class CsvParser(models.Model):
verbose_name_plural = "CSV-Parser" verbose_name_plural = "CSV-Parser"
def __str__(self): def __str__(self):
return 'Parser ' + str(self.id) return '{} Parser ({}) {}'.format(self.target, str(self.group), str(self.id))
class CsvParserExtraColumn(models.Model): class CsvParserExtraColumn(models.Model):
parser = models.ForeignKey(CsvParser, on_delete=models.CASCADE) parser = models.ForeignKey(CsvParser, on_delete=models.CASCADE)
related_entity = models.CharField(max_length=20, choices=STA_COLUMN_TYPES, default=STA_THING) related_entity = models.CharField(max_length=20, choices=CSV_ENTITIES, default=STA_THING)
col_num = models.IntegerField() col_num = models.IntegerField()
col_name = models.CharField(max_length=100) col_name = models.CharField(max_length=100)
type_in_db = models.CharField(max_length=20, choices=DB_COLUMN_TYPES, default='varchar')
class CsvIncludeCriteria(models.Model): class CsvIncludeCriteria(models.Model):
parser = models.ForeignKey(CsvParser, on_delete=models.CASCADE) parser = models.ForeignKey(CsvParser, on_delete=models.CASCADE)
...@@ -54,6 +87,7 @@ class CsvImportJob(models.Model): ...@@ -54,6 +87,7 @@ class CsvImportJob(models.Model):
created_at = models.DateTimeField(auto_now_add=True) created_at = models.DateTimeField(auto_now_add=True)
started_at = models.DateTimeField(blank=True, null=True) started_at = models.DateTimeField(blank=True, null=True)
finished_at = models.DateTimeField(blank=True, null=True) finished_at = models.DateTimeField(blank=True, null=True)
target = models.CharField(max_length=1000, blank=True, null=True)
class Meta: class Meta:
verbose_name = "CSV-Import-Job" verbose_name = "CSV-Import-Job"
...@@ -81,10 +115,37 @@ class PointData(models.Model): ...@@ -81,10 +115,37 @@ class PointData(models.Model):
class ExtendedPointData(models.Model): class ExtendedPointData(models.Model):
point_data = models.ForeignKey(PointData, on_delete=models.CASCADE) point_data = models.ForeignKey(PointData, on_delete=models.CASCADE)
related_entity = models.CharField(max_length=20, choices=STA_COLUMN_TYPES, default='thing') related_entity = models.CharField(max_length=20, choices=CSV_ENTITIES, default='thing')
name = models.CharField(max_length=100) name = models.CharField(max_length=100)
value = models.CharField(max_length=1000) value = models.CharField(max_length=1000)
class Meta: class Meta:
verbose_name = "Additional Value" verbose_name = "Additional Value"
verbose_name_plural = "Additional Values" verbose_name_plural = "Additional Values"
class WfsImportRecord(models.Model):
location_id = models.CharField(max_length=1000, blank=True, null=True)
location_name = models.CharField(max_length=1000, blank=True, null=True)
type_name = models.CharField(max_length=1000, blank=True, null=True)
period_start = models.CharField(max_length=1000, blank=True, null=True)
period_end = models.CharField(max_length=1000, blank=True, null=True)
lat = models.CharField(max_length=1000, blank=True, null=True)
lon = models.CharField(max_length=1000, blank=True, null=True)
srid = models.CharField(max_length=1000, blank=True, null=True)
geom = models.CharField(max_length=4000, blank=True, null=True)
gtype = models.CharField(max_length=1000, blank=True, null=True)
country = models.CharField(max_length=1000, blank=True, null=True)
community = models.CharField(max_length=1000, blank=True, null=True)
corine = models.CharField(max_length=1000, blank=True, null=True)
value = models.CharField(max_length=1000, blank=True, null=True)
date = models.CharField(max_length=1000, blank=True, null=True)
unit = models.CharField(max_length=1000, blank=True, null=True)
property_name = models.CharField(max_length=1000, blank=True, null=True)
property_id = models.CharField(max_length=1000, blank=True, null=True)
location_properties = models.JSONField(blank=True, null=True)
property_properties = models.JSONField(blank=True, null=True)
record_properties = models.JSONField(blank=True, null=True)
bucket_name = models.CharField(max_length=1000)
import_job = models.PositiveIntegerField()
created_at = models.DateTimeField(auto_now_add=True)
\ No newline at end of file
...@@ -100,7 +100,19 @@ DATABASES = { ...@@ -100,7 +100,19 @@ DATABASES = {
'HOST': os.environ.get('POSTGRES_HOST'), 'HOST': os.environ.get('POSTGRES_HOST'),
'PORT': 5432, 'PORT': 5432,
'OPTIONS': {'sslmode': os.environ.get('POSTGRES_SSLMODE')}, 'OPTIONS': {'sslmode': os.environ.get('POSTGRES_SSLMODE')},
} },
"geoserver": {
'ENGINE': 'django.db.backends.postgresql',
"NAME": os.environ.get('POSTGRES_DB'),
"USER": "admin",
"PASSWORD": "admin",
'HOST': os.environ.get('POSTGRES_HOST'),
'PORT': 5432,
'OPTIONS': {
'sslmode': os.environ.get('POSTGRES_SSLMODE'),
'options': '-c search_path=geoserver,postgis'
},
},
} }
......
...@@ -16,7 +16,7 @@ Including another URLconf ...@@ -16,7 +16,7 @@ Including another URLconf
from django.contrib import admin from django.contrib import admin
from django.urls import include, path, re_path from django.urls import include, path, re_path
from main.views import views, aggregation_requests, download_requests, timeseries_request, sta_requests from main.views import views, aggregation_requests, download_requests, timeseries_request, sta_requests, wfs_requests # noqa
from main.about import basic_site from main.about import basic_site
from health_check.views import get_health from health_check.views import get_health
...@@ -28,13 +28,13 @@ urlpatterns = [ ...@@ -28,13 +28,13 @@ urlpatterns = [
path('gdi-backend/download/zip/<record_id>', download_requests.get_zip_download, name='zipDownload'), path('gdi-backend/download/zip/<record_id>', download_requests.get_zip_download, name='zipDownload'),
path('gdi-backend/countries', aggregation_requests.get_countries, name='countries'), path('gdi-backend/countries', aggregation_requests.get_countries, name='countries'),
re_path(r'^gdi-backend/proxy', views.proxy_geoserver_request, name='proxy'), re_path(r'^gdi-backend/proxy', views.proxy_geoserver_request, name='proxy'),
path('gdi-backend/sta-thing-count/<int:sta_layer_id>', sta_requests.proxy_sta_thing_count, name='sta-thing-count'),
path('gdi-backend/sta-locations/<int:sta_layer_id>', sta_requests.proxy_sta_locations, name='sta-locations'), path('gdi-backend/sta-locations/<int:sta_layer_id>', sta_requests.proxy_sta_locations, name='sta-locations'),
path('gdi-backend/sta-obs-properties/<int:sta_layer_id>', sta_requests.proxy_sta_obs_properties, name='sta-obs-properties'), path('gdi-backend/sta-obs-properties/<int:sta_layer_id>', sta_requests.proxy_sta_obs_properties, name='sta-obs-properties'),
path('gdi-backend/sta-thing/<int:sta_endpoint_id>/<int:thing_id>', sta_requests.proxy_sta_thing, name='sta-thing'), path('gdi-backend/sta-thing/<int:sta_endpoint_id>/<int:thing_id>', sta_requests.proxy_sta_thing, name='sta-thing'),
path('gdi-backend/sta-obs-properties-by-thing/<int:sta_layer_id>/<int:thing_id>', sta_requests.proxy_sta_obs_properties_by_thing, name='sta-obs-properties-by-thing'), path('gdi-backend/sta-obs-properties-by-thing/<int:sta_layer_id>/<int:thing_id>', sta_requests.proxy_sta_obs_properties_by_thing, name='sta-obs-properties-by-thing'),
path('gdi-backend/sta-datastream-properties/<int:sta_endpoint_id>/<int:thing_id>/<int:obs_property_id>', sta_requests.proxy_sta_datastream_properties, name='sta-datastream-properties'), path('gdi-backend/sta-datastream-properties/<int:sta_endpoint_id>/<int:thing_id>/<int:obs_property_id>', sta_requests.proxy_sta_datastream_properties, name='sta-datastream-properties'),
path('gdi-backend/sta-all-datastreams/<int:sta_endpoint_id>/<int:thing_id>/<int:obs_property_id>', sta_requests.proxy_sta_all_datastreams, name='sta-all-datastreams'), path('gdi-backend/sta-all-datastreams/<int:sta_endpoint_id>/<int:thing_id>/<int:obs_property_id>', sta_requests.proxy_sta_all_datastreams, name='sta-all-datastreams'),
path('gdi-backend/wfs-features/<str:workspace>/<str:prefix>', wfs_requests.wfs_features, name='wfs-features'),
re_path(r'^gdi-backend/timeseries/(?P<locale>[a-z]{2})', timeseries_request.request_all_time_series, name='request_time_series'), re_path(r'^gdi-backend/timeseries/(?P<locale>[a-z]{2})', timeseries_request.request_all_time_series, name='request_time_series'),
re_path(r'^gdi-backend/process', aggregation_requests.call_process, name='aggregation'), re_path(r'^gdi-backend/process', aggregation_requests.call_process, name='aggregation'),
re_path(r'^gdi-backend/districts/(?P<country_id>[0-9][0-9])$', aggregation_requests.get_districts_by_country, name='districts'), re_path(r'^gdi-backend/districts/(?P<country_id>[0-9][0-9])$', aggregation_requests.get_districts_by_country, name='districts'),
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
"connect_to_geoserver": true, "connect_to_geoserver": true,
"connect_to_thredds": true, "connect_to_thredds": true,
"connect_to_sta": true, "connect_to_sta": true,
"connect_to_wfs": false,
"sta_endpoint": 1, "sta_endpoint": 1,
"csv_parser": 1, "csv_parser": 1,
"public_folder": false, "public_folder": false,
...@@ -38,12 +39,14 @@ ...@@ -38,12 +39,14 @@
"connect_to_geoserver": false, "connect_to_geoserver": false,
"connect_to_thredds": false, "connect_to_thredds": false,
"connect_to_sta": false, "connect_to_sta": false,
"connect_to_wfs": true,
"sta_endpoint": null, "sta_endpoint": null,
"csv_parser": null, "csv_parser": 2,
"public_folder": false, "public_folder": false,
"quota": 100, "quota": 100,
"size_unit": "gi", "size_unit": "gi",
"wms_file_suffix": null "wms_file_suffix": null,
"wfs_file_suffix": null
} }
}, },
{ {
...@@ -136,8 +139,8 @@ ...@@ -136,8 +139,8 @@
"model": "main.area", "model": "main.area",
"pk": 2, "pk": 2,
"fields": { "fields": {
"name_de": "Zeitreihendaten", "name_de": "Punktdaten",
"name_en": "Timeseries data", "name_en": "Point Data",
"project": 1, "project": 1,
"position": 2, "position": 2,
"is_active": true, "is_active": true,
...@@ -738,17 +741,17 @@ ...@@ -738,17 +741,17 @@
"model": "main.wfslayer", "model": "main.wfslayer",
"pk": 1, "pk": 1,
"fields": { "fields": {
"name_de": "Köln", "name_de": "Chemikalien",
"name_en": "Cologne", "name_en": "Chemicals",
"info_de": "", "info_de": "",
"info_en": "", "info_en": "",
"time_format": null, "time_format": null,
"y_axis_min": null, "y_axis_min": null,
"y_axis_max": null, "y_axis_max": null,
"bucket": 1, "bucket": 2,
"wfs_url": "http://localhost:5001/gdi-backend/proxy/test-bucket/wfs", "workspace": "test-bucket",
"layer_name": "test-bucket:cologne_polygon", "prefix": "test_bucket",
"file_path": "shp/cologne_polygon_shp.zip" "file_path": "chemicals/*.csv"
} }
}, },
{ {
...@@ -763,7 +766,24 @@ ...@@ -763,7 +766,24 @@
"y_axis_min": null, "y_axis_min": null,
"y_axis_max": null, "y_axis_max": null,
"bucket": 1, "bucket": 1,
"file_path": "public/bw.json" "file_path": "public/bw.json",
"url": "http://localhost:9000/test-bucket/public/bw.json"
}
},
{
"model": "main.geojsonlayer",
"pk": 2,
"fields": {
"name_de": "Köln",
"name_en": "Cologne",
"info_de": "",
"info_en": "",
"time_format": null,
"y_axis_min": null,
"y_axis_max": null,
"bucket": 1,
"file_path": "shp/cologne_polygon_shp.zip",
"url": "http://localhost:5001/gdi-backend/proxy/test-bucket/ows?service=WFS&version=2.0.0&request=GetFeature&typename=test-bucket:cologne_polygon&outputFormat=application/json&srsname=EPSG:3857"
} }
}, },
{ {
...@@ -836,6 +856,20 @@ ...@@ -836,6 +856,20 @@
"wfs_layer": 1 "wfs_layer": 1
} }
}, },
{
"model": "main.arealayer",
"pk": 6,
"fields": {
"area": 2,
"position": 1,
"is_active": true,
"is_default_on_start": false,
"wms_layer": null,
"sta_layer": null,
"geojson_layer": 2,
"wfs_layer": null
}
},
{ {
"model": "main.wmslayerlegend", "model": "main.wmslayerlegend",
"pk": 1, "pk": 1,
...@@ -1229,15 +1263,15 @@ ...@@ -1229,15 +1263,15 @@
"record_id": "zyxwvmkcO9Ss26kiROhGIXQzvMr7L7gF", "record_id": "zyxwvmkcO9Ss26kiROhGIXQzvMr7L7gF",
"publisher": 1, "publisher": 1,
"version": "1.0", "version": "1.0",
"language": "en", "language": null,
"format": 3, "format": 3,
"license": 1, "license": 1,
"title": "Drought Monitor", "title": "Drought Monitor",
"created_at": "2024-09-27T17:46:36.247Z", "created_at": "2024-09-27T17:46:36.247Z",
"updated_at": "2024-09-30T09:41:35.295Z", "updated_at": "2024-11-29T11:04:29.173Z",
"published_at": "2024-09-30", "published_at": "2024-09-30",
"abstract": "Test", "abstract": "test",
"keywords": "[\"Drought\", \"Environment\", \"Soil\"]" "keywords": "[\"Drought\", \"Germany\"]"
} }
}, },
{ {
...@@ -1295,6 +1329,31 @@ ...@@ -1295,6 +1329,31 @@
"value_col_num": 3, "value_col_num": 3,
"unit_col_num": 5, "unit_col_num": 5,
"time_col_num": 4, "time_col_num": 4,
"location_id_col_num": null,
"property_id_col_num": null,
"country_col_num": null,
"community_col_num": null,
"target": "STA",
"group": 1
}
},
{
"model": "data_import.csvparser",
"pk": 2,
"fields": {
"lat_col_num": 25,
"lon_col_num": 26,
"station_col_num": 27,
"property_col_num": 2,
"value_col_num": 17,
"unit_col_num": 18,
"time_col_num": 21,
"location_id_col_num": null,
"property_id_col_num": 1,
"country_col_num": 28,
"community_col_num": null,
"target": "WFS",
"table_prefix": "chemicals",
"group": 1 "group": 1
} }
}, },
...@@ -1308,6 +1367,95 @@ ...@@ -1308,6 +1367,95 @@
"col_name": "station-id" "col_name": "station-id"
} }
}, },
{
"model": "data_import.csvparserextracolumn",
"pk": 2,
"fields": {
"parser": 2,
"related_entity": "thing",
"col_num": 4,
"col_name": "river_km"
}
},
{
"model": "data_import.csvparserextracolumn",
"pk": 3,
"fields": {
"parser": 2,
"related_entity": "observation",
"col_num": 15,
"col_name": "concentration_data"
}
},
{
"model": "data_import.csvparserextracolumn",
"pk": 4,
"fields": {
"parser": 2,
"related_entity": "observation",
"col_num": 16,
"col_name": "concentration_individual"
}
},
{
"model": "data_import.csvparserextracolumn",
"pk": 5,
"fields": {
"parser": 2,
"related_entity": "observation",
"col_num": 19,
"col_name": "sampling_depth_type"
}
},
{
"model": "data_import.csvparserextracolumn",
"pk": 6,
"fields": {
"parser": 2,
"related_entity": "observation",
"col_num": 20,
"col_name": "sampling_depth"
}
},
{
"model": "data_import.csvparserextracolumn",
"pk": 7,
"fields": {
"parser": 2,
"related_entity": "observation",
"col_num": 22,
"col_name": "IDX_l"
}
},
{
"model": "data_import.csvparserextracolumn",
"pk": 8,
"fields": {
"parser": 2,
"related_entity": "thing",
"col_num": 29,
"col_name": "water_body"
}
},
{
"model": "data_import.csvparserextracolumn",
"pk": 9,
"fields": {
"parser": 2,
"related_entity": "thing",
"col_num": 30,
"col_name": "river_basin"
}
},
{
"model": "data_import.csvincludecriteria",
"pk": 1,
"fields": {
"parser": 2,
"col_num": 31,
"text_value": "TRUE"
}
},
{ {
"model": "admin_interface.theme", "model": "admin_interface.theme",
"pk": 1, "pk": 1,
......
...@@ -99,7 +99,7 @@ class BucketAdmin(admin.ModelAdmin): ...@@ -99,7 +99,7 @@ class BucketAdmin(admin.ModelAdmin):
model = Bucket model = Bucket
form = BucketForm form = BucketForm
inlines = [BucketUserInline] inlines = [BucketUserInline]
list_display = ('name', 'group', 'connect_to_geoserver', 'connect_to_thredds', 'connect_to_sta', 'link_to_event_logs') list_display = ('name', 'group', 'connect_to_geoserver', 'connect_to_thredds', 'connect_to_sta', 'connect_to_wfs', 'link_to_event_logs')
ordering = ('name',) ordering = ('name',)
fieldsets = [ fieldsets = [
...@@ -107,7 +107,7 @@ class BucketAdmin(admin.ModelAdmin): ...@@ -107,7 +107,7 @@ class BucketAdmin(admin.ModelAdmin):
'fields': ('name', 'group', 'public_folder', ('quota', 'size_unit'), ), 'fields': ('name', 'group', 'public_folder', ('quota', 'size_unit'), ),
}), }),
(None, { (None, {
'fields': (('connect_to_geoserver', 'wms_file_suffix', 'wfs_file_suffix'), 'connect_to_thredds', ('connect_to_sta', 'sta_endpoint', 'csv_parser'), ), 'fields': (('connect_to_geoserver', 'wms_file_suffix', 'wfs_file_suffix'), 'connect_to_thredds', ('connect_to_sta', 'sta_endpoint'), 'connect_to_wfs', 'csv_parser', ),
}) })
] ]
......
...@@ -17,7 +17,7 @@ class GeojsonLayerAdmin(DjangoTabbedChangeformAdmin, admin.ModelAdmin): ...@@ -17,7 +17,7 @@ class GeojsonLayerAdmin(DjangoTabbedChangeformAdmin, admin.ModelAdmin):
fieldsets = [ fieldsets = [
(None, { (None, {
'fields': (('name_de', 'name_en'), 'bucket', 'file_path' ), 'fields': (('name_de', 'name_en'), 'bucket', 'url', 'file_path' ),
'classes': ('tab-basic',), 'classes': ('tab-basic',),
}), }),
(None, { (None, {
......
...@@ -11,11 +11,11 @@ from django.forms import Textarea # noqa ...@@ -11,11 +11,11 @@ from django.forms import Textarea # noqa
class WfsLayerAdmin(DjangoTabbedChangeformAdmin, admin.ModelAdmin): class WfsLayerAdmin(DjangoTabbedChangeformAdmin, admin.ModelAdmin):
model = WfsLayer model = WfsLayer
list_display = ('__str__', 'layer_name') list_display = ('__str__', 'workspace', 'prefix')
save_as = True save_as = True
inlines = [FkAreaLayerInline ] inlines = [FkAreaLayerInline ]
search_fields = ["layer_name", "name_de", "name_en", "info_de", "info_en", "wfs_url"] search_fields = ["name_de", "name_en", "info_de", "info_en", "workspace", "prefix"]
formfield_overrides = { formfield_overrides = {
models.TextField: {'widget': Textarea( models.TextField: {'widget': Textarea(
...@@ -29,7 +29,7 @@ class WfsLayerAdmin(DjangoTabbedChangeformAdmin, admin.ModelAdmin): ...@@ -29,7 +29,7 @@ class WfsLayerAdmin(DjangoTabbedChangeformAdmin, admin.ModelAdmin):
'classes': ('tab-basic',), 'classes': ('tab-basic',),
}), }),
(None, { (None, {
'fields': ('layer_name', 'wfs_url'), 'fields': ('workspace', 'prefix'),
'classes': ('tab-wfs',), 'classes': ('tab-wfs',),
}), }),
(None, { (None, {
...@@ -46,7 +46,7 @@ class WfsLayerAdmin(DjangoTabbedChangeformAdmin, admin.ModelAdmin): ...@@ -46,7 +46,7 @@ class WfsLayerAdmin(DjangoTabbedChangeformAdmin, admin.ModelAdmin):
] ]
def get_readonly_fields(self, request, obj): def get_readonly_fields(self, request, obj):
fields = ['bucket', 'wfs_url', 'layer_name', 'file_path'] fields = ['bucket', 'file_path']
if request.user.is_superuser: if request.user.is_superuser:
return [] return []
return fields return fields
......
import os import os
import requests import requests
from requests.adapters import HTTPAdapter from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry from requests.packages.urllib3.util.retry import Retry # noqa
import json import json
from geoserver.catalog import Catalog from geoserver.catalog import Catalog # noqa
from owslib.wms import WebMapService from owslib.wms import WebMapService # noqa
GEOSERVER_MOSAIC_DIR = "/opt/geoserver/data_dir/mosaic" GEOSERVER_MOSAIC_DIR = "/opt/geoserver/data_dir/mosaic"
...@@ -25,9 +25,10 @@ class GeoServerApi: ...@@ -25,9 +25,10 @@ class GeoServerApi:
self.logs = [] self.logs = []
self.has_error = False self.has_error = False
self.has_timeout = False
retry_strategy = Retry( retry_strategy = Retry(
total=10, total=3,
backoff_factor=2, backoff_factor=2,
allowed_methods=['DELETE', 'GET', 'HEAD', 'OPTIONS', 'PUT', 'POST'] allowed_methods=['DELETE', 'GET', 'HEAD', 'OPTIONS', 'PUT', 'POST']
) )
...@@ -43,6 +44,7 @@ class GeoServerApi: ...@@ -43,6 +44,7 @@ class GeoServerApi:
def reset(self): def reset(self):
self.logs = [] self.logs = []
self.has_error = False self.has_error = False
self.has_timeout = False
def create_workspace(self, workspace_name): def create_workspace(self, workspace_name):
# check for existing workspace # check for existing workspace
...@@ -58,7 +60,7 @@ class GeoServerApi: ...@@ -58,7 +60,7 @@ class GeoServerApi:
else: else:
self.log("Workspace already exists. Existing Workspace will be used: {}".format(workspace_name)) self.log("Workspace already exists. Existing Workspace will be used: {}".format(workspace_name))
def create_datastore(self, workspace, data_store, object_url, type, store_type='coveragestores'): def create_datastore(self, workspace, data_store, object_url, data_type, store_type='coveragestores'):
self.create_workspace(workspace) self.create_workspace(workspace)
...@@ -67,25 +69,28 @@ class GeoServerApi: ...@@ -67,25 +69,28 @@ class GeoServerApi:
else: else:
headers = {"Content-type": "text/plain"} headers = {"Content-type": "text/plain"}
url = '{0}workspaces/{1}/{2}/{3}/{4}'.format( url = '{0}workspaces/{1}/{2}/{3}/{4}'.format(
self.url, workspace, store_type, data_store, type) self.url, workspace, store_type, data_store, data_type)
try: try:
response = self.session.put(url, data=object_url, auth=(self.username, self.password), headers=headers, timeout=10) response = self.session.put(url, data=object_url, auth=(self.username, self.password), headers=headers, timeout=5)
if response.status_code != 201: if response.status_code != 201:
self.log("Create status not ok:") self.log("Create status not ok:")
self.log(response.content) self.log(response.content)
self.has_error = True self.has_error = True
else: else:
self.log("Datastore {} created successfully".format(data_store)) self.log("Datastore {} created successfully".format(data_store))
except requests.exceptions.RetryError:
self.log("Timeout, datastore could not be created: {}".format(data_store))
self.has_timeout = True
except Exception as e: except Exception as e:
print("Datastore could not be created: {}".format(e)) self.log("Error, datastore could not be created: {}".format(e))
self.has_error = True self.has_error = True
def _create_new_style(self, style_name, style_xml): def _create_new_style(self, style_xml):
try: try:
headers = {"Content-type": "application/vnd.ogc.sld+xml"} headers = {"Content-type": "application/vnd.ogc.sld+xml"}
url = '{0}styles?raw=true'.format(self.url) url = '{0}styles?raw=true'.format(self.url)
response = self.session.post(url=url, auth=(self.username, self.password), headers=headers, data=style_xml, timeout=10) response = self.session.post(url=url, auth=(self.username, self.password), headers=headers, data=style_xml, timeout=5)
if response.status_code != 201: if response.status_code != 201:
print('Style could not be created: {}'.format(response.content)) print('Style could not be created: {}'.format(response.content))
self.has_error = True self.has_error = True
...@@ -99,7 +104,7 @@ class GeoServerApi: ...@@ -99,7 +104,7 @@ class GeoServerApi:
try: try:
headers = {"Content-type": "application/vnd.ogc.sld+xml"} headers = {"Content-type": "application/vnd.ogc.sld+xml"}
url = '{0}styles/{1}?raw=true'.format(self.url, style_name) url = '{0}styles/{1}?raw=true'.format(self.url, style_name)
response = self.session.put(url=url, auth=(self.username, self.password), headers=headers, data=style_xml, timeout=10) response = self.session.put(url=url, auth=(self.username, self.password), headers=headers, data=style_xml, timeout=5)
if response.status_code != 200: if response.status_code != 200:
print('Style could not be updated: {}'.format(response.content)) print('Style could not be updated: {}'.format(response.content))
self.has_error = True self.has_error = True
...@@ -112,13 +117,13 @@ class GeoServerApi: ...@@ -112,13 +117,13 @@ class GeoServerApi:
def create_style(self, style_name, style_xml): def create_style(self, style_name, style_xml):
try: try:
url = '{0}styles/{1}.sld'.format(self.url, style_name) url = '{0}styles/{1}.sld'.format(self.url, style_name)
response = self.session.get(url=url, auth=(self.username, self.password), timeout=10) response = self.session.get(url=url, auth=(self.username, self.password), timeout=5)
style_xml = style_xml.encode('utf-8') style_xml = style_xml.encode('utf-8')
if response.status_code != 200: if response.status_code != 200:
print('Style does not exist yet. Style will be added as new.') print('Style does not exist yet. Style will be added as new.')
self._create_new_style(style_name, style_xml) self._create_new_style(style_xml)
else: else:
print('Update existing style') print('Update existing style')
self._update_style(style_name, style_xml) self._update_style(style_name, style_xml)
...@@ -139,7 +144,7 @@ class GeoServerApi: ...@@ -139,7 +144,7 @@ class GeoServerApi:
} }
try: try:
response = self.session.post(url, data=json.dumps(body), auth=(self.username, self.password), headers=headers, timeout=10) response = self.session.post(url, data=json.dumps(body), auth=(self.username, self.password), headers=headers, timeout=5)
if response.status_code != 201: if response.status_code != 201:
print("Create status not ok: {}".format(response.content)) print("Create status not ok: {}".format(response.content))
...@@ -177,41 +182,43 @@ class GeoServerApi: ...@@ -177,41 +182,43 @@ class GeoServerApi:
"</coverage>" "</coverage>"
) )
try: try:
response = self.session.put(url, data=time_data, auth=(self.username, self.password), headers=headers, timeout=10) response = self.session.put(url, data=time_data, auth=(self.username, self.password), headers=headers, timeout=5)
if response.status_code not in [200, 201]: if response.status_code not in [200, 201]:
self.log("Time dimension status for coverage {} not ok: {}".format(native_name, response.content)) self.log("Time dimension status for coverage {} not ok: {}".format(native_name, response.content))
self.has_error = True self.has_error = True
else: else:
self.log("Time dimension for coverage {} enabled successfully".format(native_name)) self.log("Time dimension for coverage {} enabled successfully".format(native_name))
except requests.exceptions.RetryError:
self.log("Timeout, coverage_store could not be configured: {}".format(coverage_store))
self.has_timeout = True
except Exception as e: except Exception as e:
self.log("Time dimension for coverage {} could not be enabled: {}".format(native_name, e)) self.log("Time dimension for coverage {} could not be enabled: {}".format(native_name, e))
self.has_error = True self.has_error = True
def update_timesteps_and_save(self, layer, wms_name): def get_wms_timesteps(self, workspace, datastore_name):
wms_url = os.environ.get("GEOSERVER_URL") + '/ows?service=wms&version=1.3.0&request=GetCapabilities' wms_url = '{}/{}/ows?service=wms&version=1.3.0&request=GetCapabilities'.format(os.environ.get("GEOSERVER_URL"), workspace)
try: try:
wms = WebMapService(wms_url, version='1.3.0') wms = WebMapService(wms_url, version='1.3.0')
# iterate all available wms 'LAYERS' which exist in Geoserver # iterate all available wms 'LAYERS' which exist in the workspace
for wms_variable in list(wms.contents): for wms_variable in list(wms.contents):
if wms_name == wms_variable: if datastore_name == wms_variable:
time_steps = wms.contents[wms_name].timepositions time_steps = wms.contents[datastore_name].timepositions
if time_steps is None: if time_steps is None:
layer.time_steps = [] return []
else: else:
layer.time_steps = time_steps self.log("Layer-timesteps fetched")
break return time_steps
layer.save()
except Exception as e: except Exception as e:
print("Could not update layer {}: {}".format(layer.id, str(e))) self.log("Could not fetch timesteps {}: {}".format(datastore_name, str(e)))
self.has_error = True self.has_error = True
def delete_datastore(self, workspace, data_store, store_type='coveragestores'): def delete_datastore(self, workspace, data_store, store_type='coveragestores'):
url = "{0}workspaces/{1}/{2}/{3}?recurse=true".format(self.url, workspace, store_type, data_store) url = "{0}workspaces/{1}/{2}/{3}?recurse=true".format(self.url, workspace, store_type, data_store)
try: try:
response = self.session.delete(url, auth=(self.username, self.password), timeout=10) response = self.session.delete(url, auth=(self.username, self.password), timeout=5)
if response.status_code not in [200, 201]: if response.status_code not in [200, 201]:
self.log("Error in deleting datastore {}: {}".format(data_store, response.content)) self.log("Error in deleting datastore {}: {}".format(data_store, response.content))
self.has_error = True self.has_error = True
......
...@@ -40,7 +40,7 @@ def is_wms_file(bucket: Bucket, file: File): ...@@ -40,7 +40,7 @@ def is_wms_file(bucket: Bucket, file: File):
return True return True
def is_wfs_file(bucket: Bucket, file: File): def is_zip_file_for_import(bucket: Bucket, file: File):
suffix = bucket.wfs_file_suffix suffix = bucket.wfs_file_suffix
if suffix: if suffix:
for file_type in ["zip"]: for file_type in ["zip"]:
......
import os import os
import csv
import time import time
import datetime
from django.utils import timezone # noqa
from django.core.management.base import BaseCommand # noqa from django.core.management.base import BaseCommand # noqa
from django.db.models import Q # noqa
from data_import.models import CsvParser, CsvImportJob, CsvParserExtraColumn, CsvIncludeCriteria, PointData, ExtendedPointData, STA_THING # noqa from data_import.models import CsvImportJob, PointData, CsvIncludeCriteria # noqa
from main.models import StaThingProperty # noqa from data_import.models import WFS, STA # noqa
from main.lib.utils.s3_utils import download_minio_file # noqa from main.lib.utils.s3_utils import download_minio_file # noqa
from data_import.api.StaApi import StaApi # noqa from data_import.lib.StaImporter import StaImporter # noqa
from data_import.lib.PostgisImporter import PostgisImporter # noqa
SIZE_LIMIT = 30000000 # 30 MB SIZE_LIMIT = 30000000 # 30 MB
...@@ -20,11 +21,12 @@ class Command(BaseCommand): ...@@ -20,11 +21,12 @@ class Command(BaseCommand):
def handle(self, *args, **options): def handle(self, *args, **options):
job = CsvImportJob.objects.filter(is_processed=False, is_running=False, bucket__connect_to_sta=True).select_related('bucket__sta_endpoint', 'bucket__csv_parser').order_by('created_at').first() job = CsvImportJob.objects.filter(Q(is_processed=False) & Q(is_running=False) & (Q(bucket__connect_to_sta=True) | Q(bucket__connect_to_wfs=True))).select_related('bucket__sta_endpoint', 'bucket__csv_parser').order_by('created_at').first()
if job: if job:
start_time = time.time() start_time = time.time()
job.started_at = datetime.datetime.now() job.started_at = timezone.now()
job.is_running = True job.is_running = True
job.save() job.save()
...@@ -38,51 +40,31 @@ class Command(BaseCommand): ...@@ -38,51 +40,31 @@ class Command(BaseCommand):
job.save() job.save()
return return
with open(file_name, newline = '') as csvfile: if job.target == STA:
i = StaImporter(job)
reader = csv.reader(csvfile, delimiter=',') i.import_csv_in_sta()
next(reader) # skip header
api = StaApi(job)
thing_props = StaThingProperty.objects.filter(endpoint=job.bucket.sta_endpoint)
extra_columns = CsvParserExtraColumn.objects.filter(parser=job.bucket.csv_parser)
include_criteria = CsvIncludeCriteria.objects.filter(parser=job.bucket.csv_parser)
rows_succeeded = 0
rows_failed = 0
for row in reader:
if is_row_included(row, include_criteria):
point_data = create_point_data(job, row)
extended_data = create_extended_data(point_data, extra_columns, thing_props, row)
api.import_point_data(point_data, extended_data)
if api.error is True:
point_data.validation_error = "\n".join(api.logs)
point_data.save()
for data in extended_data:
data.save()
rows_failed += 1
else:
rows_succeeded += 1
api.error = False
api.logs = []
if not PointData.objects.filter(import_job=job).exists(): if not PointData.objects.filter(import_job=job).exists():
job.is_success = True job.is_success = True
job.is_processed = True if job.target == WFS:
job.is_running = False i = PostgisImporter(job)
job.execution_time = str(round((time.time() - start_time), 2)) + "s" i.import_csv_to_wfs()
job.finished_at = datetime.datetime.now() i.write_data()
job.data_points_created = rows_succeeded job.is_success = not i.error
job.data_points_failed = rows_failed if i.error is True:
job.save() job.validation_error = "\n".join(i.logs)
job.is_processed = True
job.is_running = False
job.execution_time = str(round((time.time() - start_time), 2)) + "s"
job.finished_at = timezone.now()
#job.data_points_created = rows_succeeded
#job.data_points_failed = rows_failed
job.save()
os.remove(file_name)
os.remove(file_name)
def set_file_stats_and_validate(job: CsvImportJob, file_name: str): def set_file_stats_and_validate(job: CsvImportJob, file_name: str):
job.file_size = os.path.getsize(file_name) job.file_size = os.path.getsize(file_name)
...@@ -94,55 +76,3 @@ def set_file_stats_and_validate(job: CsvImportJob, file_name: str): ...@@ -94,55 +76,3 @@ def set_file_stats_and_validate(job: CsvImportJob, file_name: str):
if job.num_rows > ROW_LIMIT: if job.num_rows > ROW_LIMIT:
job.validation_error = 'file exceeds maximum number of rows: {}'.format(ROW_LIMIT) job.validation_error = 'file exceeds maximum number of rows: {}'.format(ROW_LIMIT)
def is_row_included(row, include_criteria: list[CsvIncludeCriteria]) -> bool:
if len(include_criteria) > 0:
for criteria in include_criteria:
if row[criteria.col_num] == criteria.text_value:
return True
return False
else:
return True
def create_point_data(job: CsvImportJob, row):
p: CsvParser = job.bucket.csv_parser
point_data = PointData(
import_job = job,
thing_name = row[p.station_col_num],
location_name = row[p.station_col_num],
coord_lat = row[p.lat_col_num],
coord_lon = row[p.lon_col_num],
# geometry = '',
property = row[p.property_col_num],
# sensor = '',
result_value = row[p.value_col_num],
result_unit = row[p.unit_col_num],
result_time = row[p.time_col_num]
)
return point_data
def create_extended_data(point_data: PointData, extra_columns: list[CsvParserExtraColumn], thing_props: list[StaThingProperty], row):
result = []
for prop in thing_props:
extended_data = ExtendedPointData(
point_data = point_data,
related_entity = STA_THING,
name = prop.property_key,
value = prop.property_value
)
result.append(extended_data)
for column in extra_columns:
extended_data = ExtendedPointData(
point_data = point_data,
related_entity = column.related_entity,
name = column.col_name,
value = row[column.col_num]
)
result.append(extended_data)
return result