2015-07-21 15 views
8

Yeni akka.http kitaplığını nasıl kullanacağımı anlamaya çalışıyorum. Bir sunucuya bir http isteği göndermek ve Source[String,?] üretmek için tüm yanıt gövdesini tek bir String olarak okumak istiyorum. İşteTüm HttpResponse gövdesini Akka-Streams HTTP ile bir dize olarak alın

şimdiye kadar üretmek mümkün oldu en iyi çözümdür:

def get(
    modelID: String, 
    pool: Flow[(HttpRequest,Int),(Try[HttpResponse],Int),Http.HostConnectionPool] 
): Source[String,Unit] = { 
    val uri = reactionsURL(modelID) 
    val req = HttpRequest(uri = uri) 
    Source.single((req,0)) 
    .via(pool) 
    .map { 
     case (Success(resp),_) => 
     resp.entity.dataBytes.map(_.decodeString("utf-8")) 
    }.flatten(FlattenStrategy.concat) 
    .grouped(1024) 
    .map(_.mkString) 

O (eksik hata yolu hariç) iyi iş gibi görünüyor, ancak böyle basit görevler için biraz aksak. Daha akıllı bir çözüm var mı? grouped/mkString'dan nasıl yararlanabilirim?

cevap

11

Zaman aşımıyla HttpResponse toStrict yöntemini kullanabilirsiniz. Bütün cevabı Gelecek olarak toplar.

def toStrict (zaman aşımı: FiniteDuration) (örtük ec: ExecutionContext, fm: Cisimleştirici): Gelecek [Sıkı] sıkı bir varlık ile bu mesajın bir paylaşılabilir ve seri hale getirilebilir

kopyasını döndürür.

Örnek:

import akka.actor.ActorSystem 
import akka.http.scaladsl.Http 
import akka.http.scaladsl.model.{HttpResponse, HttpRequest} 
import akka.stream.{Materializer, ActorMaterializer} 
import akka.stream.scaladsl.{Sink, Flow, Source} 
import scala.concurrent.{ExecutionContext, Future} 
import scala.concurrent.duration._ 

import scala.util.{Try, Success} 

object Main extends App { 

    implicit val system = ActorSystem() 

    import system.dispatcher 

    implicit val materializer = ActorMaterializer() 

    val host = "127.0.0.1" 
    lazy val pool = Http().newHostConnectionPool[Int](host, 9000) 

    FlowBuilder.get("/path", pool).to(Sink.foreach(_.foreach(println))).run() 

} 

object FlowBuilder { 
    def get(modelID: String, pool: Flow[(HttpRequest, Int), (Try[HttpResponse], Int), Http.HostConnectionPool]) 
     (implicit ec: ExecutionContext, mat: Materializer): Source[Future[String], Unit] = { 
    val uri = modelID 
    val req = HttpRequest(uri = modelID) 
    Source.single((req, 0)).via(pool) 
     .map { 
     case (Success(resp), _) => resp.entity.toStrict(5 seconds).map(_.data.decodeString("UTF-8")) 
    } 
    } 
} 
+0

OK. Ama bir "Kaynak [String, Unit] 'e ihtiyacım var çünkü uygulamak için daha fazla dönüşümüm var. Çözümünüzle birlikte, Materializer ve ExecuctionContext boyunca geçip yeni bir Kaynakta tekrar bir şeyler yapmam gerekecek ... Bir şey mi özledim yoksa bir şey mi özledim? – paradigmatic

+0

@paradigmatic Örnek ekliyorum. ExecutionContext ve materializer örtülü argümanlar olarak ayarlanabilir. – Zernike

7

Ayrıca örneğin diğer türleri üzerinde çalışacak olan Unmarshall kullanabilirsiniz Sprey-json'dan json. Bu ayrıca strict olarak Future[_] döndürür.

Örnek:

authedReq.via(authServerReqResFlow).mapAsync(1) { case (tryRes, _) => 
     tryRes match { 
     case Failure(exception) => Future.failed[Principal](exception) 
     case Success(response @ HttpResponse(StatusCodes.OK,_,_,_)) => 
      val userContext = Unmarshal(response).to[UserContextData] 
      userContext.map { 
      case UserContextData(UserInfo(_, userName, fullName, email, title), _, _) => 
       Principal(userName, fullName, email, title) 
      } 
     case Success(response @ HttpResponse(responseCode,_,entity,_)) => 
      Unmarshal(entity).to[String].flatMap(msg => Future.failed(new AuthenticationFailure(s"$responseCode\n$msg"))) 
     } 
    } 
İlgili konular