2016-03-24 19 views
0

Flink'te RollingSink kullanılarak HDFS'ye seri hale getirilmiş vaka sınıfları yazmaya çalışıyorum. HDFS tarafından avro dosyaları deserialize edilebilir hale getirmek için FSDataOutputStream'ı saran DataFileWriter'i kullanıyorum. DataFileWriter ve FSDataOutputStream arasında, HDFS'de veri dosyasını kapatmak için senkronizasyon yapmaya çalıştığımda istisna atılıyor ve aslında diğer her dosyada veri alıyorum. Flink Writer uygulamasında Av akışı ile fs akışını senkronize etmenin bir yolu var mı?Flink [Scala] 'da RollingSink kullanılarak Avro ile seri hale getirilmiş nesnelerin yazılması [Scala]

DataFileWriter close() flush() sync() fsync() işlevini kullanmayı denedim ancak hepsi başarısız oldu. Senkronizasyon yöntemi en iyi performansı gösterir. Yazma yönteminde eşzamanlı çalışmayı denedim, ancak hala bir hata oluşturdu ve tüm verilerin dosyalara kaydedilip kaydedilmediğini doğrulayamadım. Yukarıdaki kod ile RollingSink çalıştırmak çalışıyor

class AvroWriter[OutputContainer <: org.apache.avro.specific.SpecificRecordBase] extends Writer[OutputContainer] { 

    val serialVersionUID = 1L 

    var outputStream: FSDataOutputStream = null 
    var outputWriter: DataFileWriter[OutputContainer] = null 

    override def open(outStream: FSDataOutputStream): Unit = { 
    if (outputStream != null) { 
     throw new IllegalStateException("AvroWriter has already been opened.") 
    } 
    outputStream = outStream 

    if(outputWriter == null) { 
     val writer: DatumWriter[OutputContainer] = new SpecificDatumWriter[OutputContainer](OutputContainer.SCHEMA$) 
     outputWriter = new DataFileWriter[OutputContainer](writer) 
     outputWriter.create(OutputContainer.SCHEMA$, outStream) 
    } 
    } 

    override def flush(): Unit = {} 

    override def close(): Unit = { 
    if(outputWriter != null) { 
     outputWriter.sync() 
    } 
    outputStream = null 
    } 

    override def write(element: OutputContainer) = { 
    if (outputStream == null) { 
     throw new IllegalStateException("AvroWriter has not been opened.") 
    } 
    outputWriter.append(element) 
    } 

    override def duplicate(): AvroWriter[OutputContainer] = { 
    new AvroWriter[OutputContainer] 
    } 
} 

şu istisna verir:

java.lang.Exception: Could not forward element to next operator 
     at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:222) 
     at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.run(FlinkKafkaConsumer08.java:316) 
     at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78) 
     at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) 
     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224) 
     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) 
     at java.lang.Thread.run(Thread.java:744) 
Caused by: java.lang.RuntimeException: Could not forward element to next operator 
     at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) 
     at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) 
     at org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:158) 
     at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:664) 
Caused by: java.nio.channels.ClosedChannelException 
     at org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:1353) 
     at org.apache.hadoop.fs.FSOutputSummer.write(FSOutputSummer.java:98) 
     at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58) 
     at java.io.DataOutputStream.write(DataOutputStream.java:107) 
     at org.apache.avro.file.DataFileWriter$BufferedFileOutputStream$PositionFilter.write(DataFileWriter.java:446) 
     at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) 
     at java.io.BufferedOutputStream.write(BufferedOutputStream.java:121) 
     at org.apache.avro.io.BufferedBinaryEncoder$OutputStreamSink.innerWrite(BufferedBinaryEncoder.java:216) 
     at org.apache.avro.io.BufferedBinaryEncoder.writeFixed(BufferedBinaryEncoder.java:150) 
     at org.apache.avro.file.DataFileStream$DataBlock.writeBlockTo(DataFileStream.java:366) 
     at org.apache.avro.file.DataFileWriter.writeBlock(DataFileWriter.java:383) 
     at org.apache.avro.file.DataFileWriter.sync(DataFileWriter.java:401) 
     at pl.neptis.FlinkKafkaConsumer.utils.AvroWriter.close(AvroWriter.scala:36) 
     at org.apache.flink.streaming.connectors.fs.RollingSink.closeCurrentPartFile(RollingSink.java:476) 
     at org.apache.flink.streaming.connectors.fs.RollingSink.openNewPartFile(RollingSink.java:419) 
     at org.apache.flink.streaming.connectors.fs.RollingSink.invoke(RollingSink.java:373) 
     at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39) 
     at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) 
     ... 3 more 

cevap

0

Sonunda bir çözüm bulduk. Akış, RollingSink tarafından yönetildiğinden, yazarın uyguladığı bir sınıfta kapatılamaz. Öte yandan, DataFileWriter bir akışı sararsa ve bir dosya HDFS'ye dökülmelidir. Hile DataFileWriter kapatmak değil ama senkronize etmek ve sonra sadece null atayarak atmak (Scala ve fonksiyonel programlama dikkate alındığında çok deyimsel bir şekilde değil, ama, Flink Java geliştirildi). Yani bu basit hile benim sorunumu çözdü:

override def close(): Unit = { 
    if(outputWriter != null) { 
     outputWriter.sync() 
    } 
    outputWriter = null 
    outputStream = null 
    }