2016-09-15 28 views
6

Spark kümelenmesinin birden çok düğümünde Apache Spark kullanarak bir Windows toplu iş dosyasını çalıştırmam gereken bir gereksinim var.Bir Windows Batch Dosyasını Apache Spark içinde Piping ile Çalıştırma

Aynı şekilde, Apache Spark'ın Piping konseptini kullanarak da aynısını yapmak mümkün müdür?

Ubuntu makinesinde Spark içinde Piping kullanarak bir kabuk dosyası çalıştırmadan önce var. Aynı şeyi Aşağıdaki kodum iyi çalışır:

data = ["hi","hello","how","are","you"] 
distScript = "/home/aawasthi/echo.sh" 
distScriptName = "echo.sh" 
sc.addFile(distScript) 
RDDdata = sc.parallelize(data) 
print RDDdata.pipe(SparkFiles.get(distScriptName)).collect() 

Ben Bir Windows makine olan Spark bir Windows toplu iş dosyası yüklü (Hadoop'un 2.6 için 1.6 önceden oluşturulmuş) çalıştırmak için aynı kodu ayak uydurduk. Ama bana sc.addFile adımında hata veriyor. Kod aşağıdadır: Kıvılcım tarafından atılan

batchFile = "D:/spark-1.6.2-bin-hadoop2.6/data/OpenCV/runOpenCv" 
batchFileName = "runOpenCv" 
sc.addFile(batchFile) 

Hata aşağıdaki gibidir: toplu iş dosyası verilen bir konumda bulunuyorsa

Py4JJavaError        Traceback (most recent call last) 
<ipython-input-11-9e13c265cbae> in <module>() 
----> 1 sc.addFile(batchFile)` 

Py4JJavaError: An error occurred while calling o160.addFile. 
: java.io.FileNotFoundException: Added file D:/spark-1.6.2-bin-hadoop2.6/data/OpenCV/runOpenCv does not exist. 
    at org.apache.spark.SparkContext.addFile(SparkContext.scala:1364) 
    at org.apache.spark.SparkContext.addFile(SparkContext.scala:1340) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381) 
    at py4j.Gateway.invoke(Gateway.java:259) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:209) 
    at java.lang.Thread.run(Thread.java:745) 

rağmen.

GÜNCELLEME: Dosya yolunun başlangıç ​​içinde batchFile & batchFileName & file:/// yılında uzantısı olarak
Eklendi .bat. Modifiye kodudur:

from pyspark import SparkFiles 
from pyspark import SparkContext  
sc  
batchFile = "file:///D:/spark-1.6.2-bin-hadoop2.6/data/OpenCV/runOpenCv.bat" 
batchFileName = "runOpenCv.bat" 
sc.addFile(batchFile) 
RDDdata = sc.parallelize(["hi","hello"]) 
print SparkFiles.get("runOpenCv.bat") 
print RDDdata.pipe(SparkFiles.get(batchFileName)).collect() 

Şimdi addFile adımda hata vermek ve print SparkFiles.get("runOpenCv.bat") baskılar patikayı
C:\Users\abhilash.awasthi\AppData\Local\Temp\spark-c0f383b1-8365-4840-bd0f-e7eb46cc6794\userFiles-69051066-f18c-45dc-9610-59cbde0d77fe\runOpenCv.bat
Yani dosya eklenir etmez. Ama kodunun son adımında aşağıda hata atıyor:

Py4JJavaError        Traceback (most recent call last) 
<ipython-input-6-bf2b8aea3ef0> in <module>() 
----> 1 print RDDdata.pipe(SparkFiles.get(batchFileName)).collect() 

D:\spark-1.6.2-bin-hadoop2.6\python\pyspark\rdd.pyc in collect(self) 
    769   """ 
    770   with SCCallSiteSync(self.context) as css: 
--> 771    port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) 
    772   return list(_load_from_socket(port, self._jrdd_deserializer)) 
    773 

D:\spark-1.6.2-bin-hadoop2.6\python\lib\py4j-0.9-src.zip\py4j\java_gateway.py in __call__(self, *args) 
    811   answer = self.gateway_client.send_command(command) 
    812   return_value = get_return_value(
--> 813    answer, self.gateway_client, self.target_id, self.name) 
    814 
    815   for temp_arg in temp_args: 

D:\spark-1.6.2-bin-hadoop2.6\python\pyspark\sql\utils.pyc in deco(*a, **kw) 
    43  def deco(*a, **kw): 
    44   try: 
---> 45    return f(*a, **kw) 
    46   except py4j.protocol.Py4JJavaError as e: 
    47    s = e.java_exception.toString() 

D:\spark-1.6.2-bin-hadoop2.6\python\lib\py4j-0.9-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name) 
    306     raise Py4JJavaError(
    307      "An error occurred while calling {0}{1}{2}.\n". 
--> 308      format(target_id, ".", name), value) 
    309    else: 
    310     raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 1 times, most recent failure: Lost task 1.0 in stage 0.0 (TID 1, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "D:\spark-1.6.2-bin-hadoop2.6\python\lib\pyspark.zip\pyspark\worker.py", line 111, in main 
    File "D:\spark-1.6.2-bin-hadoop2.6\python\lib\pyspark.zip\pyspark\worker.py", line 106, in process 
    File "D:\spark-1.6.2-bin-hadoop2.6\python\pyspark\rdd.py", line 317, in func 
    return f(iterator) 
    File "D:\spark-1.6.2-bin-hadoop2.6\python\pyspark\rdd.py", line 715, in func 
    shlex.split(command), env=env, stdin=PIPE, stdout=PIPE) 
    File "C:\Anaconda2\lib\subprocess.py", line 710, in __init__ 
    errread, errwrite) 
    File "C:\Anaconda2\lib\subprocess.py", line 958, in _execute_child 
    startupinfo) 
WindowsError: [Error 2] The system cannot find the file specified 

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207) 
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125) 
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929) 
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
    at org.apache.spark.rdd.RDD.collect(RDD.scala:926) 
    at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:405) 
    at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381) 
    at py4j.Gateway.invoke(Gateway.java:259) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:209) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "D:\spark-1.6.2-bin-hadoop2.6\python\lib\pyspark.zip\pyspark\worker.py", line 111, in main 
    File "D:\spark-1.6.2-bin-hadoop2.6\python\lib\pyspark.zip\pyspark\worker.py", line 106, in process 
    File "D:\spark-1.6.2-bin-hadoop2.6\python\pyspark\rdd.py", line 317, in func 
    return f(iterator) 
    File "D:\spark-1.6.2-bin-hadoop2.6\python\pyspark\rdd.py", line 715, in func 
    shlex.split(command), env=env, stdin=PIPE, stdout=PIPE) 
    File "C:\Anaconda2\lib\subprocess.py", line 710, in __init__ 
    errread, errwrite) 
    File "C:\Anaconda2\lib\subprocess.py", line 958, in _execute_child 
    startupinfo) 
WindowsError: [Error 2] The system cannot find the file specified 

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207) 
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125) 
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    ... 1 more 
+2

Windows toplu iş dosyalarında '.cmd' veya' .bat' uzantılı. Eklemeye çalıştın mı? –

+0

@MCND Oh aptal benim ... İsim uzerinde orada olmalı. 'BatchFile' ve' batchFileName' içinde '.bat' ekledikten sonra, dosyada hata bulunmuyor. Ancak güncellenmiş cevapta gösterildiği gibi farklı hatalar alıyorum. –

+0

'Şema için Dosya Sistemi yok: D ', yani D:' gerektiği gibi ele alınmıyor, belki (bu aptalca bir şey varsa, toplu iş dosyaları hakkında bir şeyler biliyorum, ama java alanım değil) bir URI'ye ihtiyacınız var 'file: /// D:/...' gibi olması gerekiyor –

cevap

0

Ayrıca/

batchFile = "D://spark-1.6.2-bin-hadoop2.6//data//OpenCV//runOpenCv"

kaçmak Lütfen AA yukarıda önerildiği gibi, bu .cmd veya .bat uzantılı olabilir.

+0

kaçış karakteri \, yani '' ' –

İlgili konular