# HG changeset patch # User Fabien Amarger <fabien.amarger@logilab.fr> # Date 1711121877 -3600 # Fri Mar 22 16:37:57 2024 +0100 # Node ID 2d91facad52134b18c27332ff5a003d7dd1e7fbd # Parent be43c452b0243c72b7c29439d94fa4087972d91e feat(import): Add a thread to create import_process periodically diff --git a/cubicweb_rodolf/hooks.py b/cubicweb_rodolf/hooks.py --- a/cubicweb_rodolf/hooks.py +++ b/cubicweb_rodolf/hooks.py @@ -17,14 +17,51 @@ """cubicweb-rodolf specific hooks and operations""" import os -from datetime import datetime +from datetime import datetime, timedelta +import logging from cubicweb.server.hook import Hook from cubicweb.predicates import is_instance from cubicweb_s3storage.storages import S3Storage -from cubicweb_rodolf.import_data import import_data +from cubicweb_rodolf.import_data import import_data, launch_import_procedure + + +RODOLF_IMPORT_DELTA = timedelta( + seconds=float(os.getenv("RODOLF_IMPORT_DELTA", 60 * 60 * 24)) +) + + +def looping_task_rodolf_import(repo): + logger = logging.getLogger("rodolf-import-thread") + with repo.internal_cnx() as cnx: + started_processes = 0 + procedures = cnx.find("ImportProcedure").entities() + for procedure in procedures: + started_processes += launch_import_procedure( + cnx, + procedure, + logger, + ) + logger.info(f"[rodolf-import]: {started_processes} rq-tasks created") + logger.info( + f"[rodolf-import] next import in {RODOLF_IMPORT_DELTA} seconds (at" + f" {datetime.now() + RODOLF_IMPORT_DELTA})" + ) + + +class RodolfImportScheduler(Hook): + __regid__ = "rodolf.server-startup-rodolf-import-hook" + events = ("server_startup", "server_maintenance") + + def __call__(self): + if self.repo.has_scheduler(): + self.repo.looping_task( + RODOLF_IMPORT_DELTA.total_seconds(), + looping_task_rodolf_import, + self.repo, + ) class S3StorageStartupHook(Hook):