2013-06-04 4 views
5

için kayıt sınırlayıcıyı belirtmek mümkündür. Mapreduce.textoutputformat.separator (api'nin 1.03'ü kullanarak) kullanarak anahtar ve değer arasındaki sınırlayıcıyı geçersiz kılacak bir mekanizma görüyorum. Ama ayırıcıyı kayıtlar arasında kontrol edebilmek istiyorum. FYI ArrayWritable değerini değer olarak ve NullWritable anahtar olarak kullanıyorum.Hadoop'ta, TextOutputFormat

cevap

6

Bildiğim kadarıyla TextOutputFormat değerlerin metin temsilini almak için toString() kullandığı için bu mümkün değil biliyorum ve muhtemelen eğer varsayılan Object.toString() ile sona ereceğini böylece ArrayWritable durumunda bunun, toString() uygulamıyor olarak Reducer'unuzun çıktısına bir ArrayWritable yazmak vardı. Veya belki de satırları arasındaki ayırıcıyı değiştirmek istediniz, bu durumda, TextOutputFormat ile aynı sorun, çöple işaret ettiği varsayılan olarak \ n karakterini kullanır.

Bu, kendi RecordWriter'nizi tanımlayacağınız ve getRecordWriter yönteminde özel bir yapılandırma özelliğine sahip olduğunuz bir özel çıktı biçimi uygulayarak bunu yapabilirsiniz. İşte bir & bu tür bir sınıfın hızlı bir şekilde uygulanması (test edilmemiştir) ve ihtiyacınız olanı yapmalı ve ArrayWritable için mapred.arraywritable.separator ile ayırıcıyı ve mapred.line satırları arasındaki ayırıcıyı kontrol etmenize izin verin .separator: Bu harika görünüyor

import java.io.DataOutputStream; 
import java.io.IOException; 
import java.io.UnsupportedEncodingException; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FSDataOutputStream; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.ArrayWritable; 
import org.apache.hadoop.io.NullWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.io.compress.CompressionCodec; 
import org.apache.hadoop.io.compress.GzipCodec; 
import org.apache.hadoop.mapreduce.RecordWriter; 
import org.apache.hadoop.mapreduce.TaskAttemptContext; 
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 
import org.apache.hadoop.util.ReflectionUtils; 

public class ArrayTextOutputFormat<K, V> extends TextOutputFormat<K, V> { 

protected static class ArrayLineRecordWriter<K, V> extends 
     LineRecordWriter<K, V> { 


    private static final String utf8 = "UTF-8"; 
    private final byte[] arraySeparator; 
    private final byte[] keyValueSeparator; 
    private final byte[] lineSeparator; 

    public ArrayLineRecordWriter(DataOutputStream out, 
      String keyValueSeparator, String arraySeparator, String lineSeparator) { 
     super(out); 
     try { 
      this.keyValueSeparator = keyValueSeparator.getBytes(utf8); 
      this.arraySeparator = arraySeparator.getBytes(utf8); 
      this.lineSeparator = lineSeparator.getBytes(utf8); 
     } catch (UnsupportedEncodingException uee) { 
      throw new IllegalArgumentException("can't find " + utf8 
        + " encoding"); 
     } 
    } 

    private void writeObject(Object o) throws IOException { 
     if (o instanceof Text) { 
      Text to = (Text) o; 
      out.write(to.getBytes(), 0, to.getLength()); 
     } else if (o instanceof ArrayWritable) { 
      ArrayWritable awo = (ArrayWritable) o; 
      for (String wrt : awo.toStrings()) { 
       out.write(wrt.toString().getBytes(utf8)); 
       out.write(arraySeparator); 
      } 
     } else { 
      out.write(o.toString().getBytes(utf8)); 
     } 
    } 

    public synchronized void write(K key, V value) throws IOException { 

     boolean nullKey = key == null || key instanceof NullWritable; 
     boolean nullValue = value == null || value instanceof NullWritable; 
     if (nullKey && nullValue) { 
      return; 
     } 
     if (!nullKey) { 
      writeObject(key); 
     } 
     if (!(nullKey || nullValue)) { 
      out.write(keyValueSeparator); 
     } 
     if (!nullValue) { 
      writeObject(value); 
     } 
     out.write(lineSeparator); 
    } 
} 

public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) 
     throws IOException, InterruptedException { 
    Configuration conf = job.getConfiguration(); 
    boolean isCompressed = getCompressOutput(job); 
    String keyValueSeparator = conf.get(
      "mapred.textoutputformat.separator", "\t"); 
    String arraySeparator = conf.get("mapred.arraywritable.separator", "|"); 
    String lineSeparator = conf.get("mapred.line.separator"); 
    CompressionCodec codec = null; 
    String extension = ""; 
    if (isCompressed) { 
     Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(
       job, GzipCodec.class); 
     codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, 
       conf); 
     extension = codec.getDefaultExtension(); 
    } 
    Path file = getDefaultWorkFile(job, extension); 
    FileSystem fs = file.getFileSystem(conf); 
    if (!isCompressed) { 
     FSDataOutputStream fileOut = fs.create(file, false); 
     return new ArrayLineRecordWriter<K, V>(fileOut, keyValueSeparator, 
       arraySeparator, lineSeparator); 
    } else { 
     FSDataOutputStream fileOut = fs.create(file, false); 
     return new ArrayLineRecordWriter<K, V>(new DataOutputStream(
       codec.createOutputStream(fileOut)), keyValueSeparator, 
       arraySeparator, lineSeparator); 
    } 
} 
} 
+0

!, ben de bu gece ya da yarın bu test etmek ve bildirmek için mümkün olmalıdır. –

+0

Bu, reklamı yapılan gibi çalışır. Tekrar teşekkürler –

1

TextOuputFormat kendi uygulamanızı yazmadan.

TextOutputFormat, LineRecordWriter kayıtlarını yazmak için kullanır. Bu yazara \n kodlanmış kayıt ayırıcı vardır.

static { 
    try { 
    newline = "\n".getBytes(utf8); 
    } catch (UnsupportedEncodingException uee) { 
    throw new IllegalArgumentException("can't find " + utf8 + " encoding"); 
    } 
} 

ve bunu değiştirmek için hiçbir seçenek ...

public synchronized void write(K key, V value) 
    throws IOException { 

    boolean nullKey = key == null || key instanceof NullWritable; 
    boolean nullValue = value == null || value instanceof NullWritable; 
    if (nullKey && nullValue) { 
    return; 
    } 
    if (!nullKey) { 
    writeObject(key); 
    } 
    if (!(nullKey || nullValue)) { 
    out.write(keyValueSeparator); 
    } 
    if (!nullValue) { 
    writeObject(value); 
    } 
    out.write(newline); 
} 

Neyse ki, hafif bir değişiklik ile kendi rulo nispeten kolay olmalıdır.