Streamingdata
How to set up a local development/testing environment
-
Follow the general setup procedure until step 2.
-
In the same directory (i.e. {whatever_path}/tsm-orchestration`)
-
Create a new schema and with one thing (and optionally a second one):
cat mqtt-thing-event-msg.json | docker-compose exec -T mqtt-broker sh -c "mosquitto_pub -t thing_created -u \$MQTT_USER -P \$MQTT_PASSWORD -s" cat mqtt-thing-event-msg2.json | docker-compose exec -T mqtt-broker sh -c "mosquitto_pub -t thing_created -u \$MQTT_USER -P \$MQTT_PASSWORD -s"
-
Add a new mqtt user (same is used for both things) with:
docker-compose run --rm mqtt-broker mosquitto_passwd -b /mosquitto-auth/mosquitto.passwd "mqtt_ingest" "neeTham4iexee5aiwoop8Eeroxeichahfo6zezai"
-
Restart the mqtt broker:
docker-compose restart mqtt-broker
-
Simulate a mqtt publisher. Open a separate terminal and run the following command to publish message ever 1 seconds:
while true ; do echo '{"type":"Feature","geometry":{"type":"Point","coordinates":[null,null,null]},"properties":{"loggerID":"CR6_18341","observationNames":["Batt_volt_Min","PTemp"],"observations":{"'$(date +"%Y-%m-%dT%H:%M:%SZ")'":[11.91,26.83]}}}' | docker-compose exec -T mqtt-broker sh -c "mosquitto_pub -t mqtt_ingest/seefo_envimo_cr6_test_001/32036c37-ba4c-4271-8815-500023374b9e -u mqtt_ingest -P neeTham4iexee5aiwoop8Eeroxeichahfo6zezai -s" ; sleep 1 ; done
-
Optionally simulate another mqtt publisher. Open a separate terminal and run the following command to publish message ever 5 seconds:
while true ; do echo '{"type":"Feature","geometry":{"type":"Point","coordinates":[null,null,null]},"properties":{"loggerID":"CR6_18341","observationNames":["Batt_volt_Min","PTemp"],"observations":{"'$(date +"%Y-%m-%dT%H:%M:%SZ")'":[11.91,26.83]}}}' | docker-compose exec -T mqtt-broker sh -c "mosquitto_pub -t mqtt_ingest/seefo_envimo_cr6_test_002/7ff34ed2-5e56-11ec-9b0a-54e1ad7c5c19 -u mqtt_ingest -P neeTham4iexee5aiwoop8Eeroxeichahfo6zezai -s" ; sleep 5 ; done
-
-
Open up a new terminal, cd to this repo (i.e.
{whatever_path}/tsm-dispatcher
), setup an python environment and install the necessary dependencies:python -m venv venv source venv/bin/activate pip install -r src/requirements.txt
-
Run the following to see the data action:
python src/main.py --topic mqtt_ingest/# --mqtt-broker 127.0.0.1:1883 --mqtt-user mqtt_ingest --mqtt-password neeTham4iexee5aiwoop8Eeroxeichahfo6zezai parse-data --target-uri postgresql://postgres:postgres@localhost:5432/postgres
Current status
Missing bits an pieces:
-
Integration with `tsm_datastore_lib. Currently we only print transformed data. -
A real concept to dynamically select a parser based on the topic and/or message. -
More general: a concept on how we structure messages into topics -
Batch mode: As only message with QoS > 0 will be retained on the broker, I haven't started with this yet.