Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • ufz-sdi/spatialio
1 result
Show changes
Commits on Source (17)
Showing
with 1141 additions and 161 deletions
......@@ -74,6 +74,9 @@
#### VueJS 3
- https://v3.vuejs.org/guide/introduction.html
#### Pinia-Store
- https://pinia.vuejs.org/getting-started.html
#### Typescript
- https://www.typescriptlang.org/
- https://v3.vuejs.org/guide/typescript-support.html
......@@ -107,6 +110,19 @@
### GeoServer
#### Setup
- https://docs.geoserver.org/main/en/user/production/index.html
- https://geoserver.geosolutionsgroup.com/edu/en/adv_gsconfig/gsproduction.html
#### WFS
- https://docs.geoserver.org/main/en/user/services/wfs/basics.html
- https://docs.geoserver.org/main/en/user/services/wfs/reference.html
- https://docs.geoserver.org/2.22.x/en/user/extensions/querylayer/index.html
- https://docs.ogc.org/is/09-025r2/09-025r2.html#50
- https://docs.geoserver.org/main/en/user/tutorials/cql/cql_tutorial.html#cql-tutorial
- https://gis.stackexchange.com/questions/132229/cql-filter-that-joins-two-feature-types-in-wfs
- https://docs.geoserver.org/latest/en/user/filter/function_reference.html#filter-function-reference
- Python library for using the api: https://pypi.org/project/geoserver-restconfig/
### Django / Backend
......
......@@ -9,6 +9,7 @@ class CsvImportJobAdmin(admin.ModelAdmin):
list_display = (
's3_file',
'bucket',
'target',
'is_processed',
'is_running',
'is_success',
......@@ -19,7 +20,7 @@ class CsvImportJobAdmin(admin.ModelAdmin):
fieldsets = [
(None, {
'fields': (('bucket'), ('s3_file', 'file_size', 'num_rows'), ('time_created', 'is_running', 'is_processed'), ('started_at', 'finished_at', 'execution_time' )),
'fields': (('bucket'), ('s3_file', 'file_size', 'num_rows'), 'target', ('time_created', 'is_running', 'is_processed'), ('started_at', 'finished_at', 'execution_time' )),
}),
('Results', {
'fields': ('is_success', ('data_points_created', 'data_points_failed'), 'validation_error', ),
......
......@@ -29,13 +29,18 @@ class CsvParserAdmin(DjangoTabbedChangeformAdmin, admin.ModelAdmin):
'classes': ('tab-basic',),
}),
(None, {
'fields': ('group', ),
'fields': ('location_id_col_num', 'property_id_col_num', 'country_col_num', 'community_col_num' ),
'classes': ('tab-optional',),
}),
(None, {
'fields': ('target', 'table_prefix', 'group', ),
'classes': ('tab-group',),
}),
]
tabs = [
("Basic Information", ["tab-basic"]),
("Optional Information", ["tab-optional"]),
("Extra-Columns", ["tab-extra-columns-inline"]),
("Include-Criteria", ["tab-include-criteria-inline"]),
("Group", ["tab-group"]),
......
This diff is collapsed.
import requests
import csv
from data_import.models import CsvImportJob, ExtendedPointData, PointData # noqa
from data_import.models import CsvParser, CsvImportJob, CsvParserExtraColumn, CsvIncludeCriteria, PointData, ExtendedPointData # noqa
from main.models import StaThingProperty # noqa
from data_import.models import STA_THING # noqa
from data_import.lib.utils import is_row_included # noqa
class StaApi:
class StaImporter:
def __init__(self, job: CsvImportJob):
self.error = False
......@@ -14,6 +19,42 @@ class StaApi:
self.sta_url = self.sta_endpoint.base_url.rstrip('/') + '/v1.1/'
self.auth = (self.sta_endpoint.username, self.sta_endpoint.password)
def import_csv_in_sta(self):
file_name = self.import_job.s3_file.split('/')[-1]
with open(file_name, newline='') as csvfile:
reader = csv.reader(csvfile, delimiter=',')
next(reader) # skip header
thing_props = StaThingProperty.objects.filter(endpoint=self.import_job.bucket.sta_endpoint)
extra_columns = CsvParserExtraColumn.objects.filter(parser=self.import_job.bucket.csv_parser)
include_criteria = CsvIncludeCriteria.objects.filter(parser=self.import_job.bucket.csv_parser)
rows_succeeded = 0
rows_failed = 0
for row in reader:
if is_row_included(row, include_criteria):
point_data = create_point_data(self.import_job, row)
extended_data = create_extended_data(point_data, extra_columns, thing_props, row)
self.import_point_data(point_data, extended_data)
if self.error is True:
point_data.validation_error = "\n".join(self.logs)
point_data.save()
for data in extended_data:
data.save()
rows_failed += 1
else:
rows_succeeded += 1
self.error = False
self.logs = []
def import_point_data(self, point_data: PointData, extended_point_data: list[ExtendedPointData]):
location = self.get_location_json(point_data)
......@@ -125,7 +166,6 @@ class StaApi:
print('error {}: {}'.format(route, content))
return False
def get_thing_json(self, point_data: PointData, extended_data: list[ExtendedPointData]):
thing = {
"name": point_data.thing_name,
......@@ -213,3 +253,46 @@ def sanitize_str(text: str):
result = text.replace("'", "''")
result = result.replace("+", "%2b")
return result.replace("/", "%2F")
def create_point_data(job: CsvImportJob, row):
p: CsvParser = job.bucket.csv_parser
point_data = PointData(
import_job = job,
thing_name = row[p.station_col_num],
location_name = row[p.station_col_num],
coord_lat = row[p.lat_col_num],
coord_lon = row[p.lon_col_num],
# geometry = '',
property = row[p.property_col_num],
# sensor = '',
result_value = row[p.value_col_num],
result_unit = row[p.unit_col_num],
result_time = row[p.time_col_num]
)
return point_data
def create_extended_data(point_data: PointData, extra_columns: list[CsvParserExtraColumn], thing_props: list[StaThingProperty], row):
result = []
for prop in thing_props:
extended_data = ExtendedPointData(
point_data = point_data,
related_entity = STA_THING,
name = prop.property_key,
value = prop.property_value
)
result.append(extended_data)
for column in extra_columns:
extended_data = ExtendedPointData(
point_data = point_data,
related_entity = column.related_entity,
name = column.col_name,
value = row[column.col_num]
)
result.append(extended_data)
return result
from data_import.models import CsvIncludeCriteria # noqa
def is_row_included(row, include_criteria: list[CsvIncludeCriteria]) -> bool:
if len(include_criteria) > 0:
for criteria in include_criteria:
if row[criteria.col_num] == criteria.text_value:
return True
return False
else:
return True
# Generated by Django 4.2.16 on 2024-11-15 10:25
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('data_import', '0003_csvimportjob_finished_at_csvimportjob_started_at_and_more'),
]
operations = [
migrations.CreateModel(
name='WfsImportRecord',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('location_id', models.CharField(blank=True, max_length=1000, null=True)),
('location_name', models.CharField(blank=True, max_length=1000, null=True)),
('type_id', models.CharField(blank=True, max_length=1000, null=True)),
('type_name', models.CharField(blank=True, max_length=1000, null=True)),
('period_start', models.CharField(blank=True, max_length=1000, null=True)),
('period_end', models.CharField(blank=True, max_length=1000, null=True)),
('lat', models.CharField(blank=True, max_length=1000, null=True)),
('lon', models.CharField(blank=True, max_length=1000, null=True)),
('srid', models.CharField(blank=True, max_length=1000, null=True)),
('geom', models.CharField(blank=True, max_length=4000, null=True)),
('gtype', models.CharField(blank=True, max_length=1000, null=True)),
('country', models.CharField(blank=True, max_length=1000, null=True)),
('community', models.CharField(blank=True, max_length=1000, null=True)),
('corine', models.CharField(blank=True, max_length=1000, null=True)),
('value', models.CharField(blank=True, max_length=1000, null=True)),
('date', models.CharField(blank=True, max_length=1000, null=True)),
('unit', models.CharField(blank=True, max_length=1000, null=True)),
('property_name', models.CharField(blank=True, max_length=1000, null=True)),
('property_id', models.CharField(blank=True, max_length=1000, null=True)),
('location_properties', models.JSONField(blank=True, null=True)),
('property_properties', models.JSONField(blank=True, null=True)),
('record_properties', models.JSONField(blank=True, null=True)),
('bucket_name', models.CharField(max_length=1000)),
('import_job', models.PositiveIntegerField()),
('created_at', models.DateTimeField(auto_now_add=True)),
],
),
migrations.AddField(
model_name='csvimportjob',
name='target',
field=models.CharField(blank=True, max_length=1000, null=True),
),
migrations.AddField(
model_name='csvparser',
name='community_col_num',
field=models.IntegerField(blank=True, null=True),
),
migrations.AddField(
model_name='csvparser',
name='country_col_num',
field=models.IntegerField(blank=True, null=True),
),
migrations.AddField(
model_name='csvparser',
name='location_id_col_num',
field=models.IntegerField(blank=True, null=True),
),
migrations.AddField(
model_name='csvparser',
name='property_id_col_num',
field=models.IntegerField(blank=True, null=True),
),
migrations.AddField(
model_name='csvparser',
name='target',
field=models.CharField(choices=[('WFS', 'Web Feature Service'), ('STA', 'Sensorthings API')], default='WFS', max_length=20),
),
]
# Generated by Django 4.2.16 on 2024-12-13 13:39
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('data_import', '0004_wfsimportrecord_csvimportjob_target_and_more'),
]
operations = [
migrations.AddField(
model_name='csvparserextracolumn',
name='type_in_db',
field=models.CharField(choices=[('varchar', 'string'), ('integer', 'integer'), ('double precision', 'float'), ('timestamp', 'date')], default='varchar', max_length=20),
),
]
# Generated by Django 4.2.16 on 2024-12-13 16:21
import data_import.models
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('data_import', '0005_csvparserextracolumn_type_in_db'),
]
operations = [
migrations.AddField(
model_name='csvparser',
name='table_prefix',
field=models.CharField(blank=True, max_length=20, null=True, unique=True, validators=[data_import.models.validate_prefix]),
),
]
# Generated by Django 4.2.16 on 2024-12-13 16:30
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('data_import', '0006_csvparser_table_prefix'),
]
operations = [
migrations.RemoveField(
model_name='wfsimportrecord',
name='type_id',
),
]
from django.db import models # noqa
from django.contrib.auth.models import Group # noqa
import re
from django.core.exceptions import ValidationError # noqa
STA_THING = 'thing'
STA_PROPERTY = 'property'
STA_OBSERVATION = 'observation'
STA_COLUMN_TYPES = (
CSV_ENTITIES = (
(STA_THING, 'thing'),
('property', 'property'),
('observation', 'observation'),
(STA_PROPERTY, 'property'),
(STA_OBSERVATION, 'observation'),
)
DB_COLUMN_TYPES = (
('varchar', 'string'),
('integer', 'integer'),
('double precision', 'float'),
('timestamp', 'date'),
)
WFS = 'WFS'
STA = 'STA'
TARGETS = (
(WFS, 'Web Feature Service'),
(STA, 'Sensorthings API'),
)
def validate_prefix(value):
if len(value) < 3 or len(value) > 20:
raise ValidationError("Length must be between 3 and 20 characters.")
if not re.match(r'^[a-z]*$', value):
raise ValidationError('Only lowercase-letters are allowed.')
class CsvParser(models.Model):
lat_col_num = models.IntegerField()
lon_col_num = models.IntegerField()
......@@ -18,6 +44,12 @@ class CsvParser(models.Model):
value_col_num = models.IntegerField()
unit_col_num = models.IntegerField()
time_col_num = models.IntegerField()
location_id_col_num = models.IntegerField(blank=True, null=True)
property_id_col_num = models.IntegerField(blank=True, null=True)
country_col_num = models.IntegerField(blank=True, null=True)
community_col_num = models.IntegerField(blank=True, null=True)
table_prefix = models.CharField(max_length=20, blank=True, null=True, unique=True, validators=[validate_prefix])
target = models.CharField(max_length=20, choices=TARGETS, default=WFS)
group = models.ForeignKey(Group, on_delete=models.CASCADE, null=True)
class Meta:
......@@ -25,13 +57,14 @@ class CsvParser(models.Model):
verbose_name_plural = "CSV-Parser"
def __str__(self):
return 'Parser ' + str(self.id)
return '{} Parser ({}) {}'.format(self.target, str(self.group), str(self.id))
class CsvParserExtraColumn(models.Model):
parser = models.ForeignKey(CsvParser, on_delete=models.CASCADE)
related_entity = models.CharField(max_length=20, choices=STA_COLUMN_TYPES, default=STA_THING)
related_entity = models.CharField(max_length=20, choices=CSV_ENTITIES, default=STA_THING)
col_num = models.IntegerField()
col_name = models.CharField(max_length=100)
type_in_db = models.CharField(max_length=20, choices=DB_COLUMN_TYPES, default='varchar')
class CsvIncludeCriteria(models.Model):
parser = models.ForeignKey(CsvParser, on_delete=models.CASCADE)
......@@ -54,6 +87,7 @@ class CsvImportJob(models.Model):
created_at = models.DateTimeField(auto_now_add=True)
started_at = models.DateTimeField(blank=True, null=True)
finished_at = models.DateTimeField(blank=True, null=True)
target = models.CharField(max_length=1000, blank=True, null=True)
class Meta:
verbose_name = "CSV-Import-Job"
......@@ -81,10 +115,37 @@ class PointData(models.Model):
class ExtendedPointData(models.Model):
point_data = models.ForeignKey(PointData, on_delete=models.CASCADE)
related_entity = models.CharField(max_length=20, choices=STA_COLUMN_TYPES, default='thing')
related_entity = models.CharField(max_length=20, choices=CSV_ENTITIES, default='thing')
name = models.CharField(max_length=100)
value = models.CharField(max_length=1000)
class Meta:
verbose_name = "Additional Value"
verbose_name_plural = "Additional Values"
class WfsImportRecord(models.Model):
location_id = models.CharField(max_length=1000, blank=True, null=True)
location_name = models.CharField(max_length=1000, blank=True, null=True)
type_name = models.CharField(max_length=1000, blank=True, null=True)
period_start = models.CharField(max_length=1000, blank=True, null=True)
period_end = models.CharField(max_length=1000, blank=True, null=True)
lat = models.CharField(max_length=1000, blank=True, null=True)
lon = models.CharField(max_length=1000, blank=True, null=True)
srid = models.CharField(max_length=1000, blank=True, null=True)
geom = models.CharField(max_length=4000, blank=True, null=True)
gtype = models.CharField(max_length=1000, blank=True, null=True)
country = models.CharField(max_length=1000, blank=True, null=True)
community = models.CharField(max_length=1000, blank=True, null=True)
corine = models.CharField(max_length=1000, blank=True, null=True)
value = models.CharField(max_length=1000, blank=True, null=True)
date = models.CharField(max_length=1000, blank=True, null=True)
unit = models.CharField(max_length=1000, blank=True, null=True)
property_name = models.CharField(max_length=1000, blank=True, null=True)
property_id = models.CharField(max_length=1000, blank=True, null=True)
location_properties = models.JSONField(blank=True, null=True)
property_properties = models.JSONField(blank=True, null=True)
record_properties = models.JSONField(blank=True, null=True)
bucket_name = models.CharField(max_length=1000)
import_job = models.PositiveIntegerField()
created_at = models.DateTimeField(auto_now_add=True)
\ No newline at end of file
......@@ -100,7 +100,19 @@ DATABASES = {
'HOST': os.environ.get('POSTGRES_HOST'),
'PORT': 5432,
'OPTIONS': {'sslmode': os.environ.get('POSTGRES_SSLMODE')},
}
},
"geoserver": {
'ENGINE': 'django.db.backends.postgresql',
"NAME": os.environ.get('POSTGRES_DB'),
"USER": "admin",
"PASSWORD": "admin",
'HOST': os.environ.get('POSTGRES_HOST'),
'PORT': 5432,
'OPTIONS': {
'sslmode': os.environ.get('POSTGRES_SSLMODE'),
'options': '-c search_path=geoserver,postgis'
},
},
}
......
......@@ -16,7 +16,7 @@ Including another URLconf
from django.contrib import admin
from django.urls import include, path, re_path
from main.views import views, aggregation_requests, download_requests, timeseries_request, sta_requests
from main.views import views, aggregation_requests, download_requests, timeseries_request, sta_requests, wfs_requests # noqa
from main.about import basic_site
from health_check.views import get_health
......@@ -28,13 +28,13 @@ urlpatterns = [
path('gdi-backend/download/zip/<record_id>', download_requests.get_zip_download, name='zipDownload'),
path('gdi-backend/countries', aggregation_requests.get_countries, name='countries'),
re_path(r'^gdi-backend/proxy', views.proxy_geoserver_request, name='proxy'),
path('gdi-backend/sta-thing-count/<int:sta_layer_id>', sta_requests.proxy_sta_thing_count, name='sta-thing-count'),
path('gdi-backend/sta-locations/<int:sta_layer_id>', sta_requests.proxy_sta_locations, name='sta-locations'),
path('gdi-backend/sta-obs-properties/<int:sta_layer_id>', sta_requests.proxy_sta_obs_properties, name='sta-obs-properties'),
path('gdi-backend/sta-thing/<int:sta_endpoint_id>/<int:thing_id>', sta_requests.proxy_sta_thing, name='sta-thing'),
path('gdi-backend/sta-obs-properties-by-thing/<int:sta_layer_id>/<int:thing_id>', sta_requests.proxy_sta_obs_properties_by_thing, name='sta-obs-properties-by-thing'),
path('gdi-backend/sta-datastream-properties/<int:sta_endpoint_id>/<int:thing_id>/<int:obs_property_id>', sta_requests.proxy_sta_datastream_properties, name='sta-datastream-properties'),
path('gdi-backend/sta-all-datastreams/<int:sta_endpoint_id>/<int:thing_id>/<int:obs_property_id>', sta_requests.proxy_sta_all_datastreams, name='sta-all-datastreams'),
path('gdi-backend/wfs-features/<str:workspace>/<str:prefix>', wfs_requests.wfs_features, name='wfs-features'),
re_path(r'^gdi-backend/timeseries/(?P<locale>[a-z]{2})', timeseries_request.request_all_time_series, name='request_time_series'),
re_path(r'^gdi-backend/process', aggregation_requests.call_process, name='aggregation'),
re_path(r'^gdi-backend/districts/(?P<country_id>[0-9][0-9])$', aggregation_requests.get_districts_by_country, name='districts'),
......
......@@ -19,6 +19,7 @@
"connect_to_geoserver": true,
"connect_to_thredds": true,
"connect_to_sta": true,
"connect_to_wfs": false,
"sta_endpoint": 1,
"csv_parser": 1,
"public_folder": false,
......@@ -38,12 +39,14 @@
"connect_to_geoserver": false,
"connect_to_thredds": false,
"connect_to_sta": false,
"connect_to_wfs": true,
"sta_endpoint": null,
"csv_parser": null,
"csv_parser": 2,
"public_folder": false,
"quota": 100,
"size_unit": "gi",
"wms_file_suffix": null
"wms_file_suffix": null,
"wfs_file_suffix": null
}
},
{
......@@ -136,8 +139,8 @@
"model": "main.area",
"pk": 2,
"fields": {
"name_de": "Zeitreihendaten",
"name_en": "Timeseries data",
"name_de": "Punktdaten",
"name_en": "Point Data",
"project": 1,
"position": 2,
"is_active": true,
......@@ -738,17 +741,17 @@
"model": "main.wfslayer",
"pk": 1,
"fields": {
"name_de": "Köln",
"name_en": "Cologne",
"name_de": "Chemikalien",
"name_en": "Chemicals",
"info_de": "",
"info_en": "",
"time_format": null,
"y_axis_min": null,
"y_axis_max": null,
"bucket": 1,
"wfs_url": "http://localhost:5001/gdi-backend/proxy/test-bucket/wfs",
"layer_name": "test-bucket:cologne_polygon",
"file_path": "shp/cologne_polygon_shp.zip"
"bucket": 2,
"workspace": "test-bucket",
"prefix": "test_bucket",
"file_path": "chemicals/*.csv"
}
},
{
......@@ -763,7 +766,24 @@
"y_axis_min": null,
"y_axis_max": null,
"bucket": 1,
"file_path": "public/bw.json"
"file_path": "public/bw.json",
"url": "http://localhost:9000/test-bucket/public/bw.json"
}
},
{
"model": "main.geojsonlayer",
"pk": 2,
"fields": {
"name_de": "Köln",
"name_en": "Cologne",
"info_de": "",
"info_en": "",
"time_format": null,
"y_axis_min": null,
"y_axis_max": null,
"bucket": 1,
"file_path": "shp/cologne_polygon_shp.zip",
"url": "http://localhost:5001/gdi-backend/proxy/test-bucket/ows?service=WFS&version=2.0.0&request=GetFeature&typename=test-bucket:cologne_polygon&outputFormat=application/json&srsname=EPSG:3857"
}
},
{
......@@ -836,6 +856,20 @@
"wfs_layer": 1
}
},
{
"model": "main.arealayer",
"pk": 6,
"fields": {
"area": 2,
"position": 1,
"is_active": true,
"is_default_on_start": false,
"wms_layer": null,
"sta_layer": null,
"geojson_layer": 2,
"wfs_layer": null
}
},
{
"model": "main.wmslayerlegend",
"pk": 1,
......@@ -1229,15 +1263,15 @@
"record_id": "zyxwvmkcO9Ss26kiROhGIXQzvMr7L7gF",
"publisher": 1,
"version": "1.0",
"language": "en",
"language": null,
"format": 3,
"license": 1,
"title": "Drought Monitor",
"created_at": "2024-09-27T17:46:36.247Z",
"updated_at": "2024-09-30T09:41:35.295Z",
"updated_at": "2024-11-29T11:04:29.173Z",
"published_at": "2024-09-30",
"abstract": "Test",
"keywords": "[\"Drought\", \"Environment\", \"Soil\"]"
"abstract": "test",
"keywords": "[\"Drought\", \"Germany\"]"
}
},
{
......@@ -1295,6 +1329,31 @@
"value_col_num": 3,
"unit_col_num": 5,
"time_col_num": 4,
"location_id_col_num": null,
"property_id_col_num": null,
"country_col_num": null,
"community_col_num": null,
"target": "STA",
"group": 1
}
},
{
"model": "data_import.csvparser",
"pk": 2,
"fields": {
"lat_col_num": 25,
"lon_col_num": 26,
"station_col_num": 27,
"property_col_num": 2,
"value_col_num": 17,
"unit_col_num": 18,
"time_col_num": 21,
"location_id_col_num": null,
"property_id_col_num": 1,
"country_col_num": 28,
"community_col_num": null,
"target": "WFS",
"table_prefix": "chemicals",
"group": 1
}
},
......@@ -1308,6 +1367,95 @@
"col_name": "station-id"
}
},
{
"model": "data_import.csvparserextracolumn",
"pk": 2,
"fields": {
"parser": 2,
"related_entity": "thing",
"col_num": 4,
"col_name": "river_km"
}
},
{
"model": "data_import.csvparserextracolumn",
"pk": 3,
"fields": {
"parser": 2,
"related_entity": "observation",
"col_num": 15,
"col_name": "concentration_data"
}
},
{
"model": "data_import.csvparserextracolumn",
"pk": 4,
"fields": {
"parser": 2,
"related_entity": "observation",
"col_num": 16,
"col_name": "concentration_individual"
}
},
{
"model": "data_import.csvparserextracolumn",
"pk": 5,
"fields": {
"parser": 2,
"related_entity": "observation",
"col_num": 19,
"col_name": "sampling_depth_type"
}
},
{
"model": "data_import.csvparserextracolumn",
"pk": 6,
"fields": {
"parser": 2,
"related_entity": "observation",
"col_num": 20,
"col_name": "sampling_depth"
}
},
{
"model": "data_import.csvparserextracolumn",
"pk": 7,
"fields": {
"parser": 2,
"related_entity": "observation",
"col_num": 22,
"col_name": "IDX_l"
}
},
{
"model": "data_import.csvparserextracolumn",
"pk": 8,
"fields": {
"parser": 2,
"related_entity": "thing",
"col_num": 29,
"col_name": "water_body"
}
},
{
"model": "data_import.csvparserextracolumn",
"pk": 9,
"fields": {
"parser": 2,
"related_entity": "thing",
"col_num": 30,
"col_name": "river_basin"
}
},
{
"model": "data_import.csvincludecriteria",
"pk": 1,
"fields": {
"parser": 2,
"col_num": 31,
"text_value": "TRUE"
}
},
{
"model": "admin_interface.theme",
"pk": 1,
......
......@@ -99,7 +99,7 @@ class BucketAdmin(admin.ModelAdmin):
model = Bucket
form = BucketForm
inlines = [BucketUserInline]
list_display = ('name', 'group', 'connect_to_geoserver', 'connect_to_thredds', 'connect_to_sta', 'link_to_event_logs')
list_display = ('name', 'group', 'connect_to_geoserver', 'connect_to_thredds', 'connect_to_sta', 'connect_to_wfs', 'link_to_event_logs')
ordering = ('name',)
fieldsets = [
......@@ -107,7 +107,7 @@ class BucketAdmin(admin.ModelAdmin):
'fields': ('name', 'group', 'public_folder', ('quota', 'size_unit'), ),
}),
(None, {
'fields': (('connect_to_geoserver', 'wms_file_suffix', 'wfs_file_suffix'), 'connect_to_thredds', ('connect_to_sta', 'sta_endpoint', 'csv_parser'), ),
'fields': (('connect_to_geoserver', 'wms_file_suffix', 'wfs_file_suffix'), 'connect_to_thredds', ('connect_to_sta', 'sta_endpoint'), 'connect_to_wfs', 'csv_parser', ),
})
]
......
......@@ -17,7 +17,7 @@ class GeojsonLayerAdmin(DjangoTabbedChangeformAdmin, admin.ModelAdmin):
fieldsets = [
(None, {
'fields': (('name_de', 'name_en'), 'bucket', 'file_path' ),
'fields': (('name_de', 'name_en'), 'bucket', 'url', 'file_path' ),
'classes': ('tab-basic',),
}),
(None, {
......
......@@ -11,11 +11,11 @@ from django.forms import Textarea # noqa
class WfsLayerAdmin(DjangoTabbedChangeformAdmin, admin.ModelAdmin):
model = WfsLayer
list_display = ('__str__', 'layer_name')
list_display = ('__str__', 'workspace', 'prefix')
save_as = True
inlines = [FkAreaLayerInline ]
search_fields = ["layer_name", "name_de", "name_en", "info_de", "info_en", "wfs_url"]
search_fields = ["name_de", "name_en", "info_de", "info_en", "workspace", "prefix"]
formfield_overrides = {
models.TextField: {'widget': Textarea(
......@@ -29,7 +29,7 @@ class WfsLayerAdmin(DjangoTabbedChangeformAdmin, admin.ModelAdmin):
'classes': ('tab-basic',),
}),
(None, {
'fields': ('layer_name', 'wfs_url'),
'fields': ('workspace', 'prefix'),
'classes': ('tab-wfs',),
}),
(None, {
......@@ -46,7 +46,7 @@ class WfsLayerAdmin(DjangoTabbedChangeformAdmin, admin.ModelAdmin):
]
def get_readonly_fields(self, request, obj):
fields = ['bucket', 'wfs_url', 'layer_name', 'file_path']
fields = ['bucket', 'file_path']
if request.user.is_superuser:
return []
return fields
......
import os
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from requests.packages.urllib3.util.retry import Retry # noqa
import json
from geoserver.catalog import Catalog
from owslib.wms import WebMapService
from geoserver.catalog import Catalog # noqa
from owslib.wms import WebMapService # noqa
GEOSERVER_MOSAIC_DIR = "/opt/geoserver/data_dir/mosaic"
......@@ -25,9 +25,10 @@ class GeoServerApi:
self.logs = []
self.has_error = False
self.has_timeout = False
retry_strategy = Retry(
total=10,
total=3,
backoff_factor=2,
allowed_methods=['DELETE', 'GET', 'HEAD', 'OPTIONS', 'PUT', 'POST']
)
......@@ -43,6 +44,7 @@ class GeoServerApi:
def reset(self):
self.logs = []
self.has_error = False
self.has_timeout = False
def create_workspace(self, workspace_name):
# check for existing workspace
......@@ -58,7 +60,7 @@ class GeoServerApi:
else:
self.log("Workspace already exists. Existing Workspace will be used: {}".format(workspace_name))
def create_datastore(self, workspace, data_store, object_url, type, store_type='coveragestores'):
def create_datastore(self, workspace, data_store, object_url, data_type, store_type='coveragestores'):
self.create_workspace(workspace)
......@@ -67,25 +69,28 @@ class GeoServerApi:
else:
headers = {"Content-type": "text/plain"}
url = '{0}workspaces/{1}/{2}/{3}/{4}'.format(
self.url, workspace, store_type, data_store, type)
self.url, workspace, store_type, data_store, data_type)
try:
response = self.session.put(url, data=object_url, auth=(self.username, self.password), headers=headers, timeout=10)
response = self.session.put(url, data=object_url, auth=(self.username, self.password), headers=headers, timeout=5)
if response.status_code != 201:
self.log("Create status not ok:")
self.log(response.content)
self.has_error = True
else:
self.log("Datastore {} created successfully".format(data_store))
except requests.exceptions.RetryError:
self.log("Timeout, datastore could not be created: {}".format(data_store))
self.has_timeout = True
except Exception as e:
print("Datastore could not be created: {}".format(e))
self.log("Error, datastore could not be created: {}".format(e))
self.has_error = True
def _create_new_style(self, style_name, style_xml):
def _create_new_style(self, style_xml):
try:
headers = {"Content-type": "application/vnd.ogc.sld+xml"}
url = '{0}styles?raw=true'.format(self.url)
response = self.session.post(url=url, auth=(self.username, self.password), headers=headers, data=style_xml, timeout=10)
response = self.session.post(url=url, auth=(self.username, self.password), headers=headers, data=style_xml, timeout=5)
if response.status_code != 201:
print('Style could not be created: {}'.format(response.content))
self.has_error = True
......@@ -99,7 +104,7 @@ class GeoServerApi:
try:
headers = {"Content-type": "application/vnd.ogc.sld+xml"}
url = '{0}styles/{1}?raw=true'.format(self.url, style_name)
response = self.session.put(url=url, auth=(self.username, self.password), headers=headers, data=style_xml, timeout=10)
response = self.session.put(url=url, auth=(self.username, self.password), headers=headers, data=style_xml, timeout=5)
if response.status_code != 200:
print('Style could not be updated: {}'.format(response.content))
self.has_error = True
......@@ -112,13 +117,13 @@ class GeoServerApi:
def create_style(self, style_name, style_xml):
try:
url = '{0}styles/{1}.sld'.format(self.url, style_name)
response = self.session.get(url=url, auth=(self.username, self.password), timeout=10)
response = self.session.get(url=url, auth=(self.username, self.password), timeout=5)
style_xml = style_xml.encode('utf-8')
if response.status_code != 200:
print('Style does not exist yet. Style will be added as new.')
self._create_new_style(style_name, style_xml)
self._create_new_style(style_xml)
else:
print('Update existing style')
self._update_style(style_name, style_xml)
......@@ -139,7 +144,7 @@ class GeoServerApi:
}
try:
response = self.session.post(url, data=json.dumps(body), auth=(self.username, self.password), headers=headers, timeout=10)
response = self.session.post(url, data=json.dumps(body), auth=(self.username, self.password), headers=headers, timeout=5)
if response.status_code != 201:
print("Create status not ok: {}".format(response.content))
......@@ -177,41 +182,43 @@ class GeoServerApi:
"</coverage>"
)
try:
response = self.session.put(url, data=time_data, auth=(self.username, self.password), headers=headers, timeout=10)
response = self.session.put(url, data=time_data, auth=(self.username, self.password), headers=headers, timeout=5)
if response.status_code not in [200, 201]:
self.log("Time dimension status for coverage {} not ok: {}".format(native_name, response.content))
self.has_error = True
else:
self.log("Time dimension for coverage {} enabled successfully".format(native_name))
except requests.exceptions.RetryError:
self.log("Timeout, coverage_store could not be configured: {}".format(coverage_store))
self.has_timeout = True
except Exception as e:
self.log("Time dimension for coverage {} could not be enabled: {}".format(native_name, e))
self.has_error = True
def update_timesteps_and_save(self, layer, wms_name):
wms_url = os.environ.get("GEOSERVER_URL") + '/ows?service=wms&version=1.3.0&request=GetCapabilities'
def get_wms_timesteps(self, workspace, datastore_name):
wms_url = '{}/{}/ows?service=wms&version=1.3.0&request=GetCapabilities'.format(os.environ.get("GEOSERVER_URL"), workspace)
try:
wms = WebMapService(wms_url, version='1.3.0')
# iterate all available wms 'LAYERS' which exist in Geoserver
# iterate all available wms 'LAYERS' which exist in the workspace
for wms_variable in list(wms.contents):
if wms_name == wms_variable:
time_steps = wms.contents[wms_name].timepositions
if datastore_name == wms_variable:
time_steps = wms.contents[datastore_name].timepositions
if time_steps is None:
layer.time_steps = []
return []
else:
layer.time_steps = time_steps
break
layer.save()
self.log("Layer-timesteps fetched")
return time_steps
except Exception as e:
print("Could not update layer {}: {}".format(layer.id, str(e)))
self.log("Could not fetch timesteps {}: {}".format(datastore_name, str(e)))
self.has_error = True
def delete_datastore(self, workspace, data_store, store_type='coveragestores'):
url = "{0}workspaces/{1}/{2}/{3}?recurse=true".format(self.url, workspace, store_type, data_store)
try:
response = self.session.delete(url, auth=(self.username, self.password), timeout=10)
response = self.session.delete(url, auth=(self.username, self.password), timeout=5)
if response.status_code not in [200, 201]:
self.log("Error in deleting datastore {}: {}".format(data_store, response.content))
self.has_error = True
......
......@@ -40,7 +40,7 @@ def is_wms_file(bucket: Bucket, file: File):
return True
def is_wfs_file(bucket: Bucket, file: File):
def is_zip_file_for_import(bucket: Bucket, file: File):
suffix = bucket.wfs_file_suffix
if suffix:
for file_type in ["zip"]:
......
import os
import csv
import time
import datetime
from django.utils import timezone # noqa
from django.core.management.base import BaseCommand # noqa
from django.db.models import Q # noqa
from data_import.models import CsvParser, CsvImportJob, CsvParserExtraColumn, CsvIncludeCriteria, PointData, ExtendedPointData, STA_THING # noqa
from main.models import StaThingProperty # noqa
from data_import.models import CsvImportJob, PointData, CsvIncludeCriteria # noqa
from data_import.models import WFS, STA # noqa
from main.lib.utils.s3_utils import download_minio_file # noqa
from data_import.api.StaApi import StaApi # noqa
from data_import.lib.StaImporter import StaImporter # noqa
from data_import.lib.PostgisImporter import PostgisImporter # noqa
SIZE_LIMIT = 30000000 # 30 MB
......@@ -20,11 +21,12 @@ class Command(BaseCommand):
def handle(self, *args, **options):
job = CsvImportJob.objects.filter(is_processed=False, is_running=False, bucket__connect_to_sta=True).select_related('bucket__sta_endpoint', 'bucket__csv_parser').order_by('created_at').first()
job = CsvImportJob.objects.filter(Q(is_processed=False) & Q(is_running=False) & (Q(bucket__connect_to_sta=True) | Q(bucket__connect_to_wfs=True))).select_related('bucket__sta_endpoint', 'bucket__csv_parser').order_by('created_at').first()
if job:
start_time = time.time()
job.started_at = datetime.datetime.now()
job.started_at = timezone.now()
job.is_running = True
job.save()
......@@ -38,51 +40,31 @@ class Command(BaseCommand):
job.save()
return
with open(file_name, newline = '') as csvfile:
reader = csv.reader(csvfile, delimiter=',')
next(reader) # skip header
api = StaApi(job)
thing_props = StaThingProperty.objects.filter(endpoint=job.bucket.sta_endpoint)
extra_columns = CsvParserExtraColumn.objects.filter(parser=job.bucket.csv_parser)
include_criteria = CsvIncludeCriteria.objects.filter(parser=job.bucket.csv_parser)
rows_succeeded = 0
rows_failed = 0
for row in reader:
if is_row_included(row, include_criteria):
point_data = create_point_data(job, row)
extended_data = create_extended_data(point_data, extra_columns, thing_props, row)
api.import_point_data(point_data, extended_data)
if api.error is True:
point_data.validation_error = "\n".join(api.logs)
point_data.save()
for data in extended_data:
data.save()
rows_failed += 1
else:
rows_succeeded += 1
api.error = False
api.logs = []
if job.target == STA:
i = StaImporter(job)
i.import_csv_in_sta()
if not PointData.objects.filter(import_job=job).exists():
job.is_success = True
job.is_processed = True
job.is_running = False
job.execution_time = str(round((time.time() - start_time), 2)) + "s"
job.finished_at = datetime.datetime.now()
job.data_points_created = rows_succeeded
job.data_points_failed = rows_failed
job.save()
if job.target == WFS:
i = PostgisImporter(job)
i.import_csv_to_wfs()
i.write_data()
job.is_success = not i.error
if i.error is True:
job.validation_error = "\n".join(i.logs)
job.is_processed = True
job.is_running = False
job.execution_time = str(round((time.time() - start_time), 2)) + "s"
job.finished_at = timezone.now()
#job.data_points_created = rows_succeeded
#job.data_points_failed = rows_failed
job.save()
os.remove(file_name)
os.remove(file_name)
def set_file_stats_and_validate(job: CsvImportJob, file_name: str):
job.file_size = os.path.getsize(file_name)
......@@ -94,55 +76,3 @@ def set_file_stats_and_validate(job: CsvImportJob, file_name: str):
if job.num_rows > ROW_LIMIT:
job.validation_error = 'file exceeds maximum number of rows: {}'.format(ROW_LIMIT)
def is_row_included(row, include_criteria: list[CsvIncludeCriteria]) -> bool:
if len(include_criteria) > 0:
for criteria in include_criteria:
if row[criteria.col_num] == criteria.text_value:
return True
return False
else:
return True
def create_point_data(job: CsvImportJob, row):
p: CsvParser = job.bucket.csv_parser
point_data = PointData(
import_job = job,
thing_name = row[p.station_col_num],
location_name = row[p.station_col_num],
coord_lat = row[p.lat_col_num],
coord_lon = row[p.lon_col_num],
# geometry = '',
property = row[p.property_col_num],
# sensor = '',
result_value = row[p.value_col_num],
result_unit = row[p.unit_col_num],
result_time = row[p.time_col_num]
)
return point_data
def create_extended_data(point_data: PointData, extra_columns: list[CsvParserExtraColumn], thing_props: list[StaThingProperty], row):
result = []
for prop in thing_props:
extended_data = ExtendedPointData(
point_data = point_data,
related_entity = STA_THING,
name = prop.property_key,
value = prop.property_value
)
result.append(extended_data)
for column in extra_columns:
extended_data = ExtendedPointData(
point_data = point_data,
related_entity = column.related_entity,
name = column.col_name,
value = row[column.col_num]
)
result.append(extended_data)
return result