Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# copyright 2024 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact https://www.logilab.fr -- mailto:contact@logilab.fr
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import logging
import fakeredis
import rq
from cubicweb.devtools import testlib
from cubicweb_rodolf.import_data import launch_import_procedure
LOGGER = logging.getLogger("testing.rodolf.importdata")
class ImportProcedure(testlib.CubicWebTC):
def setUp(self):
super(ImportProcedure, self).setUp()
self.fakeredis = fakeredis.FakeStrictRedis()
def test_only_delta_import(self):
"""
Trying: create an ImportProcedure from 2 recipes (ir1, ir2).
Create needed import processes and mark one of them as failed.
Expected:
- launching the procedure the first time must contain the 2 recipes
- launching the procedure in delta mode must now only contain the failed recipe
- launching the procedure in full mode must contain both recipes
"""
with self.admin_access.cnx() as cnx, rq.Connection(self.fakeredis):
ds1 = cnx.create_entity(
"DataService",
data_url="https://dbpedia.org/resource/Leonardo_da_Vinci",
name="DBpedia Leo",
)
ir1 = cnx.create_entity(
"ImportRecipe",
name="IR1",
dataservice=[ds1],
process_type="default-dryrun",
)
ds2 = cnx.create_entity(
"DataService",
data_url="https://dbpedia.org/resource/Virginia_Woolf",
name="DBpedia Virgi",
)
ir2 = cnx.create_entity(
"ImportRecipe",
name="IR2",
dataservice=[ds2],
process_type="default-dryrun",
)
importprocedure = cnx.create_entity(
"ImportProcedure",
virtuoso_url="https://sparql.poulet",
virtuoso_password="loutre",
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import_recipes=[ir1, ir2],
)
cnx.commit()
delta_recipes = [x for x in importprocedure.delta_import_recipes]
self.assertEqual(delta_recipes, [ir1, ir2])
# fake running the import processes
# import_process1 fails and import_process2 succeeds
nb_task_launched = launch_import_procedure(
cnx, importprocedure, LOGGER, only_delta_import=True
)
self.assertEqual(nb_task_launched, 2)
import_process1 = cnx.execute(
"Any X WHERE X is ImportProcess, X import_recipe R, R name 'IR1'"
).one()
import_process2 = cnx.execute(
"Any X WHERE X is ImportProcess, X import_recipe R, R name 'IR2'"
).one()
wf1 = import_process1.cw_adapt_to("IWorkflowable")
wf1.fire_transition("starts")
wf2 = import_process2.cw_adapt_to("IWorkflowable")
wf2.fire_transition("starts")
cnx.commit()
wf1.fire_transition("fails")
wf2.fire_transition("success")
cnx.commit()
# Check data only the failed IR is part of the delta recipes
delta_recipes = [x for x in importprocedure.delta_import_recipes]
self.assertEqual(delta_recipes, [ir1])
# Launching in delta mode creates only 1 task
nb_task_launched = launch_import_procedure(
cnx, importprocedure, LOGGER, only_delta_import=True
)
self.assertEqual(nb_task_launched, 1)
# Launching in full mode created 2 tasks
nb_task_launched = launch_import_procedure(
cnx, importprocedure, LOGGER, only_delta_import=False
)
self.assertEqual(nb_task_launched, 2)