2013-07-10 15 views
5

Küçük bir dosyada (3-4 MB) bir harita görevi yürütüyorum, ancak harita çıktısı nispeten büyük (150 MB). Harita% 100'ü gösterdikten sonra, döküntüyü bitirmek uzun zaman alır. Lütfen bu süreyi nasıl azaltabileceğimi önerin. Bazı örnek günlükleri aşağıda ..."Harita çıktısının akıtılması başlatılıyor" hadoop harita görevinde çok uzun zaman alıyor

13/07/10 17:45:31 INFO mapred.MapTask: Starting flush of map output 
13/07/10 17:45:32 INFO mapred.JobClient: map 98% reduce 0% 
13/07/10 17:45:34 INFO mapred.LocalJobRunner: 
13/07/10 17:45:35 INFO mapred.JobClient: map 100% reduce 0% 
13/07/10 17:45:37 INFO mapred.LocalJobRunner: 
13/07/10 17:45:40 INFO mapred.LocalJobRunner: 
13/07/10 17:45:43 INFO mapred.LocalJobRunner: 
13/07/10 17:45:46 INFO mapred.LocalJobRunner: 
13/07/10 17:45:49 INFO mapred.LocalJobRunner: 
13/07/10 17:45:52 INFO mapred.LocalJobRunner: 
13/07/10 17:45:55 INFO mapred.LocalJobRunner: 
13/07/10 17:45:58 INFO mapred.LocalJobRunner: 
13/07/10 17:46:01 INFO mapred.LocalJobRunner: 
13/07/10 17:46:04 INFO mapred.LocalJobRunner: 
13/07/10 17:46:07 INFO mapred.LocalJobRunner: 
13/07/10 17:46:10 INFO mapred.LocalJobRunner: 
13/07/10 17:46:13 INFO mapred.LocalJobRunner: 
13/07/10 17:46:16 INFO mapred.LocalJobRunner: 
13/07/10 17:46:19 INFO mapred.LocalJobRunner: 
13/07/10 17:46:22 INFO mapred.LocalJobRunner: 
13/07/10 17:46:25 INFO mapred.LocalJobRunner: 
13/07/10 17:46:28 INFO mapred.LocalJobRunner: 
13/07/10 17:46:31 INFO mapred.LocalJobRunner: 
13/07/10 17:46:34 INFO mapred.LocalJobRunner: 
13/07/10 17:46:37 INFO mapred.LocalJobRunner: 
13/07/10 17:46:40 INFO mapred.LocalJobRunner: 
13/07/10 17:46:43 INFO mapred.LocalJobRunner: 
13/07/10 17:46:46 INFO mapred.LocalJobRunner: 
13/07/10 17:46:49 INFO mapred.LocalJobRunner: 
13/07/10 17:46:52 INFO mapred.LocalJobRunner: 
13/07/10 17:46:55 INFO mapred.LocalJobRunner: 
13/07/10 17:46:58 INFO mapred.LocalJobRunner: 
13/07/10 17:47:01 INFO mapred.LocalJobRunner: 
13/07/10 17:47:04 INFO mapred.LocalJobRunner: 
13/07/10 17:47:07 INFO mapred.LocalJobRunner: 
13/07/10 17:47:10 INFO mapred.LocalJobRunner: 
13/07/10 17:47:13 INFO mapred.LocalJobRunner: 
13/07/10 17:47:16 INFO mapred.LocalJobRunner: 
13/07/10 17:47:19 INFO mapred.LocalJobRunner: 
13/07/10 17:47:22 INFO mapred.LocalJobRunner: 
13/07/10 17:47:25 INFO mapred.LocalJobRunner: 
13/07/10 17:47:28 INFO mapred.LocalJobRunner: 
13/07/10 17:47:31 INFO mapred.LocalJobRunner: 
13/07/10 17:47:34 INFO mapred.LocalJobRunner: 
13/07/10 17:47:37 INFO mapred.LocalJobRunner: 
13/07/10 17:47:40 INFO mapred.LocalJobRunner: 
13/07/10 17:47:43 INFO mapred.LocalJobRunner: 
13/07/10 17:47:45 INFO mapred.MapTask: Finished spill 0 
13/07/10 17:47:45 INFO mapred.Task: Task:attempt_local_0003_m_000000_0 is done. And is in the process of commiting 
13/07/10 17:47:45 INFO mapred.LocalJobRunner: 
13/07/10 17:47:45 INFO mapred.Task: Task 'attempt_local_0003_m_000000_0' done. 
............................... 
............................... 
............................... 
13/07/10 17:47:52 INFO mapred.JobClient: Counters: 22 
13/07/10 17:47:52 INFO mapred.JobClient: File Output Format Counters 
13/07/10 17:47:52 INFO mapred.JobClient:  Bytes Written=13401245 
13/07/10 17:47:52 INFO mapred.JobClient: FileSystemCounters 
13/07/10 17:47:52 INFO mapred.JobClient:  FILE_BYTES_READ=18871098 
13/07/10 17:47:52 INFO mapred.JobClient:  HDFS_BYTES_READ=7346566 
13/07/10 17:47:52 INFO mapred.JobClient:  FILE_BYTES_WRITTEN=35878426 
13/07/10 17:47:52 INFO mapred.JobClient:  HDFS_BYTES_WRITTEN=18621307 
13/07/10 17:47:52 INFO mapred.JobClient: File Input Format Counters 
13/07/10 17:47:52 INFO mapred.JobClient:  Bytes Read=2558288 
13/07/10 17:47:52 INFO mapred.JobClient: Map-Reduce Framework 
13/07/10 17:47:52 INFO mapred.JobClient:  Reduce input groups=740000 
13/07/10 17:47:52 INFO mapred.JobClient:  Map output materialized bytes=13320006 
13/07/10 17:47:52 INFO mapred.JobClient:  Combine output records=740000 
13/07/10 17:47:52 INFO mapred.JobClient:  Map input records=71040 
13/07/10 17:47:52 INFO mapred.JobClient:  Reduce shuffle bytes=0 
13/07/10 17:47:52 INFO mapred.JobClient:  Physical memory (bytes) snapshot=0 
13/07/10 17:47:52 INFO mapred.JobClient:  Reduce output records=740000 
13/07/10 17:47:52 INFO mapred.JobClient:  Spilled Records=1480000 
13/07/10 17:47:52 INFO mapred.JobClient:  Map output bytes=119998400 
13/07/10 17:47:52 INFO mapred.JobClient:  CPU time spent (ms)=0 
13/07/10 17:47:52 INFO mapred.JobClient:  Total committed heap usage (bytes)=1178009600 
13/07/10 17:47:52 INFO mapred.JobClient:  Virtual memory (bytes) snapshot=0 
13/07/10 17:47:52 INFO mapred.JobClient:  Combine input records=7499900 
13/07/10 17:47:52 INFO mapred.JobClient:  Map output records=7499900 
13/07/10 17:47:52 INFO mapred.JobClient:  SPLIT_RAW_BYTES=122 
13/07/10 17:47:52 INFO mapred.JobClient:  Reduce input records=740000 

Harita Görev Kaynak kodu:

public class GsMR2MapThree extends Mapper<Text, Text, LongWritable,DoubleWritable>{ 

    private DoubleWritable distGexpr = new DoubleWritable(); 
    private LongWritable m2keyOut = new LongWritable(); 
    int trMax,tstMax; 

    protected void setup(Context context) throws java.io.IOException, java.lang.InterruptedException { 

     Configuration conf =context.getConfiguration(); 
     tstMax = conf.getInt("mtst", 10); 
     trMax = conf.getInt("mtr", 10); 

    } 

    public void map(Text key, Text values, Context context) throws IOException, InterruptedException { 
     String line = values.toString(); 

     double Tij=0.0,TRij=0.0, dist=0; 
     int i=0,j; 
     long m2key=0; 
     String[] SLl = new String[]{}; 

     Configuration conf =context.getConfiguration(); 

     m2key = Long.parseLong(key.toString()); 
     StringTokenizer tokenizer = new StringTokenizer(line); 
     j=0; 
     while (tokenizer.hasMoreTokens()) { 

      String test = tokenizer.nextToken(); 
      if(j==0){ 
       Tij = Double.parseDouble(test); 
      } 
      else if(j==1){ 
       TRij = Double.parseDouble(test); 
      } 
      else if(j==2){ 
       SLl = StringUtils.split(conf.get(test),","); 
      } 
      j++; 
     } 
     //Map input ends 

     //Distance Measure function 
     dist = (long)Math.pow((Tij - TRij), 2); 

     //remove gid from key 
     m2key = m2key/100000; 
     //Map2 <key,value> emit starts 
     for(i=0; i<SLl.length;i++){ 
       long m2keyNew = (Integer.parseInt(SLl[i])*(trMax*tstMax))+m2key; 
      m2keyOut.set(m2keyNew); 
      distGexpr.set(dist); 
      context.write(m2keyOut,distGexpr); 
     } 
     //<key,value> emit done 
    } 

} 

Numune Harita Girdi: her satırın son değişken yayın değişkenler arasında bir tamsayı dizisi olsun. Her bir hat yaklaşık 100-200 çıktı kaydı üretecek.

10100014 1356.3238 1181.63 gs-4-56 
10100026 3263.1167 3192.4131 gs-3-21 
10100043 1852.0 1926.3962 gs-4-76 
10100062 1175.5925 983.47125 gs-3-19 
10100066 606.59125 976.26625 gs-8-23 

Numune Harita Çıktı:

10101 8633.0 
10102 1822.0 
10103 13832.0 
10104 2726470.0 
10105 1172991.0 
10107 239367.0 
10109 5410384.0 
10111 7698352.0 
10112 6.417 
+1

Eşleştirici kodunuzu gönderebilir misiniz (veya en azından eşleştiricinizin işlevsel olarak ne yaptığı ile ilgili bir açıklama), örnek giriş kaydı ve çıktı kayıtları mı? Temizleme yönteminiz var mı? –

+0

Cevabınız için teşekkür ederiz. Bu harita görevi ve örnek giriş ve çıkış için kaynak kodu ekledim. Temizleme yöntemi kullanmadım. Aslında daha önce birçok dökülme oldu. Yani, io.sort.record.percent ve diğer bazı ayarları değiştirdim. Daha sonra dökülmeler en aza indirgenir, ancak genel yürütme süresi aynı kalmıştır. –

cevap

0

Bunu (2 yıl orijinal mesajın yayınlanması sonra) çözdük varsayalım, ama sadece aynı sorun haline adımları herkes için, ben çalışacağım bazı önerilerde bulunmak.

Sayaçlarınızdan yola çıkarak, sıkıştırmayı zaten kullandığınızı anlıyorum (eşlenen bayt sayısı eşleştirilen bayt sayısı, harita çıkış baytlarının sayısına göre farklıdır), bu iyi bir şeydir. Harita çıkış anahtarının türü olarak, değişken uzunluktaki kodlanmış VLongWritable sınıfını kullanarak eşleştiricinin çıktısını daha da sıkıştırabilirsiniz. (Yanılmıyorsam, bir VDoubleWritable sınıfı da vardı, ancak şimdiye dek kullanımdan kaldırılmış olmalı).

Çıktısını yayınladığınız for döngüsünde, her seferinde distGexpr değişkenini ayarlamanıza gerek yoktur. Her zaman aynıdır, bu yüzden for döngüsünden hemen önce ayarlayın. Ayrıca, trMax*tstMax ürününün uzunluğunu döngü dışında saklayabilir ve her yinelemede hesaplayamazsınız.

Mümkünse, giriş anahtarınızı LongWritable (önceki işten) yapın, böylece Long.parseLong() ve Text.toString() çağrılarını kaydedebilirsiniz.

Mümkünse (redüktörünüze bağlı olarak), dökülen baytların boyutunu azaltmak için bir birleştirici kullanın.

ben döngü içinde bu Integer.parseInt() aramayı atlamak için bir yol bulamadı, ancak başlangıçta int[] olarak SLl yük eğer biraz zaman tasarruf sağlayacaktır.