Skip to content
Snippets Groups Projects
hooks.py 5.53 KiB
Newer Older
# -*- coding: utf-8 -*-
# copyright 2023 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact https://www.logilab.fr -- mailto:contact@logilab.fr
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

"""cubicweb-rodolf specific hooks and operations"""
from datetime import datetime, timedelta
import logging
from cubicweb.server.hook import Hook, match_rtype
from cubicweb.predicates import is_instance

from cubicweb_s3storage.storages import S3Storage
from cubicweb_rodolf.import_data import import_data, launch_import_procedure
from cubicweb_rodolf.process_helpers import upload_graph_to_virtuoso_endpoint


RODOLF_IMPORT_DELTA = timedelta(
    seconds=float(os.getenv("RODOLF_IMPORT_DELTA", 60 * 60 * 24))
)


def looping_task_rodolf_import(repo):
    logger = logging.getLogger("rodolf-import-thread")
    with repo.internal_cnx() as cnx:
        started_processes = 0
        procedures = cnx.find("ImportProcedure").entities()
        for procedure in procedures:
            started_processes += launch_import_procedure(
                cnx,
                procedure,
                logger,
            )
        logger.info(f"[rodolf-import]: {started_processes} rq-tasks created")
        logger.info(
            f"[rodolf-import] next import in {RODOLF_IMPORT_DELTA} seconds (at"
            f" {datetime.now() + RODOLF_IMPORT_DELTA})"
        )


class RodolfImportScheduler(Hook):
    __regid__ = "rodolf.server-startup-rodolf-import-hook"
    events = ("server_startup", "server_maintenance")

    def __call__(self):
        if self.repo.has_scheduler():
            self.repo.looping_task(
                RODOLF_IMPORT_DELTA.total_seconds(),
                looping_task_rodolf_import,
                self.repo,
            )

class S3StorageStartupHook(Hook):
    __regid__ = "rodolf.server-startup-hook"
    events = ("server_startup", "server_maintenance")

    def __call__(self):
        storage = S3Storage(os.environ.get("RODOLF_S3_BUCKET", "rodolf"))
        self.repo.system_source.set_storage("File", "data", storage)


class EnqueueTaskHook(Hook):
    __regid__ = "rodolf.enqueue-task-hook"
    __select__ = Hook.__select__ & is_instance("ImportProcess")
    events = ("after_add_entity",)

    def __call__(self):
        task_title = "import-process {eid}  ({date})".format(
            eid={self.entity.eid},
            date=datetime.utcnow().strftime("%Y-%m-%d"),
        )
        rqtask = self._cw.create_entity(
            "RqTask", name="import_process", title=task_title
        )
        self.entity.cw_set(rq_task=rqtask)
        self._cw.commit()
        rqtask.cw_adapt_to("IRqJob").enqueue(
            import_data, import_process_eid=self.entity.eid
        )
        self._cw.commit()


class UploadOntologyHook(Hook):
    __regid__ = "rodolf.upload-ontology-hook"
    __select__ = Hook.__select__ & match_rtype("ontology_file")
    events = ("after_add_relation",)

    def __call__(self):
        ontology_graph = Graph()
        file = self._cw.entity_from_eid(self.eidto)
        procedure = self._cw.entity_from_eid(self.eidfrom)
        ontology_graph.parse(
            data=file.data.read(),
            format=file.data_format,
        )
        upload_graph_to_virtuoso_endpoint(
            procedure,
            ontology_graph,
            f"urn:rodolf:{procedure.eid}:ontology",
            file.download_file_name(),
        )


class DeleteImportProcedureHook(Hook):
    __regid__ = "rodolf.delete-import-procedure-hook"
    __select__ = Hook.__select__ & is_instance("ImportProcedure")
    events = ("before_delete_entity",)

    def __call__(self):
        # Delete linked ImportProcess, ImportRecipe and RqTask
        self._cw.execute(
            "DELETE ImportRecipe X WHERE PROCEDURE eid %(eid)s, PROCEDURE import_recipes X",
            {"eid": self.entity.eid},
        )
        self._cw.execute(
            "DELETE RqTask X WHERE PROCESS import_procedure %(eid)s, PROCESS rq_task X",
            {"eid": self.entity.eid},
        )
        self._cw.execute(
            "DELETE ImportProcess X WHERE X import_procedure %(eid)s",
            {"eid": self.entity.eid},
        )
        self._cw.commit()


class DeleteDataServiceHook(Hook):
    __regid__ = "rodolf.delete-data-service-hook"
    __select__ = Hook.__select__ & is_instance("DataService")
    events = ("before_delete_entity",)

    def __call__(self):
        # Delete linked ImportProcess, ImportRecipe and RqTask
        self._cw.execute(
            "DELETE RqTask X WHERE RECIPE dataservice %(eid)s, "
            "PROCESS import_recipe RECIPE, PROCESS rq_task X",
            {"eid": self.entity.eid},
        )
        self._cw.execute(
            "DELETE ImportProcess X WHERE RECIPE dataservice %(eid)s, "
            "PROCESS import_recipe RECIPE",
            {"eid": self.entity.eid},
        )
        self._cw.execute(
            "DELETE ImportRecipe X WHERE X dataservice %(eid)s",
            {"eid": self.entity.eid},
        )
        self._cw.commit()