Skip to content
Snippets Groups Projects
Commit 393de212 authored by Bert Palm's avatar Bert Palm :bug:
Browse files

Merge branch 'add_MQTT_to_cron' into 'main'

[cron] inform downstream service about new parsed data

See merge request !240
parents f27d969f afe36610
No related branches found
No related tags found
1 merge request!240[cron] inform downstream service about new parsed data
Pipeline #486210 passed
......@@ -4,3 +4,4 @@ psycopg[binary]
minio
paramiko
requests
paho-mqtt
......@@ -6,6 +6,7 @@ import json
import os
import logging
import requests
import mqtt
from datetime import datetime, timedelta, timezone
from urllib.request import Request, urlopen
......@@ -94,18 +95,21 @@ def main(thing_uuid, parameters, target_uri):
url = f"""{params["endpoint"]}/{params["sensor_id"]}/{timestamp_from}/{timestamp_to}"""
response = make_request(url, params["username"], params["password"])
parsed_observations = parse_api_response(response, origin="bosch_data")
req = requests.post(
resp = requests.post(
f"{api_base_url}/observations/upsert/{thing_uuid}",
json=parsed_observations,
headers={"Content-type": "application/json"},
)
if req.status_code == 201:
logging.info(
f"Successfully inserted {len(parsed_observations['observations'])} "
f"observations for thing {thing_uuid} from Bosch API into TimeIO DB"
)
else:
logging.error(f"{req.text}")
if resp.status_code != 201:
logging.error(f"{resp.text}")
resp.raise_for_status()
# exit
logging.info(
f"Successfully inserted {len(parsed_observations['observations'])} "
f"observations for thing {thing_uuid} from Bosch API into TimeIO DB"
)
mqtt.send_mqtt_info("data_parsed", json.dumps({"thing_uuid": thing_uuid}))
if __name__ == "__main__":
......
......@@ -5,6 +5,7 @@ import os
import logging
import json
import click
import mqtt
from datetime import datetime, timedelta
......@@ -90,18 +91,21 @@ def main(thing_uuid, parameters, target_uri):
params = json.loads(parameters.replace("'", '"'))
response = fetch_brightsky_data(params["station_id"])
parsed_observations = parse_brightsky_response(response)
req = requests.post(
resp = requests.post(
f"{api_base_url}/observations/upsert/{thing_uuid}",
json=parsed_observations,
headers={"Content-type": "application/json"},
)
if req.status_code == 201:
logging.info(
f"Successfully inserted {len(parsed_observations['observations'])} "
f"observations for thing {thing_uuid} from DWD API into TimeIO DB"
)
else:
logging.error(f"{req.text}")
if resp.status_code != 201:
logging.error(f"{resp.text}")
resp.raise_for_status()
# exit
logging.info(
f"Successfully inserted {len(parsed_observations['observations'])} "
f"observations for thing {thing_uuid} from DWD API into TimeIO DB"
)
mqtt.send_mqtt_info("data_parsed", json.dumps({"thing_uuid": thing_uuid}))
if __name__ == "__main__":
......
#!/usr/bin/env python3
import os
import paho.mqtt.publish
try:
_broker = os.environ["MQTT_BROKER"]
mqtt_setting = {
"qos": int(os.environ["MQTT_QOS"]),
"hostname": _broker.split(":")[0],
"port": int(_broker.split(":")[1]),
"client_id": os.environ["MQTT_QOS"],
"auth": {
"username": os.environ["MQTT_USER"],
"password": os.environ["MQTT_PASSWORD"],
},
}
except KeyError as e:
raise EnvironmentError(f"Missing environment variable {e}")
def send_mqtt_info(topic, payload: str):
"""
Publish a single mqtt message to a broker, then disconnect cleanly.
"""
paho.mqtt.publish.single(**mqtt_setting, topic=topic, payload=payload)
......@@ -10,6 +10,7 @@ from datetime import datetime
import click
import requests
import psycopg
import mqtt
URL = "http://www.nmdb.eu/nest/draw_graph.php"
......@@ -111,18 +112,21 @@ def main(thing_uuid: str, parameters: str, target_uri: str):
end_date=datetime.now(),
)
req = requests.post(
resp = requests.post(
f"{api_base_url}/observations/upsert/{thing_uuid}",
json=parsed_observations,
headers={"Content-type": "application/json"},
)
if req.status_code == 201:
logging.info(
f"Successfully inserted {len(parsed_observations['observations'])} "
f"observations for thing {thing_uuid} from NM API into TimeIO DB"
)
else:
logging.error(f"{req.text}")
if resp.status_code != 201:
logging.error(f"{resp.text}")
resp.raise_for_status()
# exit
logging.info(
f"Successfully inserted {len(parsed_observations['observations'])} "
f"observations for thing {thing_uuid} from NM API into TimeIO DB"
)
mqtt.send_mqtt_info("data_parsed", json.dumps({"thing_uuid": thing_uuid}))
if __name__ == "__main__":
......
......@@ -7,6 +7,7 @@ import logging
import requests
import click
import mqtt
api_base_url = os.environ.get("DB_API_BASE_URL")
......@@ -59,18 +60,21 @@ def main(thing_uuid: str, parameters: str, target_uri: str):
bodies.append(body)
post_data = {"observations": bodies}
req = requests.post(
resp = requests.post(
f"{api_base_url}/observations/upsert/{thing_uuid}",
json=post_data,
headers={"Content-type": "application/json"},
)
if req.status_code == 201:
logging.info(
f"Successfully inserted {len(post_data['observations'])} "
f"observations for thing {thing_uuid} from TTN API into TimeIO DB"
)
else:
logging.error(f"{req.text}")
if resp.status_code != 201:
logging.error(f"{resp.text}")
resp.raise_for_status()
# exit
logging.info(
f"Successfully inserted {len(post_data['observations'])} "
f"observations for thing {thing_uuid} from TTN API into TimeIO DB"
)
mqtt.send_mqtt_info("data_parsed", json.dumps({"thing_uuid": thing_uuid}))
if __name__ == "__main__":
......
......@@ -880,6 +880,12 @@ services:
${CONFIGDB_PORT}/\
${CONFIGDB_DB}"
DB_API_BASE_URL: "${DB_API_BASE_URL}"
MQTT_BROKER: mqtt-broker:1883
MQTT_USER: "${MQTT_USER}"
MQTT_PASSWORD: "${MQTT_PASSWORD}"
MQTT_CLIENT_ID: cron-scheduler
MQTT_CLEAN_SESSION: "${MQTT_CLEAN_SESSION}"
MQTT_QOS: "${MQTT_QOS}"
depends_on:
init:
condition: service_completed_successfully
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment