2016-04-08 12 views
1

Yayım kaynağım xd aws-s3 kaynağım var ve dosya boyutuna bağlı olarak poller gecikme olarak değiştirmeyi ayarlıyorum.Şimdi benim tetikleyici tavsiyemde afterReceive yöntemi Nasıl fileSize veya lineCount alabilirim Message<?> result benim dosya içindeki? İdeal çizgi yüzden hatları veya dosya boyutuna göre ayarlamak gecikmeDosya boyutunu veya satır sayısını bulmak için dinamik gecikme yapılandırması

stream create aws-s3|log 
<?xml version="1.0" encoding="UTF-8"?> 
    <beans xmlns="http://www.springframework.org/schema/beans" 
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
      xmlns:int="http://www.springframework.org/schema/integration" 
      xmlns:context="http://www.springframework.org/schema/context" 
      xmlns:int-aws="http://www.springframework.org/schema/integration/aws" 
      xmlns:int-file="http://www.springframework.org/schema/integration/file" 
      xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd 
      http://www.springframework.org/schema/integration/file 
      http://www.springframework.org/schema/integration/file/spring-integration-file.xsd 
       http://www.springframework.org/schema/context 
       http://www.springframework.org/schema/context/spring-context.xsd 
      http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd 
      http://www.springframework.org/schema/integration/aws http://www.springframework.org/schema/integration/aws/spring-integration-aws-1.0.xsd"> 


     <context:property-placeholder location="classpath*:dms-security-${region}.properties" /> 

     <int:poller fixed-delay="${fixedDelay}" default="true" trigger="dynamicTrigger" "> 
      <int:advice-chain> 
       <ref bean="pollAdvise" /> 
       <ref bean="smartPollAdvise" /> 
      </int:advice-chain> 
     </int:poller> 


    <bean id="dynamicTrigger" 
      class="org.springframework.integration.util.DynamicPeriodicTrigger"> 
      <constructor-arg name="period" value="5000" /> 
     </bean> 

     <bean id="pollAdvise" class="org.springframework.integration.scheduling.PollSkipAdvice"> 
      <constructor-arg ref="healthCheckStrategy"/> 

     </bean> 

    <bean id="smartPollAdvise" class="com.test.api.dms.main.TriggerAdvise"> 
      <property name="trigger" ref="dynamicTrigger"/> 

     </bean> 


     <bean id="healthCheckStrategy" class="com.test.api.dms.main.ServiceHealthCheckPollSkipStrategy"> 
      <property name="url" value="${url}"/> 
      <property name="doHealthCheck" value="${doHealthCheck}"/> 
      <property name="restTemplate" ref="restTemplate"/> 

     </bean> 

     <bean id="restTemplate" 
       class="org.springframework.web.client.RestTemplate"> 
      <constructor-arg ref="requestFactory"/> 

     </bean> 


     <bean id="requestFactory" 
       class="com.test.api.dms.main.BatchClientHttpRequestFactory"> 
      <constructor-arg ref="verifier"/> 

     </bean> 

     <bean id="verifier" 
       class="com.test.api.dms.main.NullHostnameVerifier"> 

     </bean> 


     <bean id="encryptedDatum" class="com.test.api.dms.core.security.EncryptedSecuredDatum"/> 




     <!-- aws-endpoint="https://s3.amazonaws.com" proxyHost="proxy.kdc.test.com" proxyPort="8099"--> 
     <bean id="clientConfiguration" class="com.amazonaws.ClientConfiguration"> 
      <property name="proxyHost" value="${proxyHost}"/> 
      <property name="proxyPort" value="${proxyPort}"/> 
      <property name="preemptiveBasicProxyAuth" value="false"/> 
     </bean> 

     <bean id="s3Operations" class="com.test.api.dms.main.CustomC1AmazonS3Operations"> 

      <constructor-arg index="0" ref="clientConfiguration"/> 
      <property name="awsEndpoint" value="s3.amazonaws.com"/> 
      <property name="temporaryDirectory" value="${temporaryDirectory}"/> 
      <property name="awsSecurityKey" value="#{encryptedDatum.decryptBase64Encoded('${awsSecurityKey}')}"/> 
     </bean> 


     <bean id="credentials" class="org.springframework.integration.aws.core.BasicAWSCredentials"> 

     </bean> 

     <!-- aws-endpoint="https://s3.amazonaws.com" --> 
     <int-aws:s3-inbound-channel-adapter aws-endpoint="s3.amazonaws.com" 
              bucket="${bucket}" 
              s3-operations="s3Operations" 
              credentials-ref="credentials" 
              file-name-wildcard="${fileNameWildcard}" 
              remote-directory="${prefix}" 
              channel="splitChannel" 
              local-directory="${localDirectory}" 
              accept-sub-folders="false" 
              delete-source-files="true" 
              archive-bucket="${archiveBucket}" 
              archive-directory="${archiveDirectory}"> 
     </int-aws:s3-inbound-channel-adapter> 

     <int-file:splitter input-channel="splitChannel" output-channel="output" markers="false" charset="UTF-8"> 

      <int-file:request-handler-advice-chain> 
       <bean class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice"> 
        <property name="onSuccessExpression" value="payload.delete()"/> 
       </bean> 
      </int-file:request-handler-advice-chain> 

     </int-file:splitter> 

     <int:channel-interceptor pattern="*" order="3"> 
      <bean class="org.springframework.integration.channel.interceptor.WireTap"> 
       <constructor-arg ref="loggingChannel" /> 
      </bean> 
     </int:channel-interceptor> 
     <int:logging-channel-adapter id="loggingChannel" log-full-message="true" level="INFO"/> 

     <int:channel id="output"/> 

    </beans> 



public class TriggerAdvice extends AbstractMessageSourceAdvice { 

    private final DynamicPeriodicTrigger trigger; 


    private volatile long nextPollPeriod; 


    public TriggerAdvice(DynamicPeriodicTrigger trigger) { 
     this.trigger = trigger; 
     this.nextPollPeriod = trigger.getPeriod(); 
    } 


    public long getNextPollPeriod() { 
     return nextPollPeriod; 
    } 


    public void setNextPollPeriod(long nextPollPeriod) { 
     this.nextPollPeriod = nextPollPeriod; 
    } 


    @Override 
    public boolean beforeReceive(MessageSource<?> source) { 
     return true; 
    } 

    @Override 
    public Message<?> afterReceive(Message<?> result, MessageSource<?> source) { 
     if (result == null) { 
      this.trigger.setPeriod(this.nextPollPeriod); 
     } 
     return null; 

    } 

} 

cevap

1

aws-s3 source döner Message<File>, böylece afterReceive sadece döküm can iyi olabilir saymak getPayload() ve standart Java mekanizmasını kullanın dosyadan satırları say:

LineNumberReader lnr = new LineNumberReader(new FileReader((File) result.getPayload()); 
lnr.skip(Long.MAX_VALUE); 
System.out.println(lnr.getLineNumber() + 1); //Add 1 because line index starts at 0 
// Finally, the LineNumberReader object should be closed to prevent resource leak 
lnr.close(); 
İlgili konular