2014-07-13 39 views
10

Spark, Map koleksiyon türlerini dağıtıyor mu?Scala Spark içinde Dağıtılmış Harita

Yani bir HashMap [String, String] anahtar, değer çiftleri varsa, bu dağıtılmış bir Map koleksiyonu türüne dönüştürülebilir mi? Öğeye erişmek için "filtre" kullanabilirdim, ancak bunun Harita'nın yanı sıra gerçekleştirdiğinden şüphe duyuyorum.

+0

temelde aynı soru meraklı İşte http://stackoverflow.com/questions/24513440/creating-a-large-dictionary-in-pyspark/24513951#24513951 – aaronman

cevap

8

Ben bir cevap haline yorumlarımı düşünmemiştim bazı yeni bilgi bulundu beri. @maasg zaten standart lookup işlevini kapsamaktayım Dikkatli olmanız gerektiğini belirtmek isterim çünkü RDD'nin bölümleyicisi Yok ise, arama sadece bir filtre kullanır. Kıvılcımın üstündeki (K, V) deposuna göre, bu işlem devam ediyor gibi görünüyor, ancak kullanılabilir bir çekme talebi here yapıldı. İşte bir örnek kullanım.

import org.apache.spark.rdd.IndexedRDD 

// Create an RDD of key-value pairs with Long keys. 
val rdd = sc.parallelize((1 to 1000000).map(x => (x.toLong, 0))) 
// Construct an IndexedRDD from the pairs, hash-partitioning and indexing 
// the entries. 
val indexed = IndexedRDD(rdd).cache() 

// Perform a point update. 
val indexed2 = indexed.put(1234L, 10873).cache() 
// Perform a point lookup. Note that the original IndexedRDD remains 
// unmodified. 
indexed2.get(1234L) // => Some(10873) 
indexed.get(1234L) // => Some(0) 

// Efficiently join derived IndexedRDD with original. 
val indexed3 = indexed.innerJoin(indexed2) { (id, a, b) => b }.filter(_._2 != 0) 
indexed3.collect // => Array((1234L, 10873)) 

// Perform insertions and deletions. 
val indexed4 = indexed2.put(-100L, 111).delete(Array(998L, 999L)).cache() 
indexed2.get(-100L) // => None 
indexed4.get(-100L) // => Some(111) 
indexed2.get(999L) // => Some(0) 
indexed4.get(999L) // => None 

çekme isteği iyi aldı ve muhtemelen kıvılcım gelecekteki sürümlerinde dahil edilecektir gibi görünüyor, yüzden kendi kodunu bu çekme isteğini kullanmak muhtemelen güvenlidir. durumunda JIRA ticket Eğer

+0

güzel :) teşekkürler bahsetmeye değer. sadece merak ettim ama bundan nasıl haberdar oldun? Kıvılcım posta listesi? –

+0

@ blue-sky Posta listemdeyim ama aslında bunu anladığım yol [spark jira biletleri] [https://issues.apache.org/jira/browse/SPARK/?selectedTab=com.atlassian]. jira.jira-projeler-eklenti: sorunlar-panel) – aaronman

3

Hızlı yanıt: Kısmen.

Önce (k,v) çiftleri dizisi içine harita zorlayarak ancak bir haritanın tuşları kümesi olmalıdır kısıtlamayla gevşek böyle yaparak bir RDD[(A,B)] içine Map[A,B] dönüştürebilir. yani. Map yapısının semantiğini kaybedersiniz.

Pratik bir bakış açısıyla, kvRdd.lookup(element) kullanarak bir öğeyi karşılık gelen değere çözebilirsiniz ancak sonuç, daha önce açıklandığı gibi tek bir arama değeri olduğuna dair hiçbir garantiniz olmadığı sürece bir sıra olacaktır.

bir kıvılcım-kabuk örnek temizleyin şeyler yapmak:

val englishNumbers = Map(1 -> "one", 2 ->"two" , 3 -> "three") 
val englishNumbersRdd = sc.parallelize(englishNumbers.toSeq) 

englishNumbersRdd.lookup(1) 
res: Seq[String] = WrappedArray(one) 

val spanishNumbers = Map(1 -> "uno", 2 -> "dos", 3 -> "tres") 
val spanishNumbersRdd = sc.parallelize(spanishNumbers.toList) 

val bilingueNumbersRdd = englishNumbersRdd union spanishNumbersRdd 

bilingueNumbersRdd.lookup(1) 
res: Seq[String] = WrappedArray(one, uno) 
+1

nerede arama yöntemi belgelenmiştir nedir? Http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD adresinde mevcut görünmüyor. –

+0

Ah örtülü dönüşümün farkında değildim: http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions –

+0

Import org.apache. spark.SparkContext._' ve gitmek için iyi. – maasg