2015-06-19 17 views
7

Özellikle taşıma/protokol API'sı kullanılarak başımı Python 3 asyncio modülünün çevresine getirmeye çalışıyorum. Bir yayın/abone kalıbı deseni oluşturmak ve istemcimi ve sunucumu oluşturmak için asyncio.Protocol sınıfını kullanmak istiyorum.Sırasıyla Asyncio persisent istemci protokolü sınıfı

Şu anda sunucuyu çalıştırıp çalıştırıyorum ve gelen istemci bağlantılarını dinliyorum. İstemci sunucuya bağlanabilir, bir mesaj gönderebilir ve cevabı alabilir.

TCP bağlantısını canlı tutmayı ve iletileri eklememi sağlayan bir kuyruğu sürdürmeyi istiyorum. Düşük seviyeli API (Transport/Protocols) kullanarak ancak bunu yapmak için bir yol bulmayı denedim, ancak sınırlı asyncio docs/example çevrimiçi hepsi yüksek seviyeli API'ye geçiyor gibi görünüyor - akışları kullanarak vs. Bunu nasıl uygulayacağına dair doğru yönde işaret et? İşte

sunucu kod:

#!/usr/bin/env python3 

import asyncio 
import json 


class SubscriberServerProtocol(asyncio.Protocol): 
    """ A Server Protocol listening for subscriber messages """ 

    def connection_made(self, transport): 
     """ Called when connection is initiated """ 

     self.peername = transport.get_extra_info('peername') 
     print('connection from {}'.format(self.peername)) 
     self.transport = transport 

    def data_received(self, data): 
     """ The protocol expects a json message containing 
     the following fields: 

      type:  subscribe/unsubscribe 
      channel: the name of the channel 

     Upon receiving a valid message the protocol registers 
     the client with the pubsub hub. When succesfully registered 
     we return the following json message: 

      type:   subscribe/unsubscribe/unknown 
      channel:  The channel the subscriber registered to 
      channel_count: the amount of channels registered 
     """ 

     # Receive a message and decode the json output 
     recv_message = json.loads(data.decode()) 

     # Check the message type and subscribe/unsubscribe 
     # to the channel. If the action was succesful inform 
     # the client. 
     if recv_message['type'] == 'subscribe': 
      print('Client {} subscribed to {}'.format(self.peername, 
                 recv_message['channel'])) 
      send_message = json.dumps({'type': 'subscribe', 
             'channel': recv_message['channel'], 
             'channel_count': 10}, 
             separators=(',', ':')) 
     elif recv_message['type'] == 'unsubscribe': 
      print('Client {} unsubscribed from {}' 
        .format(self.peername, recv_message['channel'])) 
      send_message = json.dumps({'type': 'unsubscribe', 
             'channel': recv_message['channel'], 
             'channel_count': 9}, 
             separators=(',', ':')) 
     else: 
      print('Invalid message type {}'.format(recv_message['type'])) 
      send_message = json.dumps({'type': 'unknown_type'}, 
             separators=(',', ':')) 

     print('Sending {!r}'.format(send_message)) 
     self.transport.write(send_message.encode()) 

    def eof_received(self): 
     """ an EOF has been received from the client. 

     This indicates the client has gracefully exited 
     the connection. Inform the pubsub hub that the 
     subscriber is gone 
     """ 
     print('Client {} closed connection'.format(self.peername)) 
     self.transport.close() 

    def connection_lost(self, exc): 
     """ A transport error or EOF is seen which 
     means the client is disconnected. 

     Inform the pubsub hub that the subscriber has 
     Disappeared 
     """ 
     if exc: 
      print('{} {}'.format(exc, self.peername)) 


loop = asyncio.get_event_loop() 

# Each client will create a new protocol instance 
coro = loop.create_server(SubscriberServerProtocol, '127.0.0.1', 10666) 
server = loop.run_until_complete(coro) 

# Serve requests until Ctrl+C 
print('Serving on {}'.format(server.sockets[0].getsockname())) 
try: 
    loop.run_forever() 
except KeyboardInterrupt: 
    pass 

# Close the server 
try: 
    server.close() 
    loop.until_complete(server.wait_closed()) 
    loop.close() 
except: 
    pass 

Ve burada istemci kod:

#!/usr/bin/env python3 

import asyncio 
import json 


class SubscriberClientProtocol(asyncio.Protocol): 
    def __init__(self, message, loop): 
     self.message = message 
     self.loop = loop 

    def connection_made(self, transport): 
     """ Upon connection send the message to the 
     server 

     A message has to have the following items: 
      type:  subscribe/unsubscribe 
      channel: the name of the channel 
     """ 
     transport.write(self.message.encode()) 
     print('Message sent: {!r}'.format(self.message)) 

    def data_received(self, data): 
     """ After sending a message we expect a reply 
     back from the server 

     The return message consist of three fields: 
      type:   subscribe/unsubscribe 
      channel:  the name of the channel 
      channel_count: the amount of channels subscribed to 
     """ 
     print('Message received: {!r}'.format(data.decode())) 

    def connection_lost(self, exc): 
     print('The server closed the connection') 
     print('Stop the event loop') 
     self.loop.stop() 

if __name__ == '__main__': 
    message = json.dumps({'type': 'subscribe', 'channel': 'sensor'}, 
         separators=(',', ':')) 

    loop = asyncio.get_event_loop() 
    coro = loop.create_connection(lambda: SubscriberClientProtocol(message, 
                    loop), 
            '127.0.0.1', 10666) 
    loop.run_until_complete(coro) 
    try: 
     loop.run_forever() 
    except KeyboardInterrupt: 
     print('Closing connection') 
    loop.close() 

cevap

9

Sunucunuz Yapmaya çalıştığınız şey için olduğu gibi gayet; Yazdığınız kodunuz TCP bağlantısını canlı tutuyor, sadece sürekli olarak yeni mesajları besleyecek sıhhi tesisatınız yok. Bunu yapmak için, istemci kodunu değiştirmeniz gerekir, böylece sadece connection_made geri arama yangınları yerine yapmak yerine, yeni mesajları istediğiniz zaman besleyebilirsiniz.

Bu, yeterince kolaydır; iletileri alabilen asyncio.Queue'a bir iç asyncio.Queue ekleriz ve daha sonra bu iletileri Queue'dan alan ve onları sunucuya gönderen sonsuz bir döngüde bir coroutine çalıştırırız. Son parça, create_connection çağrısından geri döndüğünüz ClientProtocol örneğini gerçekten depolamak ve daha sonra iletileri gönderen bir korotaya iletmektir.

import asyncio 
import json 

class SubscriberClientProtocol(asyncio.Protocol): 
    def __init__(self, loop): 
     self.transport = None 
     self.loop = loop 
     self.queue = asyncio.Queue() 
     self._ready = asyncio.Event() 
     asyncio.async(self._send_messages()) # Or asyncio.ensure_future if using 3.4.3+ 

    @asyncio.coroutine 
    def _send_messages(self): 
     """ Send messages to the server as they become available. """ 
     yield from self._ready.wait() 
     print("Ready!") 
     while True: 
      data = yield from self.queue.get() 
      self.transport.write(data.encode('utf-8')) 
      print('Message sent: {!r}'.format(message)) 

    def connection_made(self, transport): 
     """ Upon connection send the message to the 
     server 

     A message has to have the following items: 
      type:  subscribe/unsubscribe 
      channel: the name of the channel 
     """ 
     self.transport = transport 
     print("Connection made.") 
     self._ready.set() 

    @asyncio.coroutine 
    def send_message(self, data): 
     """ Feed a message to the sender coroutine. """ 
     yield from self.queue.put(data) 

    def data_received(self, data): 
     """ After sending a message we expect a reply 
     back from the server 

     The return message consist of three fields: 
      type:   subscribe/unsubscribe 
      channel:  the name of the channel 
      channel_count: the amount of channels subscribed to 
     """ 
     print('Message received: {!r}'.format(data.decode())) 

    def connection_lost(self, exc): 
     print('The server closed the connection') 
     print('Stop the event loop') 
     self.loop.stop() 

@asyncio.coroutine 
def feed_messages(protocol): 
    """ An example function that sends the same message repeatedly. """ 
    message = json.dumps({'type': 'subscribe', 'channel': 'sensor'}, 
         separators=(',', ':')) 
    while True: 
     yield from protocol.send_message(message) 
     yield from asyncio.sleep(1) 

if __name__ == '__main__': 
    message = json.dumps({'type': 'subscribe', 'channel': 'sensor'}, 
         separators=(',', ':')) 

    loop = asyncio.get_event_loop() 
    coro = loop.create_connection(lambda: SubscriberClientProtocol(loop), 
            '127.0.0.1', 10666) 
    _, proto = loop.run_until_complete(coro) 
    asyncio.async(feed_messages(proto)) # Or asyncio.ensure_future if using 3.4.3+ 
    try: 
     loop.run_forever() 
    except KeyboardInterrupt: 
     print('Closing connection') 
    loop.close() 
+0

Hızlı ve kapsamlı yanıt için teşekkürler @dano. Değişiklikleri koduma uyguladım ve bir çekicilik gibi çalışıyor! – thiezn

+0

@dano Çok teşekkür ederim. Bu tam olarak aradığım şeyi. – user24502