2015-02-26 17 views
10

Trident topolojimi için Tackent topolojisini ElasticSearch'e depolamak için bir IBackingMap'i uyguluyorum (GitHub'da zaten var olan Trident/ElasticSearch entegrasyonu için birkaç uygulama olduğunu biliyorum, ancak benim görevime daha iyi uyan özel bir uygulamayı uygulamaya karar verdim). Bir Storm Trident topolojisi içinde bir IBackingMap uygulaması tarafından açılan bir veritabanı bağlantısı nasıl kapatılır?

Yani benim uygulaması klasik bir fabrika ile biridir: Girilen params olarak konak/port/küme adını alır Görüyorsunuz

public class ElasticSearchBackingMap implements IBackingMap<OpaqueValue<BatchAggregationResult>> { 

    // omitting here some other cool stuff... 
    private final Client client; 

    public static StateFactory getFactoryFor(final String host, final int port, final String clusterName) { 

     return new StateFactory() { 

      @Override 
      public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { 

       ElasticSearchBackingMap esbm = new ElasticSearchBackingMap(host, port, clusterName); 
       CachedMap cm = new CachedMap(esbm, LOCAL_CACHE_SIZE); 
       MapState ms = OpaqueMap.build(cm); 
       return new SnapshottableMap(ms, new Values(GLOBAL_KEY)); 
      } 
     }; 
    } 

    public ElasticSearchBackingMap(String host, int port, String clusterName) { 

     Settings settings = ImmutableSettings.settingsBuilder() 
       .put("cluster.name", clusterName).build(); 

     // TODO add a possibility to close the client 
     client = new TransportClient(settings) 
       .addTransportAddress(new InetSocketTransportAddress(host, port)); 
    } 

    // the actual implementation is left out 
} 

ve sınıfın bir üyesi olarak bir ElasticSearch istemcisi yaratır ANCAK HİÇ CLOSES MÜŞTERİ. Daha sonra oldukça tanıdık bir şekilde topoloji içinden kullanılan

:

tridentTopology.newStream("spout", spout) 
      // ...some processing steps here... 
      .groupBy(aggregationFields) 
      .persistentAggregate(
        ElasticSearchBackingMap.getFactoryFor(
          ElasticSearchConfig.ES_HOST, 
          ElasticSearchConfig.ES_PORT, 
          ElasticSearchConfig.ES_CLUSTER_NAME 
        ), 
        new Fields(FieldNames.OUTCOME), 
        new BatchAggregator(), 
        new Fields(FieldNames.AGGREGATED)); 

Bu topoloji içine sarılır bazı kamu bir kavanozda paketlenmiş ve yürütülmesi için Fırtına gönderilen, static void main.

Sorun şu ki, ElasticSearch bağlantısının kapatılması konusunda endişelenmeli mi yoksa Storm'un kendi işi mi? Eğer Fırtına tarafından yapılmazsa, topolojinin yaşam döngüsünde ne zaman ve nasıl yapmalıyım?

Şimdiden teşekkürler!

+0

TransportClient, her bir fırtına çalışanı için bir singleton olmalıdır. [Kullanıcı listesi] (http://elasticsearch-users.115913.n3.nabble.com/What-is-your-best-practice-to-access-a-cluster-by-a-Java-client-td4015311. html). Aslında, fırtına topolojisinin asla durmaması için java istemcisini kapatmanız gerekmediğini düşünüyorum. – fhussonnois

+1

A kesmek olabilir: her bir işçi üzerinde bir tekil oluşturmak ör. ilk durumu oluştururken ve toplayıcınızın temizleme yönteminde bu tekil adımı kapattığımda, kodunuzda 'BatchAggregator'ı görüyorum. Ama aynı zamanda daha iyi bir çözüm görmek istiyorum ... – dedek

+0

Ayrıca bu özellik isteğine de bakınız: https://issues.apache.org/jira/browse/STORM-49 – dedek

cevap

3

Tamam, kendi sorumu yanıtlama.

Her şeyden önce, Fırtına Jira'sı için önerilerinizi ve biletinizi yeniden kazanmak için @dedek teşekkürler.

Son olarak, bunu yapmanın resmi bir yolu olmadığından, Trident Filtresinin cleanup() yöntemine gitmeye karar verdim. Şimdiye kadar doğrulandıktan aşağıdaki (Fırtına v 0.9.4.): LocalCluster ile

  • temizleme() olsun ETMEZ() kümenin kapatma üzerine
  • temizleme çağrılan topoloji öldürme zaman bu bir trajedi olmamalı denilen, çok büyük olasılıkla bir gerçek küme ile gerçek dağıtımlar için zaten

LocalCluster kullanmaz

  • topoloji işçisi 'backtype.storm.daemon.worker' -f pkill -TERM -u fırtına kullanılarak durdurulduğunda yanı sıra öldürülünce o
  • o almaz çağrılan denilen işçi -9 öldürmek ile öldürülür ya da çöküyor veya eğer - ne yazık ki - bir istisna nedeniyle temizlik etkinliğinin az ya da çok iyi bir garanti verdiği genel olarak

için işçi öldüğünde() denilen olsun için istisna işlemine dikkat edeceğiniz sürece (Trident'imin her birine 'thundercatches' ekleme eğilimindeyim) zaten primitifler).

Kodum: kapanış bağlantıları için bir gün kancalar API bir parçası haline eğer

public class CloseFilter implements Filter { 

    private static final Logger LOG = LoggerFactory.getLogger(CloseFilter.class); 

    private final Closeable[] closeables; 

    public CloseFilter(Closeable... closeables) { 
     this.closeables = closeables; 
    } 

    @Override 
    public boolean isKeep(TridentTuple tuple) { 
     return true; 
    } 

    @Override 
    public void prepare(Map conf, TridentOperationContext context) { 

    } 

    @Override 
    public void cleanup() { 
     for (Closeable c : closeables) { 
      try { 
       c.close(); 
      } catch (Exception e) { 
       LOG.warn("Failed to close an instance of {}", c.getClass(), e); 
      } 
     } 
    } 
} 

Ancak iyi olurdu.

İlgili konular