Source code for cyanide.templates

from __future__ import absolute_import, unicode_literals

import celery
import os

from functools import partial

from celery.five import items
from kombu import Queue
from kombu.utils import symbol_by_name

CYANIDE_TRANS = os.environ.get('CYANIDE_TRANS', False)
default_queue = 'c.stress.trans' if CYANIDE_TRANS else 'c.stress'
CYANIDE_QUEUE = os.environ.get('CYANIDE_QUEUE', default_queue)

templates = {}

IS_CELERY_4 = celery.VERSION[0] >= 4


[docs]def template(name=None): def _register(cls): templates[name or cls.__name__] = '.'.join([__name__, cls.__name__]) return cls return _register
if IS_CELERY_4: def use_template(app, template='default'): if isinstance(template, list) and len(template) == 1: # weird argparse thing template = template[0] template = template.split(',') # mixin the rest of the templates when the config is needed @app.on_after_configure.connect(weak=False) def load_template(sender, source, **kwargs): mixin_templates(template[1:], source) app.config_from_object(templates[template[0]]) else:
[docs] def use_template(app, template='default'): # noqa template = template.split(',') app.after_configure = partial(mixin_templates, template[1:]) app.config_from_object(templates[template[0]])
[docs]def mixin_templates(templates, conf): return [mixin_template(template, conf) for template in templates]
[docs]def mixin_template(template, conf): cls = symbol_by_name(templates[template]) conf.update(dict( (k, v) for k, v in items(vars(cls)) if not k.startswith('_') ))
[docs]def template_names(): return ', '.join(templates)
[docs]@template() class default(object): CELERY_ACCEPT_CONTENT = ['json'] BROKER_URL = os.environ.get('CYANIDE_BROKER', 'pyamqp://') BROKER_HEARTBEAT = 30 CELERY_RESULT_BACKEND = os.environ.get('CYANIDE_BACKEND', 'rpc://') CELERY_RESULT_SERIALIZER = 'json' CELERY_RESULT_PERSISTENT = True CELERY_RESULT_EXPIRES = 300 CELERY_MAX_CACHED_RESULTS = 100 CELERY_DEFAULT_QUEUE = CYANIDE_QUEUE CELERY_IMPORTS = ['cyanide.tasks'] CELERY_TRACK_STARTED = True CELERY_QUEUES = [ Queue(CYANIDE_QUEUE, durable=not CYANIDE_TRANS, no_ack=CYANIDE_TRANS), ] CELERY_TASK_SERIALIZER = 'json' CELERY_TASK_PUBLISH_RETRY_POLICY = { 'max_retries': 100, 'interval_max': 2, 'interval_step': 0.1, } CELERY_TASK_PROTOCOL = 2 if CYANIDE_TRANS: CELERY_DEFAULT_DELIVERY_MODE = 1 CELERYD_PREFETCH_MULTIPLIER = int(os.environ.get('CYANIDE_PREFETCH', 10))
[docs]@template() class redis(default): BROKER_URL = os.environ.get('CYANIDE_BROKER', 'redis://') BROKER_TRANSPORT_OPTIONS = { 'fanout_prefix': True, 'fanout_patterns': True, } CELERY_RESULT_BACKEND = os.environ.get('CYANIDE_BACKEND', 'redis://')
[docs]@template() class redistore(default): CELERY_RESULT_BACKEND = 'redis://'
[docs]@template() class acks_late(default): CELERY_ACKS_LATE = True
[docs]@template() class pickle(default): CELERY_ACCEPT_CONTENT = ['pickle', 'json'] CELERY_TASK_SERIALIZER = 'pickle' CELERY_RESULT_SERIALIZER = 'pickle'
[docs]@template() class confirms(default): BROKER_URL = 'pyamqp://' BROKER_TRANSPORT_OPTIONS = {'confirm_publish': True}
[docs]@template() class events(default): CELERY_SEND_EVENTS = True CELERY_SEND_TASK_SENT_EVENT = True
[docs]@template() class execv(default): CELERYD_FORCE_EXECV = True
[docs]@template() class sqs(default): BROKER_URL = 'sqs://' BROKER_TRANSPORT_OPTIONS = { 'region': os.environ.get('AWS_REGION', 'us-east-1'), }
[docs]@template() class proto1(default): CELERY_TASK_PROTOCOL = 1
[docs]@template() class vagrant1(default): BROKER_URL = 'pyamqp://testing:t3s71ng@192.168.33.123//testing'
[docs]@template() class vagrant1_redis(redis): BROKER_URL = 'redis://192.168.33.123' CELERY_RESULT_BACKEND = 'redis://192.168.33.123'