için kayıt sınırlayıcıyı belirtmek mümkündür. Mapreduce.textoutputformat.separator (api'nin 1.03'ü kullanarak) kullanarak anahtar ve değer arasındaki sınırlayıcıyı geçersiz kılacak bir mekanizma görüyorum. Ama ayırıcıyı kayıtlar arasında kontrol edebilmek istiyorum. FYI ArrayWritable değerini değer olarak ve NullWritable anahtar olarak kullanıyorum.Hadoop'ta, TextOutputFormat
cevap
Bildiğim kadarıyla TextOutputFormat
değerlerin metin temsilini almak için toString()
kullandığı için bu mümkün değil biliyorum ve muhtemelen eğer varsayılan Object.toString()
ile sona ereceğini böylece ArrayWritable
durumunda bunun, toString()
uygulamıyor olarak Reducer
'unuzun çıktısına bir ArrayWritable
yazmak vardı. Veya belki de satırları arasındaki ayırıcıyı değiştirmek istediniz, bu durumda, TextOutputFormat
ile aynı sorun, çöple işaret ettiği varsayılan olarak \ n karakterini kullanır.
Bu, kendi RecordWriter
'nizi tanımlayacağınız ve getRecordWriter
yönteminde özel bir yapılandırma özelliğine sahip olduğunuz bir özel çıktı biçimi uygulayarak bunu yapabilirsiniz. İşte bir & bu tür bir sınıfın hızlı bir şekilde uygulanması (test edilmemiştir) ve ihtiyacınız olanı yapmalı ve ArrayWritable
için mapred.arraywritable.separator ile ayırıcıyı ve mapred.line satırları arasındaki ayırıcıyı kontrol etmenize izin verin .separator: Bu harika görünüyor
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
public class ArrayTextOutputFormat<K, V> extends TextOutputFormat<K, V> {
protected static class ArrayLineRecordWriter<K, V> extends
LineRecordWriter<K, V> {
private static final String utf8 = "UTF-8";
private final byte[] arraySeparator;
private final byte[] keyValueSeparator;
private final byte[] lineSeparator;
public ArrayLineRecordWriter(DataOutputStream out,
String keyValueSeparator, String arraySeparator, String lineSeparator) {
super(out);
try {
this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
this.arraySeparator = arraySeparator.getBytes(utf8);
this.lineSeparator = lineSeparator.getBytes(utf8);
} catch (UnsupportedEncodingException uee) {
throw new IllegalArgumentException("can't find " + utf8
+ " encoding");
}
}
private void writeObject(Object o) throws IOException {
if (o instanceof Text) {
Text to = (Text) o;
out.write(to.getBytes(), 0, to.getLength());
} else if (o instanceof ArrayWritable) {
ArrayWritable awo = (ArrayWritable) o;
for (String wrt : awo.toStrings()) {
out.write(wrt.toString().getBytes(utf8));
out.write(arraySeparator);
}
} else {
out.write(o.toString().getBytes(utf8));
}
}
public synchronized void write(K key, V value) throws IOException {
boolean nullKey = key == null || key instanceof NullWritable;
boolean nullValue = value == null || value instanceof NullWritable;
if (nullKey && nullValue) {
return;
}
if (!nullKey) {
writeObject(key);
}
if (!(nullKey || nullValue)) {
out.write(keyValueSeparator);
}
if (!nullValue) {
writeObject(value);
}
out.write(lineSeparator);
}
}
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job)
throws IOException, InterruptedException {
Configuration conf = job.getConfiguration();
boolean isCompressed = getCompressOutput(job);
String keyValueSeparator = conf.get(
"mapred.textoutputformat.separator", "\t");
String arraySeparator = conf.get("mapred.arraywritable.separator", "|");
String lineSeparator = conf.get("mapred.line.separator");
CompressionCodec codec = null;
String extension = "";
if (isCompressed) {
Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(
job, GzipCodec.class);
codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass,
conf);
extension = codec.getDefaultExtension();
}
Path file = getDefaultWorkFile(job, extension);
FileSystem fs = file.getFileSystem(conf);
if (!isCompressed) {
FSDataOutputStream fileOut = fs.create(file, false);
return new ArrayLineRecordWriter<K, V>(fileOut, keyValueSeparator,
arraySeparator, lineSeparator);
} else {
FSDataOutputStream fileOut = fs.create(file, false);
return new ArrayLineRecordWriter<K, V>(new DataOutputStream(
codec.createOutputStream(fileOut)), keyValueSeparator,
arraySeparator, lineSeparator);
}
}
}
TextOuputFormat
kendi uygulamanızı yazmadan.
TextOutputFormat
, LineRecordWriter
kayıtlarını yazmak için kullanır. Bu yazara \n
kodlanmış kayıt ayırıcı vardır.
static {
try {
newline = "\n".getBytes(utf8);
} catch (UnsupportedEncodingException uee) {
throw new IllegalArgumentException("can't find " + utf8 + " encoding");
}
}
ve bunu değiştirmek için hiçbir seçenek ...
public synchronized void write(K key, V value)
throws IOException {
boolean nullKey = key == null || key instanceof NullWritable;
boolean nullValue = value == null || value instanceof NullWritable;
if (nullKey && nullValue) {
return;
}
if (!nullKey) {
writeObject(key);
}
if (!(nullKey || nullValue)) {
out.write(keyValueSeparator);
}
if (!nullValue) {
writeObject(value);
}
out.write(newline);
}
Neyse ki, hafif bir değişiklik ile kendi rulo nispeten kolay olmalıdır.
!, ben de bu gece ya da yarın bu test etmek ve bildirmek için mümkün olmalıdır. –
Bu, reklamı yapılan gibi çalışır. Tekrar teşekkürler –