1

Merhaba teneke olayları tutmak için teneke ve kanal olarak Kafka kullanarak cloudera flume agent kullanıyorum. Kafka kanal etkinliğinde yazılan etkinliğin yapısı (başlıklar Json Tweets) şu şekildedir: ´ :::: ´, M gibi ayırıcıların takip ettiği başlıklar ve tweetin jsonu aşağıdaki gibidir: , bir flume olay nesnesidir. yapısı:Spark scala'daki kafka kanalından okuma olaylarını okumak?

event(headers, jsonTweet) 

nesne aşağıdaki gibidir:

aşağıdaki
::::�M{"filter_level":"low","retweeted":false,"in_reply_to_screen_name":null,"possibly_sensitive":false,"truncated":false,"lang":"en","in_reply_to_status_id_str":null,"id":715916168905633792,"in_reply_to_user_id_str":null,"timestamp_ms":"1459522690403","in_reply_to_status_id":null,"created_at":"Fri Apr 01 14:58:10 +0000 2016","favorite_count":0,"place":null,"coordinates":null,"text":"RT @frmikeschmitz: What I wish I had been able to say about #BvS \n\nThe Damage Done - A requiem for an American","contributors":null,"retweeted_status":{"filter_level":"low","retweeted":false,"in_reply_to_screen_name":null,"possibly_sensitive":false,"truncated":false,"lang":"en","in_reply_to_status_id_str":null,"id":715549110728847360,"in_reply_to_user_id_str":null,"in_reply_to_status_id":null,"created_at":"Thu Mar 31 14:39:36 +0000 2016","favorite_count":22,"place":null,"coordinates":null,"text":"What I wish I had been able to say about #BvS \n\nThe Damage Done - A requiem for an American icon. via @bmoviesd","contributors":null,"geo":null,"entities":{"symbols":[],"urls":[{"expanded_url":"/2016/03/30/superman-and-the-damage-done","indices":[98,121],"display_url":"birthmoviesdeath.com/2016/03/30/sup\u2026","url":""}],"hashtags":[{"text":"BvS","indices":[41,45]}],"user_mentions":[{"id":202668848,"name":"Birth.Movies.Death.","indices":[126,135],"screen_name":"bmoviesd","id_str":"202668848"}]},"is_quote_status":false,"source":"<a href=\"\" rel=\"nofollow\">Twitter for iPhone<\/a>","favorited":false,"in_reply_to_user_id":null,"retweet_count":5,"id_str":"715549110728847360","user":{"location":null,"default_profile":true,"profile_background_tile":false,"statuses_count":3001,"lang":"en","profile_link_color":"0084B4","profile_banner_url":"","id":565216911,"following":null,"protected":false,"favourites_count":1858,"profile_text_color":"333333","verified":false,"description":null,"contributors_enabled":false,"profile_sidebar_border_color":"C0DEED","name":"fathermikeschmitz","profile_background_color":"C0DEED","created_at":"Sat Apr 28 03:49:18 +0000 2012","default_profile_image":false,"followers_count":17931,"profile_image_url_https":"","geo_enabled":false,"profile_background_image_url":"":"":null,"url":"","utc_offset":-14400,"time_zone":"Eastern Time (US & Canada)","notifications":null,"profile_use_background_image":true,"friends_count":81,"profile_sidebar_fill_color":"DDEEF6","screen_name":"frmikeschmitz","id_str":"565216911","profile_image_url":"","listed_count":138,"is_translator":false}},"geo":null,"entities":{"symbols":[],"urls":[{"expanded_url":"display_url":"birthmoviesdeath.com/2016/03/30/sup\u2026","url":""}],"hashtags":[{"text":"BvS","indices":[60,64]}],"user_mentions":[{"id":565216911,"name":"fathermikeschmitz","indices":[3,17],"screen_name":"frmikeschmitz","id_str":"565216911"},{"id":202668848,"name":"Birth.Movies.Death.","indices":[139,140],"screen_name":"bmoviesd","id_str":"202668848"}]},"is_quote_status":false,"source":"<a href=\"" rel=\"nofollow\">Twitter for iPhone<\/a>","favorited":false,"in_reply_to_user_id":null,"retweet_count":0,"id_str":"715916168905633792","user":{"location":null,"default_profile":true,"profile_background_tile":false,"statuses_count":25,"lang":"en","profile_link_color":"0084B4","id":2987773015,"following":null,"protected":false,"favourites_count":90,"profile_text_color":"333333","verified":false,"description":null,"contributors_enabled":false,"profile_sidebar_border_color":"C0DEED","name":"Pam Anderson","profile_background_color":"C0DEED","created_at":"Sun Jan 18 02:14:41 +0000 2015","default_profile_image":false,"followers_count":22,"profile_image_ur":"","geo_enabled":false,"profile_background_image_url":"","profile_background_image_url_httpd":"":null,"url":null,"utc_offset":null,"time_zone":null,"notifications":null,"profile_use_background_image":true,"friends_count":59,"profile_sidebar_fill_color":"DDEEF6","screen_name":"pamawah25","id_str":"2987773015","profile_image_url":"","listed_count":0,"is_translator":false}} 

i çalışıyorum kanal madde olarak:

agent1.sources.twitter-data.type = com.cloudera.flume.source.TwitterSource 
agent1.sources.twitter-data.consumerKey = "" 
agent1.sources.twitter-data.consumerSecret = "" 
agent1.sources.twitter-data.accessToken = "" 
agent1.sources.twitter-data.accessTokenSecret = "" 
agent1.sources.twitter-data.keywords = superman, batman, iron man, tutorials point,java, bigdata, mapreduce, mahout, hbase, nosql 

agent1.channels.kafka-channel.type = org.apache.flume.channel.kafka.KafkaChannel 
agent1.channels.kafka-channel.capacity = 10000 
agent1.channels.kafka-channel.transactionCapacity = 1000 
agent1.channels.kafka-channel.brokerList = kafka:9092 
agent1.channels.kafka-channel.topic = twitter 
agent1.channels.kafka-channel.zookeeperConnect = kafka:2181 
#agent1.channels.kafka-channel.parseAsFlumeEvent = false 

agent1.sinks.hdfs-sink.type = hdfs 
agent1.sinks.hdfs-sink.hdfs.path = hdfs:///user/Hadoop/twitter_data 
agent1.sinks.hdfs-sink.fileType = DataStream 
agent1.sinks.hdfs-sink.writeFormat = Text 
agent1.sinks.hdfs-sink.batchSize = 1000 
agent1.sinks.hdfs-sink.rollSize = 0 
agent1.sinks.hdfs-sink.rollCount = 10000 

agent1.sources.twitter-data.channels = kafka-channel 

burada r istiyorum Bu verileri kıvılcım akışı/kıvılcım Sql içinde işlemek ve saklamak için kullanın. aşağıdaki gibi

Yani ben kod kullanılır, ama bunun nedeni kanalet olayın yardımına dosent: Bir Kafka akışından bunları tüketen ediyorsanız

val ssc = new StreamingContext(sc, Seconds(2)) 
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)//new org.apache.spark.sql.SQLContext(sc) 

// Create direct kafka stream with brokers and topics 
val topicsSet = topics.split(",").toSet 
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) 
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
    ssc, kafkaParams, topicsSet) 

// Get the data (tweets) from kafka 
val tweets = messages.map(_._2) 




tweets.foreachRDD { rdd => 
    val jsonRDD = sqlContext.read.json(rdd) 
    val tweetTable = jsonRDD.toDF() 
    tweetTable.printSchema() 
    tweetTable.show(5) 
    tweetTable.write.mode("append").saveAsTable("twitterStream") 

} 
+0

"map (e =>" Olay: başlık: "+ e.event.get (0) .toString +" body: "+ yeni String (e.event.getBody.array)) gibi bir şey var. kafka kanalıyla nasıl kullanılacağını anlamaya çalışan başka bir yazıdan "yazdır" –

cevap

0

, el aracılığıyla değerini ayrıştırmak gerekir ayırıcı:

val tweets = messages.map { case (_, tweet) => { 
    val splitTweet = tweet.split("?M") 
    (splitTweet(0), splitTweet(1)) 
    } 
} 

Bu başlığın birinci değeri olarak birleştirilmiş başlık verecektir ve ikinci değer tweet'inizi temsil JSON içerir.

+0

teşekkürler, ben aynı şeyi düşünürdüm ama sorun tweet.split ("? M") her zaman statik değil, bu karakter olmayan karakter nesneleri veya ikili örneğin * ((az) | (AZ) * bir reg expr deneyeceğim –

+0

@Mouzzam'ı bölmek için yapabilirsem, sorunuzda belirtmelisiniz .. BTW - Flume alıcılarına özgü Spark'i kullanmak sizin için ayrışmayı kesinlikle kolaylaştıracaktır. [this] (http: //spark.apache. org/docs/latest/streaming-flume-integration.html) –

+0

Bunu öğrenmeye çalışıyorum, şimdi meydan okuma kafka konusundaki veriyi twitter olarak adlandırdım, kafka'yı flume kanalı ve bahsettiğim kanal direği olarak kullanıyorum linkte bir kanalın akışı olur ... tweets = messages.map (_._ 2) rdd'i flume akışına iletmek için kullanabilir miyim? –

İlgili konular