2016-10-24 25 views
6

Bir csv dosyası okuyorum. Bunu yapmak için Akka Streams kullanıyorum, böylece her satırda gerçekleştirilecek eylemlerin bir grafiğini oluşturabilirim. Aşağıdaki oyuncak örneğim var ve çalışıyor.Akka Streams kullanarak bir CSV dosyalarını okuma

def main(args: Array[String]): Unit = { 
    implicit val system = ActorSystem("MyAkkaSystem") 
    implicit val materializer = ActorMaterializer() 

     val source = akka.stream.scaladsl.Source.fromIterator(Source.fromFile("a.csv").getLines) 
     val sink = Sink.foreach(println) 
     source.runWith(sink) 
     } 

İki Source türü benimle kolay oturmuyor. Bu deyimsel mi yoksa bunu yazmanın daha iyi bir yolu var mı?

cevap

2

Evet, sorun değil, çünkü bunlar farklı Source s. Kendini dosyasını okuyabilir scala.io.Source Sevmediğiniz Ama eğer (bazen örneğin kaynağını yapmak zorunda csv dosya sıkıştırılmış olan) ve daha sonra birlikte Apache Commons CSV kullanmayı düşünün söyledikten bu

StreamConverters.fromInputStream(() => input) 
    .via(Framing.delimiter(ByteString("\n"), 4096)) 
    .map(_.utf8String) 
    .collect { line => 
    line 
    } 

gibi InputStream verilen kullanarak ayrıştırmak akka akışı. Daha az kod yazabilirsiniz :)

+0

Merhaba, bu örneği birkaç dakika çalıştırmaya çalışıyordum, başaramadım (örtük ve hepsi). Eksik ithalat ve kurulum sağlamak için lütfen size sorabilir miyim? –

10

Aslında, bir dosyadan doğrudan okumak için akka-streams bir işlev sağlar.

FileIO.fromPath(Paths.get("a.csv")) 
     .via(Framing.delimiter(ByteString("\n"), 256, true).map(_.utf8String)) 
     .runForeach(println) 

Burada, runForeach yöntemi, satırları yazdırmaktır. Bu satırları işlemek için uygun bir Sink varsa, bu işlev yerine onu kullanın. Eğer içinde kelimelerin toplam sayısı ' tarafından çizgileri bölmek ve yazdırmak istiyorsanız Örneğin,:

val sink: Sink[String] = Sink.foreach(x => println(x.split(",").size)) 

FileIO.fromPath(Paths.get("a.csv")) 
     .via(Framing.delimiter(ByteString("\n"), 256, true).map(_.utf8String)) 
     .to(sink) 
     .run() 
+0

Evet, geçen gün buna rastladım. Muhtemelen kullanacağım. PureCSV kütüphanesini kullanmayı denedim, ancak akışa dayalı bir yaklaşım kullanmanın amacını yitiren, işlemeden önce tüm dosyayı belleğe yükler. –

4

CSV Akka Akışları dosyasını okumak için deyimsel yolu Alpakka CSV connector kullanmaktır. Aşağıdaki örnek ve ByteString değerleri (ilk dosyasında satır olduğu varsayılır) sütun adlarının bir haritaya dönüştürür, CSV dosyasını okur String değerlerine ByteString değerlerini dönüştürür ve her satırı yazdırır:

FileIO.fromPath(Paths.get("a.csv")) 
    .via(CsvParsing.lineScanner()) 
    .via(CsvToMap.toMap()) 
    .map(_.mapValues(_.utf8String)) 
    .runForeach(println) 
İlgili konular