Newer
Older

Fabien Amarger
committed
import requests
import logging
from rdflib import Graph, ConjunctiveGraph
from rdf_data_manager import delete_graph, upload_graph
from cubicweb_rodolf.schema import ImportProcedure
UPLOAD_MAX = 3
UPLOAD_DELAY = 10

Fabien Amarger
committed
def get_graph_from_url(
download_url: str, content_type: str, log: logging.Logger
) -> Graph:

Fabien Amarger
committed
response = requests.get(
download_url,
allow_redirects=True,
timeout=4,
)
if not response.ok:
log.error(
f"Cannot get file {download_url}:"
f" {response.status_code} {response.text}"

Fabien Amarger
committed
)
response.raise_for_status()
graph = ConjunctiveGraph()
graph.parse(data=response.text, format=content_type)

Fabien Amarger
committed
return graph
import_procedure: ImportProcedure,
graph: Graph,
named_graph: str,
filename: str,
) -> None:
delete_graph(
import_procedure.virtuoso_credentials,
named_graph,
UPLOAD_MAX,
UPLOAD_DELAY,
)
upload_graph(
import_procedure.virtuoso_credentials,
named_graph,
graph,
filename,
UPLOAD_MAX,
UPLOAD_DELAY,
)