Commit 225f8752 authored by Philippe Pepiot's avatar Philippe Pepiot
Browse files

[ccplugin] celery-monitor: retry failed items

When multiple instance of celery-monitor are running, we could have an
integrity errors raised if two instance are working on the same task_id or
temporary (network, host) failures. In this case we want to retry handling the
task_id later.

Put pending processed task_id in a "pending queue" and each minutes and if the
monitor queue is empty, requeue pending items.

This change require to handle the "timeout" parameter of loop (only used in
tests) in a different way to ensure not blocking forever in redis "brpoplpush".
parent 95f71105bbaf
......@@ -30,6 +30,9 @@ from cw_celerytask_helpers.utils import get_redis_client
logger = logging.getLogger(__name__)
PENDING_KEY = ':'.join([MONITOR_KEY, 'pending'])
REQUEUE_TIMEOUT = 60
class CeleryMonitorCommand(Command):
"""Synchronize celery task statuses"""
......@@ -75,18 +78,35 @@ class CeleryMonitorCommand(Command):
else:
break
@staticmethod
def requeue(client):
logger.info('requeue pending events')
while client.rpoplpush(PENDING_KEY, MONITOR_KEY):
pass
@staticmethod
def loop(cnx, timeout=None):
client = get_redis_client()
client.ping()
logger.info('Connected to redis')
test = (cnx.repo.config.mode == "test")
requeue_timer = timer = time.time()
while True:
data = client.brpop(MONITOR_KEY, timeout=timeout)
# pop item from MONITOR_KEY and push it to PENDING_KEY
data = client.brpoplpush(MONITOR_KEY, PENDING_KEY, timeout=1)
if data is None:
now = time.time()
if timeout is not None and abs(timer - now) > timeout:
break
data = json.loads(data[1].decode())
task_id, task_name = data['task_id'], data['task_name']
if abs(requeue_timer - time.time()) > REQUEUE_TIMEOUT:
# no items left in MONITOR_KEY and we reached the
# REQUEUE_TIMEOUT requeue failed items from PENDING_KEY to
# MONITOR_KEY
CeleryMonitorCommand.requeue(client)
requeue_timer = time.time()
continue
payload = json.loads(data.decode())
task_id, task_name = payload['task_id'], payload['task_name']
for adapter in cnx.vreg['adapters']['ICeleryTask']:
try:
adapter.sync_task_state(cnx, task_id,
......@@ -99,6 +119,9 @@ class CeleryMonitorCommand(Command):
if test:
# we should not hide exceptions in tests
raise
else:
# success, drop item from PENDING_KEY
client.lrem(PENDING_KEY, data, num=1)
CWCTL.register(CeleryMonitorCommand)
......@@ -64,7 +64,7 @@ def run_all_tasks(cnx=None):
wf.fire_transition(transition, result.traceback)
else:
from cubes.celerytask.ccplugin import CeleryMonitorCommand
CeleryMonitorCommand.loop(cnx, 1)
CeleryMonitorCommand.loop(cnx, timeout=0)
return results
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment