2015-03-30 17 views
7

İstek/yanıt döngüsünün dışında işlenmesi gereken öğeleri sıraya almak için bir Tornado web sunucusu kullanıyorum.Tornado'da bir işlem kuyruğu oluşturma

Aşağıdaki basitleştirilmiş örneğimde, her istek geldiğinde, queued_items adlı listeye yeni bir dize ekliyorum. Bu listeyi izleyecek ve öğeleri ortaya çıktıkça işleyecek bir şey oluşturmak istiyorum.

(Gerçek kodumda, öğeler web isteği geldiğinde bağlanabilen veya bağlanamayan bir TCP soketi üzerinden işlenir ve gönderilir. Web sunucusunun soket bağlantısından bağımsız olarak sıra beklemeye devam etmesini istiyorum)

Bu kodu basit tutmaya çalışıyorum ve Redis veya Beanstalk gibi harici kuyrukları/programları kullanmıyorum. Çok yüksek hacimli olmayacak.

Yeni öğeler için client.queued_items listesini izlemek ve geldiklerinde işlemden geçirmek için Tornado deyimlerini kullanmanın iyi bir yolu nedir?

import time 

import tornado.ioloop 
import tornado.gen 
import tornado.web 

class Client(): 

    def __init__(self): 
     self.queued_items = [] 

    @tornado.gen.coroutine 
    def watch_queue(self): 
     # I have no idea what I'm doing 
     items = yield client.queued_items 
     # go_do_some_thing_with_items(items) 

class IndexHandler(tornado.web.RequestHandler): 

    def get(self): 
     client.queued_items.append("%f" % time.time()) 
     self.write("Queued a new item") 

if __name__ == "__main__": 

    client = Client() 

    # Watch the queue for when new items show up 
    client.watch_queue() 

    # Create the web server 
    application = tornado.web.Application([ 
     (r'/', IndexHandler), 
    ], debug=True) 

    application.listen(8888) 
    tornado.ioloop.IOLoop.instance().start() 

cevap

11

tornado için senkronizasyon ilkeller sağlar toro adında bir kütüphane vardır. [Güncelleme:. Kasırga 4.2 itibariyle torotornado içine birleştirildi] sadece bu işlemek için bir toro.Queue (veya tornado.queues.Queue 4.2+ tornado olarak) varmış gibi

Sesler: Orada

import time 

import toro 
import tornado.ioloop 
import tornado.gen 
import tornado.web 

class Client(): 

    def __init__(self): 
     self.queued_items = toro.Queue() 

    @tornado.gen.coroutine 
    def watch_queue(self): 
     while True: 
      items = yield self.queued_items.get() 
      # go_do_something_with_items(items) 

class IndexHandler(tornado.web.RequestHandler): 

    @tornado.gen.coroutine 
    def get(self): 
     yield client.queued_items.put("%f" % time.time()) 
     self.write("Queued a new item") 

if __name__ == "__main__": 

    client = Client() 

    # Watch the queue for when new items show up 
    tornado.ioloop.IOLoop.instance().add_callback(client.watch_queue) 

    # Create the web server 
    application = tornado.web.Application([ 
     (r'/', IndexHandler), 
    ], debug=True) 

    application.listen(8888) 
    tornado.ioloop.IOLoop.instance().start() 

  1. Bizplanlamak gerekir: birkaç küçük düzenlemeyle bir toro.Queue bir listeden veri yapısını değiştirme kenara gereklidir IOLoop bağlamında doğrudan aramak yerine, add_callback kullanarak IOLoop içinde çalıştırmak için 10. toro.Queue.put, bir coroutine olduğu için bir koroutine dönüştürülmelidir
  2. . sadece bir öğe işlenmesini ve sonra çıkarken yerine, sonsuza kadar çalışabilir, böylece

Ben de watch_queue bir while True döngü ekledi.

+0

Bu tam olarak ihtiyacım olan şey. Bana nasıl uygulanacağını gösterdiğiniz için teşekkürler. – Scott

+0

Dano - Kuyruğu izlemeyi nasıl durdurabilirim? Bağlantım kötüleştiğinde, sıradaki öğeleri işlemeyi geçici olarak durdurmam gerekecek, ancak bunları kaybetmeyeceğim. – Scott

+1

toro, kasırgaya birleştirildi ve şimdi kullanımdan kaldırıldı. Tornado> = 4.2 için 'tornado.queues.Queue' –