JMS ve ActiveMQ ile çalışıyorum. Her şey harikalar yaratıyor. İlkbahar kullanmıyorum ve ne yapabilirim?JMS MessageListener'den geri alma sinyali İletme
javax.jms.MessageListener
arabiriminde yalnızca bir yöntem vardır, onMessage
. Bir uygulama içinden, bir istisna atılacak bir şans var. Aslında bir istisna atılırsa, mesajın düzgün bir şekilde işlenmediğini ve tekrar denenmesi gerektiğini söylüyorum. Yani, biraz beklemek için ActiveMQ'ye ihtiyacım var ve sonra tekrar deneyin. JMS işlemini geri almak için atılmış istisna ihtiyacım var.
Böyle bir davranışı nasıl gerçekleştirebilirim?
Belki de ActiveMQ'de bir yapılandırma bulunamadı.
Ya da ... belki uzakta tüketicilere MessageListener
s kayıt yapmak ve benzeri bir bir döngü içinde, iletileri kendimi tüketmek olabilir: yerine dinleyici kaydetme, iplik birkaç
while (true) {
// ... some administrative stuff like ...
session = connection.createSesstion(true, SESSION_TRANSACTED)
try {
Message m = receiver.receive(queue, 1000L);
theMessageListener.onMessage(m);
session.commit();
} catch (Exception e) {
session.rollback();
Thread.sleep(someTimeDefinedSomewhereElse);
}
// ... some more administrative stuff
}
.
Veya ... Bunu yapmak için bir şekilde/AOP/MessageListener
s'yi deşifre edebilirim.
Hangi rotayı izlerdiniz ve neden?
not: MessageListener
s kodunun üzerinde tam denetime sahip değilim.
DÜZENLEME kavram ispatı için bir test:
Sen Session.CLIENT_ACKNOWLEDGE için onay modunu ayarlamak gerekir@Test
@Ignore("Interactive test, just a proof of concept")
public void transaccionConListener() throws Exception {
final AtomicInteger atomicInteger = new AtomicInteger(0);
BrokerService brokerService = new BrokerService();
String bindAddress = "vm://localhost";
brokerService.addConnector(bindAddress);
brokerService.setPersistenceAdapter(new MemoryPersistenceAdapter());
brokerService.setUseJmx(false);
brokerService.start();
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(bindAddress);
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setInitialRedeliveryDelay(500);
redeliveryPolicy.setBackOffMultiplier(2);
redeliveryPolicy.setUseExponentialBackOff(true);
redeliveryPolicy.setMaximumRedeliveries(2);
activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
activeMQConnectionFactory.setUseRetroactiveConsumer(true);
activeMQConnectionFactory.setClientIDPrefix("ID");
PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory);
pooledConnectionFactory.start();
Connection connection = pooledConnectionFactory.createConnection();
Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
Queue helloQueue = session.createQueue("Hello");
MessageConsumer consumer = session.createConsumer(helloQueue);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
switch (atomicInteger.getAndIncrement()) {
case 0:
System.out.println("OK, first message received " + textMessage.getText());
message.acknowledge();
break;
case 1:
System.out.println("NOPE, second must be retried " + textMessage.getText());
throw new RuntimeException("I failed, aaaaah");
case 2:
System.out.println("OK, second message received " + textMessage.getText());
message.acknowledge();
}
} catch (JMSException e) {
e.printStackTrace(System.out);
}
}
});
connection.start();
{
// A client sends two messages...
Connection connection1 = pooledConnectionFactory.createConnection();
Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection1.start();
MessageProducer producer = session1.createProducer(helloQueue);
producer.send(session1.createTextMessage("Hello World 1"));
producer.send(session1.createTextMessage("Hello World 2"));
producer.close();
session1.close();
connection1.stop();
connection1.close();
}
JOptionPane.showInputDialog("I will wait, you watch the log...");
consumer.close();
session.close();
connection.stop();
connection.close();
pooledConnectionFactory.stop();
brokerService.stop();
assertEquals(3, atomicInteger.get());
}
Cevabınız için çok teşekkürler whaley ve @Ammar. İkiniz de beni doğru yola soktuğunuz için çekiyorum. Ama henüz doğru bir cevap seçmemek. Çünkü daha fazla test gerekli. –