Skip to content
Snippets Groups Projects
process_helpers.py 1.15 KiB
Newer Older
import requests
import logging

from rdflib import Graph, ConjunctiveGraph


from rdf_data_manager import delete_graph, upload_graph

from cubicweb_rodolf.schema import ImportProcedure


UPLOAD_MAX = 3
UPLOAD_DELAY = 10

def get_graph_from_url(
    download_url: str, content_type: str, log: logging.Logger
) -> Graph:
    response = requests.get(
        download_url,
        allow_redirects=True,
        timeout=4,
    )
    if not response.ok:
        log.error(
            f"Cannot get file {download_url}:"
            f" {response.status_code} {response.text}"
        )
        response.raise_for_status()

    graph = ConjunctiveGraph()
    graph.parse(data=response.text, format=content_type)
def upload_graph_to_virtuoso_endpoint(
    import_procedure: ImportProcedure,
    graph: Graph,
    named_graph: str,
    filename: str,
) -> None:
    delete_graph(
        import_procedure.virtuoso_credentials,
        named_graph,
        UPLOAD_MAX,
        UPLOAD_DELAY,
    )
    upload_graph(
        import_procedure.virtuoso_credentials,
        named_graph,
        graph,
        filename,
        UPLOAD_MAX,
        UPLOAD_DELAY,
    )