2015-12-15 6 views
11

Akka HTTP 2.0-M2 kullanarak toplu veri yüklemesi için bir araç yazmaya çalışıyorum.Akka HTTP istemcisini birden çok (10k - 100k) istek için düzgün olarak nasıl çağırılır?

public class TestMaxRequests { 
    private static final class Router extends HttpApp { 
     @Override 
     public Route createRoute() { 
      return route(
        path("test").route(
          get(handleWith(ctx -> ctx.complete("OK"))) 
        ) 
      ); 
     } 
    } 


    public static void main(String[] args) { 
     ActorSystem actorSystem = ActorSystem.create(); 
     Materializer materializer = ActorMaterializer.create(actorSystem); 

     Router router = new Router(); 
     router.bindRoute("127.0.0.1", 8082, actorSystem); 

     LoggingAdapter log = Logging.getLogger(actorSystem, new Object()); 

     for (int i = 0; i < 100; i++) { 
      final int reqNum = i; 
      Http.get(actorSystem).singleRequest(HttpRequest.create().withUri("http://127.0.0.1:8082/test"), materializer) 
        .onComplete(new OnComplete<HttpResponse>() { 
         @Override 
         public void onComplete(Throwable failure, HttpResponse response) throws Throwable { 
          if (failure != null) { 
           log.error(failure, "Failed: {}", reqNum); 
          } else { 
           log.info("Success: {}, consuming stream...", reqNum); 
           response.entity().getDataBytes().runWith(Sink.ignore(), materializer); 
           log.info("Success: {}, consumed stream", reqNum); 
          } 
         } 
        }, actorSystem.dispatcher()); 
     } 
    } 
} 

Bu başarısız:: Ama burada ben bir sorun izole etmek çalıştı akka.stream.OverflowStrategy$Fail$BufferOverflowException: Exceeded configured max-open-requests value of [32] error.

bakan ve kulüpler de başarısız örnek kod

[2015-12-15 16:17:32,609] [ INFO] [] [] a.e.s.Slf4jLogger: Slf4jLogger started 
[2015-12-15 16:17:32,628] [ DEBUG] [main] [EventStream(akka://default)] a.e.EventStream: logger log1-Slf4jLogger started 
[2015-12-15 16:17:32,636] [ DEBUG] [main] [EventStream(akka://default)] a.e.EventStream: Default Loggers started 
[2015-12-15 16:17:33,531] [ DEBUG] [spatcher-3] [akka://default/system/IO-TCP/selectors/$a/0] a.i.TcpListener: Successfully bound to /127.0.0.1:8082 
[2015-12-15 16:17:33,624] [ DEBUG] [spatcher-7] [akka://default/user/PoolInterfaceActor-0] a.h.i.e.c.PoolInterfaceActor: (Re-)starting host connection pool to 127.0.0.1:8082 
[2015-12-15 16:17:33,736] [ DEBUG] [spatcher-8] [akka://default/user/SlotProcessor-0] a.h.i.e.c.PoolSlot$SlotProcessor: become unconnected, from subscriber pending 
[2015-12-15 16:17:33,748] [ DEBUG] [patcher-11] [akka://default/user/SlotProcessor-3] a.h.i.e.c.PoolSlot$SlotProcessor: become unconnected, from subscriber pending 
[2015-12-15 16:17:33,758] [ DEBUG] [spatcher-9] [akka://default/user/SlotProcessor-2] a.h.i.e.c.PoolSlot$SlotProcessor: become unconnected, from subscriber pending 
[2015-12-15 16:17:33,762] [ DEBUG] [spatcher-9] [akka://default/user/SlotProcessor-1] a.h.i.e.c.PoolSlot$SlotProcessor: become unconnected, from subscriber pending 
[2015-12-15 16:17:33,779] [ ERROR] [patcher-11] [Object(akka://default)] j.l.Object: Failed: 36 
akka.stream.OverflowStrategy$Fail$BufferOverflowException: Exceeded configured max-open-requests value of [32] 
    at akka.http.impl.engine.client.PoolInterfaceActor$$anonfun$receive$1.applyOrElse(PoolInterfaceActor.scala:120) ~[akka-http-core-experimental_2.11-2.0-M2.jar:na] 
    at akka.actor.Actor$class.aroundReceive(Actor.scala:480) ~[akka-actor_2.11-2.4.0.jar:na] 
    at akka.http.impl.engine.client.PoolInterfaceActor.akka$stream$actor$ActorSubscriber$$super$aroundReceive(PoolInterfaceActor.scala:48) ~[akka-http-core-experimental_2.11-2.0-M2.jar:na] 
    at akka.stream.actor.ActorSubscriber$class.aroundReceive(ActorSubscriber.scala:201) ~[akka-stream-experimental_2.11-2.0-M2.jar:na] 
    at akka.http.impl.engine.client.PoolInterfaceActor.akka$stream$actor$ActorPublisher$$super$aroundReceive(PoolInterfaceActor.scala:48) ~[akka-http-core-experimental_2.11-2.0-M2.jar:na] 
    at akka.stream.actor.ActorPublisher$class.aroundReceive(ActorPublisher.scala:309) ~[akka-stream-experimental_2.11-2.0-M2.jar:na] 
    at akka.http.impl.engine.client.PoolInterfaceActor.aroundReceive(PoolInterfaceActor.scala:48) ~[akka-http-core-experimental_2.11-2.0-M2.jar:na] 
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:525) [akka-actor_2.11-2.4.0.jar:na] 
    at akka.actor.ActorCell.invoke(ActorCell.scala:494) [akka-actor_2.11-2.4.0.jar:na] 
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) [akka-actor_2.11-2.4.0.jar:na] 
    at akka.dispatch.Mailbox.run(Mailbox.scala:224) [akka-actor_2.11-2.4.0.jar:na] 
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234) [akka-actor_2.11-2.4.0.jar:na] 
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [scala-library-2.11.7.jar:na] 
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [scala-library-2.11.7.jar:na] 
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [scala-library-2.11.7.jar:na] 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [scala-library-2.11.7.jar:na] 
[2015-12-15 16:17:33,780] [ ERROR] [patcher-20] [Object(akka://default)] j.l.Object: Failed: 48 

Bunun sanırım çünkü ben Çok fazla Futures yaratmaya ve hepsini aynı anda uygulamaya çalışıyorum. Ama Akka'nın geri dönüşü sağlaması gerekmiyor mu? Sanırım yanlış kullanıyorum. SuperPool yöntemlerini denedim ama hiçbir şey değişmedi, çünkü anladığım kadarıyla, Http.singleRequest'un içinde aynı havuz var. Ayrıca, döngüde Http.get()'u çağırmak yerine Http örneğini yeniden kullanmayı denedim, ancak yardımcı olmadı.

Bir grup istekte bulunmanın doğru yolu nedir? 10 000 - 100 000 arası talepleri yerine getirmeyi planlıyorum.

cevap

13

Akka kesinlikle geri tepmeyi sağlar, bundan faydalanmıyorsunuz. Birden çok istek göndermek yerine, tüm isteklerinizi göndermek için tek bir Flow kullanabilirsiniz. documentation Gönderen:

final Flow<HttpRequest, HttpResponse, Future<OutgoingConnection>> connectionFlow = 
    Http.get(actorSystem).outgoingConnection("127.0.0.1", 8082); 

Daha sonra HttpRequest nesneleri işlemek için bu Akışı kullanabilirsiniz:

HttpRequest req = HttpRequest.GET("/test") 

//imitates your for-loop example of 100 requests 
Source.from(() -> Collections.nCopies(100, req).iterator()) 
     .via(connectionFlow) 
     .runForeach(...) 
+0

Ben Source.from birden istekleri kullanarak bir kısmını cevapsız()! Teşekkürler!! – relgames

+0

@relgames Hoş Geldiniz. Mutlu hack! –

+0

@RamonJRomeroyVigil Önceden bir sürü istek oluşturamıyorumsa, geri tepme ile akışı nasıl kullanabilirim? Örneğin, bazı paginated API'larda döndürülen kimliklere dayanarak bir şey istediğimi varsayalım. Bu nedenle, önceki isteklerden gelen yanıtları işlerken akış kullanmak istiyorum. – expert

İlgili konular