2015-03-03 17 views
6

Bağlam: çıkışını ChunkedResponseStart(bytes), MessageChunk(bytes), MessageChunk(bytes), ..., ChunkedResponseEnd ile yazmaya çalışıyorum. Kafamı henüz scalaz-stream ve kelime dağarcığına sarmamıştım.scalaz-stream: "header" (ilk parçalar) ile diğerlerine farklı şekilde nasıl çekilir?

İlk n parçalarını farklı şekilde işleyebilen bir işlem nasıl yazılır?

ben bu (örnek olarak dizeleri) ile geldim:

bir deyimsel ve muhtemelen genel composable yolu headerChunkAndRest yazmaktır ne
val headerChunk = process1.chunk[String](5).map(_.reduce(_ + _)) 

val headerChunkAndRest: Process1[String, String] = 
    headerChunk.take(1) ++ process1.id 

io.linesR(Files.newInputStream(Paths.get("testdata/fahrenheit.txt"))) 
    .pipe(headerChunkAndRest) 
    .to(io.stdOutLines) 
    .run.run 

?

+0

Hattın geri kalanı işlenirken başlığın içeriğine erişmeniz gerekiyor mu? –

+1

Scodec akışları (ikili içeriği kodlamak ve kodlamak için Scalaz akışları) sorununuzu çözebilecek iyi bir örnektir: d1 ++ d2: d1'i çalıştırın ve tüm kodu çözülen değerleri yayar, daha sonra kalan girişte d2'yi çalıştırın ve değerlerini yayar. Örnek: decode.once (codecs.int32) ++ decode.advance (12), tek bir imzalı Int'ı çözer, ardından imleci 12 bit ile ilerletir. (https://github.com/scodec/scodec-stream#decoding) –

+0

@TravisBrown no, içeriği diğerlerinin işlenmesi için gerekli değildir. Benim durumumda, ilk yığın, geri kalanlar ile aynı türde verileri içermelidir (bir dosyanın parçaları), daha önce müşteriye gönderilecek daha küçük bir parçadır. –

cevap

4

Genel Hususlar

kuvvetle ihtiyaçlarınıza ayrıntılarına bağlı olarak bunu yapmanın birkaç yolu vardır.

  1. foldWithIndex Bu size bir sayı olarak yığın mevcut indeksini verir: Sen scalaz-akışlarının bir parçası olan aşağıdaki yardımcı yöntemleri kullanabilirsiniz. Bu dizine bağlı olarak ayırt edebilirsiniz.
  2. zipWithState Yöntemin bir çağrısından diğerine bir durum ekleyebilir ve hala başlıkları ayrıştırıyorsanız veya bir vücuda ulaşmış olup olmadığınızı takip etmek için bu durumu kullanabilirsiniz. Bir sonraki adımda, bu durumu üstbilgi ve gövdeyi işlemek için kullanabilirsiniz. repartition Tüm üstbilgileri ve tüm gövde öğelerini birlikte gruplandırmak için bunu kullanın. Daha sonra bunları bir sonraki adımda işleyebilirsiniz.
  3. zipWithNext Bu işlev, her zaman size geçerli öğeyle gruplandırılmış önceki öğeyi sunar. Bunu, başlıktan gövdeye geçtiğinizde ve buna göre tepki verdiğinizde algılamak için kullanabilirsiniz.

Muhtemelen gerçekten neye ihtiyacınız olduğunu düşünmelisiniz. Tam olarak sorunuz için, zipwithIndex ve map olacaktır. Ancak sorununuzu tekrar düşünürseniz muhtemelen repartition veya zipWithState ile sona ereceksiniz. Bir HTTP istemcisi, vücudun (HTTP, değil HTML) den HTTP başlık öğelerini ayıran:

Örnek kod

en basit bir örnek yapalım. Üstbilgide çerez gibi şeyler, vücutta bir görüntü veya HTTP kaynakları gibi gerçek "içerik" dir.

Basit HTTP istemcisi bu gibi görünebilir:

import scalaz.stream._ 
import scalaz.concurrent.Task 
import java.net.InetSocketAddress 
import java.nio.channels.AsynchronousChannelGroup 

implicit val AG = nio.DefaultAsynchronousChannelGroup 

def httpGetRequest(hostname : String, path : String = "/"): Process[Nothing, String] = 
    Process(
    s"GET $path HTTP/1.1", 
    s"Host: $hostname", 
    "Accept: */*", 
    "User-Agent: scalaz-stream" 
).intersperse("\n").append(Process("\n\n")) 

def simpleHttpClient(hostname : String, port : Int = 80, path : String = "/")(implicit AG: AsynchronousChannelGroup) : Process[Task, String] = 
    nio.connect(new InetSocketAddress(hostname, port)).flatMap(_.run(httpGetRequest(hostname, path).pipe(text.utf8Encode))).pipe(text.utf8Decode).pipe(text.lines()) 

Şimdi geri kalanından ayrı başlık hatları için bu kodu kullanabilirsiniz. HTTP'de, başlık satırlarda yapılandırılmıştır. Vücuttan boş bir çizgi ile ayrılır. Yani ilk, başlığında satır sayısını sayalım:

val demoHostName="scala-lang.org" // Hope they won't mind... 
simpleHttpClient(demoHostName).zipWithIndex.takeWhile(! _._1.isEmpty).runLast.run 
// res3: Option[(String, Int)] = Some((Content-Type: text/html,8)) 

Bunu koştu, başlıkta 8 satır vardı.

object HttpResponsePart { 
    sealed trait EnumVal 
    case object HeaderLine extends EnumVal 
    case object HeaderBodySeparator extends EnumVal 
    case object Body extends EnumVal 
    val httpResponseParts = Seq(HeaderLine, HeaderBodySeparator, Body) 
} 

Sonra yanıtına parçalarını sınıflandırmak zipWithIndex artı map kullanalım: ilk tepki bölümlerini bir numaralandırma tanımlamak, böylece sınıflandırmak Let Benim için

simpleHttpClient(demoHostName).zipWithIndex.map{ 
    case (line, idx) if idx < 9 => (line, HeaderLine) 
    case (line, idx) if idx == 10 => (line, HeaderBodySeparator) 
    case (line, _) => (line, Body) 
}.take(15).runLog.run 

, bu iyi çalışır. Ama elbette, başlık satırlarının miktarı haber verilmeksizin herhangi bir zamanda değişebilir. Yanıtın yapısını dikkate alan çok basit bir ayrıştırıcı kullanmak çok daha sağlamdır. Bunun için ben zipWithState kullanın: görebilirsiniz

simpleHttpClient(demoHostName).zipWithState(HeaderLine : EnumVal){ 
    case (line, HeaderLine) if line.isEmpty => HeaderBodySeparator 
    case (_, HeaderLine) => HeaderLine 
    case (_, HeaderBodySeparator) => Body 
    case (line, Body) => Body 
}.take(15).runLog.run 

, her iki yaklaşım da benzer yapıyı kullanmak ve her iki yaklaşım da aynı sonucu gerektiğini söyledi. Güzel olan, her iki yaklaşımın da kolayca yeniden kullanılabilir. Kaynağı değiştirebilirsin, ör. bir dosya ile ve hiçbir şey değiştirmek zorunda değilsiniz. Sınıflandırma sonrası işlemle aynı. .take(15).runLog.run, her iki yaklaşımda da aynıdır.

İlgili konular