Source code for cyanide.fbi

from __future__ import absolute_import, print_function, unicode_literals

import socket
import sys

from contextlib import contextmanager

from celery import states


[docs]class FBI(object): def __init__(self, app): self.app = app self.receiver = None self.state = self.app.events.State() self.connection = None self.enabled = False
[docs] def enable(self, enabled): self.enabled = enabled
@contextmanager
[docs] def investigation(self): if self.enabled: with self.app.connection() as conn: receiver = self.app.events.Receiver( conn, handlers={'*': self.state.event}, ) with receiver.consumer_context() as (conn, _, _): self.connection = conn try: yield self finally: self.ffwd() else: yield
[docs] def ffwd(self): while 1: try: self.connection.drain_events(timeout=1) except socket.error: break
[docs] def state_of(self, tid): try: task = self.state.tasks[tid] except KeyError: return 'No events for {0}'.format(tid) if task.state in states.READY_STATES: return 'Task {0.uuid} completed with {0.state}'.format(task) elif task.state in states.UNREADY_STATES: return 'Task {0.uuid} waiting in {0.state} state'.format(task) else: return 'Task {0.uuid} in other state {0.state}'.format(task)
[docs] def query(self, ids): return self.app.control.inspect().query_task(id)
[docs] def diag(self, ids, file=sys.stderr): if self.enabled: self.ffwd() for tid in ids: print(self.state_of(tid), file=file)