# -*- coding: utf-8 -*- # copyright 2023 LOGILAB S.A. (Paris, FRANCE), all rights reserved. # contact https://www.logilab.fr -- mailto:contact@logilab.fr # # This program is free software: you can redistribute it and/or modify it under # the terms of the GNU Lesser General Public License as published by the Free # Software Foundation, either version 2.1 of the License, or (at your option) # any later version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more # details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see <https://www.gnu.org/licenses/>. import io import logging import pyshacl from cubicweb import Binary from cubicweb_rq.rq import rqjob from rdflib import Graph def check_rdf_graph(graph, import_procedure): errors = {} everything_ok = True for file in import_procedure.shacl_files: shacl_shapes_graph = Graph().parse( data=file.data.getvalue().decode("utf8"), format=file.data_format, ) conforms, _graph_reports, text_reports = pyshacl.validate( graph, shacl_graph=shacl_shapes_graph, ) if not conforms: everything_ok = False errors[file.eid] = text_reports return everything_ok, errors def get_import_data_logger(stream_file): log = logging.getLogger("rq.task") handler = logging.StreamHandler(stream_file) handler.setLevel(logging.DEBUG) handler.setFormatter( logging.Formatter( "%(asctime)s - %(levelname)s - %(pathname)s:%(lineno)s\n\n" "%(message)s\n\n" "--------\n" ) ) log.addHandler(handler) return log @rqjob def import_data( cnx, import_process_eid=None, ): """Import data. :param Connection cnx: database connection :param int import_process_eid: ImportProcess eid """ import_process = cnx.entity_from_eid(import_process_eid) wf = import_process.cw_adapt_to("IWorkflowable") wf.fire_transition("starts") stream_log_file = io.StringIO() log = get_import_data_logger(stream_log_file) cnx.commit() task_failed = False formatted_exc = None import_recipe = import_process.import_recipe[0] dataservice = import_recipe.dataservice[0] import_procedure = import_process.import_procedure[0] try: log.info( f"Starting import process with recipe {import_recipe.dc_title()} " f"from {dataservice.dc_title()} to populate {import_procedure.dc_title()}" ) process_to_apply = cnx.vreg["rodolf.appobject.processtype"].select( import_recipe.process_type, req=cnx ) rdf_graph = process_to_apply(import_process, log) log.info(f"Data was successfully downloaded from {dataservice.dc_title()}") import_process.cw_set( has_input_dataset=cnx.create_entity( "File", title=f"Input dataset for ImportProcess#{import_process_eid}", data=Binary(rdf_graph.serialize(format="ttl").encode("utf8")), data_name=f"Input_dataset_{import_process.eid}.ttl", data_format="text/turtle", ) ) log.info(f"Input data was successfully saved for {dataservice.dc_title()}") valid_rdf, shacl_errors = check_rdf_graph(rdf_graph, import_procedure) if not valid_rdf: stream_shacl_log = io.StringIO() shacl_log = get_import_data_logger(stream_shacl_log) shacl_log.propagate = False # do not log into stdout log.error("Data was not validated") for key, value in shacl_errors.items(): shacl_file = cnx.entity_from_eid(key) shacl_log.error( f"Data from {dataservice.dc_title()} does not comply with SHACL file " f"{shacl_file.dc_title()} and gives message : \n\t\t{value}" ) stream_shacl_log.seek(0) import_process.cw_set( shacl_report=cnx.create_entity( "File", title=f"SHACL Log file for ImportProcess#{import_process_eid}", data=Binary(stream_shacl_log.read().encode("utf8")), data_name=f"log_SHACL_ImportProcess_{import_process.eid}.txt", data_format="plain/text", ) ) import_process.cw_set(shacl_valid=False) else: import_process.cw_set(shacl_valid=True) log.info("Data was successfully validated") except Exception as error: task_failed = True log.error(error, exc_info=True) log.error("Importing data aborted.") if task_failed: wf.fire_transition("fails", formatted_exc) else: wf.fire_transition("success") stream_log_file.seek(0) import_process.cw_set( import_report=cnx.create_entity( "File", title=f"Log file for ImportProcess#{import_process_eid}", data=Binary(stream_log_file.read().encode("utf8")), data_name=f"log_ImportProcess_{import_process.eid}.txt", data_format="plain/text", ) ) cnx.commit() return not task_failed def launch_import_procedure(cnx, procedure, logger, only_delta_import=True): """ procedure: ImportProcedure Entity logger: Logger only_delta_import: boolean - if set to False, every ImportRecipe of the ImportProcedure will have ImportProcess created and an associated RqTask launched - if set to True (default), only the ImportRecipe i) whose latest ImportProcess failed ii) or whose dataservice's refresh period has been over, iii) or which have never been launched will be launched (i.e. ImportProcess + RqTask created and enqueued) """ if not procedure.activated: return 0 return procedure.create_needed_import_process(only_delta_import, logger)