2016-05-30 19 views
5

Ben bir Apache Kıvılcım küme ve RabbitMQ komisyoncu var ve mesajları tüketmek ve pyspark.streaming modülünü kullanarak bazı ölçümleri hesaplamak istiyorum.nasıl Pyspark Akış modülü kullanarak RabbitMQ tüketiciyi uygulamak?

sorun sadece this package buldum, ama Java ve Scala uygulanmaktadır. Bunun yanı sıra, herhangi bir örnek veya köprü uygulaması Python bulamadık.

Pika numaralı telefonu kullanan bir müşterim var ancak bu yükü StreamingContext numaralı telefonuma nasıl geçireceğimi bilmiyorum.

+0

Sadece bunu keşfettim [Pyspark] (http: //spark.apache.org/docs/latest/api/python/pyspark.streaming.html # module-pyspark.streaming.mqtt) ve [RabbitMQ] (https://www.rabbitmq.com/mqtt.html) her ikisi de ** MQTT protokolü konuşuyor * *. Bu bir çözüm olabilir, ancak bunlar bazı trade-off'lar ve sınırlamalar – jocerfranquiz

+2

RabbitMQ kümesindeki MQTT protokolünü kullanmak, kuyruk yapılandırmalarını değiştirmeyi ima eder. Benim için bu bir çözüm değil. Çözmenin bir yolunu buldum. Testlerimi tamamladıktan sonra bir çözüm göndereceğim – jocerfranquiz

+0

Hey, herhangi bir gelişme var mı? Şimdi aynı sorunla karşı karşıyayım. Benim durumumda MQTT kavram kanıtını bile kuramıyorum. – gwaramadze

cevap

2

Bu çözüm

  1. değiştirin Kendi RabbitMQ kimlik ve bağlantı parametreleri kullanmak için dosyayı örneğini indirin ve .py dosyası
  2. olarak kaydedin Akış Yeni Spark pika asynchronous consumer example ve socketTextStream yöntemini kullanır. Benim durumumda ben HOST ve PORT Akışı Spark için TCP bağlantısı tekabül yuva açmaya gerek if __name__ == '__main__': altında Consumer sınıf
  3. değiştirmek zorunda kaldı. Biz

    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: 
        s.bind((HOST, PORT)) 
        s.listen(1) 
        conn, addr = s.accept() 
        dispatcher = conn.sendall #assigning sendall to dispatcher variable 
    consumer = Consumer(dispatcher) 
    try: 
        consumer.run() 
    except Exception as e: 
        consumer.stop() 
        s.close() 
    
  4. Tüketicinin içindeki yöntemde on_message yılında dispatcher

    def __init__(self,dispatcher): 
        self._connection = None 
        self._channel = None 
        self._closing = False 
        self._consumer_tag = None 
        self._url = amqp_url 
        #new code 
        self._dispatcher = dispatcher 
    
  5. geçmek Consumer'da __init__ yöntemini değiştirin Consumer sınıfa geçmek bu bir değişkene soketinden yöntemi sendall kaydetmeniz gerekir biz AMQP mesajın body göndermek için self._dispatcher diyoruz

    def on_message(self, unused_channel, basic_deliver, properties, body): 
        self._channel.basic_ack(basic_deliver.delivery_tag) 
        try: 
        # we need an '\n' at the each row Spark socketTextStream 
        self._dispatcher(bytes(body.decode("utf-8")+'\n',"utf-8")) 
        except Exception as e: 
        raise 
    
  6. Spark içinde, HOST ve PORT numaralı TCP soketimize karşılık gelen ssc.socketTextStream(HOST, int(PORT))'u yerleştirin. senin Spark yerine farklı bir makinede tüketici çalıştırmak için

    • Dene: Kıvılcım ilk tüketici ve sonra Spark uygulama

Son söz bağlantısını

  • Çalıştır yönetecek makine

  • 10000 üzerinde herhangi bir bağlantı noktası Tamam olmalıdır. Linux Debian 7 ve 8 ve Ubuntu 14.04 ve 16.04
  • Pika sürüm 0.10.0
  • Python sürümü 3.5.2
  • Kıvılcım sürüm 1.6: çekirdek izin vermeyin bazı rasgele port
  • Platformu'nu açın. 1, 1.6.2 ve 2.0.0