Commit cec0994f authored by Philippe Pepiot's avatar Philippe Pepiot
Browse files

Rework test_workflow_group()

Drop assertion "cwtask.eid in results" which is already implicitly checked by "results[cwtask.eid]"
Only run a group(success(), success()), which is enough for the scope of this
test and make it compatible with celery 4 where nested groups are rewrited in a
single group.
Wait and test results of all subtasks, not only the first one.
parent 09962b2ccb79
......@@ -176,17 +176,14 @@ class CeleryTaskTC(BaseCeleryTaskTC):
cwtask = start_async_task(cnx, task)
self.assertEqual(cwtask.task_name, u'')
results = run_all_tasks(cnx)
self.assertIn(cwtask.eid, results)
self.assertEqual(results[cwtask.eid].get(), [5, 9])
children = cwtask.reverse_parent_task
self.assertEqual(len(children), 2)
self.assertEqual(children[0].task_name, u'add')
task_id = children[0].cw_adapt_to('ICeleryTask').task_id
result = self.wait_async_task(cnx, task_id)
self.assertEqual(result.get(), 5)
subtasks = cwtask.reverse_parent_task
self.assertEqual(len(subtasks), 2)
for subtask, expected in zip(subtasks, [5, 9]):
self.wait_async_task(cnx, subtask.task_id)
result = subtask.cw_adapt_to('ICeleryTask').result.get()
self.assertEqual(result, expected)
def test_workflow_chord(self):
with self.admin_access.repo_cnx() as cnx:
......@@ -252,14 +249,10 @@ class CeleryTaskTC(BaseCeleryTaskTC):
def success():
return celery.signature("success")
graph =, success()),, success()),
task = start_async_task(cnx, graph)
task = start_async_task(cnx,, success()))
task_ids = set([t.task_id for t in task.child_tasks()])
self.assertEqual(len(task_ids), 8)
self.assertEqual(len(task_ids), 3)
with mock.patch('') as revoke:
self.assertEqual(revoke.call_count, 1)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment