2013-03-01 26 views
10

real-time monitoring of celery workers ile ilgili kereviz öğreticisine göre, çalışanlar tarafından üretilen olayları programatik olarak yakalayabilir ve buna göre harekete geçebilirsiniz.Celery-Django uygulamasında çalışanlardan olaylar nasıl izlenir?

Sorum şu: Bir monitörü, bir Celery-Django uygulamasında this örneğindeki gibi nasıl entegre edebilirim?

DÜZENLEME:

from celery import Celery 

def my_monitor(app): 
    state = app.events.State() 

    def announce_failed_tasks(event): 
     state.event(event) 
     task_id = event['uuid'] 

     print('TASK FAILED: %s[%s] %s' % (
      event['name'], task_id, state[task_id].info(),)) 
    with app.connection() as connection: 
     recv = app.events.Receiver(connection, handlers={ 
       'task-failed': announce_failed_tasks, 
       'worker-heartbeat': announce_dead_workers, 
     }) 
     recv.capture(limit=None, timeout=None, wakeup=True) 

if __name__ == '__main__': 
    celery = Celery(broker='amqp://[email protected]//') 
    my_monitor(celery) 

Yani için sonuç almak için, işçi tarafından gönderilen task_failed olayı yakalamak için, ve öğretici gösterileri gibi onun task_id almak istiyorum: eğitimde kod örneği benziyor Bu görev, uygulamam için yapılandırılmış ve daha fazla işleyen sonuç arka ucundan. Benim problemim, bir django-kereviz projesinde olduğu gibi, uygulamanın nasıl elde edilebileceğinin açık olmadığıdır.

Ayrıca, bir çalışan bir görevi yürütmeyi bitirdiğinde sonuçların nasıl işleneceği konusunda başka herhangi bir fikre de açığım.

+0

, hangi olayların yakalama gerek? Örnek kodunuz var mı? – danodonovan

cevap

14

Bunu yapmanın bir yolunu buldum, ancak bunun çözüm olduğundan emin değilim, ama benim için çalışıyor. Monitör işlevi temel olarak doğrudan aracıya bağlanır ve farklı olay türlerini dinler. Kodum şunun gibi:

from celery.events import EventReceiver 
from kombu import Connection as BrokerConnection 

def my_monitor: 
    connection = BrokerConnection('amqp://guest:[email protected]:5672//') 

    def on_event(event): 
     print "EVENT HAPPENED: ", event 

    def on_task_failed(event): 
     exception = event['exception'] 
     print "TASK FAILED!", event, " EXCEPTION: ", exception 

    while True: 
     try: 
      with connection as conn: 
       recv = EventReceiver(conn, 
           handlers={'task-failed' : on_task_failed, 
              'task-succeeded' : on_event, 
              'task-sent' : on_event, 
              'task-received' : on_event, 
              'task-revoked' : on_event, 
              'task-started' : on_event, 
              # OR: '*' : on_event 
              }) 
      recv.capture(limit=None, timeout=None) 
    except (KeyboardInterrupt, SystemExit): 
     print "EXCEPTION KEYBOARD INTERRUPT" 
     sys.exit() 

Hepsi bu kadar. Ve bunu normal uygulamadan farklı bir süreçte çalıştırıyorum, yani kereviz uygulamasının sadece bu işlevi yerine getiren bir çocuk süreci oluşturduğunu. HTH

+0

Merhaba, teşekkürler, sorunuz temelde şu an yapmaya çalıştığım şey. Bu kodu Django projenize nereye koyuyorsunuz? Kereviz uygulamasının çocuk sürecini oluşturmayı açıklayabilir misiniz? Şu anda kereviz uygulaması myproj/myproj/celery.py'de (http://docs.celeryproject.org/en/latest/django/first-steps-with-django.html#using-celery -with-django) – fpghost

+1

Merhaba! Uzun zamandır bunun üzerinde çalışmıyorum, bu yüzden son n bültenlerinde Celery'de işler değişmiş olabilir. Temel olarak bir Python daemon işlemine başlıyordum: daemon_process = Süreç (target = results_processing.my_monitor) daemon_process.daemon = True daemon_process.start() Uygulama başlatıldığında denilen modüllerden birinde – Clara

+0

Django kullanıyordum ve bu monitörü bu şekilde başlattı. Django <1.9'da, kereviz uygulamasını tanımladıktan sonra '' my_monitor (app) 'tarafından 'proj/proj/celery.py' dosyasında izlemeye başlayabildim. Şimdi bir "AppRegistryNotReady" exc ile sonuçlanan Django 1.9'da ('__init __. Py' uygulamalarında modellerin içe aktarılmasına artık izin verilmediğini düşünüyorum --- monitörümün bazı modellere dayandığını not etmeliyim). Ekranı monitörümün güvenilir olduğu django uygulamasının AppConfig.ready() yönteminde başlattım (bu, uygulamanın kayıt işlemini tamamladığından emin olur). HTH – fpghost

4

Sorunlar Eğer kereviz config CELERY_SEND_EVENTS bayrağı olarak gerçek ayarlamanız gerekir

  1. bir çift sakının.
  2. Ayrıca, etkinlik izleyicisini çalışanınızdan yeni bir iş parçacığına da ayarlayabilirsiniz. İşte

benim uygulamasıdır: Ben biraz daha spesifik olmak zorundasınız düşünüyorum

class MonitorThread(object): 
    def __init__(self, celery_app, interval=1): 
     self.celery_app = celery_app 
     self.interval = interval 

     self.state = self.celery_app.events.State() 

     self.thread = threading.Thread(target=self.run, args=()) 
     self.thread.daemon = True 
     self.thread.start() 

    def catchall(self, event): 
     if event['type'] != 'worker-heartbeat': 
      self.state.event(event) 

     # logic here 

    def run(self): 
     while True: 
      try: 
       with self.celery_app.connection() as connection: 
        recv = self.celery_app.events.Receiver(connection, handlers={ 
         '*': self.catchall 
        }) 
        recv.capture(limit=None, timeout=None, wakeup=True) 

      except (KeyboardInterrupt, SystemExit): 
       raise 

      except Exception: 
       # unable to capture 
       pass 

      time.sleep(self.interval) 

if __name__ == '__main__': 
    app = get_celery_app() # returns app 
    MonitorThread(app) 
    app.start() 
İlgili konular