Source code for cyanide.app
from __future__ import absolute_import, print_function, unicode_literals
import celery
from celery import signals
from celery.bin.base import Option
from .templates import use_template, template_names
IS_CELERY_4 = celery.VERSION[0] >= 4
[docs]class App(celery.Celery):
cyanide_suite = 'cyanide.suites.default:Default'
template_selected = False
def __init__(self, *args, **kwargs):
self.template = kwargs.pop('template', None)
super(App, self).__init__(*args, **kwargs)
self.user_options['preload'].add(
Option(
'-Z', '--template', default='default', type=str,
help='Configuration template to use: {0}'.format(
template_names(),
),
)
)
signals.user_preload_options.connect(self.on_preload_parsed)
if IS_CELERY_4:
self.on_configure.connect(self._maybe_use_default_template)
[docs] def on_preload_parsed(self, options=None, **kwargs):
self.use_template(options['template'])
[docs] def use_template(self, name='default'):
if self.template_selected:
raise RuntimeError('App already configured')
self.template_selected = True
use_template(self, name)
def _maybe_use_default_template(self, **kwargs):
if not self.template_selected:
self.use_template('default')
if not IS_CELERY_4:
after_configure = None
def _get_config(self):
ret = super(App, self)._get_config()
if self.after_configure:
self.after_configure(ret)
return ret
def on_configure(self):
self._maybe_use_default_template()
app = App('cyanide', set_as_current=False)