Skip to content
Snippets Groups Projects
entities.py 4.19 KiB
Newer Older
# -*- coding: utf-8 -*-
# copyright 2023 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact https://www.logilab.fr -- mailto:contact@logilab.fr
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

"""cubicweb-rodolf entity's classes"""

from datetime import datetime, timedelta
import pytz
from cubicweb import NoResultError, Binary

from cubicweb.entities import AnyEntity
from rdf_data_manager import VirtuosoCredentials
from rdflib import Graph


TIMEDELTA_REFRESH = {
    "daily": timedelta(days=1),
    "weekly": timedelta(weeks=1),
    "monthly": timedelta(days=30),
}


class DataService(AnyEntity):
    __regid__ = "DataService"

    @property
    def refresh_timedelta(self):
        return TIMEDELTA_REFRESH[self.refresh_period]


class ImportProcedure(AnyEntity):
    __regid__ = "ImportProcedure"

    def dc_title(self):
        return self.name if self.name else self.virtuoso_url
        for recipe in self.import_recipes:
            if not recipe.needs_delta_import(self):
    def create_needed_import_process(self, only_delta_import, logger):
        recipes = self.import_recipes
        if only_delta_import:
            recipes = [x for x in self.delta_import_recipes]
        created_import_process = 0
            import_process = self._cw.create_entity(
                "ImportProcess",
                import_recipe=(recipe,),
                import_procedure=(self,),
            )
            print(
                f"ImportProcess for {self.virtuoso_url} (recipe : {recipe.name})"
                f" created ({import_process.eid})"
            )
            logger.info(
                f"[rodolf-import]: create rq task for import process "
                f"'{import_process.dc_title()}' ({import_process.eid})"
            )
            created_import_process += 1

        return created_import_process
    @property
    def virtuoso_credentials(self) -> VirtuosoCredentials:
        return VirtuosoCredentials(
            self.virtuoso_url,
            self.virtuoso_user,
            self.virtuoso_password,
        )


class ImportRecipe(AnyEntity):
    __regid__ = "ImportRecipe"

    def needs_delta_import(self, import_procedure):
        try:
            import_process = self._cw.execute(
                "Any I ORDERBY D DESC LIMIT 1 WHERE "
                "I is ImportProcess, "
                "I import_recipe %(recipe_eid)s, "
                "I import_procedure %(import_procedure)s, "
                "I in_state S, S name 'successful', "
                "I creation_date D",
                {
                    "recipe_eid": self.eid,
                    "import_procedure": import_procedure.eid,
                },
            ).one()
            dataservice = import_process.import_recipe[0].dataservice[0]
            if (
                datetime.now(tz=pytz.utc) - import_process.creation_date
                > dataservice.refresh_timedelta
            ):
                return True
        except NoResultError:
            return True
        return False


class ImportProcess(AnyEntity):
    __regid__ = "ImportProcess"

    def update_dataset(self, graph: Graph):
        dataset_file = self._cw.create_entity(
            "File",
            title=f"Graph file for ImportProcess#{self.eid}",
            data=Binary(graph.serialize(format="ttl").encode("utf8")),
            data_name=f"data_ImportProcess_{self.eid}.ttl",
            data_format="text/turtle",
        )
        self.cw_set(
            has_output_dataset=dataset_file,
        )
        return dataset_file