2012-08-02 15 views
6

Küçük veri kod çalıştırdığınızda, aşağıdaki hatayı alamıyorum. Ama aynı kodu daha büyük bir veri kümesinde çalıştırdığımda birden fazla çıktı kullanırken aşağıdaki hatayı alıyorum. Pls Yardım!hadoop çoklu zaten oluşturulmuş istisna

public class ReduceThree1 extends MapReduceBase implements Reducer<Text, Text, Text, Text>{ 
     // @SuppressWarnings("unchecked") 
     private MultipleOutputs mos; 

     public void configure(JobConf conf1) { 

     mos = new MultipleOutputs(conf1); 

     } 

      public void reduce (Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { 


       // MultipleOutputs mos; 
       int sum = 0; 
       ArrayList<CustomMapI> alcmap = new ArrayList<CustomMapI>(); 
       while(values.hasNext()) 
       { 

        String val = values.next().toString(); 
        StringTokenizer st = new StringTokenizer(val); 
        String uid = st.nextToken(); 
        String f_val = st.nextToken(); 
        CustomMapI cmap = new CustomMapI(uid, f_val); 
        alcmap.add(cmap); 
        sum += Integer.parseInt(f_val); 

       } 

       StringTokenizer st = new StringTokenizer(key.toString()); 
       String t = st.nextToken(); 
       String data = st.nextToken(); 

       for(int i = 0; i<alcmap.size(); i++) 
       { 

        String str_key = t+" "+alcmap.get(i).getUid(); 
        String str_val = data+" "+alcmap.get(i).getF_val()+" "+sum; 

       // output.collect(new Text(str_key), new Text(str_val)); 
        mos.getCollector("/home/users/mlakshm/alop176/data", reporter).collect(new Text(str_key), new Text(str_val)); 

        for(int j = 1; j<alcmap.size(); j++) 
        { 
         if((j>i)&&(!alcmap.get(i).equals(alcmap.get(j)))) 
         { 
          String mul_key = "null"; 


          String uidi = alcmap.get(i).getUid(); 
          String uidj = alcmap.get(j).getUid(); 


          ArrayList<String> alsort = new ArrayList<String>(); 
          alsort.add(uidi); 
          alsort.add(uidj); 
          Collections.sort(alsort); 
          int fi = Integer.parseInt(alcmap.get(i).getF_val()); 

          int fj = Integer.parseInt(alcmap.get(j).getF_val()); 
          String intersection = "null"; 
          if(fi<fj) 
          { 
          intersection = String.valueOf(fi); 
          } 
          else 
          { 
           intersection = String.valueOf(fj); 
          } 

          String mul_val = t+" "+alsort.get(0)+" "+alsort.get(1)+" "+intersection; 
         // System.out.println(mul_key+ " "+mul_val); 

          mos.getCollector("/home/users/mlakshm/alop177/datepairs", reporter).collect(new Text(mul_key), new Text(mul_val)); 
         } 
        } 

       } 


      } 

      public void close() throws IOException { 
       mos.close(); 

       } 
} 

İş Conf aşağıdaki gibidir:

org.apache.hadoop.mapred.Child.main (Child.java:249) en
org.apache.hadoop.ipc.RemoteException: 
org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException: failed to create file 
/home/users/mlakshm/alop176/data-r-00001 for 
DFSClient_attempt_201208010142_0043_r_000001_1 on client 10.0.1.100, because this file 
is already being created by DFSClient_attempt_201208010142_0043_r_000001_0 on  10.0.1.130 at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:1406) 
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:1246) 
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1188) 
    at org.apache.hadoop.hdfs.server.namenode.NameNode.create(NameNode.java:628) 
    at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:616) 
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:563) 
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1388) 
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1384) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at javax.security.auth.Subject.doAs(Subject.java:416) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121) 
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1382) 

    at org.apache.hadoop.ipc.Client.call(Client.java:1070) 
    at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225) 
    at $Proxy2.create(Unknown Source) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:616) 
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82) 
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59) 
    at $Proxy2.create(Unknown Source) 
    at org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.<init>(DFSClient.java:3248) 
    at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:713) 
    at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:182) 
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:555) 
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:455) 
    at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:118) 
    at com.a.MultipleOutputs$InternalFileOutputFormat.getRecordWriter(MultipleOutputs.java:565) 
    at com.a.MultipleOutputs.getRecordWriter(MultipleOutputs.java:432) 
    at com.a.MultipleOutputs.getCollector(MultipleOutputs.java:518) 
    at com.a.MultipleOutputs.getCollector(MultipleOutputs.java:482) 
    at com.a.ReduceThree1.reduce(ReduceThree1.java:56) 
    at com.a.ReduceThree1.reduce(ReduceThree1.java:1) 
    at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:519) 
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:420) 
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at javax.security.auth.Subject.doAs(Subject.java:416) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121) 


azaltmak sınıftır aşağıdaki gibidir:

Yapılandırma config1 = yeni Yapılandırma(); En büyük olasılıkla üzerinde spekülatif yürütme var

  JobConf conf1 = new JobConf(config1, DJob.class); 

      conf1.setJobName("DJob1"); 
      conf1.setOutputKeyClass(Text.class); 
      conf1.setOutputValueClass(Text.class); 
     // conf.setMapOutputValueClass(Text.class); 
     // conf.setMapOutputKeyClass(Text.class); 
     // conf.setNumMapTasks(20); 
      conf.setNumReduceTasks(10); 
      conf1.setMapperClass(MapThree1.class); 
     // conf.setCombinerClass(Combiner.class); 
      conf1.setReducerClass(ReduceThree1.class); 
      conf1.setPartitionerClass(CustomPartitioner.class); 

      conf1.setInputFormat(TextInputFormat.class); 
      conf1.setOutputFormat(TextOutputFormat.class); 
     // mos = new MultipleOutputs(conf1); 
      MultipleOutputs.addNamedOutput(conf1, "/home/users/mlakshm/alop176/data", TextOutputFormat.class, LongWritable.class, Text.class); 
      MultipleOutputs.addNamedOutput(conf1, "/home/users/mlakshm/alop177/datepairs", TextOutputFormat.class, LongWritable.class, Text.class); 


      FileInputFormat.setInputPaths(conf1, new Path(other_args.get(2))); 
      FileOutputFormat.setOutputPath(conf1, new Path(other_args.get(3))); 

     JobClient.runJob(conf1); 

cevap

3

ve görevi 1 azaltmak için iki farklı denemeler yolu /home/users/mlakshm/alop176/data-r-00001 yazmaya çalışıyoruz. Bu, büyük bir ihtimalle, ikinci bir girişimi spekülatif olarak yürütürken, daha küçük görevler için başarılı olur.

MultipleOutputs uygulamanızın özel olduğunu (com.a.MultipleOutputs) görüyorum, tüm HDFS verilerini görevlerin çalışma dizinine yazmalı ve çıktı işlendikten sonra OutputComitter'ın son çıktı dizinine taşımasına izin vermelisiniz. Eğer yapabiliyorsan, kodu yapıştır ve bir göz atabiliriz.

+0

Merhaba Chris, kodu benim cevabım olarak gönderdim. Bunu görebiliyor ve bana yardım edebiliyor musunuz? Teşekkürler! –

+0

@MahalakshmiLakshminarayanan bu sorunu çözdünüz mü? Kodda veya yapılandırma değişikliklerinde değişiklik yaptınız mı? Bana bildirin. Şimdiden teşekkürler. – Shash

+0

@shash Bunu, işteki koduma ekledim: jobConf.setSpeculativeExecution (false); –