Skip to content

Streamingdata

Luca Johannes Nendel requested to merge streamingdata into main

How to set up a local development/testing environment

  • Follow the general setup procedure until step 2.

  • In the same directory (i.e. {whatever_path}/tsm-orchestration`)

    • Create a new schema and with one thing (and optionally a second one):

      cat mqtt-thing-event-msg.json | docker-compose exec -T mqtt-broker sh -c "mosquitto_pub -t thing_created -u \$MQTT_USER -P \$MQTT_PASSWORD -s"
      
      cat mqtt-thing-event-msg2.json | docker-compose exec -T mqtt-broker sh -c "mosquitto_pub -t thing_created -u \$MQTT_USER -P \$MQTT_PASSWORD -s"
    • Add a new mqtt user (same is used for both things) with:

      docker-compose run --rm mqtt-broker mosquitto_passwd -b /mosquitto-auth/mosquitto.passwd "mqtt_ingest" "neeTham4iexee5aiwoop8Eeroxeichahfo6zezai"
    • Restart the mqtt broker:

      docker-compose restart mqtt-broker
    • Simulate a mqtt publisher. Open a separate terminal and run the following command to publish message ever 1 seconds:

      while true ; do echo '{"type":"Feature","geometry":{"type":"Point","coordinates":[null,null,null]},"properties":{"loggerID":"CR6_18341","observationNames":["Batt_volt_Min","PTemp"],"observations":{"'$(date +"%Y-%m-%dT%H:%M:%SZ")'":[11.91,26.83]}}}' | docker-compose exec -T mqtt-broker sh -c "mosquitto_pub -t mqtt_ingest/seefo_envimo_cr6_test_001/32036c37-ba4c-4271-8815-500023374b9e -u mqtt_ingest -P neeTham4iexee5aiwoop8Eeroxeichahfo6zezai -s" ; sleep 1 ; done
      
    • Optionally simulate another mqtt publisher. Open a separate terminal and run the following command to publish message ever 5 seconds:

      while true ; do echo '{"type":"Feature","geometry":{"type":"Point","coordinates":[null,null,null]},"properties":{"loggerID":"CR6_18341","observationNames":["Batt_volt_Min","PTemp"],"observations":{"'$(date +"%Y-%m-%dT%H:%M:%SZ")'":[11.91,26.83]}}}' | docker-compose exec -T mqtt-broker sh -c "mosquitto_pub -t mqtt_ingest/seefo_envimo_cr6_test_002/7ff34ed2-5e56-11ec-9b0a-54e1ad7c5c19 -u mqtt_ingest -P neeTham4iexee5aiwoop8Eeroxeichahfo6zezai -s" ; sleep 5 ; done
      
  • Open up a new terminal, cd to this repo (i.e. {whatever_path}/tsm-dispatcher), setup an python environment and install the necessary dependencies:

    python -m venv venv
    source venv/bin/activate
    pip install -r src/requirements.txt
  • Run the following to see the data action:

    python src/main.py --topic mqtt_ingest/# --mqtt-broker 127.0.0.1:1883 --mqtt-user mqtt_ingest --mqtt-password neeTham4iexee5aiwoop8Eeroxeichahfo6zezai parse-data --target-uri postgresql://postgres:postgres@localhost:5432/postgres

Current status

Missing bits an pieces:

  • Integration with `tsm_datastore_lib. Currently we only print transformed data.
  • A real concept to dynamically select a parser based on the topic and/or message.
  • More general: a concept on how we structure messages into topics
  • Batch mode: As only message with QoS > 0 will be retained on the broker, I haven't started with this yet.
Edited by Luca Johannes Nendel

Merge request reports

Loading