Newer
Older
# -*- coding: utf-8 -*-
# copyright 2023 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact https://www.logilab.fr -- mailto:contact@logilab.fr
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from cubicweb import Binary
from cubicweb_rq.rq import rqjob
from rdflib import Graph, URIRef
from rdflib.namespace import NamespaceManager
def summarize_validation_report(shacl_report_graph):
summary_query = """
PREFIX sh: <http://www.w3.org/ns/shacl#>
select distinct ?property ?inverseProperty ?severity
?constraint ?shape ?message
(count(?x) as ?shcount)
(sample(?fmessage) as ?message)
(group_concat(?fnode; separator="|") as ?nodes)
(group_concat(?shvalue; separator="|") as ?shvalues)
where{
?x a sh:ValidationResult.
?x sh:resultPath ?property.
?x sh:resultSeverity ?severity.
?x sh:sourceShape ?shape.
?x sh:resultMessage ?fmessage.
?x sh:focusNode ?fnode.
OPTIONAL{?x sh:sourceConstraintComponent ?constraint.}
OPTIONAL{?property sh:inversePath ?inverseProperty}
OPTIONAL{?x sh:value ?shvalue}
}
GROUP BY ?property ?severity ?shape ?inverseProperty ?constraint
"""
qres = shacl_report_graph.query(summary_query)
g = Graph()
nm = NamespaceManager(g, bind_namespaces="rdflib")
for row in qres:
nb_violations += int(row.shcount)
shproperty = None
if isinstance(row.property, URIRef):
shproperty = row.property.n3(nm)
elif isinstance(row.inverseProperty, URIRef):
shproperty = f"^{row.inverseProperty.n3(nm)}"
summary_report.append(
{
"severity": row.severity.n3(nm),
"count": int(row.shcount),
"message": row.message,
"constraint": row.constraint.n3(nm),
"shape": row.shape.n3(nm) if isinstance(row.shape, URIRef) else None,
"cases": {
"nodes": row.nodes.split("|")[:10],
"values": row.shvalues.split("|")[:10],
return nb_violations, summary_report
def check_rdf_graph(graph, import_procedure):
errors = {}
everything_ok = True

Elodie Thiéblin
committed
for file in import_procedure.shacl_files:
shacl_shapes_graph = Graph().parse(
data=file.data.getvalue().decode("utf8"),
format=file.data_format,
)
conforms, graph_reports, text_reports = pyshacl.validate(
graph,
shacl_graph=shacl_shapes_graph,
)
if not conforms:
nb_violations, json_summary = summarize_validation_report(graph_reports)
errors[file.eid] = (nb_violations, json_summary)
return everything_ok, errors
def get_import_data_logger(stream_file):
log = logging.getLogger("rq.task")
handler = logging.StreamHandler(stream_file)
handler.setLevel(logging.DEBUG)

Fabien Amarger
committed
handler.setFormatter(
logging.Formatter(
"%(asctime)s - %(levelname)s - %(pathname)s:%(lineno)s\n\n"
"%(message)s\n\n"
"--------\n"
)
)
log.addHandler(handler)
return log
@rqjob
def import_data(
cnx,
import_process_eid=None,
):
:param Connection cnx: database connection
:param int import_process_eid: ImportProcess eid
"""
import_process = cnx.entity_from_eid(import_process_eid)
wf = import_process.cw_adapt_to("IWorkflowable")
wf.fire_transition("starts")
stream_log_file = io.StringIO()
log = get_import_data_logger(stream_log_file)
cnx.commit()
task_failed = False
formatted_exc = None
import_recipe = import_process.import_recipe[0]
dataservice = import_recipe.dataservice[0]
import_procedure = import_process.import_procedure[0]
log.info(
f"Starting import process with recipe {import_recipe.dc_title()} "
f"from {dataservice.dc_title()} to populate {import_procedure.dc_title()}"
)

Fabien Amarger
committed
process_to_apply = cnx.vreg["rodolf.appobject.processtype"].select(
import_recipe.use_process[0].regid, req=cnx

Fabien Amarger
committed
)
rdf_graph = process_to_apply(import_process, log)
log.info(f"Data was successfully downloaded from {dataservice.dc_title()}")
import_process.cw_set(
has_input_dataset=cnx.create_entity(
"File",
title=f"Input dataset for ImportProcess#{import_process_eid}",
data=Binary(rdf_graph.serialize(format="ttl").encode("utf8")),
data_name=f"input_data_ImportProcess_{import_process.eid}.ttl",
data_format="text/turtle",
)
)
log.info(f"Input data was successfully saved for {dataservice.dc_title()}")
valid_rdf, shacl_errors = check_rdf_graph(rdf_graph, import_procedure)
if not valid_rdf:
stream_shacl_log = io.StringIO()
shacl_log = get_import_data_logger(stream_shacl_log)
shacl_log.propagate = False # do not log into stdout
log.error("Data was not validated")
for key, value in shacl_errors.items():
shacl_file = cnx.entity_from_eid(key)
shacl_log.error(
f"Data from {dataservice.dc_title()} does not comply with SHACL file "
f"{shacl_file.dc_title()} and raises {value[0]} violations.\n"
"See details in SHACL Log"
stream_shacl_log.seek(0)
import_process.cw_set(
shacl_report=cnx.create_entity(
"File",
title=f"SHACL Log file for ImportProcess#{import_process_eid}",
data=Binary(json.dumps(json_report).encode("utf-8")),
data_name=f"log_SHACL_ImportProcess_{import_process.eid}.json",
data_format="application/json",
)
)
import_process.cw_set(shacl_valid=False)
import_process.cw_set(shacl_valid=True)
log.info("Data was successfully validated")
except Exception as error:
task_failed = True
log.error(error, exc_info=True)
log.error("Importing data aborted.")
if task_failed:
wf.fire_transition("fails", formatted_exc)
else:
wf.fire_transition("success")

Fabien Amarger
committed
stream_log_file.seek(0)
import_process.cw_set(
import_report=cnx.create_entity(
"File",
title=f"Log file for ImportProcess#{import_process_eid}",
data=Binary(stream_log_file.read().encode("utf8")),
data_name=f"log_ImportProcess_{import_process.eid}.txt",
data_format="plain/text",
)
)
return not task_failed
def launch_import_procedure(cnx, procedure, logger, only_delta_import=True):
"""
procedure: ImportProcedure Entity
logger: Logger
only_delta_import: boolean
- if set to False, every ImportRecipe of the ImportProcedure will have
ImportProcess created and an associated RqTask launched
- if set to True (default), only the ImportRecipe i) whose latest ImportProcess failed
ii) or whose dataservice's refresh period has been over, iii) or which have
never been launched will be launched (i.e. ImportProcess + RqTask created and enqueued)
"""
if not procedure.activated:
return 0
return procedure.create_needed_import_process(only_delta_import, logger)