2012-06-16 14 views
8

Çıktısının HBase'ye yazılması gereken bir haddop işi var. Redüktöre gerçekten ihtiyacım yok, eklemek istediğim satırın tipi Mapper'de belirlendi.Hadoop - Doğrudan Haritalayıcıdan HBase'ye Yazma

Bunu başarmak için TableOutputFormat'ı nasıl kullanabilirim? Benim varsayım gördüğüm tüm örneklerden, indirgeyicinin Kur'u yaratan biri olduğu ve TableMapper'ın sadece HBase tablosundan okumak olduğu varsayılmaktadır.

Benim durumumda giriş HDFS'dir, çıktı belirli bir tabloya koyulur, TabloMapReduceUtil'de bu konuda bana yardımcı olabilecek hiçbir şey bulamıyorum.

Bu konuda bana yardımcı olabilecek bir örnek var mı?

BTW, sadece mapper çıkışını çifti yapmak gerekir

+0

Kaç kayıt eklemeye çalışıyorsunuz? – Gevorg

cevap

1

yeni Hadoop API kullanıyorum. OutputFormat yalnızca çıkış anahtarı değerlerinin nasıl devam edeceğini belirtir. Anahtar değerlerin redüktörden geldiği anlamına gelmez.

... extends TableMapper<ImmutableBytesWritable, Put>() { 
    ... 
    ... 
    context.write(<some key>, <some Put or Delete object>); 
} 
7

Bu dosyadan okuma örneğidir ve Hbase içine tüm satırları koymak: Sen eşleştiricisindeki böyle bir şey yapmak gerekir. Bu örnek "Hbase: Kesin kılavuz" dan ve depoda bulabilirsiniz. sadece bilgisayarınızda repo klonlamak almak için:

git clone git://github.com/larsgeorge/hbase-book.git 

ayrıca koduyla ilgili tüm açıklamaları bulabilirsiniz bu kitapta. Ama sizin için anlaşılmaz bir şey varsa, sormaya çekinmeyin.

` public class ImportFromFile { 
    public static final String NAME = "ImportFromFile"; 
    public enum Counters { LINES } 

    static class ImportMapper 
    extends Mapper<LongWritable, Text, ImmutableBytesWritable, Writable> { 
     private byte[] family = null; 
     private byte[] qualifier = null; 

     @Override 
     protected void setup(Context context) 
     throws IOException, InterruptedException { 
     String column = context.getConfiguration().get("conf.column"); 
     byte[][] colkey = KeyValue.parseColumn(Bytes.toBytes(column)); 
     family = colkey[0]; 
     if (colkey.length > 1) { 
      qualifier = colkey[1]; 
     } 
     } 

     @Override 
     public void map(LongWritable offset, Text line, Context context) 
     throws IOException { 
      try { 
      String lineString = line.toString(); 
      byte[] rowkey = DigestUtils.md5(lineString); 
      Put put = new Put(rowkey); 
      put.add(family, qualifier, Bytes.toBytes(lineString)); 
      context.write(new ImmutableBytesWritable(rowkey), put); 
      context.getCounter(Counters.LINES).increment(1); 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 
     } 
    } 

    private static CommandLine parseArgs(String[] args) throws ParseException { 
     Options options = new Options(); 
     Option o = new Option("t", "table", true, 
     "table to import into (must exist)"); 
     o.setArgName("table-name"); 
     o.setRequired(true); 
     options.addOption(o); 
     o = new Option("c", "column", true, 
     "column to store row data into (must exist)"); 
     o.setArgName("family:qualifier"); 
     o.setRequired(true); 
     options.addOption(o); 
     o = new Option("i", "input", true, 
     "the directory or file to read from"); 
     o.setArgName("path-in-HDFS"); 
     o.setRequired(true); 
     options.addOption(o); 
     options.addOption("d", "debug", false, "switch on DEBUG log level"); 
     CommandLineParser parser = new PosixParser(); 
     CommandLine cmd = null; 
     try { 
     cmd = parser.parse(options, args); 
     } catch (Exception e) { 
     System.err.println("ERROR: " + e.getMessage() + "\n"); 
     HelpFormatter formatter = new HelpFormatter(); 
     formatter.printHelp(NAME + " ", options, true); 
     System.exit(-1); 
     } 
     return cmd; 
    } 

    public static void main(String[] args) throws Exception { 
     Configuration conf = HBaseConfiguration.create(); 
     String[] otherArgs = 
     new GenericOptionsParser(conf, args).getRemainingArgs(); 
     CommandLine cmd = parseArgs(otherArgs); 
     String table = cmd.getOptionValue("t"); 
     String input = cmd.getOptionValue("i"); 
     String column = cmd.getOptionValue("c"); 
     conf.set("conf.column", column); 
     Job job = new Job(conf, "Import from file " + input + " into table " + table); 

      job.setJarByClass(ImportFromFile.class); 
     job.setMapperClass(ImportMapper.class); 
     job.setOutputFormatClass(TableOutputFormat.class); 
     job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table); 
     job.setOutputKeyClass(ImmutableBytesWritable.class); 
     job.setOutputValueClass(Writable.class); 
     job.setNumReduceTasks(0); 
     FileInputFormat.addInputPath(job, new Path(input)); 
     System.exit(job.waitForCompletion(true) ? 0 : 1); 
    } 
    }` 
+1

Aşağıdakileri alıyorum: 'Kapsayıcıdan özel durum: org.apache.hadoop.util.Shell $ ExitCodeException' Yukarıdaki kodla bu soruna da girdiniz mi? Hadoop2.4 ve Hbase0.94.18 kullanıyorum – Gevorg