2016-04-01 20 views
3

Sorun Bildirimi: MySQL DB tablosunda belirli bir modül için kullanıcının tüm gelen istek parametrelerini satır olarak ekliyoruz (bu büyük bir veri). Şimdi, bu tablodan her kaydı okuyacak ve üçüncü taraf API'larını arayarak kullanıcının bu talebi hakkında daha fazla bilgi alacağımız ve daha sonra bu döndürülen meta bilgilerini başka bir tabloya koyacak bir süreç tasarlamak istiyoruz.Scala + Slick + MySQL + Akka + Aktarımı Kullanırken Karşılaşılan Sorun Sorun Giderme

Güncel Girişimleri:

Ben Scala + Kaygan kullanıyorum bunu yapmak için. Okunacak veriler çok büyük olduğundan, bu tabloyu bir defada bir satır okumak ve işlemek istiyorum. Bu fonksiyon: Ben ancak 'java.util.concurrent.RejectedExecutionException'

denedim kaba mantığı aşağıdadır,

implicit val system = ActorSystem("Example") 
import system.dispatcher 
implicit val materializer = ActorMaterializer() 

val future = db.stream(SomeQuery.result) 
Source.fromPublisher(future).map(row => { 
     id = dataEnrichmentAPI.process(row) 

}).runForeach(id => println("Processed row : "+ id)) 

dataEnrichmentAPI.process alıyorum, kaygan + akka akışları kullanarak çalıştı üçüncü taraf bir REST çağrısı yapar ve ayrıca gerekli verileri almak için bazı DB sorguları yapar. Bu DB sorgu 'db.run' yöntemi kullanılarak yapılır ve

örneğin

def process(row: RequestRecord): Int = { 
    // SomeQuery2 = Check if data is already there in DB 
    val retId: Seq[Int] = Await.result(db.run(SomeQuery2.result), Duration.Inf) 
    if(retId.isEmpty){ 
     val metaData = RestCall() 
     // SomeQuery3 = Store this metaData in DB 
     Await.result(db.run(SomeQuery3.result), Duration.Inf) 
     return metaData.id;  
    }else{ 
     // SomeQuery4 = Get meta data id 
     return Await.result(db.run(SomeQuery4.result), Duration.Inf)  
    } 
} 

Ben DB için engelleme çağrısı kullanıyorum bu istisnayı alıyorum (bekliyor Kullanarak) bitene kadar o da bekler. Devam etmek için daha sonra akış için dönüş değeri gerektiğinden, bundan kurtulabileceğimi düşünmüyorum.

Bu 'İstisnayi engelliyor', bu Özel Durumun arkasındaki nedendir? Bu tür bir sorunu çözmek için en iyi uygulama nedir?

Teşekkürler.

+0

3.0 kaygan kutudan Akka Akışları için destek yok mu? – Martijn

+0

evet, işte bu yüzden, kaykay – aks

+0

tarafından verilen akış işlemcisini tüketmek için akka kütüphanesinin Source.fromPublisher yöntemini kullanmamın nedeni de bunu neden işlem yönteminde yapmıyorsunuz? Sadece yayınları üzerinde sadece flapmap – Martijn

cevap

4

Bu senin problemin (çok az ayrıntı) olup olmadığını bilmiyorum ama asla engellememelisin.

En iyi uygulamalardan söz ederken, async stages yerine. Açıkça ve deyimsel karşı basıncı ve paralelliği ele alınacaktır

def process(row: RequestRecord): Future[Int] = { 
    db.run(SomeQuery2.result) flatMap { 
     case retId if retId.isEmpty => 
     // what is this? is it a sync call? if it's a rest call it should return a future 
     val metaData = RestCall() 
     db.run(SomeQuery3.result).map(_ => metaData.id) 

     case _ => db.run(SomeQuery4.result) 
    } 
} 


Source.fromPublisher(db.stream(SomeQuery.result)) 
    // choose your own parallelism 
    .mapAsync(2)(dataEnrichmentAPI.process) 
    .runForeach(id => println("Processed row : "+ id)) 

Bu şekilde: Bu aşağı yukarı kodunuzu Await.result kullanmadan nasıl görüneceğini olduğunu.

deneyin aslaçağrısı üretim kodunda Await.result ve sadece compose futures using map, flatMap and for comprehensions

+0

Yorumunuz için teşekkürler. Bu işlem yöntemi aşağıdaki adımları içerir: def işlemi (row: RequestRecord): Int = { ... Bir şey ... Await.result (db.run (SomeQuery2.result), Duration.Inf) ... } – aks

+0

@aks harita/flatmap (veya kavrayışlar için) kullanıyor ve geleceği geri döndürüyor, üretim kodunda asla "Await.result" kullanmayın –

+0

Güncelleme yöntemini güncelledim. Görebildiğiniz gibi, işlem fonksiyonunda çok fazla bağımlılık olması gereken talimatlar gerektirir. Bu nedenle, bu engelleme çağrısını kullanıyorum. AsyncCall kullanarak bundan nasıl kurtulacağınız hakkında bir fikriniz var. – aks