Bir haritanın (kıvılcım) içinde aşağıda açıklanan şekilde (PySpark kullanarak) arama yapmaya çalışıyorum ve bir hatayla karşılaşıyorum.kıvılcım: aramayı kullanın haritada arayın
Bu, Spark'in yapmasına izin vermeyen bir şey mi?
>>> rdd1 = sc.parallelize([(1,'a'),(2,'b'),(3,'c'),(4,'d')]).sortByKey()
>>> rdd2 = sc.parallelize([2,4])
>>> rdd = rdd2.map(lambda x: (x, rdd1.lookup(x)))
>>> rdd.collect()
Bunu yapmanın nedeni gerçek problemde üzerinde çalışıyorum, rdd1 büyük olmasıdır. Bu nedenle, collectAsMap
gibi bir yöntemi kullanarak bir sözlüke dönüştürme gibi bir çözüm etkili değildir.
16/03/28 05:02:28 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
16/03/28 05:02:28 INFO DAGScheduler: Stage 1 (sortByKey at <stdin>:1) finished in 0.148 s
16/03/28 05:02:28 INFO DAGScheduler: Job 1 finished: sortByKey at <stdin>:1, took 0.189587 s
>>> rdd2 = sc.parallelize([2,4])
>>> rdd = rdd2.map(lambda x: (x, rdd1.lookup(x)))
>>> rdd.collect()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py", line 676, in collect
bytesInJava = self._jrdd.collect().iterator()
File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py", line 2107, in _jrdd
pickled_command = ser.dumps(command)
File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/serializers.py", line 402, in dumps
return cloudpickle.dumps(obj, 2)
File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/cloudpickle.py", line 816, in dumps
cp.dump(obj)
File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/cloudpickle.py", line 133, in dump
return pickle.Pickler.dump(self, obj)
File "/usr/lib64/python2.6/pickle.py", line 224, in dump
self.save(obj)
File "/usr/lib64/python2.6/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.6/pickle.py", line 562, in save_tuple
save(element)
File "/usr/lib64/python2.6/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/cloudpickle.py", line 254, in save_function
self.save_function_tuple(obj, [themodule])
File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/cloudpickle.py", line 304, in save_function_tuple
save((code, closure, base_globals))
File "/usr/lib64/python2.6/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.6/pickle.py", line 548, in save_tuple
save(element)
File "/usr/lib64/python2.6/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.6/pickle.py", line 600, in save_list
self._batch_appends(iter(obj))
File "/usr/lib64/python2.6/pickle.py", line 636, in _batch_appends
save(tmp[0])
File "/usr/lib64/python2.6/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/cloudpickle.py", line 249, in save_function
self.save_function_tuple(obj, modList)
File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/cloudpickle.py", line 309, in save_function_tuple
save(f_globals)
File "/usr/lib64/python2.6/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/cloudpickle.py", line 174, in save_dict
pickle.Pickler.save_dict(self, obj)
File "/usr/lib64/python2.6/pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib64/python2.6/pickle.py", line 686, in _batch_setitems
save(v)
File "/usr/lib64/python2.6/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/cloudpickle.py", line 650, in save_reduce
save(state)
File "/usr/lib64/python2.6/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/cloudpickle.py", line 174, in save_dict
pickle.Pickler.save_dict(self, obj)
File "/usr/lib64/python2.6/pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib64/python2.6/pickle.py", line 681, in _batch_setitems
save(v)
File "/usr/lib64/python2.6/pickle.py", line 306, in save
rv = reduce(self.proto)
File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 304, in get_return_value
py4j.protocol.Py4JError: An error occurred while calling o51.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342)
at py4j.Gateway.invoke(Gateway.java:252)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
>>>