Newer
Older

Fabien Amarger
committed
import requests
import logging
from rdflib import Graph, ConjunctiveGraph
from cubicweb.rdf import RDF_MIMETYPE_TO_FORMAT
from rdf_data_manager import delete_graph, upload_graph
from cubicweb_rodolf.schema import ImportProcedure
UPLOAD_MAX = 3
UPLOAD_DELAY = 10

Fabien Amarger
committed
def get_graph_from_url(download_url: str, log: logging.Logger) -> Graph:
response = requests.get(
download_url,
headers={"Accept": ";".join(RDF_MIMETYPE_TO_FORMAT.keys())},
allow_redirects=True,
timeout=4,
)
if not response.ok:
log.error(
f"Cannot get file {download_url}:"
f" {response.status_code} {response.text}"

Fabien Amarger
committed
)
response.raise_for_status()
content_type = response.headers["Content-Type"]
rdf_parse_format = content_type
for mime_type, rdf_format in RDF_MIMETYPE_TO_FORMAT.items():
if mime_type in content_type:
rdf_parse_format = rdf_format
break
graph = ConjunctiveGraph()
graph.parse(data=response.text, format=rdf_parse_format)
return graph
import_procedure: ImportProcedure,
graph: Graph,
named_graph: str,
filename: str,
log: logging.Logger,
) -> None:
delete_graph(
import_procedure.virtuoso_credentials,
named_graph,
UPLOAD_MAX,
UPLOAD_DELAY,
)
upload_graph(
import_procedure.virtuoso_credentials,
named_graph,
graph,
filename,
UPLOAD_MAX,
UPLOAD_DELAY,
)