2016-04-05 14 views
0

Spring Batch Partitioning example geliştiriyorum, bu örnekte bir “Partitioner” oluşturmaya çalıştım 10 threads olan iş, her bir iş parçacığı ‘id’ verilen aralıkta, veritabanından kayıtları okuyacaktır. Her çalışma dosyaları yeni olmalıdır için ben http://www.mkyong.com/spring-batch/spring-batch-partitioning-example/org.springframework.batch.item.ItemStreamException: Dosya zaten var: [SpringBatch-Partitioner-Example csv outputs users.processed21-30.csv]

İlk kez iyi çalışır kod çalıştırmasına gelen bir başvuru gerçekleştirmiş ancak bir dahaki sefere tekrar kod çalıştırmasına, belirli bir hata aşağıdaki beni gösterir, istediğim yaratıldı, bunun için ne yapmam gerekiyor? Hangi ekstra yapılandırmayı yapmam gerekiyor?

org.springframework.batch.item.ItemStreamException: File already exists: [C:\Users\userpc\Documents\workspace-sts-3.6.4.RELEASE\SpringBatch-Partitioner-Example\csv\outputs\users.processed21-30.csv] 
    at org.springframework.batch.item.util.FileUtils.setUpOutputFile(FileUtils.java:61) 
    at org.springframework.batch.item.file.FlatFileItemWriter$OutputState.initializeBufferedWriter(FlatFileItemWriter.java:559) 
    at org.springframework.batch.item.file.FlatFileItemWriter$OutputState.access$000(FlatFileItemWriter.java:399) 
    at org.springframework.batch.item.file.FlatFileItemWriter.doOpen(FlatFileItemWriter.java:333) 
    at org.springframework.batch.item.file.FlatFileItemWriter.open(FlatFileItemWriter.java:323) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:497) 
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317) 
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190) 
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) 
    at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:133) 
    at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:121) 
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) 
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207) 
    at com.sun.proxy.$Proxy5.open(Unknown Source) 
    at org.springframework.batch.item.support.CompositeItemStream.open(CompositeItemStream.java:96) 
    at org.springframework.batch.core.step.tasklet.TaskletStep.open(TaskletStep.java:310) 
    at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:197) 
    at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler$1.call(TaskExecutorPartitionHandler.java:139) 
    at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler$1.call(TaskExecutorPartitionHandler.java:136) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
    at java.lang.Thread.run(Thread.java:745) 

Apr 06, 2016 12:07:29 AM org.springframework.batch.core.step.AbstractStep execute 
SEVERE: Encountered an error executing step slave in job partitionJob 

User.java

public class User implements Serializable{ 
    private static final long serialVersionUID = 1L; 

    private int id; 
    private String username; 
    private String password; 
    private int age; 
    // setters and getters 
} 

UserRowMapper.java

public class UserRowMapper implements RowMapper<User> { 

    @Override 
    public User mapRow(ResultSet rs, int rowNum) throws SQLException { 
     User user = new User(); 
     user.setId(rs.getInt("id")); 
     user.setUsername(rs.getString("username")); 
     user.setPassword(rs.getString("password")); 
     user.setAge(rs.getInt("age")); 

     return user; 
    } 
} 

RangePartitioner.java

public class RangePartitioner implements Partitioner { 

    @Override 
    public Map<String, ExecutionContext> partition(int gridSize) { 

     Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>(); 

     int range = 10; 
     int fromId = 1; 
     int toId = range; 

     for (int i = 1; i <= gridSize; i++) { 
      ExecutionContext value = new ExecutionContext(); 

      System.out.println("\nStarting : Thread" + i); 
      System.out.println("fromId : " + fromId); 
      System.out.println("toId : " + toId); 

      value.putInt("fromId", fromId); 
      value.putInt("toId", toId); 

      // give each thread a name 
      value.putString("name", "Thread" + i); 

      result.put("partition" + i, value); 

      fromId = toId + 1; 
      toId += range; 
     } 
     return result; 
    } 
} 

UserProcessor.java

@Component("itemProcessor") 
@Scope(value = "step") 
public class UserProcessor implements ItemProcessor<User, User> { 

    @Value("#{stepExecutionContext[name]}") 
    private String threadName; 

    @Override 
    public User process(User item) throws Exception { 
     System.out.println(threadName + " processing : " + item.getId() + " : " + item.getUsername()); 
     return item; 
    } 

    public String getThreadName() { 
     return threadName; 
    } 

    public void setThreadName(String threadName) { 
     this.threadName = threadName; 
    } 
} 

database.xml

<beans xmlns="http://www.springframework.org/schema/beans" 
    xmlns:jdbc="http://www.springframework.org/schema/jdbc" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xsi:schemaLocation="http://www.springframework.org/schema/beans 
     http://www.springframework.org/schema/beans/spring-beans-3.2.xsd 
     http://www.springframework.org/schema/jdbc 
     http://www.springframework.org/schema/jdbc/spring-jdbc-3.2.xsd"> 

    <!-- connect to database --> 
    <bean id="dataSource" class="org.springframework.jdbc.datasource.DriverManagerDataSource"> 
     <property name="driverClassName" value="com.mysql.jdbc.Driver" /> 
     <property name="url" value="jdbc:mysql://localhost:3306/toga" /> 
     <property name="username" value="root" /> 
     <property name="password" value="root" /> 
    </bean> 

    <bean id="transactionManager" 
     class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" /> 

    <!-- create job-meta tables automatically 
    <jdbc:initialize-database data-source="dataSource"> 
     <jdbc:script location="org/springframework/batch/core/schema-drop-mysql.sql" /> 
     <jdbc:script location="org/springframework/batch/core/schema-mysql.sql" /> 
    </jdbc:initialize-database> 
    --> 
</beans> 

context.xml

<beans xmlns="http://www.springframework.org/schema/beans" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xsi:schemaLocation=" 
     http://www.springframework.org/schema/beans 
     http://www.springframework.org/schema/beans/spring-beans-3.2.xsd"> 

    <!-- stored job-meta in memory --> 
    <bean id="jobRepository" 
     class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean"> 
     <property name="transactionManager" ref="transactionManager" /> 
    </bean> 

    <bean id="transactionManager" 
     class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" /> 

    <bean id="jobLauncher" 
     class="org.springframework.batch.core.launch.support.SimpleJobLauncher"> 
     <property name="jobRepository" ref="jobRepository" /> 
    </bean> 
</beans> 

iş partitioner.xml

<?xml version="1.0" encoding="UTF-8"?> 
<beans xmlns="http://www.springframework.org/schema/beans" 
    xmlns:batch="http://www.springframework.org/schema/batch" xmlns:util="http://www.springframework.org/schema/util" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xsi:schemaLocation=" 
    http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd 
    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd 
    http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd"> 

    <!-- spring batch core settings --> 
    <import resource="../config/context.xml" /> 

    <!-- database settings --> 
    <import resource="../config/database.xml" /> 

    <!-- ============= partitioner job ========== --> 
    <job id="partitionJob" xmlns="http://www.springframework.org/schema/batch"> 

     <!-- master step, 10 threads (grid-size) --> 
     <step id="masterStep"> 
      <partition step="slave" partitioner="rangePartitioner"> 
       <handler grid-size="10" task-executor="taskExecutor" /> 
      </partition> 
     </step> 

    </job> 

    <!-- ======= Jobs to run ===== --> 
    <step id="slave" xmlns="http://www.springframework.org/schema/batch"> 
     <tasklet> 
      <chunk reader="pagingItemReader" writer="flatFileItemWriter" 
        processor="itemProcessor" commit-interval="1" /> 
     </tasklet> 
    </step> 


    <bean id="rangePartitioner" class="com.mkyong.partition.RangePartitioner" /> 

    <bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor" /> 

    <bean id="itemProcessor" class="com.mkyong.processor.UserProcessor" scope="step"> 
     <property name="threadName" value="#{stepExecutionContext[name]}" /> 
    </bean> 


    <!-- ========== Paging Item Reader --> 
    <bean id="pagingItemReader" class="org.springframework.batch.item.database.JdbcPagingItemReader" scope="step"> 
     <property name="dataSource" ref="dataSource" /> 

     <property name="queryProvider"> 
      <bean class="org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean"> 
       <property name="dataSource" ref="dataSource" /> 
       <property name="selectClause" value="select id, username,password, age" /> 
       <property name="fromClause" value="from user" /> 
       <property name="whereClause" value="where id &gt;= :fromId and id &lt;= :toId" /> 
       <property name="sortKey" value="id" /> 
      </bean> 
     </property> 

     <!-- Inject via the ExecutionContext in rangePartitioner --> 
     <property name="parameterValues"> 
      <map> 
       <entry key="fromId" value="#{stepExecutionContext[fromId]}" /> 
       <entry key="toId" value="#{stepExecutionContext[toId]}" /> 
      </map> 
     </property> 

     <property name="pageSize" value="10" /> 
     <property name="rowMapper"> 
      <bean class="com.mkyong.UserRowMapper" /> 
     </property> 
    </bean> 


    <!-- ================= csv file writer ============== --> 
    <bean id="flatFileItemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter" scope="step" > 
     <property name="resource" 
      value="file:csv/outputs/users.processed#{stepExecutionContext[fromId]}-#{stepExecutionContext[toId]}.csv" /> 

     <property name="appendAllowed" value="false" /> 

     <property name="lineAggregator"> 
      <bean class="org.springframework.batch.item.file.transform.DelimitedLineAggregator"> 
       <property name="delimiter" value="," /> 
       <property name="fieldExtractor"> 
        <bean class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor"> 
         <property name="names" value="id, username, password, age" /> 
        </bean> 
       </property> 
      </bean> 
     </property> 
    </bean> 



    <!-- ========= Mongo Item Reader ========--> 
    <bean id="mongoItemReader" class="org.springframework.batch.item.data.MongoItemReader" scope="step"> 
     <property name="template" ref="mongoTemplate" /> 
     <property name="targetType" value="com.mkyong.User" /> 
     <property name="query" 
      value="{ 
        'id':{$gt:#{stepExecutionContext[fromId]}, $lte:#{stepExecutionContext[toId]} 
      } }" /> 
     <property name="sort"> 
      <util:map id="sort"> 
       <entry key="id" value="" /> 
      </util:map> 
     </property> 
    </bean> 
</beans> 

bana bu sorunu çözmek için yardımcı olun.

App.java

public class App { 

    public static void main(String[] args) { 
     App obj = new App(); 
     obj.run(); 
    } 

    private void run() { 
     final String[] springConfig = { "spring/batch/jobs/job-partitioner.xml" }; 

     ApplicationContext context = new ClassPathXmlApplicationContext(springConfig); 

     JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher"); 
     Job job = (Job) context.getBean("partitionJob"); 

     try { 
      System.out.println("-----------------------------------------------"); 
      JobExecution execution = jobLauncher.run(job, new JobParameters()); 
      System.out.println("Exit Status : " + execution.getStatus()); 
      System.out.println("Exit Status : " + execution.getAllFailureExceptions()); 
      System.out.println("-----------------------------------------------"); 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 
     System.out.println("Done"); 
    } 
} 

Eklendi Proje yapısı: enter image description here

+0

oluşturulur bir hata, size nasıl "run benziyor sorununuza yol açar Kodu tekrar "? taze mi yoksa yeniden mı? –

+0

Lütfen kodu ana yöntemden de gönderin –

cevap

0

<bean id="flatFileItemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter" scope="step" > 
    <property name="resource" 
     value="file:csv/outputs/users.processed#{stepExecutionContext[fromId]}-#{stepExecutionContext[toId]}.csv" /> 

    <!-- ******* remove this line ***** --><property name="appendAllowed" value="false" /> 

    <property name="lineAggregator"> 
     <bean class="org.springframework.batch.item.file.transform.DelimitedLineAggregator"> 
      <property name="delimiter" value="," /> 
      <property name="fieldExtractor"> 
       <bean class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor"> 
        <property name="names" value="id, username, password, age" /> 
       </bean> 
      </property> 
     </bean> 
    </property> 
</bean> 

(source of flatfileitemwriter bkz varsayılan olarak false ekleme) appendAllowed yapılandırmasını kaldırmak ve işe yarayacak Bunun neden problemi çözdüğüne dair sebep, nedense FlatFileItemWriter'in içinde yatıyor. ekleme yapılması için değerinden bağımsız

/** 
* Flag to indicate that the target file should be appended if it already 
* exists. If this flag is set then the flag 
* {@link #setShouldDeleteIfExists(boolean) shouldDeleteIfExists} is 
* automatically set to false, so that flag should not be set explicitly. 
* Defaults value is false. 
* 
* @param append the flag value to set 
*/ 
public void setAppendAllowed(boolean append) { 
    this.append = append; 
    this.shouldDeleteIfExists = false; 
} 

, shouldDeleteIfExists yanlış olarak ayarlanır, bu

BATCH-2495

+0

Mevcut dosyaları eklemek için ne yapmamız gerekiyor? –

+0

append = true Çalışmasını sağlamalı –

+0

Evet, işe yarıyor. Yukarıda iyi bir açıklama için teşekkürler. –