# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals
import os
import signal
import sys
from time import sleep
from celery.exceptions import SoftTimeLimitExceeded
from celery.utils.log import get_task_logger
from .app import app
E_MARKER_DELAY_ERROR = """\
Retrying marker.delay(). It failed to start: {0}\
"""
logger = get_task_logger(__name__)
[docs]def marker(s, sep='-'):
"""Marker is a task that logs something to the worker logs.
:param s: Text to log.
"""
print('{0}{1}'.format(sep, s))
while True:
try:
return _marker.delay(s, sep)
except Exception as exc:
print(E_MARKER_DELAY_ERROR.format(exc))
@app.task
def _marker(s, sep='---'):
print('{sep} {0} {sep}'.format(s, sep=sep))
@app.task
def add(x, y):
"""Add two numbers."""
return x + y
@app.task(bind=True)
def ids(self, i):
"""Returns a tuple of ``root_id``, ``parent_id`` and
the argument passed as ``i``."""
return self.request.root_id, self.request.parent_id, i
@app.task(bind=True)
def collect_ids(self, res, i):
"""Used as a callback in a chain or group where the previous tasks
are :task:`ids`: returns a tuple of::
(previous_result, (root_id, parent_id, i))
"""
return res, ids(i)
@app.task
def xsum(x):
"""Takes a list of numbers and returns the total."""
return sum(x)
@app.task
def any_(*args, **kwargs):
"""Task taking any argument, returning nothing.
This is useful for testing related to large arguments: big values,
an insane number of positional arguments, etc.
:keyword sleep: Optional number of seconds to sleep for before returning.
"""
wait = kwargs.get('sleep')
if wait:
sleep(wait)
@app.task
def any_returning(*args, **kwargs):
"""The same as :task:`any` except it returns the arguments given
as a tuple of ``(args, kwargs)``."""
any_(*args, **kwargs)
return args, kwargs
@app.task
def exiting(status=0):
"""Task calling ``sys.exit(status)`` to terminate its own worker
process."""
sys.exit(status)
@app.task
def kill(sig=getattr(signal, 'SIGKILL', None) or signal.SIGTERM):
"""Task sending signal to process currently executing itself.
:keyword sig: Signal to send as signal number, default is :sig:`KILL`
on platforms that supports that signal, for other platforms (i.e Windows)
it will be :sig:`TERM`.
"""
os.kill(os.getpid(), sig)
@app.task
def sleeping(i, **_):
"""Task sleeping for ``i`` seconds, and returning nothing."""
sleep(i)
@app.task
def sleeping_ignore_limits(i, **_):
"""Task sleeping for ``i`` seconds, while ignoring soft time limits.
If the task is signalled with
:exc:`~celery.exceptions.SoftTimeLimitExceeded` the signal is ignored
and the task will sleep for ``i`` seconds again, which will trigger
the hard time limit (if enabled).
"""
try:
sleep(i)
except SoftTimeLimitExceeded:
sleep(i)
@app.task(bind=True)
def retries(self, n=1, countdown=1, return_value=10):
"""Task that retries itself ``n`` times.
:param n: Number of times to retry.
:param n: Seconds to wait (``int``/``float``) between each retry (default
is one second).
:param return_value: Value to return when task finally succeeds.
Default is 10 (don't ask, I guess it's a random true value).
"""
if not self.request.retries or self.request.retries < n:
raise self.retry(countdown=countdown)
return return_value
@app.task
def print_unicode(log_message='hå它 valmuefrø', print_message='hiöäüß'):
"""Task that both logs and print strings containing funny characters."""
logger.warning(log_message)
print(print_message)
@app.task
def segfault():
"""Task causing a segfault, abruptly terminating the process
executing the task."""
import ctypes
ctypes.memset(0, 0, 1)
assert False, 'should not get here'
@app.task(bind=True)
def chord_adds(self, x):
"""Task that adds a new task to the current chord in a workflow."""
self.add_to_chord(add.s(x, x))
return 42
@app.task(bind=True)
def chord_replace(self, x):
"""Task that replaces itself in the current chord."""
return self.replace_in_chord(add.s(x, x))
@app.task
def raising(exc=KeyError()):
"""Task raising exception.
:param exc: Exception to raise.
"""
raise exc
@app.task
def errback(request, exc, traceback):
print('Task id {0!r} raised exception: {1!r}\n{2}'.format(
request.id, exc, traceback,
))
@app.task
def old_errback(task_id):
print('Task id %r raised exception' % (task_id,))
@app.task
def logs(msg, p=False):
"""Log a message to the worker logs.
:keyword p: If set to :const:`True` the message will be printed instead
of logged, thus being redirected to the stdout logger.
"""
print(msg) if p else logger.info(msg)