2013-02-28 21 views
11

Redis ve Tornado'yu zaman uyumsuz olarak nasıl kullanabileceğimi bulmaya çalışıyorum. tornado-redis'u buldum, ancak kodda sadece yield eklemekten daha fazlasına ihtiyacım var. /wait bekleyen bir istek varken ben erişim / url alma ve ihtiyaçTornado ve Redis'i eşzamansız olarak nasıl kullanabilirim?

import redis 
import tornado.web 

class WaiterHandler(tornado.web.RequestHandler): 

    @tornado.web.asynchronous 
    def get(self): 
     client = redis.StrictRedis(port=6279) 
     pubsub = client.pubsub() 
     pubsub.subscribe('test_channel') 

     for item in pubsub.listen(): 
      if item['type'] == 'message': 
       print item['channel'] 
       print item['data'] 

     self.write(item['data']) 
     self.finish() 


class GetHandler(tornado.web.RequestHandler): 

    def get(self): 
     self.write("Hello world") 


application = tornado.web.Application([ 
    (r"/", GetHandler), 
    (r"/wait", WaiterHandler), 
]) 

if __name__ == '__main__': 
    application.listen(8888) 
    print 'running' 
    tornado.ioloop.IOLoop.instance().start() 

"Hello World" olsun:

Aşağıdaki kod var. Nasıl yapabilirim?

+1

Redis pub/sub, bir' web.RequestHandler' içinde kullanılmamalıdır .listen() '. Çalışan bir web kamerası örneği için http://tornadogists.org/532067/ adresine bakın. –

+0

Websocket iyi bir seçenektir, ancak uygulamamın websockets desteğine sahip olmayan tarayıcılarda çalışması gerekir. Uzun yoklama kullanıyorum. Bu bir 'uyumsuzluk' ihtiyacımın nedeni. –

+0

@HelieelsonSantos bu durumda, en iyi bahis, abone olunan kanal geçmişinin yerel durumunu (ayrı bir iş parçacığı tarafından beslenir) korumak ve ardından bu durumu hemen yanıt yazıp "alma" işlemini tamamlamaktır. Müşteri, farklı müşteriler için sürekliliği sağlamanıza olanak tanıyan son get endeksi veya son alma süresi vb. Gibi bazı kayıtları tutmalıdır. Zamanı aldığımda birkaç saat içinde bir örnekle bir cevap yazacağım. –

cevap

5

IO döngüsünü engelleyeceğinden, ana Kasırga içinde Redis pub/sub kullanmamalısınız. Uzun sorgulamayı web istemcilerinden ana iş parçacığından gerçekleştirebilirsiniz, ancak Redis'i dinlemek için ayrı bir iş parçacığı oluşturmanız gerekir. İletileri aldığınızda ana iş parçacığıyla iletişim kurmak için ioloop.add_callback() ve/veya bir threading.Queue kullanabilirsiniz.

1

Tamam, şimdi de ben olsun istekleri ile yapacağını nasıl benim örnek.

ilk yerel bir liste nesnesine yeni mesajları ekler basit bir dişli PubSub dinleyicidir:

iki ana bileşeni ekledi. Ayrıca, listeye liste erişimcileri ekledim, böylece dinleyici dizisinden düzenli bir listeden okuyormuşsunuz gibi okuyabilirsiniz. WebRequest ile ilgili olarak, sadece bir yerel liste nesnesinden veri okuyorsunuz demektir. Bu hemen döner ve geçerli isteğin tamamlanmasını veya gelecekteki isteklerin kabul edilmesini ve işlenmesini engeller.

class OpenChannel(threading.Thread): 
    def __init__(self, channel, host = None, port = None): 
     threading.Thread.__init__(self) 
     self.lock = threading.Lock() 
     self.redis = redis.StrictRedis(host = host or 'localhost', port = port or 6379) 
     self.pubsub = self.redis.pubsub() 
     self.pubsub.subscribe(channel) 

     self.output = [] 

    # lets implement basic getter methods on self.output, so you can access it like a regular list 
    def __getitem__(self, item): 
     with self.lock: 
      return self.output[item] 

    def __getslice__(self, start, stop = None, step = None): 
     with self.lock: 
      return self.output[start:stop:step] 

    def __str__(self): 
     with self.lock: 
      return self.output.__str__() 

    # thread loop 
    def run(self): 
     for message in self.pubsub.listen(): 
      with self.lock: 
       self.output.append(message['data']) 

    def stop(self): 
     self._Thread__stop() 

İkincisi ApplicationMixin sınıfıdır. Bu, web istek sınıfınıza sahip ikincil bir nesne, işlevsellik ve öznitelik eklemek için devralır. Bu durumda, istenen kanal için zaten bir kanal dinleyicinin var olup olmadığını kontrol eder, hiç bulunmazsa bir tane oluşturur ve dinleyici tutamacını WebRequest'e döndürür. uygulamanın oluşturulduktan sonra,

class ReadChannel(tornado.web.RequestHandler, ApplicationMixin): 
    @tornado.web.asynchronous 
    def get(self, channel): 
     # get the channel 
     channel = self.GetChannel(channel) 
     # write out its entire contents as a list 
     self.write('{}'.format(channel[:])) 
     self.finish() # not necessary? 

Son olarak (bir dize self.write vermek gerekir akılda tutarak) statik bir liste sanki

# add a method to the application that will return existing channels 
# or create non-existing ones and then return them 
class ApplicationMixin(object): 
    def GetChannel(self, channel, host = None, port = None): 
     if channel not in self.application.channels: 
      self.application.channels[channel] = OpenChannel(channel, host, port) 
      self.application.channels[channel].start() 
     return self.application.channels[channel] 

WebRequest sınıfı artık dinleyici davranır, ben eklendi bir özelliğin

# add a dictionary containing channels to your application 
application.channels = {} 

yanı sıra çalışan iş parçacığı bazı temizleme olarak boş bir sözlük, sana bir kez uygulamadan çıkın

# clean up the subscribed channels 
for channel in application.channels: 
    application.channels[channel].stop() 
    application.channels[channel].join() 

tam kodu: Python> = 3 için

import threading 
import redis 
import tornado.web 



class OpenChannel(threading.Thread): 
    def __init__(self, channel, host = None, port = None): 
     threading.Thread.__init__(self) 
     self.lock = threading.Lock() 
     self.redis = redis.StrictRedis(host = host or 'localhost', port = port or 6379) 
     self.pubsub = self.redis.pubsub() 
     self.pubsub.subscribe(channel) 

     self.output = [] 

    # lets implement basic getter methods on self.output, so you can access it like a regular list 
    def __getitem__(self, item): 
     with self.lock: 
      return self.output[item] 

    def __getslice__(self, start, stop = None, step = None): 
     with self.lock: 
      return self.output[start:stop:step] 

    def __str__(self): 
     with self.lock: 
      return self.output.__str__() 

    # thread loop 
    def run(self): 
     for message in self.pubsub.listen(): 
      with self.lock: 
       self.output.append(message['data']) 

    def stop(self): 
     self._Thread__stop() 


# add a method to the application that will return existing channels 
# or create non-existing ones and then return them 
class ApplicationMixin(object): 
    def GetChannel(self, channel, host = None, port = None): 
     if channel not in self.application.channels: 
      self.application.channels[channel] = OpenChannel(channel, host, port) 
      self.application.channels[channel].start() 
     return self.application.channels[channel] 

class ReadChannel(tornado.web.RequestHandler, ApplicationMixin): 
    @tornado.web.asynchronous 
    def get(self, channel): 
     # get the channel 
     channel = self.GetChannel(channel) 
     # write out its entire contents as a list 
     self.write('{}'.format(channel[:])) 
     self.finish() # not necessary? 


class GetHandler(tornado.web.RequestHandler): 

    def get(self): 
     self.write("Hello world") 


application = tornado.web.Application([ 
    (r"/", GetHandler), 
    (r"/channel/(?P<channel>\S+)", ReadChannel), 
]) 


# add a dictionary containing channels to your application 
application.channels = {} 


if __name__ == '__main__': 
    application.listen(8888) 
    print 'running' 
    try: 
     tornado.ioloop.IOLoop.instance().start() 
    except KeyboardInterrupt: 
     pass 

    # clean up the subscribed channels 
    for channel in application.channels: 
     application.channels[channel].stop() 
     application.channels[channel].join() 
+0

Listeyi kolayca bir engelleme veya engellemeyi engelleyen erişimi destekleyen başka bir nesneyle ve yalnızca önceki isteğinden itibaren gelen iletileri döndürerek değiştirebilirsiniz. Bununla birlikte, her bir istemci için bir kuyruğu tutmanız gerekir ve engellemeyi engellemeyi kullandığınızdan ve "Boş" istisnalarını doğru bir şekilde kullandığınızdan emin olun. –

2

.3, aioredis'u kullanmanızı tavsiye ederim. Ben Aşağıdaki kod test etmedi ama böyle bir şey olmalı:

import redis 
import tornado.web 
from tornado.web import RequestHandler 

import aioredis 
import asyncio 
from aioredis.pubsub import Receiver 


class WaiterHandler(tornado.web.RequestHandler): 

    @tornado.web.asynchronous 
    def get(self): 
     client = await aioredis.create_redis((host, 6279), encoding="utf-8", loop=IOLoop.instance().asyncio_loop) 

     ch = redis.channels['test_channel'] 
     result = None 
     while await ch.wait_message(): 
      item = await ch.get() 
      if item['type'] == 'message': 
       print item['channel'] 
       print item['data'] 
       result = item['data'] 

     self.write(result) 
     self.finish() 


class GetHandler(tornado.web.RequestHandler): 

    def get(self): 
     self.write("Hello world") 


application = tornado.web.Application([ 
    (r"/", GetHandler), 
    (r"/wait", WaiterHandler), 
]) 

if __name__ == '__main__': 
    print 'running' 
    tornado.ioloop.IOLoop.configure('tornado.platform.asyncio.AsyncIOLoop') 
    server = tornado.httpserver.HTTPServer(application) 
    server.bind(8888) 
    # zero means creating as many processes as there are cores. 
    server.start(0) 
    tornado.ioloop.IOLoop.instance().start() 
`PubSub üzerinde beklerken o ioloop engeller çünkü
İlgili konular