2010-04-26 35 views
6

Tek bir eşlemede N miktarını göndermek için Hadoop kullanmaya çalışıyorum. Hatların bölünmüş olmasına gerek yok zaten.Tek bir haritadaki birden fazla metin satırı

NLineInputFormat'ı kullanmayı denedim, ancak her satırda bir satırdaki N satırlık metinleri bir satırda [Nth satırından sonra vazgeçerek] gönderir.

Ben seçeneği ayarlamak için denedim ve sadece her haritaya bir kerede 1 çizgisinde göndermeden girdi N satır alır: Ben LineRecordReader geçersiz kılmak için beni öneren bir posta listesini buldum

job.setInt("mapred.line.input.format.linespermap", 10); 

:: sonraki, ancak bu kadar basit değil, çünkü içsel veri üyeleri tamamen gizli.

NLineInputFormat ve bunun sabit kodları LineReader için kaynağı kontrol ettim, bu yüzden geçersiz kılma yardımcı olmaz.

Ayrıca, btw Amazon EC2 MapReduce ile uyumluluk için Hadoop 0.18 kullanıyorum.

+0

Neden bunu yapmaya çalışıyorsun? Birden fazla hat bir anlamda tek bir kayıt mı oluşturuyor? –

+0

Gerçekten bir dizi rastgele satır sayısına ihtiyacım var, ancak sonuç olarak yaşayabiliyorum. Doğru redüktöre göndermek için ona ihtiyacım var. – monksy

+0

Sorunuza cevap vermek için evet yaparlar. – monksy

cevap

7

, kendi giriş formatınızı uygulamanız gerekir. Ayrıca kendi kayıt okuyucunuzu tanımlamanız da mümkün.

Ne yazık ki bir getSplits() - yöntem tanımlamanız gerekmektedir. Benim düşünceme göre bu, kayıt okuyucusunu uygulamaktan daha zor olacaktır: Bu yöntem, girdi verilerini parçalamak için bir mantığı uygulamak zorundadır. "Hadoop - kesin bir rehber"

aşağıdaki alıntıyı bakın (Hep öneriyoruz harika bir kitap!):

public interface InputFormat<K, V> { 
    InputSplit[] getSplits(JobConf job, int numSplits) throws IOException; 
    RecordReader<K, V> getRecordReader(InputSplit split, 
            JobConf job, 
            Reporter reporter) throws IOException; 
} 

JobClient getSplits çağırır() yöntemi: İşte

arayüzü İstenen sayıda harita görevini numSplits argümanı olarak geçirerek. InputFormat mentations numSplits belirtilen sayıda bölmelerini farklı sayıda geri serbesttir uygula- maya Bu sayı, bir ipucu olarak işlenir. Bölmeleri hesapladıktan sonra, istemci onları iş takibine gönderir; bu, 'un görev konumlarını, görev görevlilerinde işlemek üzere harita görevlerini planlamak için kullandığını gösterir.

Görev görevlisi üzerinde, harita görevi, bu bölme için bir RecordReader elde etmek üzere bölmeyi InputFormat öğesinden getRecordReader() yöntemine geçirir. Bir RecordReader biraz daha kayıtları üzerinde bir yineleyici daha ve harita görevi harita fonksiyonuna geçer kayıt anahtarı değer çiftleri, oluşturmak için bir kullanır. (MapRunner kod dayanarak) bir kod parçacığı bir şekilde gösterilmektedir:

K key = reader.createKey(); 
V value = reader.createValue(); 
while (reader.next(key, value)) { 
    mapper.map(key, value, output, reporter); 
} 
+0

Bu tür çalışır. Ama bu soruya gerçekten cevap vermiyor. 18.3'ün altında yeni InputFormats ekleme ile ilgili bir sorun var. – monksy

+2

Tamam özür dilerim. Gerçekten de gerçek bir soru yok, çünkü hiçbir soru işareti görmediğimden: -P Başka ne bilmek istiyorsun daha spesifik? –

1

sizin durumunuzda Yetkinin deseni takip edebilir ve (gerekirse yöntemler sonraki yani geçersiz kılan LineRecordReader etrafında sarıcı uygulamak olduğunu düşünüyorum) (veya yeni satırdaki nextKeyValue() değerini bir satır yerine N satırlarının birleşimine ayarlamak için.

I EOF veya boş bir satır ya karşılaşmadan kadar hattı ile girdi veri hattı oku (ve bitiştirmek) için LineRecordReader kullanan ParagraphRecordReader örnek uygulama googled var. Daha sonra, değer bir paragraf (bir satır yerine) olduğu çift döndürür. Ayrıca, bu ParagraphRecordReader için ParagraphInputFormat standart TextInputFormat kadar basittir.

Bu uygulama için gerekli bağlantıları ve aşağıdaki yazıyla ilgili birkaç kelimeyi bulabilirsiniz: http://hadoop-mapreduce.blogspot.com/2011/03/little-more-complicated-recordreaders.html.

2

En Sadece NLineInputFormat geçersiz kılar ve özel bir MultiLineRecordReader yerine varsayılan LineReader uygulayan benim kendi InputFormat oluşturarak son zamanlarda bu sorunu çözdü.

NLineInputFormat'ı genişletmeyi seçtim çünkü bölünme başına N çizgisine tam olarak sahip olmakla aynı garantiye sahip olmak istedim.

Bu kayıt okuyucu Değiştirilmiş http://bigdatacircus.com/2012/08/01/wordcount-with-custom-record-reader-of-textinputformat/

tek şeylerden olduğu gibi neredeyse artık yeni API kullanır maxLineLength maddi ve kodlanmış olma NLineInputFormat en setNumLinesPerSplit() INSEAD'de okunan alır NLINESTOPROCESS değeridir alınır (daha fazla esneklik için). İşte

sonucudur:

public class MultiLineInputFormat extends NLineInputFormat{ 
    @Override 
    public RecordReader<LongWritable, Text> createRecordReader(InputSplit genericSplit, TaskAttemptContext context) { 
     context.setStatus(genericSplit.toString()); 
     return new MultiLineRecordReader(); 
    } 

    public static class MultiLineRecordReader extends RecordReader<LongWritable, Text>{ 
     private int NLINESTOPROCESS; 
     private LineReader in; 
     private LongWritable key; 
     private Text value = new Text(); 
     private long start =0; 
     private long end =0; 
     private long pos =0; 
     private int maxLineLength; 

     @Override 
     public void close() throws IOException { 
      if (in != null) { 
       in.close(); 
      } 
     } 

     @Override 
     public LongWritable getCurrentKey() throws IOException,InterruptedException { 
      return key; 
     } 

     @Override 
     public Text getCurrentValue() throws IOException, InterruptedException { 
      return value; 
     } 

     @Override 
     public float getProgress() throws IOException, InterruptedException { 
      if (start == end) { 
       return 0.0f; 
      } 
      else { 
       return Math.min(1.0f, (pos - start)/(float)(end - start)); 
      } 
     } 

     @Override 
     public void initialize(InputSplit genericSplit, TaskAttemptContext context)throws IOException, InterruptedException { 
      NLINESTOPROCESS = getNumLinesPerSplit(context); 
      FileSplit split = (FileSplit) genericSplit; 
      final Path file = split.getPath(); 
      Configuration conf = context.getConfiguration(); 
      this.maxLineLength = conf.getInt("mapreduce.input.linerecordreader.line.maxlength",Integer.MAX_VALUE); 
      FileSystem fs = file.getFileSystem(conf); 
      start = split.getStart(); 
      end= start + split.getLength(); 
      boolean skipFirstLine = false; 
      FSDataInputStream filein = fs.open(split.getPath()); 

      if (start != 0){ 
       skipFirstLine = true; 
       --start; 
       filein.seek(start); 
      } 
      in = new LineReader(filein,conf); 
      if(skipFirstLine){ 
       start += in.readLine(new Text(),0,(int)Math.min((long)Integer.MAX_VALUE, end - start)); 
      } 
      this.pos = start; 
     } 

     @Override 
     public boolean nextKeyValue() throws IOException, InterruptedException { 
      if (key == null) { 
       key = new LongWritable(); 
      } 
      key.set(pos); 
      if (value == null) { 
       value = new Text(); 
      } 
      value.clear(); 
      final Text endline = new Text("\n"); 
      int newSize = 0; 
      for(int i=0;i<NLINESTOPROCESS;i++){ 
       Text v = new Text(); 
       while (pos < end) { 
        newSize = in.readLine(v, maxLineLength,Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),maxLineLength)); 
        value.append(v.getBytes(),0, v.getLength()); 
        value.append(endline.getBytes(),0, endline.getLength()); 
        if (newSize == 0) { 
         break; 
        } 
        pos += newSize; 
        if (newSize < maxLineLength) { 
         break; 
        } 
       } 
      } 
      if (newSize == 0) { 
       key = null; 
       value = null; 
       return false; 
      } else { 
       return true; 
      } 
     } 
    } 

} 
İlgili konular