Skip to content
Snippets Groups Projects
Commit a6e73e34395a authored by Fabien Amarger's avatar Fabien Amarger
Browse files

feat(hooks): Add RqTask when an ImportProcess is created

parent 81dddfafcaa5
No related branches found
No related tags found
1 merge request!29feat(frontend): add button to manually run recipe
...@@ -54,7 +54,7 @@ ...@@ -54,7 +54,7 @@
continue continue
yield recipe yield recipe
def create_needed_import_process(self, only_delta_import): def create_needed_import_process(self, only_delta_import, logger):
recipes = self.import_recipes recipes = self.import_recipes
if only_delta_import: if only_delta_import:
recipes = [x for x in self.delta_import_recipes] recipes = [x for x in self.delta_import_recipes]
...@@ -58,6 +58,7 @@ ...@@ -58,6 +58,7 @@
recipes = self.import_recipes recipes = self.import_recipes
if only_delta_import: if only_delta_import:
recipes = [x for x in self.delta_import_recipes] recipes = [x for x in self.delta_import_recipes]
created_import_process = 0
for recipe in recipes: for recipe in recipes:
import_process = self._cw.create_entity( import_process = self._cw.create_entity(
"ImportProcess", "ImportProcess",
...@@ -68,7 +69,13 @@ ...@@ -68,7 +69,13 @@
f"ImportProcess for {self.virtuoso_url} (recipe : {recipe.name})" f"ImportProcess for {self.virtuoso_url} (recipe : {recipe.name})"
f" created ({import_process.eid})" f" created ({import_process.eid})"
) )
yield import_process logger.info(
f"[rodolf-import]: create rq task for import process "
f"'{import_process.dc_title()}' ({import_process.eid})"
)
created_import_process += 1
return created_import_process
@property @property
def virtuoso_credentials(self) -> VirtuosoCredentials: def virtuoso_credentials(self) -> VirtuosoCredentials:
......
...@@ -17,5 +17,6 @@ ...@@ -17,5 +17,6 @@
"""cubicweb-rodolf specific hooks and operations""" """cubicweb-rodolf specific hooks and operations"""
import os import os
from datetime import datetime
from cubicweb.server.hook import Hook from cubicweb.server.hook import Hook
...@@ -20,4 +21,6 @@ ...@@ -20,4 +21,6 @@
from cubicweb.server.hook import Hook from cubicweb.server.hook import Hook
from cubicweb.predicates import is_instance
from cubicweb_s3storage.storages import S3Storage from cubicweb_s3storage.storages import S3Storage
...@@ -22,5 +25,7 @@ ...@@ -22,5 +25,7 @@
from cubicweb_s3storage.storages import S3Storage from cubicweb_s3storage.storages import S3Storage
from cubicweb_rodolf.import_data import import_data
class S3StorageStartupHook(Hook): class S3StorageStartupHook(Hook):
__regid__ = "rodolf.server-startup-hook" __regid__ = "rodolf.server-startup-hook"
...@@ -29,3 +34,24 @@ ...@@ -29,3 +34,24 @@
def __call__(self): def __call__(self):
storage = S3Storage(os.environ.get("RODOLF_S3_BUCKET", "rodolf")) storage = S3Storage(os.environ.get("RODOLF_S3_BUCKET", "rodolf"))
self.repo.system_source.set_storage("File", "data", storage) self.repo.system_source.set_storage("File", "data", storage)
class EnqueueTaskHook(Hook):
__regid__ = "rodolf.enqueue-task-hook"
__select__ = Hook.__select__ & is_instance("ImportProcess")
events = ("after_add_entity",)
def __call__(self):
task_title = "import-process {eid} ({date})".format(
eid={self.entity.eid},
date=datetime.utcnow().strftime("%Y-%m-%d"),
)
rqtask = self._cw.create_entity(
"RqTask", name="import_process", title=task_title
)
self.entity.cw_set(rq_task=rqtask)
self._cw.commit()
rqtask.cw_adapt_to("IRqJob").enqueue(
import_data, import_process_eid=self.entity.eid
)
self._cw.commit()
...@@ -19,8 +19,6 @@ ...@@ -19,8 +19,6 @@
import logging import logging
import pyshacl import pyshacl
from datetime import datetime
from cubicweb import Binary from cubicweb import Binary
from cubicweb_rq.rq import rqjob from cubicweb_rq.rq import rqjob
from rdflib import Graph from rdflib import Graph
...@@ -154,6 +152,5 @@ ...@@ -154,6 +152,5 @@
ii) or whose dataservice's refresh period has been over, iii) or which have ii) or whose dataservice's refresh period has been over, iii) or which have
never been launched will be launched (i.e. ImportProcess + RqTask created and enqueued) never been launched will be launched (i.e. ImportProcess + RqTask created and enqueued)
""" """
started_processes = 0
if not procedure.activated: if not procedure.activated:
return 0 return 0
...@@ -158,20 +155,3 @@ ...@@ -158,20 +155,3 @@
if not procedure.activated: if not procedure.activated:
return 0 return 0
for import_process in procedure.create_needed_import_process(only_delta_import): return procedure.create_needed_import_process(only_delta_import, logger)
logger.info(
f"[rodolf-import]: create rq task for import process "
f"'{import_process.dc_title()}' ({import_process.eid})"
)
task_title = "import-process {eid} ({date})".format(
eid={import_process.eid},
date=datetime.utcnow().strftime("%Y-%m-%d"),
)
rqtask = cnx.create_entity("RqTask", name="import_process", title=task_title)
import_process.cw_set(rq_task=rqtask)
cnx.commit()
rqtask.cw_adapt_to("IRqJob").enqueue(
import_data, import_process_eid=import_process.eid
)
cnx.commit()
started_processes += 1
return started_processes
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment