2016-01-19 18 views
5

Ben Pyspark ile ELK entegre etmiş.Pyspark'dan Elasticsearch'te veri nasıl yazılır?

rdd.saveAsTextFile("/tmp/ELKdata") 
logData = sc.textFile('/tmp/ELKdata/*') 
errors = logData.filter(lambda line: "raw1-VirtualBox" in line) 
errors.count() 

i got değeri i çıkışını

(u'AVI0UK0KZsowGuTwoQnN', {var 35

errors.first() 

yerel dosya sisteminde ELK veri olarak RDD kurtardı u 'konak': u'raw1-VirtualBox 'u'ident ': u'NetworkManager', u'pid ': u'748', u'message': u"(eth0): cihaz durumu değişikliği: ip-config - > fıkralar (sebep 'yok') [70 90 0]", u @ damgası ': u'2016-01-12T10: 59: 48 + 05: 30' })

i elde pyspark elastik arama veri yazmak için deneyin

hatalar

errors.saveAsNewAPIHadoopFile(
    path='-', 
    outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", 
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf= {"es.resource" : "logstash-2016.01.12/errors}) 

Huge java hataları

 

org.apache.spark.SparkException: RDD element of type java.lang.String cannot be used 
    at org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToPairRDD$1$$anonfun$apply$3.apply(SerDeUtil.scala:113) 
    at org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToPairRDD$1$$anonfun$apply$3.apply(SerDeUtil.scala:108) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:921) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:903) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) 
    at org.apache.spark.scheduler.Task.run(Task.scala:54) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
16/01/12 17:20:13 INFO TaskSetManager: Starting task 1.0 in stage 31.0 (TID 62, localhost, PROCESS_LOCAL, 1181 bytes) 
16/01/12 17:20:13 WARN TaskSetManager: Lost task 0.0 in stage 31.0 (TID 61, localhost): org.apache.spark.SparkException: RDD element of type java.lang.String cannot be used 
     org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToPairRDD$1$$anonfun$apply$3.apply(SerDeUtil.scala:113) 
     org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToPairRDD$1$$anonfun$apply$3.apply(SerDeUtil.scala:108) 
     scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
     scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
     org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:921) 
     org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:903) 
     org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) 
     org.apache.spark.scheduler.Task.run(Task.scala:54) 
     org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) 
     java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
     java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
     java.lang.Thread.run(Thread.java:745) 
16/01/12 17:20:13 ERROR TaskSetManager: Task 0 in stage 31.0 failed 1 times; aborting job 
16/01/12 17:20:13 INFO TaskSchedulerImpl: Cancelling stage 31 
16/01/12 17:20:13 INFO TaskSchedulerImpl: Stage 31 was cancelled 
16/01/12 17:20:13 INFO Executor: Executor is trying to kill task 1.0 in stage 31.0 (TID 62) 
16/01/12 17:20:13 INFO DAGScheduler: Failed to run saveAsNewAPIHadoopFile at PythonRDD.scala:665 
Traceback (most recent call last): 
    File "", line 6, in 
    File "/opt/spark/python/pyspark/rdd.py", line 1213, in saveAsNewAPIHadoopFile 
    keyConverter, valueConverter, jconf) 
    File "/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ 
    File "/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value 
py4j.protocol.Py4JJavaError16/01/12 17:20:13 INFO Executor: Running task 1.0 in stage 31.0 (TID 62) 
16/01/12 17:20:13 ERROR Executor: Exception in task 1.0 in stage 31.0 (TID 62) 
org.apache.spark.TaskKilledException 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:168) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
16/01/12 17:20:13 WARN TaskSetManager: Lost task 1.0 in stage 31.0 (TID 62, localhost): org.apache.spark.TaskKilledException: 
     org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:168) 
     java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
     java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
     java.lang.Thread.run(Thread.java:745) 
16/01/12 17:20:13 INFO TaskSchedulerImpl: Removed TaskSet 31.0, whose tasks have all completed, from pool 
: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 31.0 failed 1 times, most recent failure: Lost task 0.0 in stage 31.0 (TID 61, localhost): org.apache.spark.SparkException: RDD element of type java.lang.String cannot be used 
     org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToPairRDD$1$$anonfun$apply$3.apply(SerDeUtil.scala:113) 
     org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToPairRDD$1$$anonfun$apply$3.apply(SerDeUtil.scala:108) 
     scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
     scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
     org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:921) 
     org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:903) 
     org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) 
     org.apache.spark.scheduler.Task.run(Task.scala:54) 
     org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) 
     java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
     java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
     java.lang.Thread.run(Thread.java:745) 
Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391) 
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) 
    at akka.actor.ActorCell.invoke(ActorCell.scala:456) 
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) 
    at akka.dispatch.Mailbox.run(Mailbox.scala:219) 
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) 
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 


i elle veri yazabiliyorum eğer

errors = logData.filter(lambda line: "raw1-VirtualBox" in line) 
errors = errors.map(lambda item: ('AVI0UK0KZsowGuTwoQnP',{"host": "raw1-VirtualBox", 
    "ident": "NetworkManager", 
    "pid": "69", 
    "message": " sucess <info> (eth0): device state change: ip-config -> secondaries (reason 'none') [70 90 0]", 
    "@timestamp": "2016-01-12T10:59:48+05:30" 
    })) 

ancak filtrelenmiş verileri & yönetilen verileri elastik aramada yazmak istiyorum. Burada benzer bir sorun olan ve daha

+0

am:

Burada bulunabilir ek seçenekleri ekleyebilir.com/technologies/spark/load_and_transform_data) – pyspark

cevap

4

bunu çözmek başardı nasıl. İlk önce bir RDD kullanarak bir veri ağı kullanmıştım.

varmış bir dataframe şu anda kabul cevaba Benzer

from pyspark.sql import SQLContext 
df.write.format("org.elasticsearch.spark.sql").option("es.resource", "logstash-2016.01.12/errors").save() 
+1

IP adreslerini veya ES düğümlerinin dns'ini nerede veriyorsunuz? – Somar

0

yılında, bir RDD olarak veriyi yazmasına çalışırken, aynı durumda idi. Yukarıdaki cevap gerçekten çok yakın, ancak aynı zamanda yararlı olacak birçok yapılandırma seçeneği var. Düğümünüz için varsayılan localhost'u kullanmıyorsanız, bu yanıt çalışmaz.

Bir veri çerçevesi gitmenin, daha temiz, daha basit bir yoldur. Eğer pyspark kabuğunu kullanıyorsanız, kabuğu başlattığınızda, elasticsearch hadoop kavanozuna bir yol ekleyin. Eğer dataframe varsa

from pyspark.sql import SQLContext 

, sadece aşağıdakileri ihtiyaç artı ek olası:

$ pyspark2 --jars <pathtojar>/elasticsearch-hadoop-5.X.X.jar 

Mutlaka aşağıdaki satırı gerekmez: cli itibaren

kullanarak kabuk başlamak seçenekler:

df.write.format("org.elasticsearch.spark.sql") 
.option("es.resource", "<index/type>") 
.option("es.nodes", "<enter node address or name>").save() 

Belirttiğiniz dizin/tür zaten mevcut değil Elasticsearch'de oluşturulacak. //help.mortardata: Bu [link] (http aşağıdaki https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html