Skip to content
Snippets Groups Projects
process_helpers.py 1.51 KiB
Newer Older
import requests
import logging

from rdflib import Graph, ConjunctiveGraph

from cubicweb.rdf import RDF_MIMETYPE_TO_FORMAT

from rdf_data_manager import delete_graph, upload_graph

from cubicweb_rodolf.schema import ImportProcedure


UPLOAD_MAX = 3
UPLOAD_DELAY = 10


def get_graph_from_url(download_url: str, log: logging.Logger) -> Graph:
    response = requests.get(
        download_url,
        headers={"Accept": ";".join(RDF_MIMETYPE_TO_FORMAT.keys())},
        allow_redirects=True,
        timeout=4,
    )
    if not response.ok:
        log.error(
            f"Cannot get file {download_url}:"
            f" {response.status_code} {response.text}"
        )
        response.raise_for_status()
    content_type = response.headers["Content-Type"]
    rdf_parse_format = content_type
    for mime_type, rdf_format in RDF_MIMETYPE_TO_FORMAT.items():
        if mime_type in content_type:
            rdf_parse_format = rdf_format
            break

    graph = ConjunctiveGraph()
    graph.parse(data=response.text, format=rdf_parse_format)
    return graph
def upload_graph_to_virtuoso_endpoint(
    import_procedure: ImportProcedure,
    graph: Graph,
    named_graph: str,
    filename: str,
    log: logging.Logger,
) -> None:
    delete_graph(
        import_procedure.virtuoso_credentials,
        named_graph,
        UPLOAD_MAX,
        UPLOAD_DELAY,
    )
    upload_graph(
        import_procedure.virtuoso_credentials,
        named_graph,
        graph,
        filename,
        UPLOAD_MAX,
        UPLOAD_DELAY,
    )