Skip to content
Snippets Groups Projects
Commit 2d91facad521 authored by Fabien Amarger's avatar Fabien Amarger
Browse files

feat(import): Add a thread to create import_process periodically

parent be43c452b024
No related branches found
No related tags found
1 merge request!34feat(import): Add a thread to create import_process periodically
Pipeline #230079 passed
...@@ -17,10 +17,11 @@ ...@@ -17,10 +17,11 @@
"""cubicweb-rodolf specific hooks and operations""" """cubicweb-rodolf specific hooks and operations"""
import os import os
from datetime import datetime from datetime import datetime, timedelta
import logging
from cubicweb.server.hook import Hook from cubicweb.server.hook import Hook
from cubicweb.predicates import is_instance from cubicweb.predicates import is_instance
from cubicweb_s3storage.storages import S3Storage from cubicweb_s3storage.storages import S3Storage
...@@ -21,10 +22,46 @@ ...@@ -21,10 +22,46 @@
from cubicweb.server.hook import Hook from cubicweb.server.hook import Hook
from cubicweb.predicates import is_instance from cubicweb.predicates import is_instance
from cubicweb_s3storage.storages import S3Storage from cubicweb_s3storage.storages import S3Storage
from cubicweb_rodolf.import_data import import_data from cubicweb_rodolf.import_data import import_data, launch_import_procedure
RODOLF_IMPORT_DELTA = timedelta(
seconds=float(os.getenv("RODOLF_IMPORT_DELTA", 60 * 60 * 24))
)
def looping_task_rodolf_import(repo):
logger = logging.getLogger("rodolf-import-thread")
with repo.internal_cnx() as cnx:
started_processes = 0
procedures = cnx.find("ImportProcedure").entities()
for procedure in procedures:
started_processes += launch_import_procedure(
cnx,
procedure,
logger,
)
logger.info(f"[rodolf-import]: {started_processes} rq-tasks created")
logger.info(
f"[rodolf-import] next import in {RODOLF_IMPORT_DELTA} seconds (at"
f" {datetime.now() + RODOLF_IMPORT_DELTA})"
)
class RodolfImportScheduler(Hook):
__regid__ = "rodolf.server-startup-rodolf-import-hook"
events = ("server_startup", "server_maintenance")
def __call__(self):
if self.repo.has_scheduler():
self.repo.looping_task(
RODOLF_IMPORT_DELTA.total_seconds(),
looping_task_rodolf_import,
self.repo,
)
class S3StorageStartupHook(Hook): class S3StorageStartupHook(Hook):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment