Skip to content
Snippets Groups Projects
Commit 75dcf680 authored by Bert Palm's avatar Bert Palm :bug:
Browse files

blackified

parent 6d5f7349
No related branches found
No related tags found
1 merge request!235`configDB` instead of `frontendb`
......@@ -24,6 +24,7 @@ def on_connect(client, userdata, flags, rc):
global connected
connected = True
def on_message(client, userdata, msg):
global is_valid
print(msg.topic + " " + str(msg.payload))
......@@ -31,6 +32,7 @@ def on_message(client, userdata, msg):
if validate_message(msg_json):
is_valid = True
def validate_message(msg_json):
avsc_file = "./.gitlab/ci/avsc/thing_creation.avsc"
if not os.path.isfile(avsc_file):
......@@ -46,6 +48,7 @@ def validate_message(msg_json):
print(f"Message not valid. Returning exception:\n{e}")
return False
def django_loaddata():
print("Loading Django fixtures")
os.system(
......@@ -57,6 +60,7 @@ def django_loaddata():
)
time.sleep(2)
def build_client():
clt = mqtt.Client()
clt.on_connect = on_connect
......@@ -64,6 +68,7 @@ def build_client():
clt.username_pw_set(username, password)
return clt
def connect_and_listen():
client = build_client()
client.connect(host, port)
......
......@@ -22,32 +22,36 @@ PARAMETER_MAPPING = {
"PS_0_PM10_CORR": 0,
"PS_0_PM2P5_CORR": 0,
"SO2_2_CORR": 0,
"SO2_2_CORR_1hr": 0
"SO2_2_CORR_1hr": 0,
}
RESULT_TYPE_MAPPING = {0: "result_number",
1: "result_string",
2: "result_json",
3: "result_boolean"}
RESULT_TYPE_MAPPING = {
0: "result_number",
1: "result_string",
2: "result_json",
3: "result_boolean",
}
def basic_auth(username, password):
credential = f"{username}:{password}"
b_encoded_credential = base64.b64encode(credential.encode('ascii')).decode('ascii')
b_encoded_credential = base64.b64encode(credential.encode("ascii")).decode("ascii")
return f"Basic {b_encoded_credential}"
def make_request(server_url, user, password, post_data=None):
r = Request(server_url)
r.add_header('Authorization', basic_auth(user, password))
r.add_header('Content-Type', 'application/json')
r.add_header('Accept', 'application/json')
r.add_header("Authorization", basic_auth(user, password))
r.add_header("Content-Type", "application/json")
r.add_header("Accept", "application/json")
r_data = post_data
r.data = r_data
handle = urlopen(r)
content = handle.read().decode('utf8')
content = handle.read().decode("utf8")
response = json.loads(content)
return response
def get_utc_timestamps(period: int):
now_utc = datetime.now(timezone.utc)
now_str = now_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
......@@ -55,12 +59,12 @@ def get_utc_timestamps(period: int):
timestamp_from_str = timestamp_from.strftime("%Y-%m-%dT%H:%M:%SZ")
return timestamp_from_str, now_str
def parse_api_response(response: list, origin: str):
bodies = []
for entry in response:
obs = entry["payload"]
source = {"sensor_id": obs.pop("deviceID"),
"observation_type": obs.pop("Type")}
source = {"sensor_id": obs.pop("deviceID"), "observation_type": obs.pop("Type")}
timestamp = obs.pop("UTC")
for parameter, value in obs.items():
if value:
......@@ -70,11 +74,14 @@ def parse_api_response(response: list, origin: str):
"result_type": result_type,
"datastream_pos": parameter,
RESULT_TYPE_MAPPING[result_type]: value,
"parameters": json.dumps({"origin": "bosch_data", "column_header": source})
"parameters": json.dumps(
{"origin": "bosch_data", "column_header": source}
),
}
bodies.append(body)
return {"observations": bodies}
@click.command()
@click.argument("thing_uuid")
@click.argument("parameters")
......@@ -87,16 +94,19 @@ def main(thing_uuid, parameters, target_uri):
url = f"""{params["endpoint"]}/{params["sensor_id"]}/{timestamp_from}/{timestamp_to}"""
response = make_request(url, params["username"], params["password"])
parsed_observations = parse_api_response(response, origin="bosch_data")
req = requests.post(f"{api_base_url}/observations/upsert/{thing_uuid}",
json=parsed_observations,
headers = {'Content-type': 'application/json'})
req = requests.post(
f"{api_base_url}/observations/upsert/{thing_uuid}",
json=parsed_observations,
headers={"Content-type": "application/json"},
)
if req.status_code == 201:
logging.info(
f"Successfully inserted {len(parsed_observations['observations'])} "
f"observations for thing {thing_uuid} from Bosch API into TimeIO DB"
)
else:
logging.error(f"{req.text}")
logging.error(f"{req.text}")
if __name__ == "__main__":
main()
......@@ -27,30 +27,37 @@ PARAMETER_MAPPING = {
"wind_direction": 0,
"wind_speed": 0,
"wind_gust_direction": 0,
"wind_gust_speed": 0
"wind_gust_speed": 0,
}
RESULT_TYPE_MAPPING = {0: "result_number",
1: "result_string",
2: "result_json",
3: "result_boolean"}
RESULT_TYPE_MAPPING = {
0: "result_number",
1: "result_string",
2: "result_json",
3: "result_boolean",
}
def fetch_brightsky_data(station_id: str, brightsky_base_url = "https://api.brightsky.dev/weather") -> dict:
""" Returns DWD data with hourly resolution of yesterday"""
def fetch_brightsky_data(
station_id: str, brightsky_base_url="https://api.brightsky.dev/weather"
) -> dict:
"""Returns DWD data with hourly resolution of yesterday"""
yesterday = datetime.now() - timedelta(days=1)
yesterday_start = datetime.strftime(yesterday, '%Y-%m-%d:00:00:00')
yesterday_end = datetime.strftime(yesterday, '%Y-%m-%d:23:55:00')
params = {"dwd_station_id": station_id,
"date": yesterday_start,
"last_date": yesterday_end,
"units": "dwd"}
yesterday_start = datetime.strftime(yesterday, "%Y-%m-%d:00:00:00")
yesterday_end = datetime.strftime(yesterday, "%Y-%m-%d:23:55:00")
params = {
"dwd_station_id": station_id,
"date": yesterday_start,
"last_date": yesterday_end,
"units": "dwd",
}
brightsky_response = requests.get(url=brightsky_base_url, params=params)
response_data = brightsky_response.json()
return response_data
def parse_brightsky_response(resp) -> dict:
""" Uses Brightsky Response and returns body for POST request"""
"""Uses Brightsky Response and returns body for POST request"""
observation_data = resp["weather"]
source = resp["sources"][0]
bodies = []
......@@ -65,11 +72,14 @@ def parse_brightsky_response(resp) -> dict:
"result_type": result_type,
"datastream_pos": parameter,
RESULT_TYPE_MAPPING[result_type]: value,
"parameters": json.dumps({"origin": "dwd_data", "column_header": source})
"parameters": json.dumps(
{"origin": "dwd_data", "column_header": source}
),
}
bodies.append(body)
return {"observations": bodies}
@click.command()
@click.argument("thing_uuid")
@click.argument("parameters")
......@@ -80,9 +90,11 @@ def main(thing_uuid, parameters, target_uri):
params = json.loads(parameters.replace("'", '"'))
response = fetch_brightsky_data(params["station_id"])
parsed_observations = parse_brightsky_response(response)
req = requests.post(f"{api_base_url}/observations/upsert/{thing_uuid}",
json=parsed_observations,
headers = {'Content-type': 'application/json'})
req = requests.post(
f"{api_base_url}/observations/upsert/{thing_uuid}",
json=parsed_observations,
headers={"Content-type": "application/json"},
)
if req.status_code == 201:
logging.info(
f"Successfully inserted {len(parsed_observations['observations'])} "
......@@ -94,4 +106,3 @@ def main(thing_uuid, parameters, target_uri):
if __name__ == "__main__":
main()
......@@ -16,7 +16,9 @@ URL = "http://www.nmdb.eu/nest/draw_graph.php"
api_base_url = os.environ.get("DB_API_BASE_URL")
def get_nm_station_data(station: str, resolution: int, start_date: datetime, end_date: datetime) -> dict:
def get_nm_station_data(
station: str, resolution: int, start_date: datetime, end_date: datetime
) -> dict:
params = {
"wget": 1,
"stations[]": station,
......@@ -42,14 +44,18 @@ def get_nm_station_data(station: str, resolution: int, start_date: datetime, end
bodies = []
header = {"sensor_id": station, "resolution": resolution, "nm_api_url": URL}
for timestamp, value in rows:
if value:
bodies.append({
"result_time": timestamp,
"result_type": 0,
"datastream_pos": station,
"result_number": float(value),
"parameters": json.dumps({"origin": "nm_data", "column_header": header})
})
if value:
bodies.append(
{
"result_time": timestamp,
"result_type": 0,
"datastream_pos": station,
"result_number": float(value),
"parameters": json.dumps(
{"origin": "nm_data", "column_header": header}
),
}
)
return {"observations": bodies}
......@@ -105,16 +111,18 @@ def main(thing_uuid: str, parameters: str, target_uri: str):
end_date=datetime.now(),
)
req = requests.post(f"{api_base_url}/observations/upsert/{thing_uuid}",
json=parsed_observations,
headers = {'Content-type': 'application/json'})
req = requests.post(
f"{api_base_url}/observations/upsert/{thing_uuid}",
json=parsed_observations,
headers={"Content-type": "application/json"},
)
if req.status_code == 201:
logging.info(
f"Successfully inserted {len(parsed_observations['observations'])} "
f"observations for thing {thing_uuid} from NM API into TimeIO DB"
)
else:
logging.error(f"{req.text}")
logging.error(f"{req.text}")
if __name__ == "__main__":
......
......@@ -10,6 +10,7 @@ import click
api_base_url = os.environ.get("DB_API_BASE_URL")
def cleanupJson(string: str) -> str:
"""
The json string from the TTN Endpoint is erroneous
......@@ -51,14 +52,18 @@ def main(thing_uuid: str, parameters: str, target_uri: str):
"result_type": 0,
"datastream_pos": k,
"result_number": float(v),
"parameters": json.dumps({"origin": params["endpoint_uri"], "column_header": k})
"parameters": json.dumps(
{"origin": params["endpoint_uri"], "column_header": k}
),
}
bodies.append(body)
post_data = {"observations": bodies}
req = requests.post(f"{api_base_url}/observations/upsert/{thing_uuid}",
json=post_data,
headers={'Content-type': 'application/json'})
req = requests.post(
f"{api_base_url}/observations/upsert/{thing_uuid}",
json=post_data,
headers={"Content-type": "application/json"},
)
if req.status_code == 201:
logging.info(
f"Successfully inserted {len(post_data['observations'])} "
......
......@@ -17,7 +17,9 @@ file_names = [
"sms_cv_unit.json",
]
file_path_list = [os.path.join(script_dir, "tables", file_name) for file_name in file_names]
file_path_list = [
os.path.join(script_dir, "tables", file_name) for file_name in file_names
]
if __name__ == "__main__":
url = os.environ.get("CV_API_URL")
......
......@@ -14,13 +14,11 @@ def get_schemas_with_things(cur):
return cur.execute(
"""SELECT schemaname FROM pg_tables
WHERE tablename = 'thing';"""
).fetchall()
).fetchall()
def get_things(cur, schema):
return cur.execute(
f"""SELECT uuid::varchar from {schema}.thing;"""
).fetchall()
return cur.execute(f"""SELECT uuid::varchar from {schema}.thing;""").fetchall()
def get_schema_thing_dict(cur):
......@@ -29,8 +27,9 @@ def get_schema_thing_dict(cur):
for schema in schemas:
things = get_things(cur, schema["schemaname"])
for thing in things:
schemas_things_dict.append({"schema_name": schema["schemaname"],
"thing_uuid": thing["uuid"]})
schemas_things_dict.append(
{"schema_name": schema["schemaname"], "thing_uuid": thing["uuid"]}
)
return schemas_things_dict
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment