2016-03-19 17 views
0

kullanarak bir Tweet akışı elde edemiyorum Twitter Streaming API kullanarak Tweet'lerde bazı analizler yapmaya çalışıyorum.Twitter Akışı API'sını Spark

İlk olarak Durum mesajlarını akıştan yazdırmak ve oradan başlamak istedim.

Kodum aşağıda gösterilmiştir:

public static void main(String[] args) { 
    SparkConf conf = new SparkConf().setAppName("TwitterStreamPrinter").setMaster("local"); 

    Configuration twitterConf = new ConfigurationBuilder() 
     .setOAuthConsumerKey(consumerKey) 
     .setOAuthConsumerSecret(consumerSecret) 
     .setOAuthAccessToken(accessToken) 
     .setOAuthAccessTokenSecret(accessTokenSecret).build(); 
    OAuth2Authorization auth = new OAuth2Authorization(twitterConf); 
    JavaReceiverInputDStream<Status> twitterStream = TwitterUtils.createStream(ssc, auth); 

    JavaDStream<String> statuses = twitterStream.map(new Function<Status, String>() { 
    public String call(Status status) throws Exception { 
     return status.getText(); 
    } 
    }); 
    statuses.print(); 

O Spark günlükleri dışında başka bir şey yazdırmak etmez. Başlangıçta bunun yetki nedeniyle olduğunu düşündüm, bu yüzden yetkilendirmeyi geçmek için her türlü farklı yolu denedim, ama belki de bu yetki değil.

Web'den bulabildiğim her örneğe baktım (çok fazla olmamasına rağmen) ve bu kod Twitter durumlarını almak için standart bir kod gibi görünüyor, ancak neden bir şey yazdırmıyor? Ayrıca System.out.println'u denedim, ama işe yaramadı.

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 
16/03/19 12:02:23 INFO SparkContext: Running Spark version 1.6.1 
16/03/19 12:02:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 
16/03/19 12:02:24 INFO SecurityManager: Changing view acls to: abcd 
16/03/19 12:02:24 INFO SecurityManager: Changing modify acls to: abcd 
16/03/19 12:02:24 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(abcd); users with modify permissions: Set(abcd) 
16/03/19 12:02:24 INFO Utils: Successfully started service 'sparkDriver' on port 50995. 
16/03/19 12:02:24 INFO Slf4jLogger: Slf4jLogger started 
16/03/19 12:02:25 INFO Remoting: Starting remoting 
16/03/19 12:02:25 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:51003] 
16/03/19 12:02:25 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 51003. 
16/03/19 12:02:25 INFO SparkEnv: Registering MapOutputTracker 
16/03/19 12:02:25 INFO SparkEnv: Registering BlockManagerMaster 
16/03/19 12:02:25 INFO DiskBlockManager: Created local directory at /private/var/folders/3b/wzflbsn146qgwdglbm_6ms3m0000hl/T/blockmgr-e3de07a6-0c62-47cf-9940-da18382c9241 
16/03/19 12:02:25 INFO MemoryStore: MemoryStore started with capacity 2.4 GB 
16/03/19 12:02:25 INFO SparkEnv: Registering OutputCommitCoordinator 
16/03/19 12:02:25 INFO Utils: Successfully started service 'SparkUI' on port 4040. 
16/03/19 12:02:25 INFO SparkUI: Started SparkUI at http://10.0.0.12:4040 
16/03/19 12:02:25 INFO Executor: Starting executor ID driver on host localhost 
16/03/19 12:02:25 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 51016. 
16/03/19 12:02:25 INFO NettyBlockTransferService: Server created on 51016 
16/03/19 12:02:25 INFO BlockManagerMaster: Trying to register BlockManager 
16/03/19 12:02:25 INFO BlockManagerMasterEndpoint: Registering block manager localhost:51016 with 2.4 GB RAM, BlockManagerId(driver, localhost, 51016) 
16/03/19 12:02:25 INFO BlockManagerMaster: Registered BlockManager 
16/03/19 12:02:25 WARN StreamingContext: spark.master should be set as local[n], n > 1 in local mode if you have receivers to get data, otherwise Spark jobs will not get resources to process the received data. 
16/03/19 12:02:26 INFO SparkContext: Invoking stop() from shutdown hook 
16/03/19 12:02:26 INFO SparkUI: Stopped Spark web UI at http://10.0.0.12:4040 
16/03/19 12:02:26 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 
16/03/19 12:02:26 INFO MemoryStore: MemoryStore cleared 
16/03/19 12:02:26 INFO BlockManager: BlockManager stopped 
16/03/19 12:02:26 INFO BlockManagerMaster: BlockManagerMaster stopped 
16/03/19 12:02:26 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 
16/03/19 12:02:26 INFO SparkContext: Successfully stopped SparkContext 
16/03/19 12:02:26 INFO ShutdownHookManager: Shutdown hook called 
16/03/19 12:02:26 INFO ShutdownHookManager: Deleting directory /private/var/folders/3b/..... 

cevap

1

Eğer günlüklerde her şey var:

6/03/19 12:02:25 StreamingContext WARN: spark.master yerel [n] olarak ayarlanması gerekir, n yerel modda ise> 1 Veri almak için alıcılarınız var, aksi halde Spark işlerinde alınan verileri işlemek için kaynak elde edilmeyecektir.

yüzden cevap ayarlanır usta

olmaya yerel [*] ek olarak

, başlamak için unuttum var?

jssc.start(); // Hesaplamaya başlama

jssc.awaitTermination();

+0

yerel olarak değiştiriliyor [*], WARN mesajını kaldırır, ancak yine de hiçbir şey yazdırmaz. – user2418202

+0

, böylece kod ve günlükleri güncelleyebilir misiniz? Ne kadar çekirdek var? –

+0

tam olarak bu WARN mesajı olmadan aynı günlükleri. Çıktı, bildiğim kadarıyla çekirdek sayısına bağlı olmamalıdır. – user2418202