2014-12-17 22 views
11

Spark ve Mongo kullanıyorum. Ben aşağıdaki kodu kullanarak Mongo bağlanabiliyor duyuyorum: kodunun üzerindeKıvılcım kullanarak mongo nasıl sorgulanır?

val sc = new SparkContext("local", "Hello from scala") 

val config = new Configuration() 
config.set("mongo.input.uri", "mongodb://127.0.0.1:27017/dbName.collectionName") 
val mongoRDD = sc.newAPIHadoopRDD(config, classOf[com.mongodb.hadoop.MongoInputFormat], classOf[Object], classOf[BSONObject]) 

bana koleksiyonundan tüm belgeleri verir.

Şimdi bazı koşulları sorguda uygulamak istiyorum. Bunun için

Ben Bu bir seferde sadece tek bir koşulu aldı

config.set("mongo.input.query","{customerId: 'some mongo id'}") 

kullandı. 'Use'> 30

1) Bir koşulu eklemek istiyorum kıvılcım ve mongo kullanarak mongo sorgusuna (büyük ve küçük olanlar dahil olmak üzere) birden fazla koşulu nasıl ekleyebilirim?

Ayrıca scala kullanarak sorgu sonucu her bir belge üzerinde yinelemek istiyorum ??

2) Scala kullanarak sonuçta nasıl yineleyebilirim?

+0

burada bazı yan bayrak: Mongo için Hadoop biçimi açık bağlantıları tutan kaynak elleçleme sorunu var. Spark ile karıştırdığımızda patlayıcı bir kombinasyon oldu. * Önlemek * – maasg

+0

@ maasg Kıvılcım ile mongo bağlantısı için başka bir seçenek var mı? – Vishwas

cevap

10

Merhaba bunu deneyebilirsiniz:

1 Spark

https://github.com/Stratio/deep-spark/tree/develop

ile MongoDB içeren bir proje) bir git klon

2) derin kıvılcım içeri yapmak yok, daha sonra derin ana

3) mvn'yi yükleyin

Bu amaç ile 363.210

4) açık kıvılcım kabuk:

./spark-shell --jars YOUR_PATH/derin çekirdek-0.7.0-SNAPSHOT.jar, YOUR_PATH/derin commons-0.7.0-SNAPSHOT. kavanoz, YOUR_PATH/derin mongodb-0.7.0-SNAPSHOT.jar, YOUR_PATH/Mongo-java sürücüsü-2.12.4-sources.jar

gerçek yolu

5 ile "YOUR_PATH" üzerine yazmak hatırlamak) kıvılcım kabuğunda basit bir örnek yürütün:

import com.stratio.deep.mongodb.config.MongoDeepJobConfig 
import com.stratio.deep.mongodb.extractor.MongoNativeDBObjectExtractor 
import com.stratio.deep.core.context.DeepSparkContext 
import com.mongodb.DBObject 
import org.apache.spark.rdd.RDD 
import com.mongodb.QueryBuilder 
import com.mongodb.BasicDBObject 

val host = "localhost:27017" 


val database = "test" 

val inputCollection = "input"; 

val deepContext: DeepSparkContext = new DeepSparkContext(sc) 

val inputConfigEntity: MongoDeepJobConfig[DBObject] = new MongoDeepJobConfig[DBObject](classOf[DBObject]) 


val query: QueryBuilder = QueryBuilder.start(); 

query.and("number").greaterThan(27).lessThan(30); 


inputConfigEntity.host(host).database(database).collection(inputCollection).filterQuery(query).setExtractorImplClass(classOf[MongoNativeDBObjectExtractor]) 


val inputRDDEntity: RDD[DBObject] = deepContext.createRDD(inputConfigEntity) 

Bunun en iyi şey, bir QueryBui kullanabilirsiniz olmasıdır lder Nesne sorgularınızın

Ayrıca böyle bir DBObject iletebilirsiniz yapmak:

{ "number" : { "$gt" : 27 , "$lt" : 30}} 

Eğer yöntemini yourRDD.collect() kullanabilir yineleme istiyorsanız. Ayrıca, RDD.foreach'inizi de kullanabilirsiniz, ancak bir işlev sağlamalısınız.

Kavanozun kıvılcım eklenmesi için başka bir yol var. Sen spark-env.sh değiştirebilir ve sonunda bu çizgiyi koyabilirsiniz:

CONFDIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" 
for jar in $(ls $CONFDIR/../lib/*.jar); do 
    SPARK_CLASSPATH=$SPARK_CLASSPATH:${jar} 
done 

Eğer kütüphaneleri koymak lib klasörü içinde ve hepsi bu kadar.

Yasal Uyarı:

config.set("mongo.input.query","{customerId: 'some mongo id', usage: {'$gt': 30}") 

için: Ben şu anda sorguya koşullar eklemek amacıyla Stratio

+0

Bu proje kullanımdan kaldırıldı ve artık kullanılmıyor. Bu cevap kaldırılmalıdır. – rjurney

2

1) üzerinde çalışıyorum sadece 'mongo.input.query' ile sağlanan sözlükte ekleyebilirsiniz

http://docs.mongodb.org/manual/tutorial/query-documents/

http://docs.mongodb.org/getting-started/python/query/

: sorgular bakın nasıl çalışır daha iyi anlamak Sadece toplama yöntemi için bak, bu bağlantıdaki RDD yöntemi 'toplamak', fazla bilgi kıvılcım bir göz atmak isteyebilirsiniz sonucun üzerine iterating için

2): Evet yapabilirsiniz

http://spark.apache.org/docs/latest/api/scala/#org.apache.spark.rdd.RDD