2011-08-27 12 views
12

JMS ve ActiveMQ ile çalışıyorum. Her şey harikalar yaratıyor. İlkbahar kullanmıyorum ve ne yapabilirim?JMS MessageListener'den geri alma sinyali İletme

javax.jms.MessageListener arabiriminde yalnızca bir yöntem vardır, onMessage. Bir uygulama içinden, bir istisna atılacak bir şans var. Aslında bir istisna atılırsa, mesajın düzgün bir şekilde işlenmediğini ve tekrar denenmesi gerektiğini söylüyorum. Yani, biraz beklemek için ActiveMQ'ye ihtiyacım var ve sonra tekrar deneyin. JMS işlemini geri almak için atılmış istisna ihtiyacım var.

Böyle bir davranışı nasıl gerçekleştirebilirim?

Belki de ActiveMQ'de bir yapılandırma bulunamadı.

Ya da ... belki uzakta tüketicilere MessageListener s kayıt yapmak ve benzeri bir bir döngü içinde, iletileri kendimi tüketmek olabilir: yerine dinleyici kaydetme, iplik birkaç

while (true) { 
    // ... some administrative stuff like ... 
    session = connection.createSesstion(true, SESSION_TRANSACTED) 
    try { 
     Message m = receiver.receive(queue, 1000L); 
     theMessageListener.onMessage(m); 
     session.commit(); 
    } catch (Exception e) { 
     session.rollback(); 
     Thread.sleep(someTimeDefinedSomewhereElse); 
    } 
    // ... some more administrative stuff 
} 

.

Veya ... Bunu yapmak için bir şekilde/AOP/MessageListener s'yi deşifre edebilirim.

Hangi rotayı izlerdiniz ve neden?

not: MessageListener s kodunun üzerinde tam denetime sahip değilim.

DÜZENLEME kavram ispatı için bir test:

Sen Session.CLIENT_ACKNOWLEDGE için onay modunu ayarlamak gerekir
@Test 
@Ignore("Interactive test, just a proof of concept") 
public void transaccionConListener() throws Exception { 
    final AtomicInteger atomicInteger = new AtomicInteger(0); 

    BrokerService brokerService = new BrokerService(); 

    String bindAddress = "vm://localhost"; 
    brokerService.addConnector(bindAddress); 
    brokerService.setPersistenceAdapter(new MemoryPersistenceAdapter()); 
    brokerService.setUseJmx(false); 
    brokerService.start(); 

    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(bindAddress); 
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); 
    redeliveryPolicy.setInitialRedeliveryDelay(500); 
    redeliveryPolicy.setBackOffMultiplier(2); 
    redeliveryPolicy.setUseExponentialBackOff(true); 
    redeliveryPolicy.setMaximumRedeliveries(2); 

    activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy); 
    activeMQConnectionFactory.setUseRetroactiveConsumer(true); 
    activeMQConnectionFactory.setClientIDPrefix("ID"); 
    PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory); 

    pooledConnectionFactory.start(); 

    Connection connection = pooledConnectionFactory.createConnection(); 
    Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE); 
    Queue helloQueue = session.createQueue("Hello"); 
    MessageConsumer consumer = session.createConsumer(helloQueue); 
    consumer.setMessageListener(new MessageListener() { 

     @Override 
     public void onMessage(Message message) { 
      TextMessage textMessage = (TextMessage) message; 
      try { 
       switch (atomicInteger.getAndIncrement()) { 
        case 0: 
         System.out.println("OK, first message received " + textMessage.getText()); 
         message.acknowledge(); 
         break; 
        case 1: 
         System.out.println("NOPE, second must be retried " + textMessage.getText()); 
         throw new RuntimeException("I failed, aaaaah"); 
        case 2: 
         System.out.println("OK, second message received " + textMessage.getText()); 
         message.acknowledge(); 
       } 
      } catch (JMSException e) { 
       e.printStackTrace(System.out); 
      } 
     } 
    }); 
    connection.start(); 

    { 
     // A client sends two messages... 
     Connection connection1 = pooledConnectionFactory.createConnection(); 
     Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
     connection1.start(); 

     MessageProducer producer = session1.createProducer(helloQueue); 
     producer.send(session1.createTextMessage("Hello World 1")); 
     producer.send(session1.createTextMessage("Hello World 2")); 

     producer.close(); 
     session1.close(); 
     connection1.stop(); 
     connection1.close(); 
    } 
    JOptionPane.showInputDialog("I will wait, you watch the log..."); 

    consumer.close(); 
    session.close(); 
    connection.stop(); 
    connection.close(); 
    pooledConnectionFactory.stop(); 

    brokerService.stop(); 

    assertEquals(3, atomicInteger.get()); 
} 
+0

Cevabınız için çok teşekkürler whaley ve @Ammar. İkiniz de beni doğru yola soktuğunuz için çekiyorum. Ama henüz doğru bir cevap seçmemek. Çünkü daha fazla test gerekli. –

cevap

10

, o zaman kurulum bir RedeliveryPolicy on your Connection/ConnectionFactory gerekir. This page on ActiveMQ's website da yapmanız gerekebilecek bazı iyi bilgileri içerir.

sen Bahar kullanarak olmadığından, kurulum (yukarıdaki bağlantılardan birini alınan) aşağıdaki koda benzer bir şey ile bir RedeliveryPolicy yapabilirsiniz:

RedeliveryPolicy policy = connection.getRedeliveryPolicy(); 
policy.setInitialRedeliveryDelay(500); 
policy.setBackOffMultiplier(2); 
policy.setUseExponentialBackOff(true); 
policy.setMaximumRedeliveries(2); 

Edit alarak senin Kod snippet'i cevaba ekledi, aşağıdaki işlemlerin nasıl çalıştığını gösterir. Bu kodu Session.rollback() yöntemiyle yorumlayın ve SESION_TRANSACTED ve Session'ı kullanarak göreceksiniz.tamamlama/geri alma beklendiği gibi çalışır:

@Test 
public void test() throws Exception { 
    final AtomicInteger atomicInteger = new AtomicInteger(0); 

    BrokerService brokerService = new BrokerService(); 

    String bindAddress = "vm://localhost"; 
    brokerService.addConnector(bindAddress); 
    brokerService.setPersistenceAdapter(new MemoryPersistenceAdapter()); 
    brokerService.setUseJmx(false); 
    brokerService.start(); 

    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(bindAddress); 
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); 
    redeliveryPolicy.setInitialRedeliveryDelay(500); 
    redeliveryPolicy.setBackOffMultiplier(2); 
    redeliveryPolicy.setUseExponentialBackOff(true); 
    redeliveryPolicy.setMaximumRedeliveries(2); 

    activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy); 
    activeMQConnectionFactory.setUseRetroactiveConsumer(true); 
    activeMQConnectionFactory.setClientIDPrefix("ID"); 

    PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory); 

    pooledConnectionFactory.start(); 

    Connection connection = pooledConnectionFactory.createConnection(); 
    final Session session = connection.createSession(true, Session.SESSION_TRANSACTED); 
    Queue helloQueue = session.createQueue("Hello"); 
    MessageConsumer consumer = session.createConsumer(helloQueue); 
    consumer.setMessageListener(new MessageListener() { 

     public void onMessage(Message message) { 
      TextMessage textMessage = (TextMessage) message; 
      try { 
       switch (atomicInteger.getAndIncrement()) { 
        case 0: 
         System.out.println("OK, first message received " + textMessage.getText()); 
         session.commit(); 
         break; 
        case 1: 
         System.out.println("NOPE, second must be retried " + textMessage.getText()); 
         session.rollback(); 
         throw new RuntimeException("I failed, aaaaah"); 
        case 2: 
         System.out.println("OK, second message received " + textMessage.getText()); 
         session.commit(); 
       } 
      } catch (JMSException e) { 
       e.printStackTrace(System.out); 
      } 
     } 
    }); 
    connection.start(); 

    { 
     // A client sends two messages... 
     Connection connection1 = pooledConnectionFactory.createConnection(); 
     Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
     connection1.start(); 

     MessageProducer producer = session1.createProducer(helloQueue); 
     producer.send(session1.createTextMessage("Hello World 1")); 
     producer.send(session1.createTextMessage("Hello World 2")); 

     producer.close(); 
     session1.close(); 
     connection1.stop(); 
     connection1.close(); 
    } 
    JOptionPane.showInputDialog("I will wait, you watch the log..."); 

    consumer.close(); 
    session.close(); 
    connection.stop(); 
    connection.close(); 
    pooledConnectionFactory.stop(); 

    assertEquals(3, atomicInteger.get()); 
} 

}

+0

Bu işe yaramadı. Ama beni doğru yöne doğrultdu. DUPS_OK_ACKNOWLEDGE'den ayrılacağım, çünkü en az çalışmam gerektiği gibi görünüyor. –

+0

Kodunuzun tamamını yapıştırmanız gerekir, çünkü Oturumunuz ile doğru bir şey yapmıyorsunuz. DUPS_OK_ACKNOWLEDGE sadece müşteri onayı tembel olduğundan ve komisyoncu, müşteri nihayetinde ack yapana kadar mesajları yeniden göndereceğinden beri çalışıyor görünmektedir. – whaley

+0

Bir kavram kanıtı yapıştırdım. Sadece DUPS_OK_ACKNOWLEDGE ile çalışabilirim ve message.acknowledgement bir fark yaratmıyor gibi görünebilir. –

2

, istemci, iletinin kabul yöntemini çağırarak bir tüketilen mesajı kabul eder.

QueueSession session = connection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);

Sonra iletiyi işledikten sonra bu mesajı kaldırmak için Message.acknowledge() yöntemini çağırmanız gerekir için. Eğer onay modu olarak SESSION_TRANSACTED kullanmak istiyorsanız

Message message = ...; 
// Processing message 

message.acknowledge(); 
+0

Çalışmıyor. _message.acknowledge() _ hiç çağrılmasa bile _onMessage_ hala bir kez çağrılır. –

+0

Onaylama modunu düzgün ayarladınız mı? Session.CLIENT_ACKNOWLEDGE için ayarlanmış olmalıdır! – Ammar

+0

Ancak (false, Session.DUPS_OK_ACKNOWLEDGE) ile çalışır ... message.acknowledge() hile yapmak gibi görünmüyor. –

0

oturumunuz işlem ise, "acknowledgeMode" Sadece seans işlem bırakıp session.rollback ve session.commit kullanın anyways..So göz ardı edilir işleminizi yapmak veya geri almak için.

+1

(Benim) sorunumun, oturumun MessageListener.onMessage (Message) içinde erişilebilir olmaması olduğunu düşünüyorum. –