Commit cf4ad6bd authored by Philippe Pepiot's avatar Philippe Pepiot
Browse files

[entities] CeleryTaskAdapter: revoke child tasks recursively

When a task is revoked, revoke child tasks as well.
Also add a `child_tasks` property on CeleryTaskAdapter to recurse over child
parent bbd7c42bcb41
......@@ -130,6 +130,12 @@ class CeleryTask(AnyEntity):
for ptask in task.parent_tasks:
yield ptask
def child_tasks(self):
yield self
for task in self.reverse_parent_task:
for ctask in task.child_tasks():
yield ctask
class ICeleryTask(EntityAdapter):
__regid__ = 'ICeleryTask'
......@@ -251,6 +257,11 @@ class CeleryTaskAdapter(ICeleryTask):
def task_name(self):
return self.entity.task_name
def revoke(self, terminate=True, signal='SIGKILL'):
to_revoke = set([e.task_id for e in self.entity.child_tasks()])
return celery.task.control.revoke(
list(to_revoke), terminate=terminate, signal=signal)
def attach_result(self, result):
def tree(result, seen=None):
if seen is None:
......@@ -232,6 +232,37 @@ class CeleryTaskTC(BaseCeleryTaskTC):
revoke.assert_called_once_with([task.task_id], signal='SIGKILL',
def test_multi_revoke(self):
with self.admin_access.cnx() as cnx:
s = celery.signature
task_ids = []
def success():
task = s("success")
return task
def group(*tasks):
g =*tasks)
return g
graph = group(
group(success(), success()),
group(success(), success()),
task = start_async_task(cnx, graph)
self.assertEqual(set(task_ids), set([
t.task_id for t in task.child_tasks()]))
with mock.patch('') as revoke:
self.assertEqual(revoke.call_count, 1)
args, kwargs = revoke.call_args
self.assertEqual(len(args), 1)
self.assertEqual(set(task_ids), set(args[0]))
self.assertEqual({'terminate': True, 'signal': 'SIGKILL'}, kwargs)
if __name__ == '__main__':
from unittest import main
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment