Flink'te RollingSink kullanılarak HDFS'ye seri hale getirilmiş vaka sınıfları yazmaya çalışıyorum. HDFS tarafından avro dosyaları deserialize edilebilir hale getirmek için FSDataOutputStream'ı saran DataFileWriter'i kullanıyorum. DataFileWriter ve FSDataOutputStream arasında, HDFS'de veri dosyasını kapatmak için senkronizasyon yapmaya çalıştığımda istisna atılıyor ve aslında diğer her dosyada veri alıyorum. Flink Writer uygulamasında Av akışı ile fs akışını senkronize etmenin bir yolu var mı?Flink [Scala] 'da RollingSink kullanılarak Avro ile seri hale getirilmiş nesnelerin yazılması [Scala]
DataFileWriter close() flush() sync() fsync() işlevini kullanmayı denedim ancak hepsi başarısız oldu. Senkronizasyon yöntemi en iyi performansı gösterir. Yazma yönteminde eşzamanlı çalışmayı denedim, ancak hala bir hata oluşturdu ve tüm verilerin dosyalara kaydedilip kaydedilmediğini doğrulayamadım. Yukarıdaki kod ile RollingSink çalıştırmak çalışıyor
class AvroWriter[OutputContainer <: org.apache.avro.specific.SpecificRecordBase] extends Writer[OutputContainer] {
val serialVersionUID = 1L
var outputStream: FSDataOutputStream = null
var outputWriter: DataFileWriter[OutputContainer] = null
override def open(outStream: FSDataOutputStream): Unit = {
if (outputStream != null) {
throw new IllegalStateException("AvroWriter has already been opened.")
}
outputStream = outStream
if(outputWriter == null) {
val writer: DatumWriter[OutputContainer] = new SpecificDatumWriter[OutputContainer](OutputContainer.SCHEMA$)
outputWriter = new DataFileWriter[OutputContainer](writer)
outputWriter.create(OutputContainer.SCHEMA$, outStream)
}
}
override def flush(): Unit = {}
override def close(): Unit = {
if(outputWriter != null) {
outputWriter.sync()
}
outputStream = null
}
override def write(element: OutputContainer) = {
if (outputStream == null) {
throw new IllegalStateException("AvroWriter has not been opened.")
}
outputWriter.append(element)
}
override def duplicate(): AvroWriter[OutputContainer] = {
new AvroWriter[OutputContainer]
}
}
şu istisna verir:
java.lang.Exception: Could not forward element to next operator
at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:222)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.run(FlinkKafkaConsumer08.java:316)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.RuntimeException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
at org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:158)
at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:664)
Caused by: java.nio.channels.ClosedChannelException
at org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:1353)
at org.apache.hadoop.fs.FSOutputSummer.write(FSOutputSummer.java:98)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at org.apache.avro.file.DataFileWriter$BufferedFileOutputStream$PositionFilter.write(DataFileWriter.java:446)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:121)
at org.apache.avro.io.BufferedBinaryEncoder$OutputStreamSink.innerWrite(BufferedBinaryEncoder.java:216)
at org.apache.avro.io.BufferedBinaryEncoder.writeFixed(BufferedBinaryEncoder.java:150)
at org.apache.avro.file.DataFileStream$DataBlock.writeBlockTo(DataFileStream.java:366)
at org.apache.avro.file.DataFileWriter.writeBlock(DataFileWriter.java:383)
at org.apache.avro.file.DataFileWriter.sync(DataFileWriter.java:401)
at pl.neptis.FlinkKafkaConsumer.utils.AvroWriter.close(AvroWriter.scala:36)
at org.apache.flink.streaming.connectors.fs.RollingSink.closeCurrentPartFile(RollingSink.java:476)
at org.apache.flink.streaming.connectors.fs.RollingSink.openNewPartFile(RollingSink.java:419)
at org.apache.flink.streaming.connectors.fs.RollingSink.invoke(RollingSink.java:373)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
... 3 more