Skip to content
Snippets Groups Projects
Commit 800fd2de authored by Florian Gransee's avatar Florian Gransee
Browse files

add consistent cron expression for updtated things

parent 12083977
No related branches found
No related tags found
1 merge request!292add consistent cron expression for updtated things
......@@ -10,6 +10,7 @@ PyYAML==6.0.2
grafana-client>=3.6.0
requests~=2.32.3
python-crontab~=3.0.0
croniter~=6.0.0
pandas~=2.1.4
saqc==2.6.0
cryptography>=43.0.0
......
......@@ -36,8 +36,8 @@ class CreateThingInCrontabHandler(AbstractHandler):
for job in crontab:
if self.job_belongs_to_thing(job, thing):
logger.info(f"Updating cronjob for thing {thing.name}")
self.update_job(job, thing)
journal.info(f"Updated cronjob", thing.uuid)
info = self.update_job(job, thing)
journal.info(f"Updated cronjob to sync {info}", thing.uuid)
return
# if no job was found, create a new one
job = crontab.new()
......@@ -66,7 +66,7 @@ class CreateThingInCrontabHandler(AbstractHandler):
job.set_comment(comment, pre_comment=True)
job.setall(schedule)
job.set_command(command)
info = f"sFTP {thing.ext_sftp.uri} @ {interval}s"
info = f"sFTP {thing.ext_sftp.uri} @ {interval}m and schedule {schedule}"
if thing.ext_api is not None:
interval = int(thing.ext_api.sync_interval)
schedule = cls.get_schedule(interval)
......@@ -77,11 +77,48 @@ class CreateThingInCrontabHandler(AbstractHandler):
job.set_comment(comment, pre_comment=True)
job.setall(schedule)
job.set_command(command)
info = f"{thing.ext_api.api_type_name}-API @ {interval}s"
info = f"{thing.ext_api.api_type_name}-API @ {interval}m and schedule {schedule}"
return info
# alias
update_job = make_job
@classmethod
def update_job(cls, job: CronItem, thing: Thing) -> str:
info = ""
comment = cls.mk_comment(thing)
uuid = thing.uuid
current_interval = cls.get_current_interval(job)
if thing.ext_sftp is not None:
new_interval = int(thing.ext_sftp.sync_interval)
script = "/scripts/sync_sftp.py"
keyfile = thing.ext_sftp.ssh_priv_key
command = f"{script} {uuid} {keyfile} > $STDOUT 2> $STDERR"
job.enable(enabled=thing.ext_sftp.enabled)
job.set_comment(comment, pre_comment=True)
# if the interval has changed we want to ensure consistent starting times with the previous one
if current_interval != new_interval:
schedule = cls.update_cron_expression(job, new_interval)
else:
schedule = str(job.slices)
job.setall(schedule)
job.set_command(command)
info = (
f"sFTP {thing.ext_sftp.uri} @ {new_interval}m and schedule {schedule}"
)
elif thing.ext_api is not None:
new_interval = int(thing.ext_api.sync_interval)
script = f"/scripts/sync_{thing.ext_api.api_type_name}_api.py"
target_uri = thing.database.url
command = f"""{script} {uuid} "{thing.ext_api.settings}" {target_uri} > $STDOUT 2> $STDERR"""
job.enable(enabled=thing.ext_api.enabled)
job.set_comment(comment, pre_comment=True)
# if the interval has changed we want to ensure consistent starting dates
if current_interval != new_interval:
schedule = cls.update_cron_expression(job, new_interval)
else:
schedule = str(job.slices)
job.setall(schedule)
job.set_command(command)
info = f"{thing.ext_api.api_type_name}-API @ {new_interval}m and schedule {schedule}"
return info
@staticmethod
def job_belongs_to_thing(job: CronItem, thing: Thing) -> bool:
......@@ -110,6 +147,43 @@ class CreateThingInCrontabHandler(AbstractHandler):
delay_wd = randint(0, min(interval // 1440 - 1, 6))
return f"{delay_m} {delay_h} * * {delay_wd}-6/{interval//1440}"
@staticmethod
def get_current_interval(job: CronItem) -> int:
"""Get interval in minutes from crontab.txt entry"""
schedule = job.schedule()
next_run = schedule.get_next()
prev_run = schedule.get_prev()
interval = next_run - prev_run
return int(interval.seconds / 60)
@staticmethod
def extract_base_minute(schedule: str) -> int:
"""Extract the minute value from the cron expression. In the case
of multiple values, the first one is returned.
"""
minute_part = schedule.split()[0]
if "," in minute_part:
return int(minute_part.split(",")[0])
elif minute_part.isdigit():
return int(minute_part)
return 0
@classmethod
def update_cron_expression(cls, job, new_interval: int) -> str:
"""Update cron while keeping the same base minute for consistency."""
base_minute = cls.extract_base_minute(str(job.slices))
if new_interval < 60:
minutes = sorted(
(base_minute + i * new_interval) % 60 for i in range(60 // new_interval)
)
return f"{','.join(map(str, minutes))} * * * *"
elif new_interval < 1440:
return f"{base_minute} */{new_interval // 60} * * *"
else:
return f"{base_minute} 0 */{new_interval // 1440} * *"
if __name__ == "__main__":
setup_logging(get_envvar("LOG_LEVEL", "INFO"))
......
......@@ -135,7 +135,7 @@ def fetch_extapi_type_id(conn: Connection, name: str) -> int:
[name],
).fetchone()
if r is None:
raise ValueError(f"No entry for mqtt_device_type {name!r}")
raise ValueError(f"No entry for ext_api_type {name!r}")
return r[0]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment