2016-03-22 18 views
10

aşağıdaki gibi, tarif edildiği gibi (diğer iki karşı birleşmek üzere bir) 3 basamaklı boruları sahipözel MAP_SIDE sağlanması akmakta olan mantık katılması, Build sadece

  • LHSPipe - (büyük boyutu)

enter image description here

  • RHSPipes - (küçük boyutu not muhtemelen uygun olduğunu olabilir aşağıdaki gibi ry)

enter image description here

Psuedocode Bu örnek, iki

katılır gerektirir IF F1DecidingFactor = EVET sonra (LHSPipe.F1Input = RHS TARAFINDAN RHS Arama # 1 LHSPipe Üyelik Lookup # 1.Join # F1) ve arama sonucunu ayarlayın (SET LHSPipe.F1Output = Sonuç # F1) Aksi takdirde SET LHSPipe.F1Output = N/A

Aynı mantık, F2 hesaplaması için de geçerlidir.

beklenen çıkışı

,

enter image description here

Bu senaryo-ELSE katılın edip etmeyeceğine karar verir operasyonu Üyelik Custom ile gitmek için beni zorladı.

Yukarıdaki senaryoyu düşünürsek, MAP-SIDE birleşimine katılmak istiyorum (RHSPipe'ı MAP görev düğümünün belleğinde tutmak), aşağıdaki olası çözümleri düşünmekteydim, her birinin kendi artıları ve eksileri var. Bunlara dair önerileriniz var.

Seçenek # 1:

CoGroup - Biz özel inşa özel katılmak (çalışma) izledi BufferJoiner ile CoGroup kullanarak mantığı katılmak, ama bu MAP-YAN katılmak sağlamak olmaz edebilirsiniz.

Seçenek # 2:

HashJoin - O MAP-YAN katılmak sağlar, ama bildiğim kadarıyla bu kullanılarak inşa edilemez katılmak özel gördüğünüz gibi.

Lütfen anlayışımı düzeltin ve bu gereksinim üzerinde çalışmanız için görüşlerinizi belirtin.

Şimdiden teşekkürler.

+0

Örnek kodunuzu sağlayabilir misiniz ve ayrıca özel katılmada ne yapmak istiyorsunuz? – Ambrish

+0

Örnek giriş verileri ve beklenen çıktı da yardımcı olacaktır. – Ambrish

+0

Verilerinizi alt kümelerde bölümlemeyi düşündünüz mü? – kpie

cevap

1

Bu sorunu çözmenin en iyi yolu (düşünebildiğim), küçük veri kümenizi değiştirmek. Daha küçük veri kümesine yeni bir alan (F1DecidingFactor) ekleyebilirsiniz.F1Result değeri isterim edebilirsiniz:

Sudo kodu

if F1DecidingFactor == "Yes" then 
    F1Result = ACTUAL_VALUE 
else 
    F1Result = "N/A" 

Sonucu Tablo Sen de basamaklı yoluyla yukarıdaki yapabilirsiniz

|F1#Join|F1#Result|F1#DecidingFactor| 
| Yes|  0|    True| 
| Yes|  1|   False| 
|  No|  0|    N/A| 
|  No|  1|    N/A| 

.

Bundan sonra, harita tarafı birleştirmeyi yapabilirsiniz.

Daha küçük veri kümesini değiştiremiyorsanız, sorunu çözmek için 2 seçeneğim var. 1

Seçeneği (yani F1DecidingFactor_RHS = Yes) belirleyici unsur size eşdeğer olan küçük borulara yeni alanlar ekleyin. Ardından, katılma kriterinize ekleyin. Katılımınız tamamlandığında, yalnızca bu koşulun eşleştiği satırlara sahip olacaksınız. Aksi takdirde boş/boş olacaktır. Örnek kod:

Ana Sınıfı

import cascading.operation.Insert; 
import cascading.pipe.Each; 
import cascading.pipe.HashJoin; 
import cascading.pipe.Pipe; 
import cascading.pipe.assembly.Discard; 
import cascading.pipe.joiner.LeftJoin; 
import cascading.tuple.Fields; 

public class StackHashJoinTestOption2 { 
    public StackHashJoinTestOption2() { 
     Fields f1Input = new Fields("F1Input"); 
     Fields f2Input = new Fields("F2Input"); 
     Fields f1Join = new Fields("F1Join"); 
     Fields f2Join = new Fields("F2Join"); 

     Fields f1DecidingFactor = new Fields("F1DecidingFactor"); 
     Fields f2DecidingFactor = new Fields("F2DecidingFactor"); 
     Fields f1DecidingFactorRhs = new Fields("F1DecidingFactor_RHS"); 
     Fields f2DecidingFactorRhs = new Fields("F2DecidingFactor_RHS"); 

     Fields lhsJoinerOne = f1DecidingFactor.append(f1Input); 
     Fields lhsJoinerTwo = f2DecidingFactor.append(f2Input); 

     Fields rhsJoinerOne = f1DecidingFactorRhs.append(f1Join); 
     Fields rhsJoinerTwo = f2DecidingFactorRhs.append(f2Join); 

     Fields functionFields = new Fields("F1DecidingFactor", "F1Output", "F2DecidingFactor", "F2Output"); 

     // Large Pipe fields : 
     // F1DecidingFactor F1Input F2DecidingFactor F2Input 
     Pipe largePipe = new Pipe("large-pipe"); 

     // Small Pipe 1 Fields : 
     // F1Join F1Result 
     Pipe rhsOne = new Pipe("small-pipe-1"); 

     // New field to small pipe. Expected Fields: 
     // F1Join F1Result F1DecidingFactor_RHS 
     rhsOne = new Each(rhsOne, new Insert(f1DecidingFactorRhs, "Yes"), Fields.ALL); 

     // Small Pipe 2 Fields : 
     // F2Join F2Result 
     Pipe rhsTwo = new Pipe("small-pipe-2"); 

     // New field to small pipe. Expected Fields: 
     // F2Join F2Result F2DecidingFactor_RHS 
     rhsTwo = new Each(rhsTwo, new Insert(f1DecidingFactorRhs, "Yes"), Fields.ALL); 

     // Joining first small pipe. Expected fields after join: 
     // F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result F1DecidingFactor_RHS 
     Pipe resultsOne = new HashJoin(largePipe, lhsJoinerOne, rhsOne, rhsJoinerOne, new LeftJoin()); 

     // Joining second small pipe. Expected fields after join: 
     // F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result F1DecidingFactor_RHS F2Join F2Result F2DecidingFactor_RHS 
     Pipe resultsTwo = new HashJoin(resultsOne, lhsJoinerTwo, rhsTwo, rhsJoinerTwo, new LeftJoin()); 

     Pipe result = new Each(resultsTwo, functionFields, new TestFunction(), Fields.REPLACE); 

     result = new Discard(result, f1DecidingFactorRhs); 
     result = new Discard(result, f2DecidingFactorRhs); 

     // result Pipe should have expected result 
    } 
} 

Seçenek 2

yerine/boş null varsayılan değere sahip olmak istiyorsanız, o zaman varsayılan HashJoin ilk yapmamızı önerirsin Müteakipler, tuplleri uygun değerlerle güncellemek için bir işlev izledi. Bir şey gibi:

Ana Sınıfı

import cascading.pipe.Each; 
import cascading.pipe.HashJoin; 
import cascading.pipe.Pipe; 
import cascading.pipe.joiner.LeftJoin; 
import cascading.tuple.Fields; 

public class StackHashJoinTest { 
    public StackHashJoinTest() { 
     Fields f1Input = new Fields("F1Input"); 
     Fields f2Input = new Fields("F2Input"); 
     Fields f1Join = new Fields("F1Join"); 
     Fields f2Join = new Fields("F2Join"); 

     Fields functionFields = new Fields("F1DecidingFactor", "F1Output", "F2DecidingFactor", "F2Output"); 

     // Large Pipe fields : 
     // F1DecidingFactor F1Input F2DecidingFactor F2Input 
     Pipe largePipe = new Pipe("large-pipe"); 

     // Small Pipe 1 Fields : 
     // F1Join F1Result 
     Pipe rhsOne = new Pipe("small-pipe-1"); 

     // Small Pipe 2 Fields : 
     // F2Join F2Result 
     Pipe rhsTwo = new Pipe("small-pipe-2"); 

     // Joining first small pipe. 
     // Expected fields after join: 
     // F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result 
     Pipe resultsOne = new HashJoin(largePipe, f1Input, rhsOne, f1Join, new LeftJoin()); 

     // Joining second small pipe. 
     // Expected fields after join: 
     // F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result F2Join F2Result 
     Pipe resultsTwo = new HashJoin(resultsOne, f2Input, rhsTwo, f2Join, new LeftJoin()); 

     Pipe result = new Each(resultsTwo, functionFields, new TestFunction(), Fields.REPLACE); 

     // result Pipe should have expected result 
    } 
} 

Güncelleme Fonksiyonu

import cascading.flow.FlowProcess; 
import cascading.operation.BaseOperation; 
import cascading.operation.Function; 
import cascading.operation.FunctionCall; 
import cascading.tuple.Fields; 
import cascading.tuple.TupleEntry; 

public class TestFunction extends BaseOperation<Void> implements Function<Void> { 

    private static final long serialVersionUID = 1L; 

    private static final String DECIDING_FACTOR = "No"; 
    private static final String DEFAULT_VALUE = "N/A"; 

    // Expected Fields: "F1DecidingFactor", "F1Output", "F2DecidingFactor", "F2Output" 
    public TestFunction() { 
     super(Fields.ARGS); 
    } 

    @Override 
    public void operate(@SuppressWarnings("rawtypes") FlowProcess process, FunctionCall<Void> call) { 
     TupleEntry arguments = call.getArguments(); 

     TupleEntry result = new TupleEntry(arguments); 

     if (result.getString("F1DecidingFactor").equalsIgnoreCase(DECIDING_FACTOR)) { 
      result.setString("F1Output", DEFAULT_VALUE); 
     } 

     if (result.getString("F2DecidingFactor").equalsIgnoreCase(DECIDING_FACTOR)) { 
      result.setString("F2Output", DEFAULT_VALUE); 
     } 

     call.getOutputCollector().add(result); 
    } 

} 

Kaynaklar

Bu

  • Custom Function
  • sorunu çözmek gerekir. Bu yardımcı olursa bana bildirin.