Skip to content
Snippets Groups Projects
hooks.py 3.19 KiB
Newer Older
# -*- coding: utf-8 -*-
# copyright 2023 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact https://www.logilab.fr -- mailto:contact@logilab.fr
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

"""cubicweb-rodolf specific hooks and operations"""
from datetime import datetime, timedelta
import logging

from cubicweb.server.hook import Hook
from cubicweb.predicates import is_instance

from cubicweb_s3storage.storages import S3Storage

from cubicweb_rodolf.import_data import import_data, launch_import_procedure


RODOLF_IMPORT_DELTA = timedelta(
    seconds=float(os.getenv("RODOLF_IMPORT_DELTA", 60 * 60 * 24))
)


def looping_task_rodolf_import(repo):
    logger = logging.getLogger("rodolf-import-thread")
    with repo.internal_cnx() as cnx:
        started_processes = 0
        procedures = cnx.find("ImportProcedure").entities()
        for procedure in procedures:
            started_processes += launch_import_procedure(
                cnx,
                procedure,
                logger,
            )
        logger.info(f"[rodolf-import]: {started_processes} rq-tasks created")
        logger.info(
            f"[rodolf-import] next import in {RODOLF_IMPORT_DELTA} seconds (at"
            f" {datetime.now() + RODOLF_IMPORT_DELTA})"
        )


class RodolfImportScheduler(Hook):
    __regid__ = "rodolf.server-startup-rodolf-import-hook"
    events = ("server_startup", "server_maintenance")

    def __call__(self):
        if self.repo.has_scheduler():
            self.repo.looping_task(
                RODOLF_IMPORT_DELTA.total_seconds(),
                looping_task_rodolf_import,
                self.repo,
            )

class S3StorageStartupHook(Hook):
    __regid__ = "rodolf.server-startup-hook"
    events = ("server_startup", "server_maintenance")

    def __call__(self):
        storage = S3Storage(os.environ.get("RODOLF_S3_BUCKET", "rodolf"))
        self.repo.system_source.set_storage("File", "data", storage)


class EnqueueTaskHook(Hook):
    __regid__ = "rodolf.enqueue-task-hook"
    __select__ = Hook.__select__ & is_instance("ImportProcess")
    events = ("after_add_entity",)

    def __call__(self):
        task_title = "import-process {eid}  ({date})".format(
            eid={self.entity.eid},
            date=datetime.utcnow().strftime("%Y-%m-%d"),
        )
        rqtask = self._cw.create_entity(
            "RqTask", name="import_process", title=task_title
        )
        self.entity.cw_set(rq_task=rqtask)
        self._cw.commit()
        rqtask.cw_adapt_to("IRqJob").enqueue(
            import_data, import_process_eid=self.entity.eid
        )
        self._cw.commit()