2016-03-28 36 views
0

Bir haritanın (kıvılcım) içinde aşağıda açıklanan şekilde (PySpark kullanarak) arama yapmaya çalışıyorum ve bir hatayla karşılaşıyorum.kıvılcım: aramayı kullanın haritada arayın

Bu, Spark'in yapmasına izin vermeyen bir şey mi?

>>> rdd1 = sc.parallelize([(1,'a'),(2,'b'),(3,'c'),(4,'d')]).sortByKey() 
>>> rdd2 = sc.parallelize([2,4]) 
>>> rdd = rdd2.map(lambda x: (x, rdd1.lookup(x))) 
>>> rdd.collect() 

Bunu yapmanın nedeni gerçek problemde üzerinde çalışıyorum, rdd1 büyük olmasıdır. Bu nedenle, collectAsMap gibi bir yöntemi kullanarak bir sözlüke dönüştürme gibi bir çözüm etkili değildir.

16/03/28 05:02:28 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
16/03/28 05:02:28 INFO DAGScheduler: Stage 1 (sortByKey at <stdin>:1) finished in 0.148 s 
16/03/28 05:02:28 INFO DAGScheduler: Job 1 finished: sortByKey at <stdin>:1, took 0.189587 s 
>>> rdd2 = sc.parallelize([2,4]) 
>>> rdd = rdd2.map(lambda x: (x, rdd1.lookup(x))) 
>>> rdd.collect() 
Traceback (most recent call last): 
    File "<stdin>", line 1, in <module> 
    File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py", line 676, in collect 
    bytesInJava = self._jrdd.collect().iterator() 
    File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py", line 2107, in _jrdd 
    pickled_command = ser.dumps(command) 
    File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/serializers.py", line 402, in dumps 
    return cloudpickle.dumps(obj, 2) 
    File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/cloudpickle.py", line 816, in dumps 
    cp.dump(obj) 
    File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/cloudpickle.py", line 133, in dump 
    return pickle.Pickler.dump(self, obj) 
    File "/usr/lib64/python2.6/pickle.py", line 224, in dump 
    self.save(obj) 
    File "/usr/lib64/python2.6/pickle.py", line 286, in save 
    f(self, obj) # Call unbound method with explicit self 
    File "/usr/lib64/python2.6/pickle.py", line 562, in save_tuple 
    save(element) 
    File "/usr/lib64/python2.6/pickle.py", line 286, in save 
    f(self, obj) # Call unbound method with explicit self 
    File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/cloudpickle.py", line 254, in save_function 
    self.save_function_tuple(obj, [themodule]) 
    File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/cloudpickle.py", line 304, in save_function_tuple 
    save((code, closure, base_globals)) 
    File "/usr/lib64/python2.6/pickle.py", line 286, in save 
    f(self, obj) # Call unbound method with explicit self 
    File "/usr/lib64/python2.6/pickle.py", line 548, in save_tuple 
    save(element) 
    File "/usr/lib64/python2.6/pickle.py", line 286, in save 
    f(self, obj) # Call unbound method with explicit self 
    File "/usr/lib64/python2.6/pickle.py", line 600, in save_list 
    self._batch_appends(iter(obj)) 
    File "/usr/lib64/python2.6/pickle.py", line 636, in _batch_appends 
    save(tmp[0]) 
    File "/usr/lib64/python2.6/pickle.py", line 286, in save 
    f(self, obj) # Call unbound method with explicit self 
    File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/cloudpickle.py", line 249, in save_function 
    self.save_function_tuple(obj, modList) 
    File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/cloudpickle.py", line 309, in save_function_tuple 
    save(f_globals) 
    File "/usr/lib64/python2.6/pickle.py", line 286, in save 
    f(self, obj) # Call unbound method with explicit self 
    File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/cloudpickle.py", line 174, in save_dict 
    pickle.Pickler.save_dict(self, obj) 
    File "/usr/lib64/python2.6/pickle.py", line 649, in save_dict 
    self._batch_setitems(obj.iteritems()) 
    File "/usr/lib64/python2.6/pickle.py", line 686, in _batch_setitems 
    save(v) 
    File "/usr/lib64/python2.6/pickle.py", line 331, in save 
    self.save_reduce(obj=obj, *rv) 
    File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/cloudpickle.py", line 650, in save_reduce 
    save(state) 
    File "/usr/lib64/python2.6/pickle.py", line 286, in save 
    f(self, obj) # Call unbound method with explicit self 
    File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/cloudpickle.py", line 174, in save_dict 
    pickle.Pickler.save_dict(self, obj) 
    File "/usr/lib64/python2.6/pickle.py", line 649, in save_dict 
    self._batch_setitems(obj.iteritems()) 
    File "/usr/lib64/python2.6/pickle.py", line 681, in _batch_setitems 
    save(v) 
    File "/usr/lib64/python2.6/pickle.py", line 306, in save 
    rv = reduce(self.proto) 
    File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ 
    File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 304, in get_return_value 
py4j.protocol.Py4JError: An error occurred while calling o51.__getnewargs__. Trace: 
py4j.Py4JException: Method __getnewargs__([]) does not exist 
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333) 
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342) 
    at py4j.Gateway.invoke(Gateway.java:252) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:207) 
    at java.lang.Thread.run(Thread.java:745) 


>>> 

cevap

1

Is this something that Spark just does not allow doing?

Evet vardır:

hem rdd1 ve rdd2

yüzden onları katılmadan da

Teşekkür

hata

son derece yavaş çok büyük. Spark, iç içe geçmiş eylemleri ve dönüştürmeleri desteklemiyor. Zaten join'u ve yerel değişkenleri kapladığınızdan beri, geriye kalan tek seçenek, aramalar için harici sistemi (örneğin veritabanı) kullanmaktır.

0

RDD olarak RDD.Such bir işlemde kullanılamaz: rdd1 ve rdd2:
iki RDD var. rdd1.map(......)
Ama bunu yapamaz:
Bunu yapabilirsin bazı karmaşık eylem ne yapacağını ne zaman rdd1.map(.....rdd2....)
Yani, bunlar katılmak/birlik deneyin.