Skip to content
Snippets Groups Projects
Commit acec94665576 authored by Simon Chabot's avatar Simon Chabot
Browse files

test: add import_procedure and dataservices deletion tests

related: #83
parent 5802bb0aa6ee
No related branches found
No related tags found
1 merge request!59test: add import_procedure and dataservices deletion tests
Pipeline #233852 passed
......@@ -121,3 +121,58 @@
cnx, importprocedure, LOGGER, only_delta_import=False
)
self.assertEqual(nb_task_launched, 2)
def test_delete_import_procedure(self):
with self.admin_access.cnx() as cnx, rq.Connection(self.fakeredis):
# create fake data
data = self._create_test_data(cnx)
# launch task
launch_import_procedure(
cnx, data["import_procedure"], LOGGER, only_delta_import=False
)
# assert everything has been created as expected
self.assertEqual(len(cnx.find("RqTask")), 2)
self.assertEqual(len(cnx.find("DataService")), 2)
self.assertEqual(len(cnx.find("ImportRecipe")), 2)
self.assertEqual(len(cnx.find("ImportProcess")), 2)
self.assertEqual(len(cnx.find("ImportProcedure")), 1)
# delete the import procedure
data["import_procedure"].cw_delete()
# assert composite object has been deleted as expected
self.assertEqual(len(cnx.find("RqTask")), 0)
self.assertEqual(len(cnx.find("ImportRecipe")), 0)
self.assertEqual(len(cnx.find("ImportProcess")), 0)
self.assertEqual(len(cnx.find("ImportProcedure")), 0)
# assert non included object are kept
self.assertEqual(len(cnx.find("DataService")), 2)
def test_delete_dataservices(self):
with self.admin_access.cnx() as cnx, rq.Connection(self.fakeredis):
# create fake data
data = self._create_test_data(cnx)
# launch task
launch_import_procedure(
cnx, data["import_procedure"], LOGGER, only_delta_import=False
)
# assert everything has been created as expected
self.assertEqual(len(cnx.find("RqTask")), 2)
self.assertEqual(len(cnx.find("DataService")), 2)
self.assertEqual(len(cnx.find("ImportRecipe")), 2)
self.assertEqual(len(cnx.find("ImportProcess")), 2)
self.assertEqual(len(cnx.find("ImportProcedure")), 1)
# delete all the dataservices
[ds.cw_delete() for ds in data["dataservices"]]
# assert composite object has been deleted as expected
self.assertEqual(len(cnx.find("RqTask")), 0)
self.assertEqual(len(cnx.find("DataService")), 0)
self.assertEqual(len(cnx.find("ImportRecipe")), 0)
self.assertEqual(len(cnx.find("ImportProcedure")), 1)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment