2011-06-24 15 views
5

Bunu hadoop kullanıcı posta listesine çapraz gönderme konusunda üzgünüm ve bu benim için acil bir konu oluyor. aşağıdaki gibiHesaplama kesişimi ve hadoop ile iki dosyanın kayıtlarının farkını ayarlama

Benim sorundur: Ben iki giriş dosyaları var ve ben belirlemek istiyorum

  • a) yalnızca) 1
  • b dosyasındaki satır sayısını meydana hatlarının sayısı hangi

    : sadece dosyanın 2
  • c) Her iki (örneğin ip eşitliği açısından) içerisinde

Örnek ortak hat sayısı ile meydana 210

File 1: 
a 
b 
c 

File 2: 
a 
d 
her durum için

İstenilen çıktı:

lines_only_in_1: 2   (b, c) 
lines_only_in_2: 1   (d) 
lines_in_both: 1   (a) 

Temelde yaklaşımım şu şekildedir: mapper hattı (metin) ve byte oluşan bir çift alır böylece, kendi LineRecordReader yazdı Kaynak dosyayı gösterir (0 veya 1). Eşleştirici, eşleştirmeyi yalnızca tekrar döndürür, böylece hiçbir şey yapmaz. Ancak, yan etki, birleştirici alan olduğu bir

Map<Line, Iterable<SourceId>> 

(sourceid, 0 veya 1).

Şimdi, her hat için ben kaynaklarının setini alabilirsiniz içeri. Bu nedenle, her vaka için sayar bir birleştirici (Liste 1) hatların (a, b, c) numarayı

yazabiliriz görünür

Birleştirici, daha sonra, yalnızca temizlikte bir 'özet' çıkarır (bu güvenli midir?). Sonra sadece bu özetleri değerlerini özetlemek redüktör olarak

lines_only_in_1 2531 
lines_only_in_2 3190 
lines_in_both  901 

: gibi Yani bu özet görünüyor. (Bu nedenle redüktörün çıkışı, birleştiricininki gibi görünür).

Ancak, asıl sorun ben formu (hat, sourceid) verimi kayıtları // sourceid ya 0 ya

1 Ve ben tek bir sanal dosya olarak hem kaynak dosyaları tedavi etmek gerek yani, Bunu nasıl elde edeceğimi bilmiyorum. Soru şu ki, ön işleme ve dosyaların önceden birleştirilmesini önleyip durduramam ve bunu sanal olarak birleştirilen dosya okuyucusu ve özel kayıt okuyucusu gibi bir şeyle anında gerçekleştirebiliyorum. Herhangi bir kod örneği çok takdir edilmektedir.

Saygılarımızla, Claus

İlanı 1:

public static class SourceCombiner 
    extends Reducer<Text, ByteWritable, Text, LongWritable> { 

    private long countA = 0; 
    private long countB = 0; 
    private long countC = 0; // C = lines (c)ommon to both sources 

    @Override 
    public void reduce(Text key, Iterable<ByteWritable> values, Context context) throws IOException, InterruptedException { 
     Set<Byte> fileIds = new HashSet<Byte>(); 
     for (ByteWritable val : values) { 
      byte fileId = val.get(); 

      fileIds.add(fileId); 
     } 

     if(fileIds.contains((byte)0)) { ++countA; } 
     if(fileIds.contains((byte)1)) { ++countB; } 
     if(fileIds.size() >= 2) { ++countC; } 
    } 

    protected void cleanup(Context context) 
      throws java.io.IOException, java.lang.InterruptedException 
    { 
     context.write(new Text("in_a_distinct_count_total"), new LongWritable(countA)); 
     context.write(new Text("in_b_distinct_count_total"), new LongWritable(countB)); 
     context.write(new Text("out_common_distinct_count_total"), new LongWritable(countC)); 
    } 
} 

cevap

2

Tamam, ben gerçekten şimdiye kadar denedim ne özü yakalamak olmadığını itiraf etmeliyim, ama var ihtiyacınız olabilecek şeyleri yapmak için basit bir yaklaşım.

Filemapper'a bir göz atın. Bu, dosya adını alacak ve girdinin her bir satırıyla gönderecektir.

a File 1,File 2 
    b File 1 
    c File 1 
    d File 2 
:

public class FileMapper extends Mapper<LongWritable, Text, Text, Text> { 

     static Text fileName; 

     @Override 
     protected void map(LongWritable key, Text value, Context context) 
       throws IOException, InterruptedException { 
      context.write(value, fileName); 
     } 

     @Override 
     protected void setup(Context context) throws IOException, 
       InterruptedException { 

      String name = ((FileSplit) context.getInputSplit()).getPath().getName(); 
      fileName = new Text(name); 
     } 
    } 

Şimdi onları size böyle bir giriş alacak azaltarak (sizin örnek ile ilgili olarak) şu şekilde görünecektir anahtar/değerlere Açıkçası

a File 1 
    b File 1 
    c File 1 

    a File 2 
    d File 2 

bir grup var

Redüktörünüzde yapmanız gerekenler şu şekilde olabilir:

public class FileReducer extends Reducer<Text, Text, Text, Text> { 

    enum Counter { 
     LINES_IN_COMMON, LINES_IN_FIRST, LINES_IN_SECOND 
    } 

    @Override 
    protected void reduce(Text key, Iterable<Text> values, Context context) 
      throws IOException, InterruptedException { 
     HashSet<String> set = new HashSet<String>(); 
     for (Text t : values) { 
      set.add(t.toString()); 
     } 

     // if we have only two files and we have just two records in our hashset 
     // the line is contained in both files 
     if (set.size() == 2) { 
      context.getCounter(Counter.LINES_IN_COMMON).increment(1); 
     } else { 
      // sorry this is a bit dirty... 
      String t = set.iterator().next(); 
      // determine which file it was by checking for the name: 
      if(t.toString().equals("YOUR_FIRST_FILE_NAME")){ 
       context.getCounter(Counter.LINES_IN_FIRST).increment(1); 
      } else { 
       context.getCounter(Counter.LINES_IN_SECOND).increment(1); 
      } 
     } 
    } 

} 

if ifadesinin içindeki dizeyi dosya adlarınıza değiştirmeniz gerekir.

İş sayacını kullanmak, kendi ilkel öğelerini kullanmaktan ve bunları temizlemede bağlama göre yazmaktan biraz daha açık olduğunu düşünüyorum. Sen tamamlanmasının ardından bu şeyleri arayarak bir iş için sayaçları alabilirsiniz:

Job job = new Job(new Configuration()); 
//setup stuff etc omitted.. 
job.waitForCompletion(true); 
// do the same line with the other enums 
long linesInCommon = job.getCounters().findCounter(Counter.LINES_IN_COMMON).getValue(); 

Az asla size HDF'ler yaygın vb satır numaralarını gerekiyorsa, o zaman çözüm için gidin.

Size yardımcı oldu umarım.

+0

Merhaba, biraz belirsiztim: Buradaki nokta, birleştiricilerin yalnızca özetlemeyi (1, 2 ve ortak satır sayısı) redüktöre vermesini istediğimdir - tüm satırlara gerek yoktur redüktöre geri gönderilir. Ancak bunun çalışması için, birleştiriciler her iki dosyanın kayıtlarını birlikte görmelidir (benim RecordReader zaten üretir (line, fileId) çiftleridir, dosya adından fileId'ye eşleme, config nesnesine iletilir). Ancak, dosyaları iki FileInputFormat.addInputPath (iş, dosya) deyimleriyle eklerken, dosyalar tek tek işlenir, böylece birleştiriciler "sendikalarını" görmezler. –

+0

bazı gerçekten garip "optimizasyon" dır. Ama iyi bir nokta. Geç yanıt için –

+0

Sry; Bu benim düşüncem mümkün değil: Kaynak dosya ayrıldı ve bölmeler düğümlere gönderilir. Düğümler daha sonra ilgili bölümlerden kayıtları okuyor. Bu nedenle, kaynak dosyasındaki yinelenen kayıtlar birkaç bölüme yerleştirilebilir ve dolayısıyla çeşitli düğümlere yayılabilir. Bu nedenle çiftlerin gruplandırılması sadece redüktörde mümkündür. Bu doğru mu? –