I have a composite item processor like so:
@Bean public CompositeItemProcessor<User, User> compositeItemProcessor() throws Exception { CompositeItemProcessor<User, User> processor = new CompositeItemProcessor<>(); List<ItemProcessor<User, User>> processors = Arrays.asList(new Processor(),validatingItemProcessor()); processor.setDelegates(processors); processor.afterPropertiesSet(); return processor; }
I am not able to get jobExecutionId
in any of the process with @Beforestep
This is processor 1.
public class Processor implements ItemProcessor<User, User> { private static final Map<String, String> DEPT_NAMES = new HashMap<>(); private Long jobExecutionId; @BeforeStep public void beforeStep(StepExecution stepExecution) { this.jobExecutionId = stepExecution.getJobExecutionId(); } public Processor() { DEPT_NAMES.put("001", "Technology"); DEPT_NAMES.put("002", "Operations"); DEPT_NAMES.put("003", "Accounts"); } @Override public User process(User user) throws Exception { System.out.println(this.jobExecutionId); String deptCode = user.getDept(); String dept = DEPT_NAMES.get(deptCode); user.setDept(dept); user.setTime(new Date()); System.out.println(String.format("Converted from [%s] to [%s]", deptCode, dept)); return user; } }
This is processor 2.
@Bean public ValidatingItemProcessor<User> validatingItemProcessor() { return new ValidatingItemProcessor<>(new UserValidator()); }
Here is my job configuration.
Step step = stepBuilderFactory.get("ETL-file-load") .<User, User>chunk(100) .reader(itemReader) .processor(compositeItemProcessor()) .writer(itemWriter) .faultTolerant() .skipPolicy(jobSkipPolicy()) .listener(userValidationListener()) .build(); return jobBuilderFactory.get("ETL-Load") .incrementer(new RunIdIncrementer()) .start(step) .build();
I am using a listener for skipped user records.
@Bean public UserValidationListener userValidationListener() { return new UserValidationListener(); } public class UserValidationListener implements SkipListener<User, User> { @Override public void onSkipInRead(Throwable throwable) { } @Override public void onSkipInWrite(User user, Throwable throwable) { } @Override public void onSkipInProcess(User user, Throwable throwable) { System.out.println(user.toString()); System.out.println(throwable.getMessage()); //write error //errorStaorage.() } }
With this configuration, I am not abled to get the jobExecutionId
in any of the processor.
I have tried using a single processor instead of composite processor. I get the jobExecutionId
.
Step step = stepBuilderFactory.get("ETL-file-load") .<User, User>chunk(100) .reader(itemReader) .processor(new Processor()) .writer(itemWriter) .faultTolerant() .skipPolicy(jobSkipPolicy()) .listener(userValidationListener()) .build();
I am getting the jobExecutionId
Converted from [001] to [Technology] 1 Converted from [002] to [Operations] 1 Converted from [003] to [Accounts] 1 Converted from [001] to [Technology] 1 Converted from [001] to [Technology]
Where 1
is the jobId. So, this brings me to a conclusion that there is an error with the way I have setup composite processor
and cannot get the id
with @BeforeStep
.
Why does @Beforestep
behave differently and not get invoke when using a composite item processor?
UPDATE As per suggested answer, I have added this.
@Bean public Processor itemProcessor() { return new Processor(); }
And now my steplooks like this…
Step step = stepBuilderFactory.get("ETL-file-load") .<User, User>chunk(100) .reader(itemReader) .processor(compositeItemProcessor()) .writer(itemWriter) .faultTolerant() .skipPolicy(jobSkipPolicy()) .listener(userValidationListener()) .listener(itemProcessor()) .build();
Advertisement
Answer
The issue is that your processor is not being implicitly registered as a listener, only the composite processor is. You will need to do this explicitly in your step
@Bean public CompositeItemProcessor<User, User> compositeItemProcessor() throws Exception { CompositeItemProcessor<User, User> processor = new CompositeItemProcessor<>(); List<ItemProcessor<User, User>> processors = Arrays.asList(processor(), validatingItemProcessor()); processor.setDelegates(processors); processor.afterPropertiesSet(); return processor; } @Bean public ItemProcessor<User, User> processor() { return new Processor(); } @Bean public void step() { Step step = stepBuilderFactory.get("ETL-file-load") .<User, User>chunk(100) .reader(itemReader) .processor(compositeItemProcessor()) .writer(itemWriter) .faultTolerant() .skipPolicy(jobSkipPolicy()) .listener(userValidationListener()) .listener(processor()) // register your custom processor as a listener to execute @BeforeStep .build(); return step; }