Commit 5a1e0652 authored by David Douard's avatar David Douard
Browse files

extract helpers in a standalone package

parent 860d52ca4c55
......@@ -9,14 +9,14 @@ Installation and setup
Declare tasks using celery task or cubicweb-celery cwtasks.
On worker side, install cw-celerytask-helpers from ./helpers.
On worker side, install cw-celerytask-helpers_.
celeryconfig.py example::
BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = BROKER_URL
CUBICWEB_CELERYTASK_REDIS_URL = BROKER_URL
CELERY_IMPORTS = ('cw_celerytask_helpers', 'module.containing.tasks')
CELERY_IMPORTS = ('cw_celerytask_helpers.redislogger', 'module.containing.tasks')
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json', 'msgpack', 'yaml']
......@@ -47,6 +47,7 @@ Ensure to have the celeryconfig.py loaded for both cubicweb instance and
celery worker, enforce by settings with CELERY_CONFIG_MODULE environment
variable (it must be an importable python module).
.. _cw-celerytask-helpers: https://www.cubicweb.org/project/cw-celerytask-helpers
Running tasks
-------------
......
......@@ -22,7 +22,7 @@ __depends__ = {
'cubicweb': '>= 3.19',
'six': '>= 1.4.0',
'celery': None,
'cw-celerytask-helpers': '>= 0.1.0',
'cw-celerytask-helpers': '>= 0.2.0',
}
__recommends__ = {}
......
......@@ -16,7 +16,7 @@ Architecture: all
Depends:
cubicweb-common (>= 3.19),
python-six (>= 1.4.0),
python-cw-celerytask-helpers (>= 0.1.0),
python-cw-celerytask-helpers (>= 0.2.0),
python-celery,
${python:Depends},
${misc:Depends},
......
BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = BROKER_URL
CUBICWEB_CELERYTASK_REDIS_URL = BROKER_URL
CELERY_IMPORTS = ('cw_celerytask_helpers', 'demotasks')
CELERY_IMPORTS = ('cw_celerytask_helpers.redislogger', 'demotasks')
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json', 'msgpack', 'yaml']
......@@ -7,7 +7,7 @@ import random
from celery import current_app as app
from celery.utils.log import get_task_logger
from cw_celerytask_helpers import redirect_stdouts
from cw_celerytask_helpers.redislogger import redirect_stdouts
# 3 kinds of logger
cw_logger = logging.getLogger('cubes.tasks')
......
......@@ -26,7 +26,7 @@ from cubicweb.view import EntityAdapter
from cubicweb.predicates import is_instance
from cubicweb.server.hook import DataOperationMixIn, Operation
import cw_celerytask_helpers
from cw_celerytask_helpers import redislogger as loghelper
_TEST_TASKS = {}
......@@ -119,7 +119,7 @@ class ICeleryTask(EntityAdapter):
@property
def logs(self):
return cw_celerytask_helpers.get_task_logs(self.task_id) or b''
return loghelper.get_task_logs(self.task_id) or b''
@property
def result(self):
......
"""Helpers for celery workers
Add this module 'cw_celerytask_helpers' to CELERY_IMPORTS
"""
import contextlib
import logging
import json
import datetime
import sys
import traceback
import redis
import celery
from celery.app.log import TaskFormatter
from celery.utils.log import get_task_logger
from celery import signals
LOG_KEY_PREFIX = "cw:celerytask:log"
def get_redis_client():
return redis.Redis.from_url(
celery.current_app.conf.CUBICWEB_CELERYTASK_REDIS_URL)
def get_log_key(task_id):
return "{0}:{1}".format(LOG_KEY_PREFIX, task_id)
@signals.celeryd_after_setup.connect
def setup_redis_logging(conf=None, **kwargs):
logger = logging.getLogger('celery.task')
redis_client = get_redis_client()
logger.addHandler(RedisPubHandler(
channel=LOG_KEY_PREFIX,
redis_client=redis_client,
level=logging.DEBUG))
store_handler = RedisStoreHandler(
prefix=LOG_KEY_PREFIX,
redis_client=redis_client,
level=logging.DEBUG)
store_handler.setFormatter(logging.Formatter(
fmt="%(levelname)s %(asctime)s %(module)s %(process)d %(message)s\n"))
logger.addHandler(store_handler)
@signals.celeryd_after_setup.connect
def setup_cubicweb_logging(conf=None, **kwargs):
"""
Set parent to "celery.task" for all instantiated logger names starting with
"cube" or "cubicweb"
"""
logall = conf.get('CUBICWEB_CELERYTASK_LOG_ALL', False)
for logname in logging.root.manager.loggerDict.keys():
if (logall or
logname.startswith('cubes') or logname.startswith('cubicweb')):
get_task_logger(logname)
def get_task_logs(task_id):
"""
Get task logs by id
"""
redis_client = get_redis_client()
return redis_client.get(get_log_key(task_id))
def flush_task_logs(task_id):
"""Delete task logs"""
redis_client = get_redis_client()
return redis_client.delete(get_log_key(task_id))
@contextlib.contextmanager
def redirect_stdouts(logger):
old_outs = sys.stdout, sys.stderr
try:
app = celery.current_app
rlevel = app.conf.CELERY_REDIRECT_STDOUTS_LEVEL
app.log.redirect_stdouts_to_logger(logger, rlevel)
yield
finally:
sys.stdout, sys.stderr = old_outs
@signals.celeryd_init.connect
def configure_worker(conf=None, **kwargs):
conf.setdefault('CELERY_SEND_EVENTS', True)
conf.setdefault('CELERY_SEND_TASK_SENT_EVENT', True)
conf.setdefault('CELERY_TASK_SERIALIZER', 'json')
conf.setdefault('CELERY_RESULT_SERIALIZER', 'json')
conf.setdefault('CELERY_ACCEPT_CONTENT', ['json', 'msgpack', 'yaml'])
conf.setdefault('CUBICWEB_CELERYTASK_REDIS_URL',
'redis://localhost:6379/0')
@signals.task_failure.connect
def log_exception(**kwargs):
logger = logging.getLogger("celery.task")
logger.critical("unhandled exception:\n%s",
"".join(traceback.format_tb(kwargs['traceback'])))
class RedisFormatter(TaskFormatter):
def format(self, record):
"""
JSON-encode a record for serializing through redis.
Convert date to iso format, and stringify any exceptions.
"""
message = super(RedisFormatter, self).format(record)
data = {
'name': record.name,
'level': record.levelno,
'levelname': record.levelname,
'filename': record.filename,
'line_no': record.lineno,
'message': message,
'time': datetime.datetime.utcnow().isoformat(),
'funcname': record.funcName,
'traceback': record.exc_info,
'task_id': record.task_id,
'task_name': record.task_name,
}
# stringify exception data
if record.exc_text:
data['traceback'] = self.formatException(record.exc_text)
return json.dumps(data)
class RedisPubHandler(logging.Handler):
"""
Publish messages to redis channel.
"""
def __init__(self, channel, redis_client, *args, **kwargs):
"""
Create a new logger for the given channel and redis_client.
"""
super(RedisPubHandler, self).__init__(*args, **kwargs)
self.channel = channel
self.redis_client = redis_client
self.formatter = RedisFormatter()
def emit(self, record):
"""
Publish record to redis logging channel
"""
self.redis_client.publish(self.channel, self.format(record))
class RedisStoreHandler(logging.Handler):
"""
Send logging messages to a redis store.
"""
def __init__(self, prefix, redis_client, *args, **kwargs):
"""
Create a new logger for the given channel and redis_client.
"""
super(RedisStoreHandler, self).__init__(*args, **kwargs)
self.prefix = prefix
self.redis_client = redis_client
def emit(self, record):
"""
Publish record to redis logging channel
"""
key = get_log_key(record.task_id)
self.redis_client.append(key, self.format(record))
from setuptools import setup, find_packages
setup(
name='cw-celerytask-helpers',
version='0.1.0',
description='Worker side helpers for cubicweb-celerytask',
author='LOGILAB S.A. (Paris, FRANCE)',
author_email='contact@logilab.fr',
license='LGPL',
classifiers=[
'Development Status :: 3 - Alpha',
'Intended Audience :: Developers',
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 2.7',
],
packages=find_packages('.'),
install_requires=[
'celery',
'redis',
],
)
......@@ -24,7 +24,7 @@ from cubicweb import Binary
from cubicweb.predicates import on_fire_transition
from cubicweb.server.hook import Hook, DataOperationMixIn, Operation
from cw_celerytask_helpers import flush_task_logs
from cw_celerytask_helpers.redislogger import flush_task_logs
if six.PY3:
unicode = six.text_type
......
......@@ -6,7 +6,7 @@ import logging
from celery import current_app as app
from celery.utils.log import get_task_logger
from cw_celerytask_helpers import redirect_stdouts
from cw_celerytask_helpers.redislogger import redirect_stdouts
cw_logger = logging.getLogger('cubes.tasks')
dummy_logger = logging.getLogger('dummy')
......
......@@ -33,7 +33,7 @@ from cubes.celerytask.entities import (start_async_task, StartCeleryTaskOp,
run_all_tasks)
from cubes.celerytask.ccplugin import CeleryMonitorCommand
import cw_celerytask_helpers
import cw_celerytask_helpers.redislogger as loghelper
class CeleryTaskTC(testlib.CubicWebTC):
......@@ -56,7 +56,7 @@ class CeleryTaskTC(testlib.CubicWebTC):
conf.CELERY_TASK_SERIALIZER = 'json'
conf.CELERY_RESULT_SERIALIZER = 'json'
conf.CELERY_ACCEPT_CONTENT = ['json', 'msgpack', 'yaml']
conf.CELERY_IMPORTS = ('cw_celerytask_helpers', 'tasks')
conf.CELERY_IMPORTS = ('cw_celerytask_helpers.redislogger', 'tasks')
import tasks # noqa
cls.worker = multiprocessing.Process(target=cls.start_worker)
cls.worker.start()
......@@ -156,7 +156,7 @@ class CeleryTaskTC(testlib.CubicWebTC):
self.wait_async_task(cnx, cwtask.task_id)
# logs should be flushed from redis to database
redis_logs = cw_celerytask_helpers.get_task_logs(cwtask.task_id)
redis_logs = loghelper.get_task_logs(cwtask.task_id)
self.assertIsNone(redis_logs)
cwtask.task_logs.seek(0)
task_logs = cwtask.task_logs.read()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment