Commit 7ad3f397 authored by Philippe Pepiot's avatar Philippe Pepiot
Browse files

First implementation

Adapters based API
Inspired from https://hg.logilab.org/users/david/cw-celery-logging-demo/
parent 3a53353ee56f
Summary
-------
Run and monitor celery tasks
Run, monitor and log celery tasks.
Usage
-----
Declare tasks using celery task or cubicweb-celery cwtasks.
On worker side, install cw-celerytask-helpers from ./helpers.
celeryconfig.py example::
BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = BROKER_URL
CUBICWEB_CELERYTASK_REDIS_URL = BROKER_URL
CELERY_IMPORTS = ('cw_celerytask_helpers', 'module.containing.tasks')
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json', 'msgpack', 'yaml']
Start a worker::
# running cubicweb tasks (celeryconfig.py will be imported from your instance config directory)
celery -A cubicweb_celery -i <CW_INSTANCE_NAME> worker -l info -E
# running pure celery tasks
celery worker -l info -E
Create a task:
.. code-block:: python
from celery import current_app as app
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@app.task(name='hi_there')
def my_task(arg, kw=0):
logger.info('HI %s %s!', arg, kw)
return 42
Run a task:
.. code-block:: python
from cubes.celerytask.entities import start_async_task
cwtask = start_async_task(cnx, 'hi_there', 'THERE', kw=42)
......@@ -18,7 +18,12 @@ author_email = 'contact@logilab.fr'
description = 'Run and monitor celery tasks'
web = 'http://www.cubicweb.org/project/%s' % distname
__depends__ = {'cubicweb': '>= 3.19', 'six': '>= 1.4.0'}
__depends__ = {
'cubicweb': '>= 3.19',
'six': '>= 1.4.0',
'celery': None,
'cw-celerytask-helpers': '>= 0.1.0',
}
__recommends__ = {}
classifiers = [
......
# -*- coding: utf-8 -*-
# copyright 2016 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr -- mailto:contact@logilab.fr
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import traceback
import celery
from celery.events import EventReceiver
from kombu import Connection as BrokerConnection
from cubicweb.toolsutils import Command
from cubicweb.server.serverconfig import ServerConfiguration
from cubicweb.cwctl import CWCTL
class CeleryMonitorCommand(Command):
"""Synchronize celery task statuses"""
name = 'celery-monitor'
arguments = '<instance>'
min_args = max_args = 1
options = (
('loglevel',
{'short': 'l', 'type': 'choice', 'metavar': '<log level>',
'default': 'info', 'choices': ('debug', 'info', 'warning', 'error')},
),
)
tr_map = {
'task-failed': 'fail',
'task-succeeded': 'finish',
'task-received': 'enqueue',
'task-started': 'start',
'task-revoked': 'fail',
}
def run(self, args):
from cubicweb import repoapi
from cubicweb.cwctl import init_cmdline_log_threshold
config = ServerConfiguration.config_for(args[0])
config.global_set_option('log-file', None)
config.log_format = '%(levelname)s %(name)s: %(message)s'
init_cmdline_log_threshold(config, self['loglevel'])
repo = repoapi.get_repository(config=config)
repo.hm.call_hooks('server_maintenance', repo=repo)
self.repo = repo
with repo.internal_cnx() as cnx:
self.on_monitor_start(cnx)
self.celery_monitor()
@staticmethod
def on_monitor_start(cnx):
for adapter in cnx.vreg['adapters']['ICeleryTask']:
adapter.on_monitor_start(cnx)
cnx.commit()
def on_event(self, event):
with self.repo.internal_cnx() as cnx:
for adapter in cnx.vreg['adapters']['ICeleryTask']:
adapter.on_event(cnx, event)
cnx.commit()
def celery_monitor(self):
result_backend = celery.current_app.conf['CELERY_RESULT_BACKEND']
while True:
try:
with BrokerConnection(result_backend) as conn:
recv = EventReceiver(
conn,
handlers={
'task-failed': self.on_event,
'task-succeeded': self.on_event,
'task-sent': self.on_event,
'task-received': self.on_event,
'task-revoked': self.on_event,
'task-started': self.on_event,
# '*': self.on_event,
})
recv.capture(limit=None, timeout=None)
except (KeyboardInterrupt, SystemExit):
traceback.print_exc()
break
CWCTL.register(CeleryMonitorCommand)
......@@ -22,6 +22,8 @@ BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-buildroot
BuildRequires: %{python} %{python}-setuptools
Requires: cubicweb >= 3.19
Requires: %{python}-six >= 1.4.0
Requires: %{python}-celery
Requires: cw-celerytask-helpers >= 0.1.0
%description
Run and monitor celery tasks
......
......@@ -13,6 +13,8 @@ Architecture: all
Depends:
cubicweb-common (>= 3.19),
python-six (>= 1.4.0),
cw-celerytask-helpers (>= 0.1.0),
python-celery,
${python:Depends},
${misc:Depends},
Description: Run and monitor celery tasks
......
......@@ -16,3 +16,122 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""cubicweb-celerytask entity's classes"""
import six
import celery
from celery.result import AsyncResult
from cubicweb.view import EntityAdapter
from cubicweb.predicates import is_instance
import cw_celerytask_helpers
def start_async_task(cnx, name, *args, **kwargs):
entity = cnx.create_entity('CeleryTask')
entity.cw_adapt_to('ICeleryTask').start(name, *args, **kwargs)
return entity
class ICeleryTask(EntityAdapter):
__regid__ = 'ICeleryTask'
__abstract__ = True
def start(self, name, *args, **kwargs):
eid = self.entity.eid
task = self.get_task(name, *args, **kwargs)
self._cw.transaction_data.setdefault('celerytask', {})[eid] = task
def get_task(self, name, *args, **kwargs):
"""Should return a celery task / signature or None
This method run in a precommit event
"""
return celery.signature(name, args=args, kwargs=kwargs)
@staticmethod
def on_event(cnx, event):
"""Triggered by celery-monitor"""
pass
@staticmethod
def on_monitor_start(cnx):
"""Triggered by celery-monitor"""
pass
@property
def task_id(self):
raise NotImplementedError
@property
def logs(self):
return cw_celerytask_helpers.get_task_logs(self.task_id) or b''
@property
def result(self):
return AsyncResult(self.task_id)
class CeleryTaskAdapter(ICeleryTask):
"""Base adapter that store task call args in the transaction"""
__regid__ = 'ICeleryTask'
__select__ = ICeleryTask.__select__ & is_instance('CeleryTask')
tr_map = {
'task-failed': 'fail',
'task-succeeded': 'finish',
'task-received': 'enqueue',
'task-started': 'start',
'task-revoked': 'fail',
}
def _attach_result(self, result, cwtask):
child = self.cnx.create_entity('CeleryTask',
task_id=six.text_type(result.task_id),
parent=cwtask)
for child_result in result.children or []:
self._attach_task(child_result, child)
def get_task(self, name, *args, **kwargs):
signature = super(CeleryTaskAdapter, self).get_task(
name, *args, **kwargs)
result = signature.freeze()
self.entity.cw_set(task_id=six.text_type(result.task_id))
for child_result in result.children or []:
self._attach_result(child_result, self.entity)
return signature
@property
def task_id(self):
return self.entity.task_id
@staticmethod
def on_event(cnx, event):
tr_map = CeleryTaskAdapter.tr_map
if event['type'] in tr_map:
entity = cnx.find('CeleryTask', task_id=event['uuid']).one()
transition = tr_map[event['type']]
entity.cw_adapt_to('IWorkflowable').fire_transition(
transition, event.get('exception'))
CeleryTaskAdapter.info('<CeleryTask %s (task_id %s)> %s',
entity.eid, entity.task_id, transition)
@staticmethod
def on_monitor_start(cnx):
for task_eid, task_id in cnx.execute((
'Any T, TID WHERE '
'T is CeleryTask, T task_id TID, T in_state S, '
'S name in ("waiting", "queued", "running")'
)):
result = AsyncResult(task_id)
transition = {
'SUCCESS': 'finish',
'FAILURE': 'fail',
'STARTED': 'running',
}.get(result.state)
if transition is not None:
wf = cnx.entity_from_eid(task_eid).cw_adapt_to('IWorkflowable')
wf.fire_transition(transition, result.traceback)
CeleryTaskAdapter.info('<CeleryTask %s (task_id %s)> %s',
task_eid, task_id, transition)
"""Helpers for celery workers
Add this module 'cw_celerytask_helpers' to CELERY_IMPORTS
"""
import contextlib
import logging
import json
import datetime
import sys
import redis
import celery
from celery.app.log import TaskFormatter
from celery.utils.log import get_task_logger
from celery import signals
def get_redis_client():
return redis.Redis.from_url(
celery.current_app.conf.CUBICWEB_CELERYTASK_REDIS_URL)
@signals.celeryd_after_setup.connect
def setup_redis_logging(conf=None, **kwargs):
logger = logging.getLogger('celery.task')
redis_client = get_redis_client()
logger.addHandler(RedisPubHandler(
channel='cw:celerytask:log',
redis_client=redis_client,
level=logging.DEBUG))
store_handler = RedisStoreHandler(
prefix='cw:celerytask:log',
redis_client=redis_client,
level=logging.DEBUG)
store_handler.setFormatter(logging.Formatter(
fmt="%(levelname)s %(asctime)s %(module)s %(process)d %(message)s\n"))
logger.addHandler(store_handler)
@signals.celeryd_after_setup.connect
def setup_cubicweb_logging(**kwargs):
"""
Set parent to "celery.task" for all instantiated logger names starting with
"cube" or "cubicweb"
"""
for logname in logging.root.manager.loggerDict.keys():
if logname.startswith('cubes') or logname.startswith('cubicweb'):
get_task_logger(logname)
def get_task_logs(task_id):
"""
Get task logs by id
"""
redis_client = get_redis_client()
return redis_client.get('cw:celerytask:log:%s' % (task_id,))
@contextlib.contextmanager
def redirect_stdouts(logger):
old_outs = sys.stdout, sys.stderr
try:
app = celery.current_app
rlevel = app.conf.CELERY_REDIRECT_STDOUTS_LEVEL
app.log.redirect_stdouts_to_logger(logger, rlevel)
yield
finally:
sys.stdout, sys.stderr = old_outs
@signals.celeryd_init.connect
def configure_worker(conf=None, **kwargs):
conf.setdefault('CELERY_SEND_EVENTS', True)
conf.setdefault('CELERY_SEND_TASK_SENT_EVENT', True)
conf.setdefault('CELERY_TASK_SERIALIZER', 'json')
conf.setdefault('CELERY_RESULT_SERIALIZER', 'json')
conf.setdefault('CELERY_ACCEPT_CONTENT', ['json', 'msgpack', 'yaml'])
conf.setdefault('CUBICWEB_CELERYTASK_REDIS_URL',
'redis://localhost:6379/0')
class RedisFormatter(TaskFormatter):
def format(self, record):
"""
JSON-encode a record for serializing through redis.
Convert date to iso format, and stringify any exceptions.
"""
message = super(RedisFormatter, self).format(record)
data = {
'name': record.name,
'level': record.levelno,
'levelname': record.levelname,
'filename': record.filename,
'line_no': record.lineno,
'message': message,
'time': datetime.datetime.utcnow().isoformat(),
'funcname': record.funcName,
'traceback': record.exc_info,
'task_id': record.task_id,
'task_name': record.task_name,
}
# stringify exception data
if record.exc_text:
data['traceback'] = self.formatException(record.exc_text)
return json.dumps(data)
class RedisPubHandler(logging.Handler):
"""
Publish messages to redis channel.
"""
def __init__(self, channel, redis_client, *args, **kwargs):
"""
Create a new logger for the given channel and redis_client.
"""
super(RedisPubHandler, self).__init__(*args, **kwargs)
self.channel = channel
self.redis_client = redis_client
self.formatter = RedisFormatter()
def emit(self, record):
"""
Publish record to redis logging channel
"""
self.redis_client.publish(self.channel, self.format(record))
class RedisStoreHandler(logging.Handler):
"""
Send logging messages to a redis store.
"""
def __init__(self, prefix, redis_client, *args, **kwargs):
"""
Create a new logger for the given channel and redis_client.
"""
super(RedisStoreHandler, self).__init__(*args, **kwargs)
self.prefix = prefix
self.redis_client = redis_client
def emit(self, record):
"""
Publish record to redis logging channel
"""
key = ':'.join([self.prefix, record.task_id])
self.redis_client.append(key, self.format(record))
from setuptools import setup, find_packages
setup(
name='cw-celerytask-helpers',
version='0.1.0',
description='Worker side helpers for cubicweb-celerytask',
author='LOGILAB S.A. (Paris, FRANCE)',
author_email='contact@logilab.fr',
license='LGPL',
classifiers=[
'Development Status :: 3 - Alpha',
'Intended Audience :: Developers',
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 2.7',
],
packages=find_packages('.'),
install_requires=[
'celery',
'redis',
],
tests_requires=[
],
)
......@@ -16,3 +16,30 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""cubicweb-celerytask specific hooks and operations"""
from __future__ import print_function
import six
from cubicweb.predicates import adaptable
from cubicweb.server import hook
if six.PY3:
unicode = six.text_type
class StartCeleryTaskOp(hook.DataOperationMixIn, hook.Operation):
def postcommit_event(self):
for eid in self.get_data():
task = self.cnx.transaction_data.get('celerytask', {}).get(eid)
if task is not None:
task.delay()
class StartTaskHook(hook.Hook):
__regid__ = 'celerytask.start_task'
__select__ = hook.Hook.__select__ & adaptable('ICeleryTask')
events = ('after_add_entity', 'after_update_entity', 'after_add_relation')
def __call__(self):
StartCeleryTaskOp.get_instance(self._cw).add_data(self.entity.eid)
......@@ -20,6 +20,17 @@ the cube is added to an existing instance.
You could setup site properties or a workflow here for example.
"""
import six
# Example of site property change
#set_property('ui.site-title', "<sitename>")
_ = six.text_type
wf = add_workflow(u'CeleryTask Workflow', 'CeleryTask')
waiting = wf.add_state(_('waiting'), initial=True)
queued = wf.add_state(_('queued'))
running = wf.add_state(_('running'))
done = wf.add_state(_('done'))
failed = wf.add_state(_('failed'))
wf.add_transition(_('enqueue'), waiting, queued)
wf.add_transition(_('start'), queued, running)
wf.add_transition(_('finish'), (waiting, queued, running), done)
wf.add_transition(_('fail'), (waiting, queued, running), failed)
......@@ -16,3 +16,20 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""cubicweb-celerytask schema"""
from yams.buildobjs import RelationDefinition, String
from cubicweb.schema import WorkflowableEntityType
class CeleryTask(WorkflowableEntityType):
task_id = String(maxsize=40, required=False, indexed=True)
class parent(RelationDefinition):
object = 'CeleryTask'
subject = 'CeleryTask'
cardinality = '?*'
composite = 'object'
inlined = True
import six
from celery.result import AsyncResult
from cubicweb.predicates import is_instance
from cubes.celerytask.entities import ICeleryTask
class RunAdapter(ICeleryTask):
__select__ = ICeleryTask.__select__ & is_instance('Run')
def get_task(self, name, *args, **kwargs):
signature = super(RunAdapter, self).get_task(name, *args, **kwargs)
result = signature.freeze()
self.entity.cw_set(task_id=six.text_type(result.task_id))
return signature
@staticmethod
def on_event(cnx, event):
if event['type'] in ('task-failed', 'task-succeeded'):
run = cnx.find('Run', task_id=event['uuid']).one()
run.cw_adapt_to('IWorkflowable').fire_transition('finish')
if event['type'] == 'task-succeeded':
run.cw_set(result=self.result.get())
@staticmethod
def on_monitor_start(cnx):
for run_eid, task_id in cnx.execute((
'Any R, TID WHERE '
'R is Run, R task_id TID, T in_state S, '
'S name "pending"'
)):
result = AsyncResult(task_id)
if result.state in ('SUCCESS', 'FAILURE'):
run = cnx.entity_from_eid(run_eid)
run.cw_adapt_to('IWorkflowable').fire_transition('finish')
if result.state == ('SUCCESS'):
run.cw_set(result=result.get())
@property
def task_id(self):
return self.entity.task_id
from cubicweb.server.hook import Hook
from cubicweb.predicates import is_instance, on_fire_transition, has_related_entities
from cubes.celerytask.entities import start_async_task
class StartRunHook(Hook):
__regid__ = 'test.start_run_hook'
__select__ = Hook.__select__ & on_fire_transition('Run', 'start')