Commit b7fb3cce authored by Philippe Pepiot's avatar Philippe Pepiot
Browse files

monitor: can work with multiple instances

Previously we handle monitoring celery task by listening to celery event bus
(celery.events.EventReceiver) that was not persistent. In this case we used a
dedicated routine (on_monitor_start) to synchronize non finished tasks, but
this wasn't working in case of an untracked task (eg. not started with
start_async_task). Also this was a single point of failure because it cannot
run in multiple instance without concurrency issues (events are sent to all
instances)

Now we use a redis queue where worker put task_id and task_name to be
synchronized and celery-monitor use brpop (https://redis.io/commands/brpop) to
process the queue.

We don't require CELERY_SEND_EVENTS to be enabled anymore (-E or --events in
worker options).
We require to add 'cw_celerytask_helpers.helpers' to CELERY_IMPORTS.
parent c97f0e74bb5f
......@@ -16,7 +16,7 @@ celeryconfig.py example::
BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = BROKER_URL
CUBICWEB_CELERYTASK_REDIS_URL = BROKER_URL
CELERY_IMPORTS = ('cw_celerytask_helpers.redislogger', 'module.containing.tasks')
CELERY_IMPORTS = ('cw_celerytask_helpers.helpers', 'module.containing.tasks')
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json', 'msgpack', 'yaml']
......@@ -32,10 +32,10 @@ logging handling mechanism.
Start a worker::
# running cubicweb tasks (celeryconfig.py will be imported from your instance config directory)
celery -A cubicweb_celery -i <CW_INSTANCE_NAME> worker -l info -E
celery -A cubicweb_celery -i <CW_INSTANCE_NAME> worker -l info
# running pure celery tasks
celery worker -l info -E
celery worker -l info
Task state synchronization requires to run the `celery-monitor` command::
......
......@@ -15,16 +15,15 @@
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import traceback
import celery
from celery.events import EventReceiver
from kombu import Connection as BrokerConnection
import json
from cubicweb.toolsutils import Command
from cubicweb.server.serverconfig import ServerConfiguration
from cubicweb.cwctl import CWCTL
from cw_celerytask_helpers.monitor import MONITOR_KEY
from cw_celerytask_helpers.utils import get_redis_client
class CeleryMonitorCommand(Command):
"""Synchronize celery task statuses"""
......@@ -48,48 +47,31 @@ class CeleryMonitorCommand(Command):
init_cmdline_log_threshold(config, self['loglevel'])
repo = repoapi.get_repository(config=config)
repo.hm.call_hooks('server_maintenance', repo=repo)
self.repo = repo
with repo.internal_cnx() as cnx:
self.on_monitor_start(cnx)
self.celery_monitor()
self.loop(cnx)
@staticmethod
def on_monitor_start(cnx):
for adapter in cnx.vreg['adapters']['ICeleryTask']:
adapter.on_monitor_start(cnx)
cnx.commit()
def on_event(self, event):
with self.repo.internal_cnx() as cnx:
try:
for adapter in cnx.vreg['adapters']['ICeleryTask']:
adapter.on_event(cnx, event)
cnx.commit()
except Exception as exc:
self.error('<CeleryMonitorCommand> '
'Unexpected error on event %s:\n%s',
event, exc)
cnx.roolback()
def celery_monitor(self):
result_backend = celery.current_app.conf['CELERY_RESULT_BACKEND']
def loop(cnx, timeout=None):
client = get_redis_client()
test = (cnx.repo.config.mode == "test")
while True:
try:
with BrokerConnection(result_backend) as conn:
recv = EventReceiver(
conn,
handlers={
'task-failed': self.on_event,
'task-succeeded': self.on_event,
'task-received': self.on_event,
'task-revoked': self.on_event,
'task-started': self.on_event,
# '*': self.on_event,
})
recv.capture(limit=None, timeout=None)
except (KeyboardInterrupt, SystemExit):
traceback.print_exc()
data = client.brpop(MONITOR_KEY, timeout=timeout)
if data is None:
break
data = json.loads(data[1].decode())
task_id, task_name = data['task_id'], data['task_name']
for adapter in cnx.vreg['adapters']['ICeleryTask']:
try:
adapter.sync_task_state(cnx, task_id,
task_name=task_name)
except Exception:
adapter.exception('Unhandled exception while syncing '
'task <Task %s (%s)>', task_id,
task_name)
cnx.rollback()
if test:
# we should not hide exceptions in tests
raise
CWCTL.register(CeleryMonitorCommand)
......@@ -64,7 +64,7 @@ def run_all_tasks(cnx=None):
wf.fire_transition(transition, result.traceback)
else:
from cubes.celerytask.ccplugin import CeleryMonitorCommand
CeleryMonitorCommand.on_monitor_start(cnx)
CeleryMonitorCommand.loop(cnx, 1)
return results
......@@ -149,12 +149,7 @@ class ICeleryTask(EntityAdapter):
return celery.signature(name, args=args, kwargs=kwargs)
@staticmethod
def on_event(cnx, event):
"""Triggered by celery-monitor"""
pass
@staticmethod
def on_monitor_start(cnx):
def sync_task_state(cnx, task_id, task_name=None):
"""Triggered by celery-monitor"""
pass
......@@ -294,36 +289,6 @@ class CeleryTaskAdapter(ICeleryTask):
self.entity, self.entity.task_id)
cwtask.cw_set(parent_task=self.entity)
@staticmethod
def on_event(cnx, event):
# handle weaving of parent_task relations
log = CeleryTaskAdapter
if event['type'] in ('task-failed', 'task-received', 'task-succeeded',
'task-revoked', 'task-started'):
try:
CeleryTaskAdapter.sync_task_state(cnx, event['uuid'],
task_name=event.get('name'))
except Exception:
log.exception('Unhandled exception while syncing task ',
'%s state', event['uuid'])
cnx.rollback()
@staticmethod
def on_monitor_start(cnx):
log = CeleryTaskAdapter
for task_eid, task_id in cnx.execute((
'Any T, TID WHERE '
'T is CeleryTask, T task_id TID, T in_state S, '
'S name in ("waiting", "queued", "running")'
)):
try:
CeleryTaskAdapter.sync_task_state(cnx, task_id)
except Exception:
log.exception('Unhandled exception while syncing task '
'%s state', task_id)
cnx.rollback()
raise
@staticmethod
def sync_task_state(cnx, task_id, task_name=None, commit=True):
log = CeleryTaskAdapter
......
......@@ -2,6 +2,7 @@ import six
from celery.result import AsyncResult
from cubicweb import NoResultError
from cubicweb.predicates import is_instance
from cubes.celerytask import STATES
......@@ -18,26 +19,17 @@ class RunAdapter(ICeleryTask):
return signature
@staticmethod
def on_event(cnx, event):
if event['type'] in ('task-failed', 'task-succeeded'):
run = cnx.find('Run', task_id=event['uuid']).one()
run.cw_adapt_to('IWorkflowable').fire_transition('finish')
if event['type'] == 'task-succeeded':
run.cw_set(result=self.result.get())
@staticmethod
def on_monitor_start(cnx):
for run_eid, task_id in cnx.execute((
'Any R, TID WHERE '
'R is Run, R task_id TID, T in_state S, '
'S name "pending"'
)):
result = AsyncResult(task_id)
if result.state in (STATES.SUCCESS, STATES.FAILURE):
run = cnx.entity_from_eid(run_eid)
run.cw_adapt_to('IWorkflowable').fire_transition('finish')
if result.state == STATES.SUCCESS:
run.cw_set(result=result.get())
def sync_task_state(cnx, task_id, task_name=None):
result = AsyncResult(task_id)
if result.ready():
try:
run = cnx.find('Run', task_id=task_id).one()
except NoResultError:
return
run.cw_adapt_to('IWorkflowable').fire_transition_if_possible('finish')
if result.successful():
run.cw_set(result=result.get())
cnx.commit()
@property
def task_id(self):
......
......@@ -16,6 +16,7 @@
"""cubicweb-celerytask automatic tests"""
import collections
import logging
import unittest
......@@ -177,23 +178,26 @@ class CeleryTaskTC(BaseCeleryTaskTC):
cwtask = start_async_task(cnx, task)
cnx.commit()
self.assertEqual(cwtask.task_name, u'spawn')
run_all_tasks()
run_all_tasks(cnx)
asresult = self.wait_async_task(cnx, cwtask.task_id)
result = celery.result.from_serializable(asresult.result)
self.assertEqual(0, result.get())
cwtask = cnx.entity_from_eid(cwtask.eid)
self.assertEqual(cwtask.cw_adapt_to('IWorkflowable').state, "done")
children = cwtask.reverse_parent_task
sync_task_state = cwtask.cw_adapt_to('ICeleryTask').sync_task_state
for task in children:
sync_task_state(cnx, task.task_id)
self.assertEqual(len(children), 11)
# XXX: cannot check children task names since they are not
# availables in broker
self.assertEqual(set([c.task_name for c in children]),
set(['<unknown>']))
self.assertEqual(set([c.cw_adapt_to('IWorkflowable').state
for c in children]), set(['done']))
counter = collections.Counter()
states = []
for child in children:
counter[child.task_name] += 1
states.append(child.cw_adapt_to('IWorkflowable').state)
self.assertEqual(dict(counter), {'success': 10, 'add': 1})
tsum = cnx.find('CeleryTask', task_name='tsum').one()
self.assertEqual(tsum.cw_adapt_to('IWorkflowable').state, 'done')
# XXX: this task should have "spawn" as parent_task ?
self.assertEqual(tsum.parent_task, ())
self.assertEqual(tsum.reverse_parent_task, ())
if __name__ == '__main__':
......
......@@ -28,8 +28,6 @@ from celery.bin.worker import worker as celery_worker
from cubicweb.devtools import testlib
from cubes.celerytask.ccplugin import CeleryMonitorCommand
class BaseCeleryTaskTC(testlib.CubicWebTC):
......@@ -50,7 +48,7 @@ class BaseCeleryTaskTC(testlib.CubicWebTC):
conf.CELERY_TASK_SERIALIZER = 'json'
conf.CELERY_RESULT_SERIALIZER = 'json'
conf.CELERY_ACCEPT_CONTENT = ['json', 'msgpack', 'yaml']
conf.CELERY_IMPORTS = ('cw_celerytask_helpers.redislogger', 'tasks')
conf.CELERY_IMPORTS = ('cw_celerytask_helpers.helpers', 'tasks')
import tasks # noqa
cls.worker = multiprocessing.Process(target=cls.start_worker)
cls.worker.start()
......@@ -77,8 +75,6 @@ class BaseCeleryTaskTC(testlib.CubicWebTC):
start = time.time()
while abs(time.time() - start) < timeout:
if result.ready():
# synchronize task workflow
CeleryMonitorCommand.on_monitor_start(cnx)
return result
if not self.worker.is_alive():
# will be joined in tearDown
......
......@@ -7,6 +7,7 @@ deps =
pytest
pbr>=1.4
pifpaf
http://hg.logilab.org/review/cw-celerytask-helpers/archive/dfec94e.tar.bz2#egg=cw-celerytask-helpers
commands =
pifpaf run redis --port 6380 -- {envpython} -m pytest {posargs:test}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment