2013-09-05 36 views
6

içerecek şekilde genişlet Sekans dosyalarını okuyan özel bir InputFormat oluşturabilmek istiyorum, ancak dosya yolunu ve kaydın bulunduğu dosyanın içindeki ofseti de ekleyebilirim.Dosya adı + ofseti

Geri adım atmak için kullanım durumu şöyledir: Değişken büyüklükteki verileri içeren bir sıra dosyası var. Anahtarlar çoğunlukla ilgisizdir ve değerler çeşitli farklı alanları içeren birkaç megabayttır. Bu alanların bir kısmını elasticsearch'te dosya adı ve ofsetiyle birlikte dizinlemek istiyorum. Bu şekilde, bu alanları elasticsearch'ten sorgulayabilir ve daha sonra tüm dosyayı ES'de saklamak yerine sıra dosyasına geri dönmek ve orijinal kaydı elde etmek için dosya adını ve ofsetini kullanabilirim.

Tüm bu işlemi tek bir java programı olarak çalışıyorum. SequenceFile.Reader sınıfı, bunun gerçekleşmesi için getPosition ve seek yöntemlerini uygun bir şekilde verir. Bununla birlikte, sonuçta birçok terabayt veriye yer verilecektir, dolayısıyla bunu bir MapReduce işine dönüştürmem gerekecek (muhtemelen Map-only). Dizi dosyasındaki gerçek anahtarlar ilgisiz olduğundan, benim beklediğim yaklaşım, SquenceFileInputFormat'ı genişleten veya bir şekilde kullanan özel bir InputFormat oluşturmak, ancak gerçek anahtarları döndürmek yerine, dosyadan oluşan birleşik anahtar döndürür ve ofset. Bununla birlikte, uygulamada daha zor olduğu kanıtlanmaktadır. Mümkün gibi görünüyor, ama gerçek API'ler ve maruz kalanlar göz önüne alındığında, bu zor. Herhangi bir fikir? Belki de almam gereken alternatif bir yaklaşım?

cevap

5

Herkes benzer bir sorunla karşılaşırsa, işte bulduğum çözüm. Kodun bir kısmını sadece SequenceFileInputFormat/RecordReader'da kopyaladım ve sadece değiştirdim. Bir alt sınıf veya bir dekoratör falan ya yazmaya umduğu ... bu şekilde hoş değil, ama çalışır:

SequenceFileOffsetInputFormat.java:

import java.io.IOException; 
import java.util.List; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.SequenceFile; 
import org.apache.hadoop.io.Writable; 
import org.apache.hadoop.mapreduce.InputSplit; 
import org.apache.hadoop.mapreduce.JobContext; 
import org.apache.hadoop.mapreduce.RecordReader; 
import org.apache.hadoop.mapreduce.TaskAttemptContext; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.FileSplit; 
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; 

public class SequenceFileOffsetInputFormat<V extends Writable> extends FileInputFormat<PathOffsetWritable, V> { 

    private static class SequenceFileOffsetRecordReader<V extends Writable> extends RecordReader<PathOffsetWritable, V> { 

     private SequenceFile.Reader in; 
     private long start; 
     private long end; 
     private boolean more = true; 
     private PathOffsetWritable key = null; 
     private Writable k = null; 
     private V value = null; 
     private Configuration conf; 

     @Override 
     public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { 
      FileSplit fileSplit = (FileSplit) split; 
      conf = context.getConfiguration(); 
      Path path = fileSplit.getPath(); 
      FileSystem fs = path.getFileSystem(conf); 
      this.in = new SequenceFile.Reader(fs, path, conf); 
      try { 
       this.k = (Writable) in.getKeyClass().newInstance(); 
       this.value = (V) in.getValueClass().newInstance(); 
      } catch (InstantiationException e) { 
       throw new IOException(e); 
      } catch (IllegalAccessException e) { 
       throw new IOException(e); 
      } 
      this.end = fileSplit.getStart() + fileSplit.getLength(); 

      if (fileSplit.getStart() > in.getPosition()) { 
       in.sync(fileSplit.getStart()); 
      } 

      this.start = in.getPosition(); 
      more = start < end; 

      key = new PathOffsetWritable(path, start); 
     } 

     @Override 
     public boolean nextKeyValue() throws IOException, InterruptedException { 
      if (!more) { 
       return false; 
      } 
      long pos = in.getPosition(); 

      more = in.next(k, value); 
      if (!more || (pos >= end && in.syncSeen())) { 
       key = null; 
       value = null; 
       more = false; 
      } else { 
       key.setOffset(pos); 
      } 
      return more; 
     } 

     @Override 
     public PathOffsetWritable getCurrentKey() { 
      return key; 
     } 

     @Override 
     public V getCurrentValue() { 
      return value; 
     } 

     @Override 
     public float getProgress() throws IOException, InterruptedException { 
      if (end == start) { 
       return 0.0f; 
      } else { 
       return Math.min(1.0f, (in.getPosition() - start)/(float)(end - start)); 
      } 
     } 

     @Override 
     public void close() throws IOException { 
      in.close(); 
     } 

    } 

    @Override 
    public RecordReader<PathOffsetWritable, V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { 
     return new SequenceFileOffsetRecordReader<V>(); 
    } 

    @Override 
    public List<InputSplit> getSplits(JobContext context) throws IOException { 
     return new SequenceFileInputFormat<PathOffsetWritable, V>().getSplits(context); 
    } 

    @Override 
    public long getFormatMinSplitSize() { 
     return SequenceFile.SYNC_INTERVAL; 
    } 


} 

PathOffsetWritable.java:

import java.io.DataInput; 
import java.io.DataOutput; 
import java.io.IOException; 

import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.io.WritableComparable; 

public class PathOffsetWritable implements WritableComparable<PathOffsetWritable> { 

    private Text t = new Text(); 
    private Path path; 
    private long offset; 

    public PathOffsetWritable(Path path, long offset) { 
     this.path = path; 
     this.offset = offset; 
    } 

    public Path getPath() { 
     return path; 
    } 

    public long getOffset() { 
     return offset; 
    } 

    public void setPath(Path path) { 
     this.path = path; 
    } 

    public void setOffset(long offset) { 
     this.offset = offset; 
    } 

    @Override 
    public void readFields(DataInput in) throws IOException { 
     t.readFields(in); 
     path = new Path(t.toString()); 
     offset = in.readLong(); 
    } 

    @Override 
    public void write(DataOutput out) throws IOException { 
     t.set(path.toString()); 
     t.write(out); 
     out.writeLong(offset); 
    } 

    @Override 
    public int compareTo(PathOffsetWritable o) { 
     int x = path.compareTo(o.path); 
     if (x != 0) { 
      return x; 
     } else { 
      return Long.valueOf(offset).compareTo(Long.valueOf(o.offset)); 
     } 
    } 


}