2016-09-05 40 views
5

Çalıştığım bir java uygulaması için birçok Kafka belgesine göz attım. Java 8'de tanıtılan lambda sözdizimine girmeyi denedim, ama o zemin üzerinde biraz kabatasarım ve henüz kullandığım şeyin olması gerektiği konusunda kendimi çok emin hissetmiyorum.Yazdır Kafka Stream Konsolu kullanıma mı çıkıyorsunuz?

Kafka/Zookeeper Hizmeti'nin sorunsuz bir şekilde çalıştığından ve yapmak istediğim şey, girdiye göre yazılacak küçük bir örnek program yazmak, ancak çok fazla örnek olduğundan zaten.

Örnek verileri

This a sample string containing some keywords such as GPS, GEO and maybe a little bit of ACC. 

Soru

Ben 3 harfli kelime ayıklamak ve onları baskı yapabilmek istiyor:

örnek verilerin gelince aşağıdaki yapının bir dize elde edilecektir System.out.println ile. Girişi içeren bir string değişkenini nasıl alabilirim? Normal ifadeleri nasıl uygulayacağımı veya sadece anahtar kelimeleri almak için dizgeyi nasıl aradığını biliyorum.

Kod

public static void main(String[] args) { 
    Properties props = new Properties(); 
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app_id"); 
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "0:0:0:0:0:0:0:1:9092"); 
    props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "0:0:0:0:0:0:0:1:2181"); 
    props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); 
    props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); 

    final Serde<String> stringSerde = Serdes.String(); 

    KStreamBuilder builder = new KStreamBuilder(); 

    KStream<String, String> source = builder.stream(stringSerde, stringSerde, "in-stream"); 

    KafkaStreams streams = new KafkaStreams(builder, props); 
    streams.start(); 

    //How do I assign the input from in-stream to the following variable? 
    String variable = ? 
} 

Ben bu yüzden temelde aynı String durumlarda (yapımcı, tüketici ve akışın tüm görünmesini görmek istiyorum Yaupon Kafka, üretici ve tüketici hepsi aynı konuya bağladım çalışır hale geldikten).

cevap

11

Kafka Streams'i kullanırsanız, veri akışlarınıza işlevler/işleçler uygulamanız gerekir. Sizin durumunuzda, bir KStream nesnesi yaratırsınız, böylece bir operatöre source'u uygulamak istersiniz.

sen ne yapmak istediğinize bağlı olarak, birlikte çoklu kaydına bir işlevi uygulamak bağımsız akışında her kayıt için bir işlev uygulamak operatörleri (örneğin. map()) veya diğer operatörler (örn. aggregateByKey()) vardır. Eğer yukarıdaki örnekte göstermek olarak http://docs.confluent.io/3.0.0/streams/developer-guide.html#kafka-streams-dsl ve örnekler https://github.com/confluentinc/examples/tree/kafka-0.10.0.0-cp-3.0.0/kafka-streams

Böylece, Kafka akışları kullanarak yerel değişkenler oluşturmak asla değil, operatörler birbirine zincirlenmiş olsun/fonksiyonlar her şeyi gömmek: Sen belgelerinde içine bir göz atmalısınız. Eğer stdout'a tüm giriş kaydını yazdırmak istiyorsanız streams.start() ile başvuru yapılacaktır başlattıktan sonra

Örneğin, sen o olacak tüketici sizden kayıtlar giriş konu ve her biri için, Böylece

KStream<String, String> source = builder.stream(stringSerde, stringSerde, "in-stream"); 
source.foreach(new ForeachAction<String, String>() { 
    void apply(String key, String value) { 
     System.out.println(key + ": " + value); 
    } 
}); 

yapabilirdi konunuzun kaydı, stdout'taki kaydı basan apply(...) numaralı çağrı yapılır.

Tabii ki, konsola akışını baskı için daha doğal bir yol source.print() kullanmak olacaktır (içten temelde zaten verilmiş ForeachAction ile gösterilen foreach() operatörü ile aynıdır.) Ile Mesela

Dizgiyi yerel bir değişkene atayarak, kodunuzu apply(...)'a koymanız ve normal ifadelerinizi "3 harfli anahtar kelimeleri ayıklamak" için yapmanız gerekir. Bununla birlikte, bunu ifade etmenin en iyi yolu, flatMapValues() ve print() (yani, source.flatMapValues(...).print()) kombinasyonu ile olacaktır.Her giriş kaydı için flatMapValues() çağrılır (sizin durumunuzda, anahtarın null olacağını varsayarsınız). flatMapValue işlevinizde, normal ifadenizi uygularsınız ve her eşleşme için, eşleşmeyi sonunda döndüğünüz değerlerin listesine eklersiniz.

source.flatMapValues(new ValueMapper<String, Iterable<String>>() { 
    @Override 
    public Iterable<String> apply(String value) { 
     ArrayList<String> keywords = new ArrayList<String>(); 

     // apply regex to value and for each match add it to keywords 

     return keywords; 
    } 
} 

flatMapValues çıktısı her bulundu anahtar kelime (yani çıkış akımı tüm listeler ValueMapper#apply() da dönüş üzerinde "birlik" dır) için bir kayıt içeren, tekrar bir KStream olacaktır. Son olarak, sonucunuzu print() aracılığıyla konsola yazdırıyorsunuz. (Elbette, flatMapValue + print yerine bir foreach kullanabilirsiniz, ancak bu daha az modüler olabilir.)

+0

Vay. Harika cevap arkadaşı. Aradığım şey bu! – Zeliax

+0

YW. Foreach döngüsünün sonunda eksik :) –

+1

')' eksik. – asitm9

İlgili konular