2015-07-20 24 views
8

amazon SQS queue'den mesajlar alıyorum. Kuyrukta binlerce mesajım var. Uygulamayı başlattığımda (Java ile bahar çerçevesinde yazılmış) Kuyruktaki mesajları sorgulamaya ve 500 mesaj aldıktan sonra durur. Uygulamayı tekrar başlatırsam başka 500 mesaj tüketir.Amazon SQS java sdk, 500 mesaj yolladıktan sonra duruyor.

Kodum gibidir ...

Bağlantı fabrika

@Bean 
public DefaultJmsListenerContainerFactory jmsListenerContainerFactoryActiveMQ() { 
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); 
    factory.setConnectionFactory(connectionFactory()); 
    factory.setConcurrency("3-15"); 
    factory.setReceiveTimeout(3000L); 
    return factory; 
} 

@Bean(name = "sqsJmsListenerContainerFactory") 
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(CustomDestinationResolver resolver) { 
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); 
    factory.setConnectionFactory(sqsConnectionFactory()); 
    factory.setConcurrency("3-15"); 
    factory.setReceiveTimeout(3000L); 
    return factory; 
} 

Dinleyici

@JmsListener(containerFactory = "sqsJmsListenerContainerFactory", destination = "sqs.queue") 
public void onMessage(Message message) { 
    //Processing message 
} 

ben amazon kuyruğunda veya bağlantı fabrika fasulye yapılandırmak için gereken bir şey var mı ? Eklenen iplik dökümü

Uygulama tüketen iken mesajlar iplik çöplükte

"[email protected]" prio=5 tid=0x18 nid=NA runnable 
    java.lang.Thread.State: RUNNABLE 
     at java.net.SocketInputStream.socketRead0(SocketInputStream.java:-1) 
     at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) 
     at java.net.SocketInputStream.read(SocketInputStream.java:170) 
     at java.net.SocketInputStream.read(SocketInputStream.java:141) 
     at sun.security.ssl.InputRecord.readFully(InputRecord.java:465) 
     at sun.security.ssl.InputRecord.read(InputRecord.java:503) 
     at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:961) 
     - locked <0x2230> (a java.lang.Object) 
     at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:918) 
     at sun.security.ssl.AppInputStream.read(AppInputStream.java:105) 
     - locked <0x2231> (a sun.security.ssl.AppInputStream) 
     at org.apache.http.impl.io.AbstractSessionInputBuffer.fillBuffer(AbstractSessionInputBuffer.java:160) 
     at org.apache.http.impl.io.SocketInputBuffer.fillBuffer(SocketInputBuffer.java:84) 
     at org.apache.http.impl.io.AbstractSessionInputBuffer.readLine(AbstractSessionInputBuffer.java:273) 
     at org.apache.http.impl.conn.LoggingSessionInputBuffer.readLine(LoggingSessionInputBuffer.java:116) 
     at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:140) 
     at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:57) 
     at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:260) 
     at org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:283) 
     at org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:251) 
     at org.apache.http.impl.conn.ManagedClientConnectionImpl.receiveResponseHeader(ManagedClientConnectionImpl.java:197) 
     at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:271) 
     at com.amazonaws.http.protocol.SdkHttpRequestExecutor.doReceiveResponse(SdkHttpRequestExecutor.java:66) 
     at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:123) 
     at org.apache.http.impl.client.DefaultRequestDirector.tryExecute(DefaultRequestDirector.java:682) 
     at org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:486) 
     at org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:863) 
     at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82) 
     at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:57) 
     at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:685) 
     at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:460) 
     at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:295) 
     at com.amazonaws.services.sqs.AmazonSQSClient.invoke(AmazonSQSClient.java:2291) 
     at com.amazonaws.services.sqs.AmazonSQSClient.deleteMessage(AmazonSQSClient.java:1340) 
     at com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper.deleteMessage(AmazonSQSMessagingClientWrapper.java:127) 
     at com.amazon.sqs.javamessaging.acknowledge.AutoAcknowledger.acknowledge(AutoAcknowledger.java:33) 
     at com.amazon.sqs.javamessaging.acknowledge.AutoAcknowledger.notifyMessageReceived(AutoAcknowledger.java:42) 
     at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.messageHandler(SQSMessageConsumerPrefetch.java:477) 
     at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.receive(SQSMessageConsumerPrefetch.java:410) 
     at com.amazon.sqs.javamessaging.SQSMessageConsumer.receive(SQSMessageConsumer.java:157) 
     at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:413) 
     at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:293) 
     at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:246) 
     at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1144) 
     at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1136) 
     at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1033) 
     at java.lang.Thread.run(Thread.java:745) 


ConsumerPrefetchThread olan iplik dökümü
DefaultMessageListenerContainer edilir
gibi:
sizi

Güncelleme :-) ederiz

gibi
"[email protected]" daemon prio=5 tid=0x1b nid=NA runnable 
    java.lang.Thread.State: RUNNABLE 
     at java.net.SocketInputStream.socketRead0(SocketInputStream.java:-1) 
     at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) 
     at java.net.SocketInputStream.read(SocketInputStream.java:170) 
     at java.net.SocketInputStream.read(SocketInputStream.java:141) 
     at sun.security.ssl.InputRecord.readFully(InputRecord.java:465) 
     at sun.security.ssl.InputRecord.read(InputRecord.java:503) 
     at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:961) 
     - locked <0x23a7> (a java.lang.Object) 
     at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:918) 
     at sun.security.ssl.AppInputStream.read(AppInputStream.java:105) 
     - locked <0x23a8> (a sun.security.ssl.AppInputStream) 
     at org.apache.http.impl.io.AbstractSessionInputBuffer.fillBuffer(AbstractSessionInputBuffer.java:160) 
     at org.apache.http.impl.io.SocketInputBuffer.fillBuffer(SocketInputBuffer.java:84) 
     at org.apache.http.impl.io.AbstractSessionInputBuffer.readLine(AbstractSessionInputBuffer.java:273) 
     at org.apache.http.impl.conn.LoggingSessionInputBuffer.readLine(LoggingSessionInputBuffer.java:116) 
     at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:140) 
     at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:57) 
     at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:260) 
     at org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:283) 
     at org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:251) 
     at org.apache.http.impl.conn.ManagedClientConnectionImpl.receiveResponseHeader(ManagedClientConnectionImpl.java:197) 
     at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:271) 
     at com.amazonaws.http.protocol.SdkHttpRequestExecutor.doReceiveResponse(SdkHttpRequestExecutor.java:66) 
     at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:123) 
     at org.apache.http.impl.client.DefaultRequestDirector.tryExecute(DefaultRequestDirector.java:682) 
     at org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:486) 
     at org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:863) 
     at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82) 
     at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:57) 
     at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:685) 
     at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:460) 
     at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:295) 
     at com.amazonaws.services.sqs.AmazonSQSClient.invoke(AmazonSQSClient.java:2291) 
     at com.amazonaws.services.sqs.AmazonSQSClient.receiveMessage(AmazonSQSClient.java:1021) 
     at com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper.receiveMessage(AmazonSQSMessagingClientWrapper.java:319) 
     at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.getMessages(SQSMessageConsumerPrefetch.java:216) 
     at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.run(SQSMessageConsumerPrefetch.java:180) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:745) 

Uygulama iplik çöplükte iletileri
ConsumerPrefetchThread tüketen durur iplik çöplükte

"[email protected]" prio=5 tid=0x18 nid=NA waiting 
    java.lang.Thread.State: WAITING 
     at java.lang.Object.wait(Object.java:-1) 
     at java.lang.Object.wait(Object.java:502) 
     at org.apache.commons.pool.impl.GenericKeyedObjectPool.borrowObject(GenericKeyedObjectPool.java:1151) 
     at org.apache.activemq.jms.pool.ConnectionPool.createSession(ConnectionPool.java:133) 
     at org.apache.activemq.jms.pool.PooledConnection.createSession(PooledConnection.java:167) 
     at com.ac.jms.senders.AbstractNoResponseSender.request(AbstractNoResponseSender.java:40) 
     at com.ac.mic.listener.AbstractMicQueueListener.onMessage(AbstractMicQueueListener.java:117) 
     at com.ac.mic.listener.MicQueueListener.onMessage(MicQueueListener.java:40) 
     at sun.reflect.GeneratedMethodAccessor240.invoke(Unknown Source:-1) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:497) 
     at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:185) 
     at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:104) 
     at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:90) 
     at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:66) 
     at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:674) 
     at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:634) 
     at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:605) 
     at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:308) 
     at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:246) 
     at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1144) 
     at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1136) 
     at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1033) 
     at java.lang.Thread.run(Thread.java:745) 


ConsumerPrefetchThread

"[email protected]" daemon prio=5 tid=0x1b nid=NA waiting 
    java.lang.Thread.State: WAITING 
     at java.lang.Object.wait(Object.java:-1) 
     at java.lang.Object.wait(Object.java:502) 
     at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.waitForPrefetch(SQSMessageConsumerPrefetch.java:273) 
     at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.run(SQSMessageConsumerPrefetch.java:174) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:745) 
+0

Tüketici durduğunda, konu iş parçacığının ne yaptığını görmek için bir iş parçacığı dökümü almayı deneyin. Belki kodunda sıkışmış olabilir? –

+0

@GaryRussell İplik dökümü ekledim. Bu uygulamada, ActiveMQ kullanıyorum. SQS'den ActiveMQ kuyruklarına alacağım mesajları zorluyorum. – Piyush

cevap

3

gibidir gibi havuz yorgunluktan çeşit benziyor aşağıdadır kod ...

at org.apache.commons.pool.impl.GenericKeyedObjectPool.borrowObject(GenericKeyedObjectPool.java:1151) 
    at org.apache.activemq.jms.pool.ConnectionPool.createSession(ConnectionPool.java:133) 
    at org.apache.activemq.jms.pool.PooledConnection.createSession(PooledConnection.java:167) 
    at com.ac.jms.senders.AbstractNoResponseSender.request(AbstractNoResponseSender.java:40) 
    at com.ac.mic.listener.AbstractMicQueueListener.onMessage(AbstractMicQueueListener.java:117) 
    at com.ac.mic.listener.MicQueueListener.onMessage(MicQueueListener.java:40) 

Kapsayıcı iş parçacığı, PooledConnection oturumunu almaya çalışırken sıkışmış.

Belki de oturumları havuza geri göndermiyorsunuz?

JMS ile konuşmak için kendi kodunuzun yerine bir JmsTemplate kullanmayı düşünün. Bu tür sorunları önler.

+1

Bu sorunu çözdüm. İletiyi gönderdikten sonra oturum ve bağlantıyı kapatmıyordum. İplik dökümü kullanmayı öğrendim. Teşekkürler @GaryRussell :) – Piyush