# HG changeset patch # User Fabien Amarger <fabien.amarger@logilab.fr> # Date 1711038351 -3600 # Thu Mar 21 17:25:51 2024 +0100 # Node ID a6e73e34395ab574adb132f635b4926bf4556a20 # Parent 81dddfafcaa50ecedcc88c6fad31bd5e9d4373ec feat(hooks): Add RqTask when an ImportProcess is created diff --git a/cubicweb_rodolf/entities.py b/cubicweb_rodolf/entities.py --- a/cubicweb_rodolf/entities.py +++ b/cubicweb_rodolf/entities.py @@ -54,10 +54,11 @@ continue yield recipe - def create_needed_import_process(self, only_delta_import): + def create_needed_import_process(self, only_delta_import, logger): recipes = self.import_recipes if only_delta_import: recipes = [x for x in self.delta_import_recipes] + created_import_process = 0 for recipe in recipes: import_process = self._cw.create_entity( "ImportProcess", @@ -68,7 +69,13 @@ f"ImportProcess for {self.virtuoso_url} (recipe : {recipe.name})" f" created ({import_process.eid})" ) - yield import_process + logger.info( + f"[rodolf-import]: create rq task for import process " + f"'{import_process.dc_title()}' ({import_process.eid})" + ) + created_import_process += 1 + + return created_import_process @property def virtuoso_credentials(self) -> VirtuosoCredentials: diff --git a/cubicweb_rodolf/hooks.py b/cubicweb_rodolf/hooks.py --- a/cubicweb_rodolf/hooks.py +++ b/cubicweb_rodolf/hooks.py @@ -17,10 +17,15 @@ """cubicweb-rodolf specific hooks and operations""" import os +from datetime import datetime from cubicweb.server.hook import Hook +from cubicweb.predicates import is_instance + from cubicweb_s3storage.storages import S3Storage +from cubicweb_rodolf.import_data import import_data + class S3StorageStartupHook(Hook): __regid__ = "rodolf.server-startup-hook" @@ -29,3 +34,24 @@ def __call__(self): storage = S3Storage(os.environ.get("RODOLF_S3_BUCKET", "rodolf")) self.repo.system_source.set_storage("File", "data", storage) + + +class EnqueueTaskHook(Hook): + __regid__ = "rodolf.enqueue-task-hook" + __select__ = Hook.__select__ & is_instance("ImportProcess") + events = ("after_add_entity",) + + def __call__(self): + task_title = "import-process {eid} ({date})".format( + eid={self.entity.eid}, + date=datetime.utcnow().strftime("%Y-%m-%d"), + ) + rqtask = self._cw.create_entity( + "RqTask", name="import_process", title=task_title + ) + self.entity.cw_set(rq_task=rqtask) + self._cw.commit() + rqtask.cw_adapt_to("IRqJob").enqueue( + import_data, import_process_eid=self.entity.eid + ) + self._cw.commit() diff --git a/cubicweb_rodolf/import_data.py b/cubicweb_rodolf/import_data.py --- a/cubicweb_rodolf/import_data.py +++ b/cubicweb_rodolf/import_data.py @@ -19,8 +19,6 @@ import logging import pyshacl -from datetime import datetime - from cubicweb import Binary from cubicweb_rq.rq import rqjob from rdflib import Graph @@ -154,24 +152,6 @@ ii) or whose dataservice's refresh period has been over, iii) or which have never been launched will be launched (i.e. ImportProcess + RqTask created and enqueued) """ - started_processes = 0 if not procedure.activated: return 0 - for import_process in procedure.create_needed_import_process(only_delta_import): - logger.info( - f"[rodolf-import]: create rq task for import process " - f"'{import_process.dc_title()}' ({import_process.eid})" - ) - task_title = "import-process {eid} ({date})".format( - eid={import_process.eid}, - date=datetime.utcnow().strftime("%Y-%m-%d"), - ) - rqtask = cnx.create_entity("RqTask", name="import_process", title=task_title) - import_process.cw_set(rq_task=rqtask) - cnx.commit() - rqtask.cw_adapt_to("IRqJob").enqueue( - import_data, import_process_eid=import_process.eid - ) - cnx.commit() - started_processes += 1 - return started_processes + return procedure.create_needed_import_process(only_delta_import, logger)