2015-05-19 12 views
5

XML Spring amqp yapılandırmasından "basit" olduğu için bir java ek açıklamasına göre geçiş yapmaya çalışıyorum. Yanlış yaptığımdan emin değilim, XML yapılandırması iyi çalışıyor ancak java @Configurable bir "Cause by: java.net.SocketException: Connection reset" istisnası atar.<rabbit:> üzerinden bir taşınmaya çalışırken, bir java @Configurable ad alanı yapılandırması yinelenemiyor

XML yapılandırma (mükemmel çalışıyor):

<?xml version="1.0" encoding="UTF-8"?> 
<beans xmlns="http://www.springframework.org/schema/beans" 
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
     xmlns:rabbit="http://www.springframework.org/schema/rabbit" 
     xmlns:context="http://www.springframework.org/schema/context" 
     xmlns:util="http://www.springframework.org/schema/util" 
     xsi:schemaLocation="http://www.springframework.org/schema/rabbit 
      http://www.springframework.org/schema/rabbit/spring-rabbit.xsd 
      http://www.springframework.org/schema/context 
      http://www.springframework.org/schema/context/spring-context.xsd 
      http://www.springframework.org/schema/util 
      http://www.springframework.org/schema/util/spring-util.xsd 
      http://www.springframework.org/schema/beans 
      http://www.springframework.org/schema/beans/spring-beans.xsd"> 

    <!-- define which properties files will be used --> 
    <context:property-placeholder location="classpath:*.properties" /> 

    <rabbit:connection-factory id="connectionFactory" 
           addresses='${rabbitmq.hostname}' 
           username='${rabbitmq.username}' 
           password='${rabbitmq.password}' 
           virtual-host='${rabbitmq.virtual_host}' 
           cache-mode='${rabbitmq.cache_mode}'        
           channel-cache-size='${rabbitmq.channel_cache_size}'/> 

    <bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> 
     <property name="corePoolSize" value="3"/> 
     <property name="maxPoolSize" value="5"/> 
     <property name="queueCapacity" value="15"/>       
    </bean>        


    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" /> 
    <rabbit:admin connection-factory="connectionFactory"/> 
     <rabbit:queue name="${rabbitmq.queue_name}" /> 
<rabbit:topic-exchange name="${rabbitmq.topic_exchange_name}"> 
    <rabbit:bindings> 
     <rabbit:binding queue="${rabbitmq.queue_name}" pattern="${rabbitmq.topic_exchange_pattern}"/> 
    </rabbit:bindings> 
</rabbit:topic-exchange> 

    <bean id="listener" class="com.my.package.path.worker.DefaultMessageListener"/> 


    <rabbit:listener-container id="listenerContainer" connection-factory="connectionFactory" task-executor="taskExecutor"> 
      <rabbit:listener ref="listener" queues="notification.main" /> 

    </rabbit:listener-container> 
</beans> 

Java yapılandırma:

@Configurable 
@PropertySource("classpath:rabbitmq.properties") 
public class RabbitMQConfig { 

@Value("${rabbitmq.hostname}") 
private String hostname; 

@Value("${rabbitmq.port}") 
private String port; 

@Value("${rabbitmq.username}") 
private String username; 

@Value("${rabbitmq.password}") 
private String password; 

@Value("${rabbitmq.virtual_host}") 
private String virtualHost; 

//@Value("${rabbitmq.cache_mode}") 
//private String cacheMode; 

@Value("${rabbitmq.channel_cache_size}") 
private String channelCacheSize; 

@Value("${rabbitmq.topic_exchange_name}") 
private String topicExchangeName; 

@Value("${rabbitmq.topic_exchange_pattern}") 
private String topicExchangePattern; 

@Value("${rabbitmq.queue_name}") 
private String queueName; 

@Autowired 
private ConnectionFactory cachingConnectionFactory; 

@Bean(name="cachingConnectionFactory") 
public ConnectionFactory connectionFactory() { 
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(hostname,Integer.valueOf(port)); 

    connectionFactory.setUsername(username); 
    connectionFactory.setPassword(password); 

    //connectionFactory.setCacheMode(CacheMode.valueOf(cacheMode)); 
    connectionFactory.setChannelCacheSize(Integer.valueOf(channelCacheSize)); 

    return connectionFactory; 
} 

@Bean(name="taskExecutor") 
public ThreadPoolTaskExecutor threadPoolTaskExecutor() { 
    ThreadPoolTaskExecutor tpte = new ThreadPoolTaskExecutor(); 
    tpte.setCorePoolSize(3); 
    tpte.setMaxPoolSize(5); 
    tpte.setQueueCapacity(15); 
    return tpte; 
} 

@Bean 
public AmqpTemplate AmqpTemplate() { 
    RabbitTemplate template = new RabbitTemplate(cachingConnectionFactory); 

    return template; 
} 


@Bean 
public AmqpAdmin amqpAdmin() { 
    RabbitAdmin amqpAdmin = new RabbitAdmin(cachingConnectionFactory); 

    return amqpAdmin; 
} 

@Bean 
public Queue queue() { 
    return new Queue(queueName); 
} 

@Bean 
public TopicExchange topicExchange() { 
    TopicExchange topicExchange = new TopicExchange(topicExchangeName); 
    return topicExchange; 
} 

@Bean 
public Binding dataBinding(TopicExchange topicExchange, Queue queue) { 
    return BindingBuilder.bind(queue).to(topicExchange).with(topicExchangePattern); 
} 

@Bean 
public DefaultMessageListener defaultMessageListener() { 
    return new DefaultMessageListener(); 
} 

@Bean 
public SimpleMessageListenerContainer container(DefaultMessageListener defaultMessageListener) { 
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); 
    container.setConnectionFactory(cachingConnectionFactory); 
    container.setQueueNames(queueName); 
    container.setAutoStartup(true); 
    container.setMessageListener(defaultMessageListener); 
    //container.setTaskExecutor(taskExecutor); 
    return container; 
} 

@Bean 
public static PropertySourcesPlaceholderConfigurer propertySourcesPlaceholderConfigurer() { 
    return new PropertySourcesPlaceholderConfigurer(); 
} 

java yapılandırma hatası:

INFO : org.springframework.context.support.DefaultLifecycleProcessor - Starting beans in phase 2147483647 
DEBUG: org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer - No global properties bean 
DEBUG: org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer - Starting Rabbit listener container. 
ERROR: org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer - Failed to check/redeclare auto-delete queue(s). 
org.springframework.amqp.AmqpIOException: java.io.IOException 
    at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:63) 
    at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:217) 
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:444) 
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils$1.createConnection(ConnectionFactoryUtils.java:80) 
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.doGetTransactionalResourceHolder(ConnectionFactoryUtils.java:130) 
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactoryUtils.java:67) 
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1035) 
    at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1028) 
    at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1004) 
    at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueProperties(RabbitAdmin.java:254) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.redeclareElementsIfNecessary(SimpleMessageListenerContainer.java:963) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$300(SimpleMessageListenerContainer.java:83) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1081) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.io.IOException 
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106) 
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102) 
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124) 
    at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:376) 
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:603) 
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:637) 
    at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:208) 
    ... 12 more 
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error 
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) 
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33) 
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:348) 
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:221) 
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118) 
    ... 16 more 
Caused by: java.net.SocketException: Connection reset 
    at java.net.SocketInputStream.read(SocketInputStream.java:209) 
    at java.net.SocketInputStream.read(SocketInputStream.java:141) 
    at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) 
    at java.io.BufferedInputStream.read(BufferedInputStream.java:265) 
    at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:288) 
    at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95) 
    at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:139) 
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:534) 
    ... 1 more 

Spring amqp koduna adım attım ve suçlu RabbitAdmin # getQueueProperties yöntemidir. XML yapılandırmasında iyi çalışır ... ancak java yapılandırması ile çalışır yürütmez, yukarıdaki istisnayı atar? Yaptığım şey farklı mı? Her iki yapılandırma da bana aynı görünüyor.

package org.springframework.amqp.rabbit.core; 

public class RabbitAdmin implements AmqpAdmin, ApplicationContextAware, InitializingBean { 
//... 
    @Override 
    public Properties getQueueProperties(final String queueName) { 
     Assert.hasText(queueName, "'queueName' cannot be null or empty"); 
     return this.rabbitTemplate.execute(new ChannelCallback<Properties>() { 
      @Override 
      public Properties doInRabbit(Channel channel) throws Exception { 
       try { 
        DeclareOk declareOk = channel.queueDeclarePassive(queueName); 
        Properties props = new Properties(); 
        props.put(QUEUE_NAME, declareOk.getQueue()); 
        props.put(QUEUE_MESSAGE_COUNT, declareOk.getMessageCount()); 
        props.put(QUEUE_CONSUMER_COUNT, declareOk.getConsumerCount()); 
        return props; 
       } 
       catch (Exception e) { 
        if (logger.isDebugEnabled()) { 
         logger.debug("Queue '" + queueName + "' does not exist"); 
        } 
        return null; 
       } 
      } 
     }); 
    } 
} 

Her iki yapılandırma da sınıf yolundaki aynı rabbitmq.properties dosyasını kullanır. Hatta her iki yapılandırmasında zamanında RabbitAdmin ve RabbitTemplate sınıflarının özelliklerini kontrol ediyorum ve Sen @Configurable@Configuration değil kullanarak olmalıdır

cevap

1

'/' Kök sanal ana bilgisayarını kullanmıyorum. Virtual_host için kendi özel değerinim vardı. Her ne kadar ben bu özelliği spel ile java konfigürasyonuma enjekte ettim, açıkça connectFactory üzerinde ayarlamamıştım.

connectionFactory.setVirtualHost(virtualHost); 

Sorunumda bana yardımcı olduğu için, Gary Russell'a teşekkürler.

@Bean(name="cachingConnectionFactory") 
public ConnectionFactory connectionFactory() { 

     CachingConnectionFactory connectionFactory = new CachingConnectionFactory(hostname,Integer.valueOf(port)); 

     connectionFactory.setUsername(username); 
     connectionFactory.setPassword(password); 

     connectionFactory.setVirtualHost(virtualHost); 
     connectionFactory.setChannelCacheSize(Integer.valueOf(channelCacheSize)); 

     return connectionFactory; 
    } 
4

onlar ... tam olarak aynı görünüyor.

DÜZENLEME: RabbitMQ-sunucu bağlantısı kapatıyor gibi

görünüyor:

Caused by: java.net.SocketException: Connection reset 

sunucu günlüğüne bak; eğer bu yardımcı olmazsa; org.springframework için bir DEBUG kütüğünün tamamını bir yere gönderin (muhtemelen burası için çok büyük).

EDIT2:

Sen bir kimlik doğrulama sorun var ...

{handshake_error,opening,0, 
      {amqp_error,access_refused, 
         "access to vhost '/' refused for user 'gggdw'", 
         'connection.open'}} 

... kullanıcı adı ve parolanın (ve sankonu) kontrol ediniz.

+0

adil nokta ama bahar AMQP koduna adım attı ve o RabbitAdmin # getQueueProperties yöntemdir hala (bana) – Selwyn

+0

Russel http://pastebin.com/p0KzJv3g Tam yığın çok açıklayıcı değil aynı hatayı alıyorum Bu java config bir istisna atar ama XML yapılandırmasında – Selwyn

+1

ince çalıştırır Kimlik ile bazı sorun. –