I am trying to stop and start a multithreaded step through Scheduler. But I am getting exception as
Caused by: org.springframework.dao.InvalidDataAccessResourceUsageException: Unexpected cursor position change.
If I understand correctly we wont be able to restart an multithreaded step. But I am not restarting. I stop the job by stepExecution.setTerminateOnly() through ChunkListener() and trying to start this by jobLauncher.run() in a scheduler. Here is my codes;
public class BatchConfiguration { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired public DataSource dataSource; @Bean public TaskExecutor taskExecutor(){ SimpleAsyncTaskExecutor asyncTaskExecutor=new SimpleAsyncTaskExecutor("user_purge"); asyncTaskExecutor.setConcurrencyLimit(5); return asyncTaskExecutor; } @Bean public Job userPurgeJob() { return jobBuilderFactory.get("userPurgeJob") .start(userPurgeStep()) .listener(new JobLoggerListener()) .build(); } @Bean public Step userPurgeStep() { return stepBuilderFactory.get("userPurgeStep") .<UserInfo, UserInfo> chunk(10) .reader(userPurgeReader()) .writer(compositePurgeWriter()) .listener(new StopListener()) .taskExecutor(taskExecutor()) .build(); } @Bean @StepScope public JdbcCursorItemReader<UserInfo> userPurgeReader(){ JdbcCursorItemReader<UserInfo> reader = new JdbcCursorItemReader<UserInfo>(); reader.setDataSource(dataSource); reader.setSql("SELECT user_id, user_status " + "FROM db3.user_purge " + "WHERE user_status = 'I' " + "AND purge_status = 'N'"); reader.setRowMapper(new SoftDeleteMapper()); return reader; } @Bean public CompositeItemWriter<UserInfo> compositePurgeWriter() { CompositeItemWriter<UserInfo> compositeItemWriter = new CompositeItemWriter<>(); compositeItemWriter.setDelegates(Arrays.asList(delMasterWriter(), delTableWriter())); return compositeItemWriter; } @Bean @StepScope public JdbcBatchItemWriter<UserInfo> delMasterWriter() { JdbcBatchItemWriter<UserInfo> writer = new JdbcBatchItemWriter<UserInfo>(); writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>()); writer.setSql("UPDATE db3.userinfo " + "SET user_status = :userStatus, " + "updated = NOW() " + "WHERE user_id = :userId"); writer.setDataSource(dataSource); return writer; } @Bean @StepScope public JdbcBatchItemWriter<UserInfo> delTableWriter() { JdbcBatchItemWriter<UserInfo> writer = new JdbcBatchItemWriter<UserInfo>(); writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>()); writer.setSql("UPDATE db3.user_purge SET purge_status = 'S', del_date = NOW() WHERE user_id = :userId"); writer.setDataSource(dataSource); return writer; } }
StopListener.java This ChunkListner class implementation is used to terminate the execution any time other than between 10pm and 6am
public class StopListener implements ChunkListener{ private StepExecution stepExecution; @Autowired AppUtils appUtils; @Override public void beforeChunk(ChunkContext context) { } @Override public void afterChunk(ChunkContext context) { if (stopJob()) { this.stepExecution.setTerminateOnly(); } } @Override public void afterChunkError(ChunkContext context) { } //Check the time between 10pm and 6am private boolean terminateJob() { Date date = new Date(); Calendar calendar = GregorianCalendar.getInstance(); calendar.setTime(date); calendar.get(Calendar.HOUR_OF_DAY); if(calendar.get(Calendar.HOUR_OF_DAY) >= 6 && calendar.get(Calendar.HOUR_OF_DAY) < 22) { return true; }else { return false; } } }
And finally my scheduler method in application class. I am using CommandLneRunner to accept arguments.
@SpringBootApplication @EnableScheduling public class UserPurgeBatchApplication implements CommandLineRunner{ static final Logger LOG = LogManager.getLogger(UserPurgeBatchApplication.class); @Autowired private JobLauncher jobLauncher; @Autowired private ApplicationContext context; @Autowired private JobRepository jobRepository; @Autowired private JobOperator jobOperator; private String jobName; private JobParameters jobParameters; private String inputFile; private String usertype; private boolean jobStatus = false; private String completionStatus; public static void main(String[] args) throws Exception{ SpringApplication.run(UserPurgeBatchApplication.class, args); } @Override public void run(String... args) throws Exception { this.jobName = args[0]; this.jobParameters = new JobParametersBuilder() .addLong("time", System.currentTimeMillis()) .toJobParameters(); LOG.info("||| Launching the JOB: " + jobName); this.completionStatus = jobSelector(jobName, jobParameters).getExitCode(); LOG.info(">>> JOB completed with status: " + this.completionStatus); } public ExitStatus jobSelector(String jobName, JobParameters jobParameters) { Job job = this.context.getBean(jobName, Job.class); try { return this.jobLauncher.run(job, jobParameters).getExitStatus(); } catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException | JobParametersInvalidException e) { e.printStackTrace(); } return new ExitStatus("FAILED"); } @Scheduled(cron = "0 0/30 22-23,23,0-6 * * *") public void batchStartScheduler() { LOG.info("---Beginning of batchScheduler()---"); Long lastExecutionID = jobRepository.getLastJobExecution(jobName, jobParameters).getId(); String jobStatus = jobRepository.getLastJobExecution(jobName, jobParameters).getStatus().toString(); Job job = this.context.getBean(jobName, Job.class); if(!jobStatus.equals(ApplicationConstants.JOB_EXITSTATUS_COMPLETED)) { if(jobStatus.equals(ApplicationConstants.JOB_EXITSTATUS_STOPPED) || jobStatus.equals(ApplicationConstants.JOB_EXITSTATUS_FAILED)) { try { LOG.info("|||Starting the Job..."); this.jobParameters = new JobParametersBuilder(jobParameters) .addLong("time", System.currentTimeMillis()) .toJobParameters(); this.jobLauncher.run(job, jobParameters); } catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException | JobParametersInvalidException e) { e.printStackTrace(); } } }else { LOG.info("Scheduler not executed!!"); } LOG.info("---End of batchScheduler()---"); } }
There are some confusions. Will run method always try to restart the previous executions if it is failed? Because I could see this was still restarting and that can be the reason for this. I tried to provide new JobParameter hoping it will just launch it again. I hope my stopping method from ChunkListener is ok. But somehow I want to start this job again from Scheduler and I definitely need a multihreaded step. I also hope a CompositeWriter in a Multithreaded step is also fine. A help would be greatly appreciated. Thanks in advance!
Update : Finally I could make it work by adding reader.setVerifyCursorPosition(false). But I think I need to use thread safe Reader as suggested by Mahmoud Ben Hassine. So I am trying to use JdbcPagingItemReader but getting error as “sortKey must be specified”. I think I have specified it but not sure it is correct. Here is my JdbcPagingItemReader
@Bean public JdbcPagingItemReader<UserInfo> jdbcPagingItemReader() { JdbcPagingItemReader<UserInfo> pagingItemReader = new JdbcPagingItemReader<>(); pagingItemReader.setDataSource(dataSource); pagingItemReader.setFetchSize(3); pagingItemReader.setRowMapper(new SoftDeleteMapper()); MySqlPagingQueryProvider mySqlPagingQueryProvider = new MySqlPagingQueryProvider(); mySqlPagingQueryProvider.setSelectClause("SELECT user_id, user_status"); mySqlPagingQueryProvider.setFromClause("FROM db3.user_purge"); mySqlPagingQueryProvider.setWhereClause( "WHERE user_status = 'I' " + "AND purge_status = 'N'"); Map<String, Order> orderByKeys = new HashMap<>(); orderByKeys.put("user_id", Order.ASCENDING); mySqlPagingQueryProvider.setSortKeys(orderByKeys); pagingItemReader.setQueryProvider(mySqlPagingQueryProvider); return pagingItemReader; }
My Updated Step
@Bean public Step userPurgeStep() { return stepBuilderFactory.get("userPurgeStep") .<UserInfo, UserInfo> chunk(10) .reader(jdbcPagingItemReader()) .writer(compositeSoftDelWriter()) .listener(new StopListener()) .taskExecutor(taskExecutor()) .build(); }
Advertisement
Answer
Multi-threading is not compatible with restarts. As mentioned in the Javadoc, you should set saveState
to false if you use the JdbcCursorItemReader
in a multi-threaded step.
Moreover, the JdbcCursorItemReader
is not thread-safe as it wraps a ResultSet
object which is not thread safe and also because it inherits from AbstractItemCountingItemStreamItemReader
which is not thread safe neither. So using it in a multi-threaded step is incorrect. This is actually the cause of your issue Unexpected cursor position change
. Concurrent threads are modifying the cursor position inadvertently.
You need to synchronize access to the reader by wrapping it in a SynchronizedIteamStreamReader
or use a JdbcPagingItemReader
which is thread safe.
EDIT: Add example with JdbcPagingItemReader
Here is a self-contained docker-based sample:
import java.util.HashMap; import java.util.Map; import javax.sql.DataSource; import com.mysql.cj.jdbc.MysqlDataSource; import org.junit.Assert; import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; import org.junit.runner.RunWith; import org.testcontainers.containers.MySQLContainer; import org.testcontainers.utility.DockerImageName; import org.springframework.batch.core.ExitStatus; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.batch.item.database.JdbcPagingItemReader; import org.springframework.batch.item.database.Order; import org.springframework.batch.item.database.support.MySqlPagingQueryProvider; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.ByteArrayResource; import org.springframework.core.io.ClassPathResource; import org.springframework.jdbc.core.BeanPropertyRowMapper; import org.springframework.jdbc.datasource.init.ResourceDatabasePopulator; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @ContextConfiguration public class SO67614305 { private static final DockerImageName MYSQL_IMAGE = DockerImageName.parse("mysql:8.0.24"); @ClassRule public static MySQLContainer<?> mysql = new MySQLContainer<>(MYSQL_IMAGE); @Autowired private DataSource dataSource; @Autowired private JobLauncher jobLauncher; @Autowired private Job job; @Before public void setUp() { String schema = "/org/springframework/batch/core/schema-mysql.sql"; String data = // the script is inline here to have a self contained example "create table person (ID int not null primary key, name varchar(20));" + "insert into person values (1, 'foo1'); insert into person values (2, 'foo2');" + "insert into person values (3, 'foo3'); insert into person values (4, 'foo4');"; ResourceDatabasePopulator databasePopulator = new ResourceDatabasePopulator(); databasePopulator.addScript(new ClassPathResource(schema)); databasePopulator.addScript(new ByteArrayResource(data.getBytes())); databasePopulator.execute(this.dataSource); } @Test public void testJob() throws Exception { // given JobParameters jobParameters = new JobParameters(); // when JobExecution jobExecution = this.jobLauncher.run(this.job, jobParameters); // then Assert.assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus()); } @Configuration @EnableBatchProcessing static class TestConfiguration { @Bean public DataSource dataSource() throws Exception { MysqlDataSource datasource = new MysqlDataSource(); datasource.setURL(mysql.getJdbcUrl()); datasource.setUser(mysql.getUsername()); datasource.setPassword(mysql.getPassword()); datasource.setUseSSL(false); return datasource; } @Bean public JdbcPagingItemReader<Person> jdbcPagingItemReader() throws Exception { MySqlPagingQueryProvider mySqlPagingQueryProvider = new MySqlPagingQueryProvider(); mySqlPagingQueryProvider.setSelectClause("SELECT id, name"); mySqlPagingQueryProvider.setFromClause("FROM person"); Map<String, Order> orderByKeys = new HashMap<>(); orderByKeys.put("id", Order.DESCENDING); mySqlPagingQueryProvider.setSortKeys(orderByKeys); JdbcPagingItemReader<Person> pagingItemReader = new JdbcPagingItemReader<>(); pagingItemReader.setDataSource(dataSource()); pagingItemReader.setFetchSize(2); pagingItemReader.setRowMapper(new BeanPropertyRowMapper<>(Person.class)); pagingItemReader.setQueryProvider(mySqlPagingQueryProvider); return pagingItemReader; } @Bean public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) throws Exception { return jobs.get("job") .start(steps.get("step").chunk(2) .reader(jdbcPagingItemReader()) .writer(items -> items.forEach(System.out::println)) .build()) .build(); } static class Person { int id; String name; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public String toString() { return "Person{" + "id=" + id + ", name='" + name + ''' + '}'; } } } }
This prints items in the descending order as expected without complaining about the missing sort key:
Person{id=4, name='foo4'} Person{id=3, name='foo3'} Person{id=2, name='foo2'} Person{id=1, name='foo1'}