Skip to content
Snippets Groups Projects
import_data.py 8.16 KiB
Newer Older
# -*- coding: utf-8 -*-
# copyright 2023 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact https://www.logilab.fr -- mailto:contact@logilab.fr
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

from cubicweb import Binary
from cubicweb_rq.rq import rqjob
from rdflib import Graph, URIRef
from rdflib.namespace import NamespaceManager


def summarize_validation_report(shacl_report_graph):
    summary_query = """
PREFIX sh: <http://www.w3.org/ns/shacl#>
select distinct ?property ?inverseProperty ?severity
?constraint ?shape ?message
(count(?x) as ?shcount)
(sample(?fmessage) as ?message)
(group_concat(?fnode; separator="|") as ?nodes)
(group_concat(?shvalue; separator="|") as ?shvalues)
where{
  ?x a sh:ValidationResult.
  ?x sh:resultPath ?property.
  ?x sh:resultSeverity ?severity.
  ?x sh:sourceShape ?shape.
  ?x sh:resultMessage ?fmessage.
  ?x sh:focusNode ?fnode.
  OPTIONAL{?x sh:sourceConstraintComponent ?constraint.}
  OPTIONAL{?property sh:inversePath ?inverseProperty}
  OPTIONAL{?x sh:value ?shvalue}
}
GROUP BY ?property ?severity ?shape ?inverseProperty ?constraint
    """
    qres = shacl_report_graph.query(summary_query)
    summary_report = []
    g = Graph()
    nm = NamespaceManager(g, bind_namespaces="rdflib")
    nb_violations = 0
    for row in qres:
        nb_violations += int(row.shcount)
        shproperty = None
        if isinstance(row.property, URIRef):
            shproperty = row.property.n3(nm)
        elif isinstance(row.inverseProperty, URIRef):
            shproperty = f"^{row.inverseProperty.n3(nm)}"

        summary_report.append(
            {
                "severity": row.severity.n3(nm),
                "count": int(row.shcount),
                "message": row.message,
                "constraint": row.constraint.n3(nm),
                "property": shproperty,
                "shape": row.shape.n3(nm) if isinstance(row.shape, URIRef) else None,
                "cases": {
                    "nodes": row.nodes.split("|")[:10],
                    "values": row.shvalues.split("|")[:10],
    return nb_violations, summary_report


def check_rdf_graph(graph, import_procedure):
    errors = {}
    everything_ok = True
        shacl_shapes_graph = Graph().parse(
            data=file.data.getvalue().decode("utf8"),
            format=file.data_format,
        )
        conforms, graph_reports, text_reports = pyshacl.validate(
            graph,
            shacl_graph=shacl_shapes_graph,
        )
        if not conforms:
            nb_violations, json_summary = summarize_validation_report(graph_reports)
            everything_ok = False
            errors[file.eid] = (nb_violations, json_summary)
    return everything_ok, errors
def get_import_data_logger(stream_file):
    log = logging.getLogger("rq.task")
    handler = logging.StreamHandler(stream_file)
    handler.setLevel(logging.DEBUG)
    handler.setFormatter(
        logging.Formatter(
            "%(asctime)s - %(levelname)s - %(pathname)s:%(lineno)s\n\n"
            "%(message)s\n\n"
            "--------\n"
        )
    )
@rqjob
def import_data(
    cnx,
    import_process_eid=None,
):

    :param Connection cnx: database connection
    :param int import_process_eid: ImportProcess eid

    """

    import_process = cnx.entity_from_eid(import_process_eid)
    wf = import_process.cw_adapt_to("IWorkflowable")
    wf.fire_transition("starts")
    stream_log_file = io.StringIO()
    log = get_import_data_logger(stream_log_file)
    cnx.commit()
    task_failed = False
    formatted_exc = None
    import_recipe = import_process.import_recipe[0]
    dataservice = import_recipe.dataservice[0]
    import_procedure = import_process.import_procedure[0]
        log.info(
            f"Starting import process with recipe {import_recipe.dc_title()} "
            f"from {dataservice.dc_title()} to populate {import_procedure.dc_title()}"
        )
        process_to_apply = cnx.vreg["rodolf.appobject.processtype"].select(
            import_recipe.use_process[0].regid, req=cnx
        )
        rdf_graph = process_to_apply(import_process, log)
        log.info(f"Data was successfully downloaded from {dataservice.dc_title()}")
        import_process.cw_set(
            has_input_dataset=cnx.create_entity(
                "File",
                title=f"Input dataset for ImportProcess#{import_process_eid}",
                data=Binary(rdf_graph.serialize(format="ttl").encode("utf8")),
                data_name=f"input_data_ImportProcess_{import_process.eid}.ttl",
                data_format="text/turtle",
            )
        )
        log.info(f"Input data was successfully saved for {dataservice.dc_title()}")

        valid_rdf, shacl_errors = check_rdf_graph(rdf_graph, import_procedure)
        if not valid_rdf:
            stream_shacl_log = io.StringIO()
            shacl_log = get_import_data_logger(stream_shacl_log)
            shacl_log.propagate = False  # do not log into stdout
            log.error("Data was not validated")
            for key, value in shacl_errors.items():
                shacl_file = cnx.entity_from_eid(key)
                    f"Data from {dataservice.dc_title()} does not comply with SHACL file "
                    f"{shacl_file.dc_title()} and raises {value[0]} violations.\n"
                    "See details in SHACL Log"
                json_report += value[1]
            stream_shacl_log.seek(0)
            import_process.cw_set(
                shacl_report=cnx.create_entity(
                    "File",
                    title=f"SHACL Log file for ImportProcess#{import_process_eid}",
                    data=Binary(json.dumps(json_report).encode("utf-8")),
                    data_name=f"log_SHACL_ImportProcess_{import_process.eid}.json",
                    data_format="application/json",
                )
            )
            import_process.cw_set(shacl_valid=False)
            import_process.cw_set(shacl_valid=True)
            log.info("Data was successfully validated")
    except Exception as error:
        task_failed = True
        log.error(error, exc_info=True)
        log.error("Importing data aborted.")
    if task_failed:
        wf.fire_transition("fails", formatted_exc)
    else:
        wf.fire_transition("success")
    stream_log_file.seek(0)
    import_process.cw_set(
        import_report=cnx.create_entity(
            "File",
            title=f"Log file for ImportProcess#{import_process_eid}",
            data=Binary(stream_log_file.read().encode("utf8")),
            data_name=f"log_ImportProcess_{import_process.eid}.txt",
            data_format="plain/text",
        )
    )
def launch_import_procedure(cnx, procedure, logger, only_delta_import=True):
    """
    procedure: ImportProcedure Entity
    logger: Logger
    only_delta_import: boolean
        - if set to False, every ImportRecipe of the ImportProcedure will have
            ImportProcess created and an associated RqTask launched
        - if set to True (default), only the ImportRecipe i) whose latest ImportProcess failed
            ii) or whose dataservice's refresh period has been over, iii) or which have
            never been launched will be launched (i.e. ImportProcess + RqTask created and enqueued)
    """
    if not procedure.activated:
        return 0
    return procedure.create_needed_import_process(only_delta_import, logger)