Commit 4d74af86 authored by Philippe Pepiot's avatar Philippe Pepiot
Browse files

[entities] add a revoke method on ICeleryTask

In tests set the name of the worker to te@st so it's easier to get the revoke
task list from the worker.
parent e7ef40d49b29
......@@ -161,6 +161,10 @@ class ICeleryTask(EntityAdapter):
def task_name(self):
raise NotImplementedError
def revoke(self, terminate=True, signal='SIGKILL'):
return celery.task.control.revoke(
[self.task_id], terminate=terminate, signal=signal)
@property
def logs(self):
return loghelper.get_task_logs(self.task_id) or b''
......
......@@ -23,6 +23,7 @@ import unittest
import six
import celery.result
import mock
from cubicweb.devtools import testlib # noqa
......@@ -222,6 +223,15 @@ class CeleryTaskTC(BaseCeleryTaskTC):
self.assertEqual(tsum.parent_task, ())
self.assertEqual(tsum.reverse_parent_task, ())
def test_revoke(self):
with self.admin_access.cnx() as cnx:
task = start_async_task(cnx, celery.signature("success"))
cnx.commit()
with mock.patch('celery.app.control.Control.revoke') as revoke:
task.cw_adapt_to('ICeleryTask').revoke()
revoke.assert_called_once_with([task.task_id], signal='SIGKILL',
terminate=True)
if __name__ == '__main__':
from unittest import main
......
......@@ -7,6 +7,7 @@ deps =
pytest
pbr>=1.4
pifpaf
mock
http://hg.logilab.org/review/cw-celerytask-helpers/archive/default.tar.bz2
commands =
pifpaf run redis --port 6380 -- {envpython} -m pytest {posargs:test}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment