2016-04-13 13 views
1

Yay entegrasyonu ile küçük bir yaylı önyükleme uygulaması kurmaya çalışıyorum. Tek yapması gereken, bir jms kuyruğundan bir mesaj alıp, bir nesneyi tekrar geri istemez ve ısrar etmek için belirli bir fasülyeye yönlendirmektir. Yönlendirme bölümünü test ettim ve işe yaradığını doğrulayabilirim.Yay entegrasyonu ile mücadele dsl

Testimde, JmsTemplate yayı üzerinden bir ileti gönderebildiğim gömülü bir activemq aracısı var, ancak xml yükünü küçümsemek ve iletiyi yönlendirmek görünmüyor. Ben günlüğüne görebilirsiniz:

16:42:09.285 [main] INFO c.m.z.v.o.VitelAsyncPersisterApplicationTests - Sending message 
16:42:09.289 [main] DEBUG o.s.j.c.CachingConnectionFactory - Registering cached JMS Session for mode 1: ActiveMQSession {id=ID:theblacklodge-59640-1460558526948-4:1:2,started=false} [email protected] 
16:42:09.289 [main] DEBUG o.s.j.c.JmsTemplate - Executing callback on JMS Session: Cached JMS Session: ActiveMQSession {id=ID:theblacklodge-59640-1460558526948-4:1:2,started=false} [email protected] 
16:42:09.295 [ActiveMQ Transport: tcp:///127.0.0.1:[email protected]] DEBUG o.a.a.b.TransportConnector - Publishing: tcp://localhost:61616 for broker transport URI: tcp://localhost:61616 
16:42:09.295 [ActiveMQ Transport: tcp:///127.0.0.1:[email protected]] DEBUG o.a.a.b.TransportConnector - Publishing: tcp://localhost:61616 for broker transport URI: tcp://localhost:61616 
16:42:09.295 [ActiveMQ Transport: tcp:///127.0.0.1:[email protected]] DEBUG o.a.a.b.r.AbstractRegion - test_broker adding destination: topic://ActiveMQ.Advisory.Producer.Queue.jms/test 
16:42:09.298 [main] DEBUG o.s.j.c.CachingConnectionFactory - Registering cached JMS MessageProducer for destination [queue://jms/test]: ActiveMQMessageProducer { value=ID:theblacklodge-59640-1460558526948-4:1:2:1 } 
16:42:09.301 [main] DEBUG o.s.j.c.JmsTemplate - Sending created message: ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = null, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = <?xml version="1.0" encoding="UTF-8" standalo...uteLogEvent>} 
16:42:09.305 [ActiveMQ Transport: tcp:///127.0.0.1:[email protected]] DEBUG o.a.a.b.r.Queue - test_broker Message ID:theblacklodge-59640-1460558526948-4:1:2:1:1 sent to queue://jms/test 
16:42:09.306 [ActiveMQ BrokerService[test_broker] Task-2] DEBUG o.a.a.b.r.Queue - queue://jms/test, subscriptions=1, memory=0%, size=1, pending=0 toPageIn: 1, Inflight: 0, pagedInMessages.size 0, pagedInPendingDispatch.size 0, enqueueCount: 1, dequeueCount: 0, memUsage:1332 
16:42:09.314 [main] INFO c.m.z.v.o.VitelAsyncPersisterApplicationTests - Message sent 

Bahar entegrasyon günlüğü:

Testi durumda:

@Test 
public void jmsIntegrationTest() { 
    RouteLogEvent log = new RouteLogEvent(); 
    log.setAgentId(8888); 
    log.setInteracitonId(95634); 
    log.setMax(5); 
    log.setQueueTime(1256L); 
    log.setRouteTime(96541L); 
    log.setScore(8); 

    ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(500); 

    marshaller.marshal(log, new StreamResult(bytesOut)); 

    final String xmlPayload = new String(bytesOut.toByteArray()); 

    LOG.info("Sending message"); 

    jmsTemplate.send(jmsQueue, (s) -> { 
     return s.createTextMessage(xmlPayload); 
    }); 

    LOG.info("Message sent"); 

    List<RouteLogEvent> events = testDao.findAllRouteLogs(); 
    assertNotNull(events); 
    assertFalse(events.isEmpty()); 

    List<RouteLogEvent> filtered = events.stream().filter(val -> val.getAgentId() == 8888).collect(Collectors.toList()); 
    assertNotNull(filtered); 
    assertFalse(filtered.isEmpty()); 
} 
ben dışında kalan veya yanlış ne emin değilim

13:43:48.996 [main] INFO o.s.i.j.JmsMessageDrivenEndpoint - started [email protected]04469 
13:43:48.996 [main] INFO o.s.i.d.j.JmsInboundGateway - started org.springframework.integration.dsl.jms.JmsInboundGateway#0 
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.integration.dsl.jms.JmsInboundGateway#0' 
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Starting bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0' of type [class org.springframework.integration.config.ConsumerEndpointFactoryBean] 
13:43:48.996 [main] INFO o.s.i.e.EventDrivenConsumer - Adding {xml:unmarshalling-transformer} as a subscriber to the 'buildReceiverFlow.channel#0' channel 
13:43:48.996 [main] INFO o.s.i.c.DirectChannel - Channel 'application:test:-1.buildReceiverFlow.channel#0' has 1 subscriber(s). 
13:43:48.996 [main] INFO o.s.i.e.EventDrivenConsumer - started org.springframework.integration.config.ConsumerEndpointFactoryBean#0 
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0' 
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Starting bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#1' of type [class org.springframework.integration.config.ConsumerEndpointFactoryBean] 
13:43:48.996 [main] INFO o.s.i.e.EventDrivenConsumer - Adding {router} as a subscriber to the 'msg.router' channel 
13:43:48.996 [main] INFO o.s.i.c.DirectChannel - Channel 'application:test:-1.msg.router' has 1 subscriber(s). 
13:43:48.996 [main] INFO o.s.i.e.EventDrivenConsumer - started org.springframework.integration.config.ConsumerEndpointFactoryBean#1 
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#1' 
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Starting bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#2' of type [class org.springframework.integration.config.ConsumerEndpointFactoryBean] 
13:43:48.996 [main] INFO o.s.i.c.DirectChannel - Channel 'application:test:-1.buildRouterFlow.subFlow#0.channel#0' has 1 subscriber(s). 
13:43:48.996 [main] INFO o.s.i.e.EventDrivenConsumer - started org.springframework.integration.config.ConsumerEndpointFactoryBean#2 
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#2' 
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Starting bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#3' of type [class org.springframework.integration.config.ConsumerEndpointFactoryBean] 
13:43:48.996 [main] INFO o.s.i.c.DirectChannel - Channel 'application:test:-1.buildRouterFlow.subFlow#1.channel#0' has 1 subscriber(s). 
13:43:48.996 [main] INFO o.s.i.e.EventDrivenConsumer - started org.springframework.integration.config.ConsumerEndpointFactoryBean#3 
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#3' 
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Starting bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#4' of type [class org.springframework.integration.config.ConsumerEndpointFactoryBean] 
13:43:48.996 [main] INFO o.s.i.c.DirectChannel - Channel 'application:test:-1.buildRouterFlow.subFlow#2.channel#0' has 1 subscriber(s). 
13:43:48.996 [main] INFO o.s.i.e.EventDrivenConsumer - started org.springframework.integration.config.ConsumerEndpointFactoryBean#4 
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#4' 
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Starting bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#5' of type [class org.springframework.integration.config.ConsumerEndpointFactoryBean] 
13:43:48.996 [main] INFO o.s.i.c.DirectChannel - Channel 'application:test:-1.buildRouterFlow.subFlow#3.channel#0' has 1 subscriber(s). 
13:43:48.996 [main] INFO o.s.i.e.EventDrivenConsumer - started org.springframework.integration.config.ConsumerEndpointFactoryBean#5 
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#5' 
13:43:48.996 [main] INFO o.s.c.s.DefaultLifecycleProcessor - Starting beans in phase 2147483647 
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Starting bean 'org.springframework.jms.config.internalJmsListenerEndpointRegistry' of type [class org.springframework.jms.config.JmsListenerEndpointRegistry] 
13:43:48.997 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.jms.config.internalJmsListenerEndpointRegistry' 
13:43:49.002 [main] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Returning cached instance of singleton bean 'org.springframework.boot.context.properties.ConfigurationPropertiesBindingPostProcessor' 
13:43:49.002 [main] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Returning cached instance of singleton bean 'org.springframework.context.annotation.internalScheduledAnnotationProcessor' 
13:43:49.002 [main] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Returning cached instance of singleton bean 'org.springframework.integration.config.IdGeneratorConfigurer#0' 
13:43:49.017 [main] DEBUG o.s.b.a.l.AutoConfigurationReportLoggingInitializer - 

Yay tümleştirme yapılandırması:

@SpringBootApplication 
@EnableIntegration 
public class VitelAsyncPersisterApplication { 

    private static final Map<Class, String> ROUTING_EVENTS = new HashMap<>(); 

    private static final String CHANNEL_RECORDING = "channel-recording"; 
    private static final String CHANNEL_INTERACTION_STATE = "channel-interaction-state"; 
    private static final String CHANNEL_AGENT_STATE = "channel-agent-state"; 
    private static final String CHANNEL_ROUTE_LOG = "channel-route"; 

    static { 
     ROUTING_EVENTS.put(AgentStateChangeEvent.class, CHANNEL_AGENT_STATE); 
     ROUTING_EVENTS.put(InteractionStateChangeEvent.class, CHANNEL_INTERACTION_STATE); 
     ROUTING_EVENTS.put(Recording.class, CHANNEL_RECORDING); 
     ROUTING_EVENTS.put(RouteLogEvent.class, CHANNEL_ROUTE_LOG); 
    } 

    @Value("${jms.queue.entity.persist}") 
    private String jmsQueueName; 

    @Value("${jms.broker.url}") 
    private String jmsBrokerUrl; 

    @Autowired 
    private EventDao eventDao; 

    @Bean 
    public Jaxb2Marshaller xmlMarshaller() { 
     Jaxb2Marshaller marshaller = new Jaxb2Marshaller(); 
     marshaller.setSchema(new ClassPathResource("entities.xsd")); 
     marshaller.setPackagesToScan("com.mhgad.za.vitel.persister.entities"); 

     return marshaller; 
    } 

    @Bean 
    public ConnectionFactory jmsConnFactory() { 
     ActiveMQConnectionFactory activeMq = new ActiveMQConnectionFactory(jmsBrokerUrl); 

     CachingConnectionFactory cachingConnFactory = new CachingConnectionFactory(); 
     cachingConnFactory.setTargetConnectionFactory(activeMq); 

     return cachingConnFactory; 
    } 

    @Bean 
    public IntegrationFlow buildReceiverFlow(ConnectionFactory jmsConnectionFactory, Jaxb2Marshaller marshaller) { 
     UnmarshallingTransformer xmlToObjTransformer = Transformers.unmarshaller(marshaller); 

     JmsInboundGatewaySpec jmsSpec = Jms.inboundGateway(jmsConnectionFactory).destination(jmsQueueName); 

     return IntegrationFlows.from(jmsSpec).transform(xmlToObjTransformer).channel("msg.router").get(); 
    } 

    @Bean 
    public IntegrationFlow buildRouterFlow() { 

     Function router = (p) -> { 
      if (ROUTING_EVENTS.containsKey(p.getClass())) { 
       return ROUTING_EVENTS.get(p.getClass()); 
      } else { 
       return null; 
      } 
     }; 

     return IntegrationFlows.from("msg.router").route(router, m -> m 
       .subFlowMapping(CHANNEL_AGENT_STATE, sf -> sf.handle((p) -> eventDao.save((AgentStateChangeEvent) p.getPayload()))) 
       .subFlowMapping(CHANNEL_INTERACTION_STATE, sf -> sf.handle((p) -> eventDao.save((InteractionStateChangeEvent) p.getPayload()))) 
       .subFlowMapping(CHANNEL_RECORDING, sf -> sf.handle((p) -> eventDao.save((Recording) p.getPayload()))) 
       .subFlowMapping(CHANNEL_ROUTE_LOG, sf -> sf.handle((p) -> eventDao.save((RouteLogEvent) p.getPayload())))).get(); 
    } 

    public static void main(String[] args) { 
     SpringApplication.run(VitelAsyncPersisterApplication.class, args); 
    } 
} 

cevap

1

Günlüklerinize göre hiçbir Spring Entegrasyon altyapısı göremiyorum.

Yani, belki de @EnableIntegration'u özlediniz mi?

Ayrıca testiniz biraz garip. JMS'ye mesaj gönderir ve sonucu DB'den kontrol eder. Ancak, Entegrasyon için bu yapılandırmayı nasıl başlattığınızı görmeyiz.

Yalnızca iletileri dinleyip DB'de sakladığınızdan, tek yönlü JMS bileşenini - Jms.messageDriverChannelAdapter() kullanmayı düşünün. isteği/yanıtı ağ geçidi yerine.

+0

Merhaba Artem, yapılandırma snippet'ini değiştirdim. @EnableIntegration var. Evet, istek/cevap yanlış olduğunu düşündüm ama bir Poller'ın yerleştirilmesi gerektiğinde mücadele ediyordum. WRT günlükleri bazı ek çıktılar ekledim – user3465651

+0

Jms.messageDriverChannelAdapter() olarak değiştirildi, ve bu mesajda benzer bir sorun görüldüğünden, 5.13.2'den 5.12.3'e kadar activemq sürümü düzeltildi -> http: // stackoverflow .com/sorular/36007782/yay yapılandırarak-embedded-brokerservice – user3465651