Skip to content
Snippets Groups Projects
Commit 96881df3 authored by Bert Palm's avatar Bert Palm :bug:
Browse files

copied files

parent 337c536d
No related branches found
No related tags found
1 merge request!252Integrate dispatcher
Showing
with 2156 additions and 0 deletions
*
!.gitignore
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<Context path="/{schema}" docBase="/share/FROST-Server.war">
<Parameter override="false" name="ApiVersion" value="v1.0" description="The version tag of the API used in the URL."/>
<Parameter override="false" name="serviceRootUrl" value="{tomcat_proxy_url}{schema}" description="The base URL of the SensorThings Server without version."/>
<Parameter override="false" name="defaultCount" value="false" description="The default value for the $count query option."/>
<Parameter override="false" name="defaultTop" value="100" description="The default value for the $top query option."/>
<Parameter override="false" name="maxTop" value="1000" description="The maximum allowed value for the $top query option."/>
<Parameter override="false" name="maxDataSize" value="25000000" description="The maximum allowed estimated data size (in bytes) for responses."/>
<Parameter override="false" name="bus.busImplementationClass" value="de.fraunhofer.iosb.ilt.frostserver.messagebus.InternalMessageBus" description="The java class used for connecting to the message bus."/>
<Parameter override="false" name="persistence.persistenceManagerImplementationClass" value="de.fraunhofer.iosb.ilt.frostserver.persistence.pgjooq.imp.PostgresPersistenceManagerLong" description="The java class used for persistence (must implement PersistenceManaher interface)"/>
<Parameter override="false" name="persistence.idGenerationMode" value="ServerGeneratedOnly" description="Mode for id generation when using PostgresPersistenceManagerString."/>
<Parameter override="false" name="persistence.autoUpdateDatabase" value="false" description="Automatically apply database updates."/>
<Parameter override="false" name="persistence.alwaysOrderbyId" value="false" description="Always add an 'orderby=id asc' to queries to ensure consistent paging."/>
<Parameter override="false" name="persistence.db_jndi_datasource" value="jdbc/sensorThings" description="JNDI data source name"/>
<Resource
name="jdbc/sensorThings" auth="Container"
type="javax.sql.DataSource" driverClassName="org.postgresql.Driver"
url="jdbc:{db_url}"
username="{username}" password="{password}"
maxTotal="20" maxIdle="10" maxWaitMillis="-1"
defaultAutoCommit="false"
/>
</Context>
\ No newline at end of file
BEGIN;
--- CREATE VIEW "THINGS" ---
DROP VIEW IF EXISTS "THINGS" CASCADE;
CREATE OR REPLACE VIEW "THINGS" AS
SELECT DISTINCT c.id AS "ID",
c.description AS "DESCRIPTION",
c.label AS "NAME",
jsonb_build_object(
'url', %(sms_url)s || 'configurations/' || c.id,
'pid', c.persistent_identifier,
'status', c.status,
'mobile', CASE
WHEN MAX(cdl.begin_date) IS NULL THEN 'false'
ELSE 'true'
END,
'organizations', ARRAY_AGG(DISTINCT co.organization),
'projects', ARRAY_AGG(DISTINCT c.project),
'when_dynamic', ARRAY_AGG(
DISTINCT CASE
WHEN cdl.end_date IS NULL THEN
TO_CHAR(cdl.begin_date,
'YYYY-MM-DD HH24:MI:SS TZ')
ELSE
TO_CHAR(cdl.begin_date,
'YYYY-MM-DD HH24:MI:SS TZ') ||
'/' ||
TO_CHAR(cdl.end_date, 'YYYY-MM-DD HH24:MI:SS TZ')
END
),
'when_stationary', ARRAY_AGG(
DISTINCT CASE
WHEN csl.end_date IS NULL THEN
TO_CHAR(csl.begin_date,
'YYYY-MM-DD HH24:MI:SS TZ')
ELSE
TO_CHAR(csl.begin_date,
'YYYY-MM-DD HH24:MI:SS TZ') ||
'/' ||
TO_CHAR(csl.end_date, 'YYYY-MM-DD HH24:MI:SS TZ')
END
),
'contacts', ARRAY_AGG(
DISTINCT jsonb_build_object(
'name', CONCAT(co.given_name, ' ', co.family_name),
'email', co.email,
'organization', co.organization,
'orcid', co.orcid
))
) AS "PROPERTIES"
FROM public.sms_configuration c
JOIN public.sms_configuration_contact_role ccr ON c.id = ccr.configuration_id
JOIN public.sms_contact co ON ccr.contact_id = co.id
JOIN public.sms_device_mount_action dma ON c.id = dma.configuration_id
JOIN public.sms_datastream_link dsl ON dma.id = dsl.device_mount_action_id
LEFT JOIN public.sms_configuration_dynamic_location_begin_action cdl
ON c.id = cdl.configuration_id
LEFT JOIN public.sms_configuration_static_location_begin_action csl
ON c.id = csl.configuration_id
WHERE ((cdl.configuration_id IS NOT NULL) OR (csl.configuration_id IS NOT NULL))
AND dsl.datasource_id = %(tsm_schema)s
AND c.is_public
GROUP BY c.id, c.description, c.label, c.persistent_identifier, c.status, c.is_public,
cdl.configuration_id, csl.configuration_id, dsl.datasource_id
ORDER BY c.id ASC;
--- CREATE VIEW SENSORS ---
DROP VIEW IF EXISTS "SENSORS" CASCADE;
CREATE OR REPLACE VIEW "SENSORS" AS
SELECT DISTINCT d.id AS "ID",
d.short_name AS "NAME",
d.description AS "DESCRIPTION",
'html'::text AS "ENCODING_TYPE",
%(sms_url)s || 'backend/api/v1/devices/' || d.id ||
'/sensorml' AS "METADATA",
jsonb_build_object(
'url', %(sms_url)s || 'devices/' || d.id,
'pid', d.persistent_identifier,
'type', d.device_type_name,
'contacts', ARRAY_AGG(DISTINCT jsonb_build_object(
'email', co.email,
'organization', co.organization,
'name', CONCAT(co.given_name, ' ', co.family_name),
'orcid', co.orcid
)),
'manufacturer', d.manufacturer_name,
'model', d.model,
'serialNumber', d.serial_number
) AS "PROPERTIES"
FROM public.sms_device d
JOIN public.sms_device_mount_action dma ON d.id = dma.device_id
JOIN public.sms_configuration_contact_role ccr
ON dma.configuration_id = ccr.configuration_id
JOIN public.sms_contact co ON ccr.contact_id = co.id
JOIN public.sms_datastream_link dsl ON dma.id = dsl.device_mount_action_id
WHERE dsl.datasource_id = %(tsm_schema)s
AND d.is_public
GROUP BY d.id, d.short_name, d.description, d.persistent_identifier, d.device_type_name,
d.manufacturer_name, d.model, d.serial_number, d.is_public, dsl.datasource_id
ORDER BY d.id ASC;
--- CREATE VIEW LOCATIONS ---
DROP VIEW IF EXISTS "LOCATIONS" CASCADE;
CREATE OR REPLACE VIEW "LOCATIONS" AS
SELECT DISTINCT csl.id AS "ID",
csl.label AS "NAME",
csl.begin_description AS "DESCRIPTION",
'application/geo+json'::text AS "ENCODING_TYPE",
public.ST_ASGeoJSON(
public.ST_SetSRID(
public.ST_MakePoint(csl.x, csl.y),
4326
)
) AS "LOCATION",
jsonb_build_object() AS "PROPERTIES"
FROM public.sms_configuration_static_location_begin_action csl
JOIN public.sms_configuration c ON csl.configuration_id = c.id
JOIN public.sms_device_mount_action dma ON c.id = dma.configuration_id
JOIN public.sms_datastream_link dsl ON dma.id = dsl.device_mount_action_id
WHERE dsl.datasource_id = %(tsm_schema)s
AND c.is_public
ORDER BY csl.id;
--- CREATE VIEW THINGS_LOCATIONS ---
DROP VIEW IF EXISTS "THINGS_LOCATIONS" CASCADE;
CREATE OR REPLACE VIEW "THINGS_LOCATIONS" AS
SELECT DISTINCT ON (c.id) c.id AS "THING_ID",
csl.id AS "LOCATION_ID"
FROM public.sms_configuration c
JOIN public.sms_configuration_static_location_begin_action csl
ON c.id = csl.configuration_id
JOIN public.sms_device_mount_action dma ON c.id = dma.configuration_id
JOIN public.sms_datastream_link dsl ON dma.id = dsl.device_mount_action_id
WHERE dsl.datasource_id = %(tsm_schema)s
AND c.is_public
ORDER BY c.id, csl.begin_date DESC;
--- CREATE VIEW LOCATIONS_HIST_LOCATIONS ---
DROP VIEW IF EXISTS "LOCATIONS_HIST_LOCATIONS" CASCADE;
CREATE OR REPLACE VIEW "LOCATIONS_HIST_LOCATIONS" AS
--build cte that returns configuration_ids and location_ids (configuration_static_location_begin_action.id)
WITH config_locations AS (SELECT DISTINCT c.id AS c_id,
csl.id AS csl_id
FROM public.sms_configuration c
JOIN public.sms_configuration_static_location_begin_action csl
ON c.id = csl.configuration_id
JOIN public.sms_device_mount_action dma
on c.id = dma.configuration_id
JOIN public.sms_datastream_link dsl
on dma.id = dsl.device_mount_action_id
WHERE dsl.datasource_id = %(tsm_schema)s
AND c.is_public),
-- return newest location (highest csl_id) for each configuration_id
-- might need to be decided based on timestamp.
-- for now, taking the highest id works fine
locations AS (SELECT c_id,
MAX(csl_id) AS max_csl_id
FROM config_locations
GROUP BY c_id)
-- join locations on newest locaiton per configuration id
SELECT loc.max_csl_id AS "LOCATION_ID",
cl.csl_id AS "HIST_LOCATION_ID"
-- join locations on newest location on configuration id
FROM config_locations cl
JOIN locations loc ON cl.c_id = loc.c_id
-- leave out newest location in join
WHERE cl.csl_id <> loc.max_csl_id
ORDER BY loc.max_csl_id ASC, cl.csl_id ASC
--returns hist_location_id for mapped to current location_id for each configuration_id
;
--- CREATE VIEW HIST_LOCATIONS ---
DROP VIEW IF EXISTS "HIST_LOCATIONS" CASCADE;
CREATE OR REPLACE VIEW "HIST_LOCATIONS" AS
WITH cte AS (SELECT c.id AS "THING_ID",
csl.id "ID",
csl.begin_date AS "TIME",
ROW_NUMBER()
OVER (PARTITION BY c.id ORDER BY csl.begin_date DESC) AS row_num
FROM public.sms_configuration c
JOIN public.sms_configuration_static_location_begin_action csl
ON c.id = csl.configuration_id
WHERE c.is_public)
SELECT DISTINCT "THING_ID",
"ID",
"TIME"
FROM cte
JOIN public.sms_device_mount_action dma
on cte."THING_ID" = dma.configuration_id
JOIN public.sms_datastream_link dsl on dma.id = dsl.device_mount_action_id
WHERE row_num > 1
AND dsl.datasource_id = %(tsm_schema)s;
--- CREATE VIEW FEATURE_OF_INTEREST ---
DO $$
BEGIN
IF EXISTS (
SELECT 1
FROM information_schema.tables
WHERE table_name = 'FEATURES'
AND table_schema = %(tsm_schema)s
AND table_type = 'BASE TABLE')
THEN EXECUTE 'DROP TABLE "FEATURES" CASCADE';
ELSIF EXISTS (
SELECT 1
FROM information_schema.tables
WHERE table_name = 'FEATURES'
AND table_schema = %(tsm_schema)s
AND table_type = 'VIEW'
)
THEN EXECUTE 'DROP VIEW "FEATURES" CASCADE';
END IF;
END $$;
CREATE TABLE "FEATURES" (
"ID" serial,
"NAME" text,
"DESCRIPTION" text,
"ENCODING_TYPE" text,
"FEATURE" jsonb,
"PROPERTIES" jsonb
);
--- CREATE VIEW OBSERVATIONS ---
DROP VIEW IF EXISTS "OBSERVATIONS" CASCADE;
CREATE OR REPLACE VIEW "OBSERVATIONS" AS
SELECT
o.result_boolean AS "RESULT_BOOLEAN",
o.result_quality AS "RESULT_QUALITY",
o.phenomenon_time_start AS "PHENOMENON_TIME_START",
jsonb_build_object() AS "PARAMETERS",
dsl.device_property_id AS "DATASTREAM_ID",
o.result_string AS "RESULT_STRING",
o.result_type AS "RESULT_TYPE",
o.valid_time_end AS "VALID_TIME_END",
o.phenomenon_time_end AS "PHENOMENON_TIME_END",
null AS "FEATURE_ID",
row_number() OVER () AS "ID",
o.result_json AS "RESULT_JSON",
o.result_time AS "RESULT_TIME",
o.result_number AS "RESULT_NUMBER",
o.valid_time_start AS "VALID_TIME_START"
FROM public.sms_datastream_link dsl
JOIN observation o ON o.datastream_id = dsl.datastream_id
JOIN public.sms_device_mount_action dma ON dma.id = dsl.device_mount_action_id
JOIN public.sms_device d ON d.id = dma.device_id
JOIN public.sms_configuration c ON c.id = dma.configuration_id
WHERE c.is_public AND d.is_public AND dsl.datasource_id = %(tsm_schema)s;
--- CREATE VIEW DATASTREAMS ---
DROP VIEW IF EXISTS "DATASTREAMS" CASCADE;
CREATE OR REPLACE VIEW "DATASTREAMS" AS SELECT
dsl.device_property_id AS "ID",
CONCAT(
c.label, '_',
d.short_name, '_',
dp.property_name, '_',
dma.offset_z, '_',
dp.aggregation_type_name) AS "NAME",
CONCAT(
d.short_name, '_',
dp.property_name, '_',
dma.offset_z, ' at site ',
c.label, ' with aggregation function ',
dp.aggregation_type_name) AS "DESCRIPTION",
c.id AS "THING_ID",
d.id AS "SENSOR_ID",
'OM_Observation' AS "OBSERVATION_TYPE",
dma.begin_date AS "PHENOMENON_TIME_START",
dma.begin_date AS "RESULT_TIME_START",
dma.end_date AS "PHENOMENON_TIME_END",
dma.end_date AS "RESULT_TIME_END",
-- we don't provide an observed area, as this is really expensive
null as "OBSERVED_AREA",
dp.unit_uri AS "UNIT_DEFINITION",
dp.property_name AS "UNIT_NAME",
dp.unit_name AS "UNIT_SYMBOL",
CASE
WHEN dp.property_uri = '' THEN NULL
ELSE reverse(split_part(reverse(dp.property_uri::text), '/'::text,
2))::integer
END as "OBS_PROPERTY_ID",
jsonb_build_object(
'sensorOutput', dp.property_name,
'offset', jsonb_build_object(
'z', dma.offset_z,
'unit', 'm'),
'aggregation', jsonb_build_object(
'period', dsl.aggregation_period,
'function', dp.aggregation_type_name),
'quality', jsonb_build_object(
'resolution', dp.resolution,
'resolution_unit', dp.resolution_unit_name,
'accuracy', dp.accuracy,
'measuring_range_min', dp.measuring_range_min,
'measuring_range_max', dp.measuring_range_max),
'license', '') AS "PROPERTIES"
FROM public.sms_datastream_link dsl
JOIN public.sms_device_mount_action dma ON dma.id = dsl.device_mount_action_id
JOIN public.sms_device d ON d.id = dma.device_id
JOIN public.sms_configuration c ON c.id = dma.configuration_id
JOIN public.sms_device_property dp ON dp.id = dsl.device_property_id
WHERE dsl.datasource_id = %(tsm_schema)s AND c.is_public AND d.is_public;
--- CREATE VIEW OBS_PROPERTIES ---
DROP VIEW IF EXISTS "OBS_PROPERTIES" CASCADE;
CREATE OR REPLACE VIEW "OBS_PROPERTIES" AS
SELECT DISTINCT mq.id as "ID",
mq.term as "NAME",
mq.provenance_uri "DEFINITION",
mq.definition as "DESCRIPTION",
jsonb_build_object('url', %(cv_url)s || 'api/v1/measuredquantities/' || mq.id)
as "PROPERTIES"
FROM public.sms_cv_measured_quantity mq
JOIN public.sms_device_property dp
ON mq.id = reverse(split_part(reverse(dp.property_uri), '/', 2))::int
JOIN public.sms_device_mount_action dma ON dp.device_id = dma.device_id
JOIN public.sms_configuration c ON dma.configuration_id = c.id
JOIN public.sms_device d ON dma.device_id = d.id
JOIN public.sms_datastream_link dsl ON dma.id = dsl.device_mount_action_id
LEFT JOIN public.sms_configuration_static_location_begin_action csl
on dma.configuration_id = csl.configuration_id
LEFT JOIN public.sms_configuration_dynamic_location_begin_action cdl
on dma.configuration_id = cdl.configuration_id
WHERE (cdl.configuration_id IS NOT NULL OR csl.configuration_id IS NOT NULL)
AND c.is_public
AND d.is_public
AND dp.property_uri <> ''
AND dsl.datasource_id = %(tsm_schema)s;
COMMIT;
BEGIN;
--
-- Create model Thing
--
CREATE TABLE IF NOT EXISTS "thing"
(
"id" bigserial NOT NULL PRIMARY KEY,
"name" varchar(200) NOT NULL,
"uuid" uuid NOT NULL UNIQUE,
"description" text NULL,
"properties" jsonb NULL
);
--
-- Create model Journal
--
CREATE TABLE IF NOT EXISTS "journal"
(
"id" bigserial NOT NULL PRIMARY KEY,
"timestamp" timestamp NOT NULL,
-- Added below by ALTER TABLE
-- "timestamp" timestamp with time zone NOT NULL,
"level" varchar(30) NOT NULL,
"message" text NULL,
"extra" jsonb NULL,
"thing_id" bigint NOT NULL,
-- Added below by ALTER TABLE
-- "origin" varchar(200) NULL
CONSTRAINT "journal_thing_id_fk_thing_id" FOREIGN KEY ("thing_id") REFERENCES "thing" ("id") DEFERRABLE INITIALLY DEFERRED
);
ALTER TABLE "journal"
ADD COLUMN IF NOT EXISTS "origin" varchar(200) NULL;
ALTER TABLE "journal"
ALTER COLUMN "timestamp" TYPE timestamp with time zone;
-- Conditionally create the index if it does not exist
DO
$$
BEGIN
IF NOT EXISTS (SELECT 1
FROM pg_indexes
WHERE tablename = 'journal'
AND indexname = 'journal_thing_id') THEN
EXECUTE 'CREATE INDEX "journal_thing_id" ON "journal" ("thing_id")';
END IF;
END
$$;
--
-- Create model Datastream
--
CREATE TABLE IF NOT EXISTS "datastream"
(
"id" bigserial NOT NULL PRIMARY KEY,
"name" varchar(200) NOT NULL,
"description" text NULL,
"properties" jsonb NULL,
"position" varchar(200) NOT NULL,
"thing_id" bigint NOT NULL,
CONSTRAINT "datastream_thing_id_position_9f2cfe68_uniq" UNIQUE ("thing_id", "position"),
CONSTRAINT "datastream_thing_id_f55522a4_fk_thing_id" FOREIGN KEY ("thing_id") REFERENCES "thing" ("id") DEFERRABLE INITIALLY DEFERRED
);
-- Conditionally create the index if it does not exist
DO
$$
BEGIN
IF NOT EXISTS (SELECT 1
FROM pg_indexes
WHERE tablename = 'datastream'
AND indexname = 'datastream_thing_id_f55522a4') THEN
EXECUTE 'CREATE INDEX "datastream_thing_id_f55522a4" ON "datastream" ("thing_id")';
END IF;
END
$$;
--
-- Create model Observation
--
CREATE TABLE IF NOT EXISTS "observation"
(
-- "id" bigserial NOT NULL PRIMARY KEY,
"phenomenon_time_start" timestamp with time zone NULL,
"phenomenon_time_end" timestamp with time zone NULL,
"result_time" timestamp with time zone NOT NULL,
"result_type" smallint NOT NULL,
"result_number" double precision NULL,
"result_string" varchar(200) NULL,
"result_json" jsonb NULL,
"result_boolean" boolean NULL,
"result_latitude" double precision NULL,
"result_longitude" double precision NULL,
"result_altitude" double precision NULL,
"result_quality" jsonb NULL,
"valid_time_start" timestamp with time zone NULL,
"valid_time_end" timestamp with time zone NULL,
"parameters" jsonb NULL,
"datastream_id" bigint NOT NULL,
CONSTRAINT "observation_datastream_id_result_time_1d043396_uniq" UNIQUE ("datastream_id", "result_time"),
CONSTRAINT "observation_datastream_id_77f5c4fb_fk_datastream_id" FOREIGN KEY ("datastream_id") REFERENCES "datastream" ("id") DEFERRABLE INITIALLY DEFERRED
);
-- -- Conditionally make a TimescaleDB hypertable if not already done
-- DO
-- $$
-- BEGIN
-- IF NOT EXISTS (SELECT 1
-- FROM timescaledb_information.hypertables
-- WHERE hypertable_name = 'observation'
-- AND hypertable_schema = current_schema()) THEN
-- -- The table is not a hypertable, so create it
-- PERFORM public.create_hypertable('observation', 'result_time');
-- END IF;
-- END
-- $$;
-- Conditionally create the index if it does not exist
DO
$$
BEGIN
IF NOT EXISTS (SELECT 1
FROM pg_indexes
WHERE tablename = 'observation'
AND indexname = 'observation_datastream_id_77f5c4fb') THEN
EXECUTE 'CREATE INDEX "observation_datastream_id_77f5c4fb" ON "observation" ("datastream_id")';
END IF;
END
$$;
--
-- Create model Relation role
--
CREATE TABLE IF NOT EXISTS relation_role
(
id bigint GENERATED BY DEFAULT AS IDENTITY,
name varchar(200) NOT NULL,
definition text NULL,
inverse_name varchar(200) NOT NULL,
inverse_definition text NULL,
description text NULL,
properties jsonb NULL,
CONSTRAINT relation_role_pk PRIMARY KEY (id),
CONSTRAINT relation_role_pk_2 UNIQUE (name),
CONSTRAINT relation_role_pk_3 UNIQUE (inverse_name)
);
--
-- Create model Related datastreams
--
CREATE TABLE IF NOT EXISTS related_datastream
(
id bigint GENERATED ALWAYS AS IDENTITY,
datastream_id bigint NOT NULL,
role_id bigint NOT NULL,
target_id bigint NOT NULL,
CONSTRAINT related_datastream_pk PRIMARY KEY (id),
CONSTRAINT related_datastream_pk_2 UNIQUE (datastream_id, role_id, target_id),
CONSTRAINT related_datastream_datastream_id_fk FOREIGN KEY (datastream_id) REFERENCES datastream (id) DEFERRABLE INITIALLY DEFERRED,
CONSTRAINT related_datastream_datastream_id_fk_2 FOREIGN KEY (target_id) REFERENCES datastream (id) DEFERRABLE INITIALLY DEFERRED,
CONSTRAINT related_datastream_relation_role_id_fk FOREIGN KEY (role_id) REFERENCES relation_role (id) DEFERRABLE INITIALLY DEFERRED
);
-- cannot be done inside the CREATE TABLE statement but is idempotent
COMMENT ON COLUMN related_datastream.datastream_id IS 'Domain, source or set of departure of the relation.';
COMMENT ON COLUMN related_datastream.role_id IS 'The type of the relation.';
COMMENT ON COLUMN related_datastream.target_id IS 'Codomain, target or set of destination of the relation.';
COMMIT;
BEGIN;
INSERT INTO relation_role
(id, name, definition, inverse_name, inverse_definition, description, properties)
VALUES
(1, 'created_by', 'This was created by other(s)', 'created', 'Other(s) created this', 'A derived product', null)
ON CONFLICT (id) DO UPDATE SET
name = excluded.name,
definition = excluded.definition,
inverse_name = excluded.inverse_name,
inverse_definition = excluded.inverse_definition,
description = excluded.description,
properties = excluded.properties;
COMMIT;
BEGIN;
SET search_path TO %(tsm_schema)s;
DROP VIEW IF EXISTS "DATASTREAMS" CASCADE;
CREATE OR REPLACE VIEW "DATASTREAMS" AS
SELECT
dsl.device_property_id AS "ID",
concat(c.label, ':',
d.short_name, ':',
dp.property_name
) as "NAME",
concat(d.short_name, ' ',
dp.property_name, ' ',
dma.offset_z, 'm at site ',
c.label, ' with aggregation function ',
dp.aggregation_type_name, ' and period ',
dsl.aggregation_period , 's'
) as "DESCRIPTION",
c.id as "THING_ID",
d.id as "SENSOR_ID",
CASE
WHEN dp.property_uri = '' THEN NULL
ELSE reverse(split_part(reverse(dp.property_uri::text), '/'::text,
2))::integer
END as "OBS_PROPERTY_ID",
dp.unit_uri AS "UNIT_DEFINITION",
dp.property_name AS "UNIT_NAME",
dp.unit_name AS "UNIT_SYMBOL",
'OM_Observation' as "OBSERVATION_TYPE",
jsonb_build_object(
'name', '',
'symbol', dp.unit_name,
'definition', dp.unit_uri
) as "UNIT_OF_MEASUREMENT",
public.st_asgeojson(
public.st_convexhull(
public.ST_SetSRID(
public.ST_MakePoint(csl.x, csl.y),
4326
)
)
) as "OBSERVED_AREA",
null as "RESULT_TIME",
null as "PHENOMENON_TIME",
dma.begin_date AS "PHENOMENON_TIME_START",
dma.begin_date AS "RESULT_TIME_START",
dma.end_date AS "PHENOMENON_TIME_END",
dma.end_date AS "RESULT_TIME_END",
jsonb_build_object(
'@context', public.get_schema_org_context(),
'jsonld.id', %(sms_url)s || 'datastream-links/' || MAX(dsl.id),
'jsonld.type', 'DatastreamProperties',
'observingProcedure', jsonb_build_object(
'jsonld.type', 'ObservingProcedure',
'name', dp.aggregation_type_name,
'description', cv_agg.definition,
'definition', dp.aggregation_type_uri,
'properties', jsonb_build_object(
'period', dsl.aggregation_period,
'unitOfPeriod', jsonb_build_object(
'jsonld.type', 'Unit',
'name', cv_u.provenance,
'symbol', cv_u.term,
'definition', dp.unit_uri
)
)
),
'measurementProperties', jsonb_build_object(
'jsonld.type', 'MeasurementProperties',
'measurementResolution', dp.resolution ,
'unitOfMeasurementResolution', jsonb_build_object(
'jsonld.type', 'Unit',
'name', cv_ur.provenance,
'symbol', dp.resolution_unit_name,
'definition', dp.resolution_unit_uri
),
'measurementAccuracy', dp.accuracy,
'unitOfMeasurementAccuracy', jsonb_build_object(
'jsonld.type', 'Unit',
'name', cv_ua.provenance,
'symbol', dp.accuracy_unit_name ,
'definition', dp.accuracy_unit_uri
),
'operationRange', array[dp.measuring_range_min,dp.measuring_range_max],
'unitOfOperationRange', jsonb_build_object(
'jsonld.type', 'Unit',
'name', cv_ua.provenance,
'symbol', dp.accuracy_unit_name ,
'definition', dp.accuracy_unit_uri
)
),
'license', jsonb_build_object(
'jsonld.type', 'CreativeWork',
'name', cv_l.term,
'url', cv_l.provenance_uri,
'provider', cv_l.definition
),
'providerMobility', CASE WHEN MAX(cdl.begin_date) IS NULL THEN 'static' ELSE 'dynamic' end,
'deployment', jsonb_build_object(
'jsonld.id', %(sms_url)s || 'configurations/' || c.id || '/platforms-and-devices?deviceMountAction=' || dma.id,
'jsonld.type', 'Deployment',
'name', dma."label",
'description', dma.begin_description,
'deploymentTime', dma.begin_date,
'properties', jsonb_build_object(
'jsonld.type', 'DeploymentProperties',
'offsets', jsonb_build_object(
'jsonld.type', 'Offset',
'x', dma.offset_x,
'y', dma.offset_y,
'z', dma.offset_z
),
'unitOfOffsets', jsonb_build_object(
'jsonld.type', 'Unit',
'name', 'meters',
'symbol', 'm',
-- this should be generated automatically. we need to find the unit id for meter
'definition', 'https://sms-cv.helmholtz.cloud/sms/cv/api/v1/units/63'
)
)
),
'dataSource', ''
) as "PROPERTIES"
FROM public.sms_datastream_link dsl
JOIN public.sms_device_mount_action dma ON dma.id = dsl.device_mount_action_id
JOIN public.sms_device d ON d.id = dma.device_id
JOIN public.sms_configuration c ON c.id = dma.configuration_id
JOIN public.sms_device_property dp ON dp.id = dsl.device_property_id
LEFT JOIN public.sms_cv_aggregation_type cv_agg ON coalesce(nullif(split_part(dp.aggregation_type_uri,'/',9),'')::integer) =cv_agg.id
LEFT JOIN public.sms_cv_unit cv_u ON coalesce(nullif(split_part(dp.unit_uri ,'/',9),'')::integer) =cv_u.id
LEFT JOIN public.sms_cv_unit cv_ur ON coalesce(nullif(split_part(dp.resolution_unit_uri ,'/',9),'')::integer) =cv_ur.id
LEFT JOIN public.sms_cv_unit cv_ua ON coalesce(nullif(split_part(dp.accuracy_unit_uri ,'/',9),'')::integer) =cv_ua.id
LEFT JOIN public.sms_cv_license cv_l ON coalesce(nullif(split_part(dsl.license_uri,'/',9),'')::integer) =cv_l.id
LEFT JOIN public.sms_configuration_dynamic_location_begin_action cdl ON c.id = cdl.configuration_id
LEFT JOIN public.sms_configuration_static_location_begin_action csl ON c.id = csl.configuration_id
WHERE c.is_public AND d.is_public AND dsl.datasource_id = %(tsm_schema)s
GROUP BY dsl.device_property_id, c.label, d.short_name, dp.property_name, dma.offset_z, dp.aggregation_type_name, dsl.aggregation_period,
dp.unit_name, dp.unit_uri, d.id, dp.id, cv_agg.definition, dp.aggregation_type_uri, cv_u.provenance, cv_u.term, dp.resolution, cv_ur.provenance,
dp.resolution_unit_name, dp.resolution_unit_uri, dp.accuracy, cv_ua.provenance, dp.accuracy_unit_name, dp.accuracy_unit_uri, dp.measuring_range_min,
dp.measuring_range_max, cv_l.term, cv_l.provenance_uri, cv_l.definition, c.id, dma.id, dma.label, dma.begin_description, dma.begin_date, dma.offset_x,
dma.offset_y, csl.x, csl.y, dp.property_uri, dma.end_date;
COMMIT;
\ No newline at end of file
BEGIN;
SET search_path TO %(tsm_schema)s;
DO $$
BEGIN
IF EXISTS (
SELECT 1
FROM information_schema.tables
WHERE table_name = 'FEATURES'
AND table_schema = %(tsm_schema)s
AND table_type = 'BASE TABLE')
THEN EXECUTE 'DROP TABLE "FEATURES" CASCADE';
ELSIF EXISTS (
SELECT 1
FROM information_schema.tables
WHERE table_name = 'FEATURES'
AND table_schema = %(tsm_schema)s
AND table_type = 'VIEW'
)
THEN EXECUTE 'DROP VIEW "FEATURES" CASCADE';
END IF;
END $$;
CREATE TABLE "FEATURES" (
"ID" serial,
"NAME" text,
"DESCRIPTION" text,
"ENCODING_TYPE" text,
"FEATURE" jsonb,
"PROPERTIES" jsonb
);
COMMIT;
\ No newline at end of file
BEGIN;
SET search_path TO %(tsm_schema)s;
DROP VIEW IF EXISTS "LOCATIONS" CASCADE;
CREATE OR REPLACE VIEW "LOCATIONS" AS
SELECT DISTINCT
csl.id AS "ID",
csl.label AS "NAME",
csl.begin_description AS "DESCRIPTION",
'application/geo+json'::text AS "ENCODING_TYPE",
public.ST_ASGeoJSON(
public.ST_SetSRID(
public.ST_MakePoint(csl.x, csl.y),
4326
)
) AS "LOCATION",
jsonb_build_object(
'@context', public.get_schema_org_context(),
'jsonld.id', %(sms_url)s || 'configurations/' || c.id || 'locations/static-location-actions/' || csl.id,
'jsonld.type', 'LocationProperties'
) AS "PROPERTIES"
FROM public.sms_configuration_static_location_begin_action csl
JOIN public.sms_configuration c ON csl.configuration_id = c.id
JOIN public.sms_device_mount_action dma ON c.id = dma.configuration_id
JOIN public.sms_device d ON dma.device_id = d.id
JOIN public.sms_datastream_link dsl ON dma.id = dsl.device_mount_action_id
WHERE dsl.datasource_id = %(tsm_schema)s
AND c.is_public AND d.is_public
ORDER BY csl.id;
--- CREATE VIEW THINGS_LOCATIONS ---
DROP VIEW IF EXISTS "THINGS_LOCATIONS" CASCADE;
CREATE OR REPLACE VIEW "THINGS_LOCATIONS" AS
SELECT DISTINCT ON (c.id)
c.id AS "THING_ID",
csl.id AS "LOCATION_ID"
FROM public.sms_configuration c
JOIN public.sms_configuration_static_location_begin_action csl
ON c.id = csl.configuration_id
JOIN public.sms_device_mount_action dma ON c.id = dma.configuration_id
JOIN public.sms_datastream_link dsl ON dma.id = dsl.device_mount_action_id
JOIN public.sms_device d ON dma.device_id = d.id
WHERE dsl.datasource_id = %(tsm_schema)s
AND c.is_public AND d.is_public
ORDER BY c.id, csl.begin_date DESC;
DROP VIEW IF EXISTS "LOCATIONS_HIST_LOCATIONS" CASCADE;
CREATE OR REPLACE VIEW "LOCATIONS_HIST_LOCATIONS" AS
--build cte that returns configuration_ids and location_ids (configuration_static_location_begin_action.id)
WITH locations AS (
SELECT DISTINCT
c.id AS c_id,
csl.id AS csl_id
FROM public.sms_configuration c
JOIN public.sms_configuration_static_location_begin_action csl ON c.id = csl.configuration_id
JOIN public.sms_device_mount_action dma on c.id = dma.configuration_id
JOIN public.sms_device d ON dma.device_id = d.id
JOIN public.sms_datastream_link dsl ON dma.id = dsl.device_mount_action_id
WHERE dsl.datasource_id = %(tsm_schema)s
AND c.is_public AND d.is_public
),
current_locations AS (
SELECT
c_id,
MAX(csl_id) AS max_csl_id
FROM locations
GROUP BY c_id
)
SELECT
cl.max_csl_id AS "LOCATION_ID",
loc.csl_id AS "HIST_LOCATION_ID"
FROM locations loc
-- join locations on current configuration location
JOIN current_locations cl ON loc.c_id = cl.c_id
ORDER BY cl.max_csl_id ASC, loc.csl_id ASC
--returns hist_location_id mapped to current location_id for each configuration_id
;
DROP VIEW IF EXISTS "HIST_LOCATIONS" CASCADE;
CREATE OR REPLACE VIEW "HIST_LOCATIONS" AS
WITH cte AS (
SELECT
c.id AS "THING_ID",
csl.id "ID",
csl.begin_date AS "TIME",
ROW_NUMBER() OVER (PARTITION BY c.id ORDER BY csl.begin_date DESC) AS row_num
FROM public.sms_configuration c
JOIN public.sms_configuration_static_location_begin_action csl ON c.id = csl.configuration_id
JOIN public.sms_device_mount_action dma ON c.id = dma.configuration_id
JOIN public.sms_device d ON dma.device_id = d.id
JOIN public.sms_datastream_link dsl ON dma.id = dsl.device_mount_action_id
WHERE c.is_public AND d.is_public
AND dsl.datasource_id = %(tsm_schema)s
)
SELECT DISTINCT "THING_ID",
"ID",
"TIME"
FROM cte
WHERE row_num > 1;
COMMIT;
\ No newline at end of file
BEGIN;
SET search_path TO %(tsm_schema)s;
DROP VIEW IF EXISTS "OBSERVATIONS" CASCADE;
CREATE OR REPLACE VIEW "OBSERVATIONS" AS
SELECT
o.result_boolean AS "RESULT_BOOLEAN",
o.result_quality AS "RESULT_QUALITY",
o.result_time AS "PHENOMENON_TIME_START",
jsonb_build_object() AS "PARAMETERS",
dsl.device_property_id AS "DATASTREAM_ID",
o.result_string AS "RESULT_STRING",
o.result_type AS "RESULT_TYPE",
o.valid_time_end AS "VALID_TIME_END",
o.result_time AS "PHENOMENON_TIME_END",
null AS "FEATURE_ID",
row_number() OVER () AS "ID",
o.result_json AS "RESULT_JSON",
o.result_time AS "RESULT_TIME",
o.result_number AS "RESULT_NUMBER",
o.valid_time_start AS "VALID_TIME_START",
jsonb_build_object(
'@context', public.get_schema_org_context(),
'jsonld.type', 'ObservationProperties',
'dataSource', NULL
) AS "PROPERTIES"
FROM public.sms_datastream_link dsl
JOIN observation o ON o.datastream_id = dsl.datastream_id
JOIN public.sms_device_mount_action dma ON dma.id = dsl.device_mount_action_id
JOIN public.sms_device d ON d.id = dma.device_id
JOIN public.sms_configuration c ON c.id = dma.configuration_id
WHERE c.is_public AND d.is_public AND dsl.datasource_id = %(tsm_schema)s
AND o.result_time BETWEEN dsl.begin_date AND COALESCE(dsl.end_date, 'infinity'::timestamp);
COMMIT;
BEGIN;
SET search_path TO %(tsm_schema)s;
DROP VIEW IF EXISTS "OBS_PROPERTIES" CASCADE;
CREATE OR REPLACE VIEW "OBS_PROPERTIES" AS
SELECT DISTINCT
mq.id as "ID",
mq.term as "NAME",
mq.provenance_uri "DEFINITION",
mq.definition as "DESCRIPTION",
jsonb_build_object(
'@context', public.get_schema_org_context(),
'jsonld.id', %(cv_url)s || 'api/v1/measuredquantities/' || mq.id,
'jsonld.type', 'ObservedPropertyProperties'
) as "PROPERTIES"
FROM public.sms_cv_measured_quantity mq
JOIN public.sms_device_property dp ON mq.id = reverse(split_part(reverse(dp.property_uri), '/', 2))::int
JOIN public.sms_device_mount_action dma ON dp.device_id = dma.device_id
JOIN public.sms_configuration c ON dma.configuration_id = c.id
JOIN public.sms_device d ON dma.device_id = d.id
JOIN public.sms_datastream_link dsl ON dma.id = dsl.device_mount_action_id
LEFT JOIN public.sms_configuration_static_location_begin_action csl ON dma.configuration_id = csl.configuration_id
LEFT JOIN public.sms_configuration_dynamic_location_begin_action cdl ON dma.configuration_id = cdl.configuration_id
WHERE (cdl.configuration_id IS NOT NULL OR csl.configuration_id IS NOT NULL)
AND c.is_public AND d.is_public AND dp.property_uri <> '' AND dsl.datasource_id = %(tsm_schema)s;
COMMIT;
\ No newline at end of file
BEGIN;
SET search_path TO %(tsm_schema)s;
CREATE OR REPLACE function public.get_schema_org_context() RETURNS jsonb AS
$$
BEGIN
RETURN
'{
"@version": "1.1",
"@import": "stamplate.jsonld",
"@vocab": "http://schema.org/"
}'::jsonb;
END;
$$
language plpgsql;
COMMIT;
BEGIN;
SET search_path TO %(tsm_schema)s;
DROP VIEW IF EXISTS "SENSORS" CASCADE;
CREATE OR REPLACE VIEW "SENSORS" AS WITH
device_role_responsible_persons AS (
SELECT DISTINCT
d.id AS "device_id",
dcr.role_name AS "role_name",
dcr.role_uri AS "role_uri",
array_agg(DISTINCT jsonb_build_object(
'jsonld.id', %(sms_url)s || 'contacts/' || co.id,
'jsonld.type', 'Person',
'givenName', co.given_name,
'familyName', co.family_name,
'email', co.email,
'affiliation', jsonb_build_object(
'jsonld.type', 'Organization',
'name', co.organization,
'identifier', NULL
),
'identifier', co.orcid
)) AS "responsible_persons"
FROM public.sms_device d
JOIN public.sms_device_contact_role dcr ON dcr.device_id = d.id
JOIN public.sms_contact co ON dcr.contact_id = co.id
GROUP BY d.id, dcr.role_name, dcr.role_uri
),
device_properties AS (
SELECT DISTINCT
d.id AS "device_id",
jsonb_build_object(
'@context', public.get_schema_org_context(),
'jsonld.id', %(sms_url)s || 'devices/' || d.id,
'jsonld.type', 'SensorProperties',
'identifier', d.persistent_identifier,
'isVariantOf', jsonb_build_object(
'jsonld.type', 'ProductGroup',
'name', d.device_type_name,
'definition', d.device_type_uri
),
'isVirtual', false,
'model', d.model,
'manufacturer', jsonb_build_object(
'jsonld.type', 'Organization',
'name', d.manufacturer_name,
'definition', d.manufacturer_uri
),
'serialNumber', d.serial_number,
'responsiblePersons', array_agg(DISTINCT jsonb_build_object(
'jsonld.type', 'Person',
'roleName', drrp.role_name,
'definition', drrp.role_uri,
'resonsiblePersons', drrp.responsible_persons
))
) AS "properties"
FROM public.sms_device d
JOIN device_role_responsible_persons drrp ON d.id = drrp.device_id
JOIN public.sms_device_mount_action dma ON d.id = dma.device_id
JOIN public.sms_datastream_link dsl ON dma.id = dsl.device_mount_action_id
JOIN public.sms_configuration c ON dma.configuration_id = c.id
LEFT JOIN public.sms_configuration_dynamic_location_begin_action cdl ON c.id = cdl.configuration_id
LEFT JOIN public.sms_configuration_static_location_begin_action csl ON c.id = csl.configuration_id
GROUP BY d.id, d.persistent_identifier, d.device_type_name, d.device_type_uri, d.model,
d.manufacturer_name, d.manufacturer_uri, d.serial_number, c.is_public, d.is_public,
cdl.configuration_id, csl.configuration_id, dsl.datasource_id
HAVING ((cdl.configuration_id IS NOT NULL) OR (csl.configuration_id IS NOT NULL))
AND d.is_public
AND c.is_public
AND dsl.datasource_id = %(tsm_schema)s
)
SELECT
d.id AS "ID",
d.short_name AS "NAME",
d.description AS "DESCRIPTION",
'html' AS "ENCODING_TYPE",
%(sms_url)s || 'backend/api/v1/devices/' || d.id || '/sensorml' AS "METADATA",
dp.properties AS "PROPERTIES"
FROM public.sms_device d
JOIN device_properties dp ON d.id = dp.device_id
ORDER BY d.id ASC;
COMMIT;
\ No newline at end of file
BEGIN;
SET search_path TO %(tsm_schema)s;
DROP VIEW IF EXISTS "THINGS" CASCADE;
CREATE OR REPLACE VIEW "THINGS" AS
WITH
configuration_role_responsible_persons AS (
SELECT
c.id AS "configuration_id",
ccr.role_name AS "role_name",
ccr.role_uri AS "role_uri",
array_agg(DISTINCT jsonb_build_object(
'jsonld.id', %(sms_url)s || 'contacts/' || co.id,
'jsonld.type', 'Person',
'givenName', co.given_name,
'familyName', co.family_name,
'email', co.email,
'affiliation', jsonb_build_object(
'jsonld.type', 'Organization',
'name', co.organization,
'identifier', NULL
),
'identifier', co.orcid
)
) AS "responsible_persons"
FROM public.sms_configuration c
JOIN public.sms_configuration_contact_role ccr ON c.id = ccr.configuration_id
JOIN public.sms_contact co ON ccr.contact_id = co.id
GROUP BY c.id, ccr.role_name, ccr.role_uri
),
configuration_properties AS (
SELECT
c.id AS "configuration_id",
jsonb_build_object(
'@context', public.get_schema_org_context(),
'jsonld.id', %(sms_url)s || 'configurations/' || c.id,
'jsonld.type', 'ThingProperties',
'identifier', c.persistent_identifier,
'responsiblePersons', array_agg(DISTINCT jsonb_build_object(
'jsonld.type', 'Role',
'roleName', crrp.role_name,
'definition', crrp.role_uri,
'resonsiblePersons', crrp.responsible_persons
)),
'partOfProjects', array_agg(DISTINCT jsonb_build_object(
'jsonld.type', 'Project',
'name', c.project
)),
'metadata', jsonb_build_object('jsonld.type', 'Dataset',
'encodingType', 'http://www.opengis.net/doc/IS/SensorML/2.0',
'distribution', jsonb_build_object(
'jsonld.type', 'DataDistributionService',
'url', %(sms_url)s || 'cbackend/api/v1/configurations/' || c.id || '/sensorml'
)
),
'isVirtual', false
) AS "properties"
FROM public.sms_configuration c
JOIN public.sms_configuration_contact_role ccr ON c.id = ccr.configuration_id
JOIN public.sms_contact co ON ccr.contact_id = co.id
JOIN public.sms_device_mount_action dma ON c.id = dma.configuration_id
JOIN public.sms_device d ON dma.device_id = d.id
JOIN public.sms_datastream_link dsl ON dma.id = dsl.device_mount_action_id
JOIN configuration_role_responsible_persons crrp ON c.id = crrp.configuration_id
LEFT JOIN public.sms_configuration_dynamic_location_begin_action cdl ON c.id = cdl.configuration_id
LEFT JOIN public.sms_configuration_static_location_begin_action csl ON c.id = csl.configuration_id
GROUP BY
c.id, c.persistent_identifier, c.project, cdl.configuration_id, csl.configuration_id,
c.is_public, d.is_public, dsl.datasource_id
HAVING
((cdl.configuration_id IS NOT NULL) OR (csl.configuration_id IS NOT NULL))
AND c.is_public AND d.is_public AND dsl.datasource_id = %(tsm_schema)s
)
SELECT DISTINCT
c.id AS "ID",
c.description AS "DESCRIPTION",
c.label AS "NAME",
cp.properties AS "PROPERTIES"
FROM public.sms_configuration c
JOIN configuration_properties cp ON c.id = cp.configuration_id
ORDER BY c.id ASC;
COMMIT;
\ No newline at end of file
#!/usr/bin/env python3
from __future__ import annotations
import json
import logging
import sys
import traceback
import typing
from abc import ABC, abstractmethod
import paho.mqtt.client as mqtt
from paho.mqtt.client import MQTTMessage
from utils.errors import (
UserInputError,
DataNotFoundError,
ParsingError,
NoDataWarning,
ProcessingError,
)
logger = logging.getLogger("mqtt-handler")
class AbstractHandler(ABC):
def __init__(
self,
topic: str,
mqtt_broker: str,
mqtt_user: str,
mqtt_password: str,
mqtt_client_id: str,
mqtt_qos: int,
mqtt_clean_session: bool,
):
self.topic = topic
self.mqtt_broker = mqtt_broker
self.mqtt_user = mqtt_user
self.mqtt_password = mqtt_password
self.mqtt_client_id = mqtt_client_id
self.mqtt_qos = mqtt_qos
self.mqtt_clean_session = mqtt_clean_session
self.mqtt_host = mqtt_broker.split(":")[0]
self.mqtt_port = int(mqtt_broker.split(":")[1])
self.mqtt_client = mqtt.Client(
client_id=mqtt_client_id,
clean_session=self.mqtt_clean_session,
)
self.mqtt_client.suppress_exceptions = False
self.mqtt_client.username_pw_set(self.mqtt_user, self.mqtt_password)
self.mqtt_client.on_connect = self.on_connect
self.mqtt_client.on_subscribe = self.on_subscribe
self.mqtt_client.on_message = self.on_message
self.mqtt_client.on_log = self.on_log
def run_loop(self) -> typing.NoReturn:
logger.info(f"Setup ok, starting listening loop")
self.mqtt_client.connect(self.mqtt_host, self.mqtt_port)
self.mqtt_client.subscribe(self.topic, self.mqtt_qos)
self.mqtt_client.loop_forever()
def on_log(self, client: mqtt.Client, userdata, level, buf):
logger.debug(f"%s: %s", level, buf)
def on_connect(self, client: mqtt.Client, userdata, flags, rc):
if rc == 0:
logger.info(
f"Connected to %r with client ID: %s",
self.mqtt_broker,
self.mqtt_client._client_id.decode(),
)
return
logger.error(f"Failed to connect to %r, return code: %s", self.mqtt_broker, rc)
def on_subscribe(self, client: mqtt.Client, userdata, mid, granted_qos):
logger.info(f"Subscribed to topic {self.topic} with QoS {granted_qos[0]}")
def on_message(self, client: mqtt.Client, userdata, message: MQTTMessage):
logger.info(
"\n\n======================= NEW MESSAGE ========================\n"
f"Topic: %r, QoS: %s, Timestamp: %s",
message.topic,
message.qos,
message.timestamp,
)
try:
content = self._decode(message)
except Exception:
logger.critical(
f"\n====================== CRITICAL ERROR ======================\n"
f"Status: PARSING ERROR (Decoding/parsing of payload failed)\n"
f"Payload:\n{message.payload!r}\n"
f"{traceback.format_exc()}"
f"========================= SYS EXIT =========================\n",
)
# We exit now, because otherwise the client.on_log would print
# the exception again (with unnecessary clutter)
sys.exit(1)
try:
logger.debug(f"calling %s.act()", self.__class__.__qualname__)
self.act(content, message)
except (UserInputError, ParsingError):
logger.error(
f"\n======================== USER ERROR ========================\n"
f"Status: ERROR (An error because of user data or input)\n"
f"Content:\n{content!r}\n"
f"{traceback.format_exc()}"
f"======================== USER ERROR ========================\n",
)
return
except (DataNotFoundError, NoDataWarning):
logger.error(
f"\n======================== DATA ERROR ========================\n"
f"Status: ERROR (Data is missing)\n"
f"Content:\n{content!r}\n"
f"{traceback.format_exc()}"
f"======================== DATA ERROR ========================\n",
)
return
except Exception:
logger.critical(
f"\n====================== CRITICAL ERROR ======================\n"
f"Status: UNHANDLED ERROR (See exception and traceback below)\n"
f"Content:\n{content!r}\n"
f"{traceback.format_exc()}"
f"========================= SYS EXIT =========================\n",
)
# We exit now, because otherwise the client.on_log would print
# the exception again (with unnecessary clutter)
sys.exit(1)
logger.info(
f"\n===================== PROCESSING DONE ======================\n"
f"Status: Success (Message was processed successfully)\n"
f"===================== PROCESSING DONE ======================\n",
)
def _decode(self, message: MQTTMessage) -> typing.Any:
"""
This decodes the message from utf-8 and also try to decode json to python
objects. If the object is not json (e.g. plain strings or a datetime object
like 2022-22-22T11:11:11) the object itself is returned instead.
Parameters
----------
message : MQTTMessage
Message to decode.
Returns
-------
content:
The decoded content
Raises
------
UnicodeDecodeError
If the raw message is not 'utf-8' encoded.
"""
# Hint: json.loads also decodes single numeric values,
# the constants `null`, +/-`Infinity` and `NaN`.
decoded: str = message.payload.decode("utf-8")
try:
decoded = json.loads(decoded)
except json.JSONDecodeError:
logger.warning(
f"Message content is not valid json. (That's ok, but unusual)"
)
return decoded
@abstractmethod
def act(self, content: typing.Any, message: MQTTMessage):
"""
Subclasses must overwrite this function.
The calling function will handle the following exceptions:
- utils.errors.ParsingError
- utils.errors.UserInputError
- utils.errors.DataNotFoundError
- utils.errors.NoDataWarning
Other exceptions will lead to a system exit.
"""
raise NotImplementedError
#!/usr/bin/env python3
# test if we can import everything without
# errors, but we ignore warnings
import warnings
with warnings.catch_warnings():
warnings.simplefilter("ignore")
from base_handler import * # noqa
from crontab_setup import * # noqa
from databases import * # noqa
from db_setup import * # noqa
from file_ingest import * # noqa
from frost import * # noqa
from frost_setup import * # noqa
from grafana_dashboard_setup import * # noqa
from grafana_user_setup import * # noqa
from minio_setup import * # noqa
from mqtt_ingest import * # noqa
from mqtt_user_setup import * # noqa
from parsing import * # noqa
from thing import * # noqa
from version import __version__
if __name__ == "__main__":
print(__version__)
from __future__ import annotations
import logging
from datetime import datetime
from random import randint
from crontab import CronItem, CronTab
from base_handler import AbstractHandler, MQTTMessage
from thing import Thing
from utils import get_envvar, setup_logging
from utils.journaling import Journal
logger = logging.getLogger("crontab-setup")
journal = Journal("Cron")
class CreateThingInCrontabHandler(AbstractHandler):
def __init__(self):
super().__init__(
topic=get_envvar("TOPIC"),
mqtt_broker=get_envvar("MQTT_BROKER"),
mqtt_user=get_envvar("MQTT_USER"),
mqtt_password=get_envvar("MQTT_PASSWORD"),
mqtt_client_id=get_envvar("MQTT_CLIENT_ID"),
mqtt_qos=get_envvar("MQTT_QOS", cast_to=int),
mqtt_clean_session=get_envvar("MQTT_CLEAN_SESSION", cast_to=bool),
)
self.tabfile = "/tmp/cron/crontab.txt"
def act(self, content: dict, message: MQTTMessage):
thing = Thing.get_instance(content)
with CronTab(tabfile=self.tabfile) as crontab:
for job in crontab:
if self.job_belongs_to_thing(job, thing):
logger.info(f"Updating cronjob for thing {thing.name}")
self.update_job(job, thing)
journal.info(f"Updated cronjob", thing.uuid)
return
# if no job was found, create a new one
job = crontab.new()
logger.info(f"Creating job for thing {thing.name}")
info = self.make_job(job, thing)
if not info:
logger.warning(
"no Cronjob was created, because neither extAPI, "
"nor extSFTP is present"
)
return
crontab.append(job)
journal.info(f"Created cronjob to sync {info}", thing.uuid)
@classmethod
def make_job(cls, job: CronItem, thing: Thing) -> str:
info = ""
comment = cls.mk_comment(thing)
uuid = thing.uuid
if thing.external_sftp is not None:
interval = int(thing.external_sftp.sync_interval)
schedule = cls.get_schedule(interval)
script = "/scripts/sftp_sync/sftp_sync.py"
keyfile = thing.external_sftp.private_key_path
command = f"{script} {uuid} {keyfile} > $STDOUT 2> $STDERR"
job.enable(enabled=thing.external_sftp.enabled)
job.set_comment(comment, pre_comment=True)
job.setall(schedule)
job.set_command(command)
info = f"sFTP {thing.external_sftp.uri} @ {interval}s"
if thing.external_api is not None:
interval = int(thing.external_api.sync_interval)
schedule = cls.get_schedule(interval)
script = f"/scripts/ext_api_sync/{thing.external_api.api_type}_api_sync.py"
target_uri = thing.database.url
command = f"""{script} {uuid} "{thing.external_api.settings}" {target_uri} > $STDOUT 2> $STDERR"""
job.enable(enabled=thing.external_api.enabled)
job.set_comment(comment, pre_comment=True)
job.setall(schedule)
job.set_command(command)
info = f"{thing.external_api.api_type}-API @ {interval}s"
return info
# alias
update_job = make_job
@staticmethod
def job_belongs_to_thing(job: CronItem, thing: Thing) -> bool:
"""Check if job belongs to thing."""
return job.comment.split(" | ")[-1] == thing.uuid
@staticmethod
def mk_comment(thing: Thing) -> str:
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
return f"{now_str} | {thing.project.name} | {thing.name} | {thing.uuid}"
@staticmethod
def get_schedule(interval: int) -> str:
# set a random delay to avoid all jobs running at the same time
# maximum delay is the interval or 59, whichever is smaller
delay_m = randint(0, min(interval - 1, 59))
# interval is smaller than an hour
if interval < 60:
return f"{delay_m}-59/{interval} * * * *"
# interval is smaller than a day
elif interval < 1440:
delay_h = randint(0, min(interval // 60 - 1, 23))
return f"{delay_m} {delay_h}-23/{interval//60} * * *"
else:
delay_h = randint(0, min(interval // 60 - 1, 23))
delay_wd = randint(0, min(interval // 1440 - 1, 6))
return f"{delay_m} {delay_h} * * {delay_wd}-6/{interval//1440}"
if __name__ == "__main__":
setup_logging(get_envvar("LOG_LEVEL", "INFO"))
CreateThingInCrontabHandler().run_loop()
#!/usr/bin/env python3
from __future__ import annotations
import logging
import threading
import urllib.request
from functools import partial
from typing import Any, Callable, Literal
import psycopg
import psycopg2
import psycopg2.extensions
import requests
from psycopg import Connection, conninfo
from psycopg.rows import dict_row
import parsing
from utils.errors import DataNotFoundError
class Database:
name = "database"
def __init__(self, dsn: str):
self.info = conninfo.conninfo_to_dict(dsn)
self.info.pop("password")
self.__dsn = dsn
self.ping()
@property
def connection(self) -> Callable[[], psycopg.Connection]:
return partial(psycopg.connect, self.__dsn)
def ping(self, conn: Connection | None = None):
try:
if conn is not None:
conn.execute("")
else:
with self.connection() as conn:
conn.execute("")
except psycopg.errors.DatabaseError as e:
raise ConnectionError(f"Ping to {self.name} failed. ({self.info})") from e
class ConfigDB(Database):
name = "configDB"
def get_parser(self, thing_uuid) -> parsing.FileParser:
"""Returns parser-type-name and parser-parameter"""
query = (
"select fpt.name, fp.params from thing t "
"join s3_store s3 on t.s3_store_id = s3.id "
"join file_parser fp on s3.file_parser_id = fp.id "
"join file_parser_type fpt on fp.file_parser_type_id = fpt.id "
"where t.uuid = %s"
)
with self.connection() as conn:
p_type, p_params = conn.execute(query, [thing_uuid]).fetchone() # noqa
return parsing.get_parser(p_type, p_params)
def get_mqtt_parser(self, thing_uuid) -> parsing.MqttDataParser:
query = (
"select mdt.name from thing t join mqtt m on t.mqtt_id = m.id "
"join mqtt_device_type mdt on m.mqtt_device_type_id = mdt.id "
"where t.uuid = %s"
)
with self.connection() as conn:
dev_type = conn.execute(query, [thing_uuid]).fetchone() # noqa
return parsing.get_parser(dev_type, None)
def get_thing_uuid(self, by: Literal["bucket", "mqtt_user"], value) -> str | None:
# fmt: off
by_map = {
"bucket": "select t.uuid from thing t join s3_store s3 on "
"t.s3_store_id = s3.id where s3.bucket = %s",
"mqtt_user": 'select t.uuid from mqtt m join thing t on '
'm.id = t.mqtt_id where m."user" = %s',
}
# fmt: on
logging.debug(f"get thing uuid for {by}={value}")
if query := by_map.get(by):
with self.connection() as conn:
res = conn.execute(query, [value]).fetchone()
if res is None:
raise DataNotFoundError(f"No thing for {by}: {value}")
uuid = res[0]
logging.debug(f"got thing {uuid}")
return uuid
raise ValueError("Argument 'by' must be one of 'bucket' or 'mqtt_user'")
def get_s3_store(self, thing_uuid):
query = (
"select s3s.* from config_db.s3_store s3s join "
"thing t on s3s.id = t.s3_store_id where t.uuid = %s"
)
with self.connection() as conn:
with conn.cursor(row_factory=dict_row) as cur:
return cur.execute(query, [thing_uuid]).fetchone()
class DBapi:
def __init__(self, base_url):
self.base_url = base_url
self.ping_dbapi()
def ping_dbapi(self):
"""
Test the health endpoint of the given url.
Added in version 0.4.0
"""
with urllib.request.urlopen(f"{self.base_url}/health") as resp:
if not resp.status == 200:
raise ConnectionError(
f"Failed to ping. HTTP status code: {resp.status}"
)
def upsert_observations(self, thing_uuid: str, observations: list[dict[str, Any]]):
url = f"{self.base_url}/observations/upsert/{thing_uuid}"
response = requests.post(url, json={"observations": observations})
if response.status_code not in (200, 201):
raise RuntimeError(
f"upload to {thing_uuid} failed with "
f"{response.reason} and {response.text}"
)
class ReentrantConnection:
"""
Workaround for stale connections.
Stale connections might happen for different reasons, for example, when
a timeout occur, because the connection was not used for some time or
the database service restarted.
"""
# in seconds
TIMEOUT = 2.0
logger = logging.getLogger("ReentrantConnection")
def __init__(
self, dsn=None, connection_factory=None, cursor_factory=None, **kwargs
):
# we use a nested function to hide credentials
def _connect(_self) -> None:
_self._conn = psycopg2.connect(
dsn, connection_factory, cursor_factory, **kwargs
)
self._conn: psycopg2.extensions.connection | None = None
self._connect = _connect
self._lock = threading.RLock()
def _is_alive(self) -> bool:
try:
self._ping()
except TimeoutError:
self.logger.debug("Connection timed out")
return False
except (psycopg2.InterfaceError, psycopg2.OperationalError):
self.logger.debug("Connection seems stale")
return False
else:
return True
def _ping(self):
if self._conn is None:
raise ValueError("must call connect first")
with self._conn as conn:
# unfortunately there is no client side timeout
# option, and we encountered spurious very long
# Connection timeouts (>15 min)
timer = threading.Timer(self.TIMEOUT, conn.cancel)
timer.start()
try:
# also unfortunately there is no other way to check
# if the db connection is still alive, other than to
# send a (simple) query.
with conn.cursor() as c:
c.execute("select 1")
c.fetchone()
if timer.is_alive():
return
finally:
try:
timer.cancel()
except Exception:
pass
raise TimeoutError("Connection timed out")
def reconnect(self) -> psycopg2.extensions.connection:
with self._lock:
if self._conn is None or not self._is_alive():
try:
self._conn.close() # noqa
except Exception:
pass
self.logger.debug("(re)connecting to database")
self._connect(self)
self._ping()
return self._conn
connect = reconnect
def close(self) -> None:
self._conn.close()
from __future__ import annotations
import json
import logging
import os
from psycopg2 import sql
from base_handler import AbstractHandler, MQTTMessage
from databases import ReentrantConnection
from thing import Thing
from utils import get_envvar, setup_logging
from utils.journaling import Journal
from utils.crypto import decrypt, get_crypt_key
import psycopg
logger = logging.getLogger("db-setup")
journal = Journal("System")
class CreateThingInPostgresHandler(AbstractHandler):
def __init__(self):
super().__init__(
topic=get_envvar("TOPIC"),
mqtt_broker=get_envvar("MQTT_BROKER"),
mqtt_user=get_envvar("MQTT_USER"),
mqtt_password=get_envvar("MQTT_PASSWORD"),
mqtt_client_id=get_envvar("MQTT_CLIENT_ID"),
mqtt_qos=get_envvar("MQTT_QOS", cast_to=int),
mqtt_clean_session=get_envvar("MQTT_CLEAN_SESSION", cast_to=bool),
)
self.db_conn = ReentrantConnection(get_envvar("DATABASE_URL"))
self.db = self.db_conn.connect()
def act(self, content: dict, message: MQTTMessage):
self.db = self.db_conn.reconnect()
thing = Thing.get_instance(content)
logger.info(f"start processing. {thing.name=}, {thing.uuid=}")
STA_PREFIX = "sta_"
GRF_PREFIX = "grf_"
# 1. Check, if there is already a database user for this project
if not self.user_exists(user := thing.database.username.lower()):
logger.debug(f"create user {user}")
self.create_user(thing)
logger.debug("create schema")
self.create_schema(thing)
if not self.user_exists(
sta_user := STA_PREFIX + thing.database.ro_username.lower()
):
logger.debug(f"create sta read-only user {sta_user}")
self.create_ro_user(thing, user_prefix=STA_PREFIX)
if not self.user_exists(
grf_user := GRF_PREFIX + thing.database.ro_username.lower()
):
logger.debug(f"create grafana read-only user {grf_user}")
self.create_ro_user(thing, user_prefix=GRF_PREFIX)
logger.debug("deploy dll")
self.deploy_ddl(thing)
logger.debug("deploy dml")
self.deploy_dml()
logger.info("update/create thing in db")
created = self.upsert_thing(thing)
journal.info(f"{'Created' if created else 'Updated'} Thing", thing.uuid)
logger.debug("create frost views")
self.create_frost_views(thing, user_prefix=STA_PREFIX)
logger.debug(f"grand frost view privileges to {sta_user}")
self.grant_sta_select(thing, user_prefix=STA_PREFIX)
logger.debug("create grafana helper views")
self.create_grafana_helper_view(thing)
logger.debug(f"grand grafana view privileges to {grf_user}")
self.grant_grafana_select(thing, user_prefix=GRF_PREFIX)
def create_user(self, thing):
with self.db:
with self.db.cursor() as c:
user = sql.Identifier(thing.database.username.lower())
passw = decrypt(thing.database.password, get_crypt_key())
c.execute(
sql.SQL("CREATE ROLE {user} WITH LOGIN PASSWORD {password}").format(
user=user, password=sql.Literal(passw)
)
)
c.execute(
sql.SQL("GRANT {user} TO {creator}").format(
user=user, creator=sql.Identifier(self.db.info.user)
)
)
def create_ro_user(self, thing, user_prefix: str = ""):
with self.db:
with self.db.cursor() as c:
ro_username = user_prefix.lower() + thing.database.ro_username.lower()
ro_user = sql.Identifier(ro_username)
schema = sql.Identifier(thing.database.username.lower())
ro_passw = decrypt(thing.database.ro_password, get_crypt_key())
c.execute(
sql.SQL(
"CREATE ROLE {ro_user} WITH LOGIN PASSWORD {ro_password}"
).format(ro_user=ro_user, ro_password=sql.Literal(ro_passw))
)
c.execute(
sql.SQL("GRANT {ro_user} TO {creator}").format(
ro_user=ro_user, creator=sql.Identifier(self.db.info.user)
)
)
# Allow tcp connections to database with new user
c.execute(
sql.SQL("GRANT CONNECT ON DATABASE {db_name} TO {ro_user}").format(
ro_user=ro_user, db_name=sql.Identifier(self.db.info.dbname)
)
)
c.execute(
sql.SQL("GRANT USAGE ON SCHEMA {schema} TO {ro_user}").format(
ro_user=ro_user, schema=schema
)
)
def password_has_changed(self, url, user, password):
# NOTE: currently unused function
try:
with psycopg.connect(url, user=user, password=password):
pass
except psycopg.OperationalError as e:
if "password authentication failed" in str(e):
return True
raise e
else:
return False
def maybe_update_password(self, user, password, db_url):
# NOTE: currently unused function
password = decrypt(password, get_crypt_key())
if not self.password_has_changed(user, password, db_url):
return
logger.debug(f"update password for user {user}")
with self.db:
with self.db.cursor() as c:
c.execute(
sql.SQL("ALTER USER {user} WITH PASSWORD {password}").format(
user=sql.Identifier(user), password=sql.Identifier(password)
)
)
def create_schema(self, thing):
with self.db:
with self.db.cursor() as c:
c.execute(
sql.SQL(
"CREATE SCHEMA IF NOT EXISTS {user} AUTHORIZATION {user}"
).format(user=sql.Identifier(thing.database.username.lower()))
)
def deploy_ddl(self, thing):
file = os.path.join(
os.path.dirname(__file__),
"CreateThingInDatabaseAction",
"postgres-ddl.sql",
)
with open(file) as fh:
query = fh.read()
with self.db:
with self.db.cursor() as c:
user = sql.Identifier(thing.database.username.lower())
# Set search path for current session
c.execute(sql.SQL("SET search_path TO {0}").format(user))
# Allow tcp connections to database with new user
c.execute(
sql.SQL("GRANT CONNECT ON DATABASE {db_name} TO {user}").format(
user=user, db_name=sql.Identifier(self.db.info.dbname)
)
)
# Set default schema when connecting as user
c.execute(
sql.SQL(
"ALTER ROLE {user} SET search_path to {user}, public"
).format(user=user)
)
# Grant schema to new user
c.execute(
sql.SQL("GRANT USAGE ON SCHEMA {user}, public TO {user}").format(
user=user
)
)
# Equip new user with all grants
c.execute(
sql.SQL("GRANT ALL ON SCHEMA {user} TO {user}").format(user=user)
)
# deploy the tables and indices and so on
c.execute(query)
c.execute(
sql.SQL(
"GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA {user} TO {user}"
).format(user=user)
)
c.execute(
sql.SQL(
"GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA {user} TO {user}"
).format(user=user)
)
def deploy_dml(self):
file = os.path.join(
os.path.dirname(__file__),
"CreateThingInDatabaseAction",
"postgres-dml.sql",
)
with open(file) as fh:
query = fh.read()
with self.db:
with self.db.cursor() as c:
c.execute(query)
def grant_sta_select(self, thing, user_prefix: str):
schema = sql.Identifier(thing.database.username.lower())
sta_user = sql.Identifier(
user_prefix.lower() + thing.database.ro_username.lower()
)
with self.db:
with self.db.cursor() as c:
# Set default schema when connecting as user
c.execute(
sql.SQL(
"ALTER ROLE {sta_user} SET search_path to {schema}, public"
).format(sta_user=sta_user, schema=schema)
)
# grant read rights to newly created views in schema to user
c.execute(
sql.SQL(
"GRANT SELECT ON ALL TABLES in SCHEMA {schema} TO {sta_user}"
).format(sta_user=sta_user, schema=schema)
)
c.execute(
sql.SQL(
"GRANT SELECT ON ALL SEQUENCES in SCHEMA {schema} TO {sta_user}"
).format(sta_user=sta_user, schema=schema)
)
c.execute(
sql.SQL(
"GRANT EXECUTE ON ALL FUNCTIONS in SCHEMA {schema} TO {sta_user}"
).format(sta_user=sta_user, schema=schema)
)
def grant_grafana_select(self, thing, user_prefix: str):
with self.db:
with self.db.cursor() as c:
schema = sql.Identifier(thing.database.username.lower())
grf_user = sql.Identifier(
user_prefix.lower() + thing.database.ro_username.lower()
)
# Set default schema when connecting as user
c.execute(
sql.SQL("ALTER ROLE {grf_user} SET search_path to {schema}").format(
grf_user=grf_user, schema=schema
)
)
c.execute(sql.SQL("SET search_path TO {schema}").format(schema=schema))
c.execute(
sql.SQL(
"REVOKE ALL ON ALL TABLES IN SCHEMA {schema}, public FROM {grf_user}"
).format(grf_user=grf_user, schema=schema)
)
c.execute(
sql.SQL(
"GRANT SELECT ON TABLE thing, datastream, observation, "
"journal, datastream_properties TO {grf_user}"
).format(grf_user=grf_user, schema=schema)
)
def create_frost_views(self, thing, user_prefix: str = "sta_"):
base_path = os.path.join(
os.path.dirname(__file__), "CreateThingInDatabaseAction", "sta"
)
files = [
os.path.join(base_path, "schema_context.sql"),
os.path.join(base_path, "thing.sql"),
os.path.join(base_path, "location.sql"),
os.path.join(base_path, "sensor.sql"),
os.path.join(base_path, "observed_property.sql"),
os.path.join(base_path, "datastream.sql"),
os.path.join(base_path, "observation.sql"),
os.path.join(base_path, "feature.sql"),
]
with self.db:
with self.db.cursor() as c:
for file in files:
logger.debug(f"deploy file: {file}")
with open(file) as fh:
view = fh.read()
user = sql.Identifier(thing.database.username.lower())
sta_user = sql.Identifier(
user_prefix.lower() + thing.database.ro_username.lower()
)
c.execute(sql.SQL("SET search_path TO {user}").format(user=user))
c.execute(
view,
{
"cv_url": os.environ.get("CV_URL"),
"sms_url": os.environ.get("SMS_URL"),
"tsm_schema": thing.database.username.lower(),
},
)
def create_grafana_helper_view(self, thing):
with self.db:
with self.db.cursor() as c:
username_identifier = sql.Identifier(thing.database.username.lower())
# Set search path for current session
c.execute(sql.SQL("SET search_path TO {0}").format(username_identifier))
c.execute(
sql.SQL("""DROP VIEW IF EXISTS "datastream_properties" CASCADE""")
)
c.execute(
sql.SQL(
"""
CREATE OR REPLACE VIEW "datastream_properties" AS
SELECT DISTINCT case
when dp.property_name is null or dp.unit_name is null then tsm_ds.position
else concat(dp.property_name, ' (', dp.unit_name, ') - ', tsm_ds."position"::text)
end as "property",
tsm_ds."position",
tsm_ds.id as "ds_id",
tsm_t.uuid as "t_uuid"
FROM datastream tsm_ds
JOIN thing tsm_t ON tsm_ds.thing_id = tsm_t.id
LEFT JOIN public.sms_datastream_link sdl ON tsm_t.uuid = sdl.thing_id AND tsm_ds.id = sdl.datastream_id
LEFT JOIN public.sms_device_property dp ON sdl.device_property_id = dp.id
"""
)
)
def upsert_thing(self, thing) -> bool:
"""Returns True for insert and False for update"""
query = (
"INSERT INTO thing (name, uuid, description, properties) "
"VALUES (%s, %s, %s, %s) ON CONFLICT (uuid) DO UPDATE SET "
"name = EXCLUDED.name, "
"description = EXCLUDED.description, "
"properties = EXCLUDED.properties "
"RETURNING (xmax = 0)"
)
with self.db:
with self.db.cursor() as c:
c.execute(
sql.SQL("SET search_path TO {user}").format(
user=sql.Identifier(thing.database.username.lower())
)
)
c.execute(
query,
(
thing.name,
thing.uuid,
thing.description,
json.dumps(thing.properties),
),
)
return c.fetchone()[0]
def thing_exists(self, username: str):
with self.db:
with self.db.cursor() as c:
c.execute("SELECT 1 FROM pg_roles WHERE rolname=%s", [username])
return len(c.fetchall()) > 0
def user_exists(self, username: str):
with self.db:
with self.db.cursor() as c:
c.execute("SELECT 1 FROM pg_roles WHERE rolname=%s", [username])
return len(c.fetchall()) > 0
if __name__ == "__main__":
setup_logging(get_envvar("LOG_LEVEL", "INFO"))
CreateThingInPostgresHandler().run_loop()
from __future__ import annotations
import fnmatch
import json
import logging
from datetime import datetime
import warnings
from minio import Minio
from minio.commonconfig import Tags
import databases
from base_handler import AbstractHandler, MQTTMessage
from utils import get_envvar, setup_logging
from utils.errors import UserInputError, ParsingError
from utils.journaling import Journal
_FILE_MAX_SIZE = 256 * 1024 * 1024
logger = logging.getLogger("file-ingest")
journal = Journal("Parser")
class ParserJobHandler(AbstractHandler):
def __init__(self):
super().__init__(
topic=get_envvar("TOPIC"),
mqtt_broker=get_envvar("MQTT_BROKER"),
mqtt_user=get_envvar("MQTT_USER"),
mqtt_password=get_envvar("MQTT_PASSWORD"),
mqtt_client_id=get_envvar("MQTT_CLIENT_ID"),
mqtt_qos=get_envvar("MQTT_QOS", cast_to=int),
mqtt_clean_session=get_envvar("MQTT_CLEAN_SESSION", cast_to=bool),
)
self.minio = Minio(
endpoint=get_envvar("MINIO_URL"),
access_key=get_envvar("MINIO_ACCESS_KEY"),
secret_key=get_envvar("MINIO_SECURE_KEY"),
secure=get_envvar("MINIO_SECURE", default=True, cast_to=bool),
)
self.pub_topic = get_envvar("TOPIC_DATA_PARSED")
self.dbapi = databases.DBapi(get_envvar("DB_API_BASE_URL"))
self.confdb = databases.ConfigDB(get_envvar("CONFIGDB_DSN"))
def act(self, content: dict, message: MQTTMessage):
if not self.is_valid_event(content):
logger.debug(f'irrelevant event {content["EventName"]!r}')
return
# Directories are part of the filename
# eg: foo/bar/file.ext -> bucket: foo, file: bar/file.ext
bucket_name, filename = content["Key"].split("/", maxsplit=1)
thing_uuid = self.confdb.get_thing_uuid("bucket", bucket_name)
pattern = self.confdb.get_s3_store(thing_uuid)["filename_pattern"]
if not fnmatch.fnmatch(filename, pattern):
logger.debug(f"{filename} is excluded by filename_pattern {pattern!r}")
return
source_uri = f"{bucket_name}/{filename}"
logger.debug(f"loading parser for {thing_uuid}")
parser = self.confdb.get_parser(thing_uuid)
logger.debug(f"reading raw data file {source_uri}")
rawdata = self.read_file(bucket_name, filename)
logger.info(f"parsing rawdata ... ")
file = source_uri
with warnings.catch_warnings() as w:
try:
df = parser.do_parse(rawdata)
obs = parser.to_observations(df, source_uri)
except ParsingError as e:
journal.error(
f"Parsing failed. Detail: {e}. File: {file!r}", thing_uuid
)
raise e
except Exception as e:
journal.error(f"Parsing failed for file {file!r}", thing_uuid)
raise UserInputError("Parsing failed") from e
if w:
journal.warning(w[0].message, thing_uuid)
logger.debug(f"storing observations to database ...")
try:
self.dbapi.upsert_observations(thing_uuid, obs)
except Exception as e:
# Tell the user that his parsing was successful
journal.error(
f"Parsing was successful, but storing data "
f"in database failed. File: {file!r}",
thing_uuid,
)
raise e
# Now everything is fine and we tell the user
journal.info(f"Parsed file {file}", thing_uuid)
object_tags = Tags.new_object_tags()
object_tags["parsed_at"] = datetime.now().isoformat()
self.minio.set_object_tags(bucket_name, filename, object_tags)
payload = json.dumps({"thing_uuid": str(thing_uuid)})
self.mqtt_client.publish(
topic=self.pub_topic, payload=payload, qos=self.mqtt_qos
)
def is_valid_event(self, content: dict):
logger.debug(f'{content["EventName"]=}')
return content["EventName"] in (
"s3:ObjectCreated:Put",
"s3:ObjectCreated:CompleteMultipartUpload",
)
def read_file(self, bucket_name, object_name) -> str:
stat = self.minio.stat_object(bucket_name, object_name)
if stat.size > _FILE_MAX_SIZE:
raise IOError("Maximum filesize of 256M exceeded")
rawdata = (
self.minio.get_object(bucket_name, object_name)
.read()
.decode()
# remove the ASCII control character ETX (end-of-text)
.rstrip("\x03")
)
return rawdata
if __name__ == "__main__":
setup_logging(get_envvar("LOG_LEVEL", "INFO"))
ParserJobHandler().run_loop()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment