Commit 05e64abb authored by Philippe Pepiot
[entities] ensure run_all_tasks() run tasks only once

When run_all_tasks() is called multiple times in the test, tasks where run
twice. Ensure _TEST_TASKS is empty after run_all_tasks().
parent 7bf87a21b009
......@@ -50,8 +50,8 @@ def run_all_tasks(cnx=None):
'workflow synchronisation', DeprecationWarning,
results = {}
for task_eid, task in _TEST_TASKS.items():
results[task_eid] = task.delay()
for task_eid in list(_TEST_TASKS):
results[task_eid] = _TEST_TASKS.pop(task_eid).delay()
if cnx is not None:
if celery.current_app.conf.CELERY_ALWAYS_EAGER:
