Skip to content
Advertisement

How to get jobId with @BeforeStep in spring batch with compositeItem processor?

I have a composite item processor like so:

@Bean
    public CompositeItemProcessor<User, User> compositeItemProcessor() throws Exception {
        CompositeItemProcessor<User, User> processor = new CompositeItemProcessor<>();
        List<ItemProcessor<User, User>> processors = Arrays.asList(new Processor(),validatingItemProcessor());
        processor.setDelegates(processors);
        processor.afterPropertiesSet();
        return processor;
    }

I am not able to get jobExecutionId in any of the process with @Beforestep This is processor 1.

public class Processor implements ItemProcessor<User, User>  {

    private static final Map<String, String> DEPT_NAMES =
            new HashMap<>();

    private Long jobExecutionId;

    @BeforeStep
    public void beforeStep(StepExecution stepExecution) {
        this.jobExecutionId = stepExecution.getJobExecutionId();
    }

    public Processor() {
        DEPT_NAMES.put("001", "Technology");
        DEPT_NAMES.put("002", "Operations");
        DEPT_NAMES.put("003", "Accounts");
    }

    @Override
    public User process(User user) throws Exception {
        System.out.println(this.jobExecutionId);
        String deptCode = user.getDept();
        String dept = DEPT_NAMES.get(deptCode);
        user.setDept(dept);
        user.setTime(new Date());
        System.out.println(String.format("Converted from [%s] to [%s]", deptCode, dept));
        return user;
    }

}

This is processor 2.

    @Bean
    public ValidatingItemProcessor<User> validatingItemProcessor() {
        return new ValidatingItemProcessor<>(new UserValidator());
    }

Here is my job configuration.

       Step step = stepBuilderFactory.get("ETL-file-load")
                .<User, User>chunk(100)
                .reader(itemReader)
                .processor(compositeItemProcessor())
                .writer(itemWriter)
                .faultTolerant()
                .skipPolicy(jobSkipPolicy())
                .listener(userValidationListener())
                .build();


        return jobBuilderFactory.get("ETL-Load")
                .incrementer(new RunIdIncrementer())
                .start(step)
                .build();

I am using a listener for skipped user records.

    @Bean
    public UserValidationListener userValidationListener() {
        return new UserValidationListener();
    }
public class UserValidationListener implements SkipListener<User, User> {


    @Override
    public void onSkipInRead(Throwable throwable) {

    }

    @Override
    public void onSkipInWrite(User user, Throwable throwable) {

    }

    @Override
    public void onSkipInProcess(User user, Throwable throwable) {
        System.out.println(user.toString());
        System.out.println(throwable.getMessage());
        //write error
        //errorStaorage.()
    }
}

With this configuration, I am not abled to get the jobExecutionId in any of the processor.

I have tried using a single processor instead of composite processor. I get the jobExecutionId.

        Step step = stepBuilderFactory.get("ETL-file-load")
                .<User, User>chunk(100)
                .reader(itemReader)
                .processor(new Processor())
                .writer(itemWriter)
                .faultTolerant()
                .skipPolicy(jobSkipPolicy())
                .listener(userValidationListener())
                .build();

I am getting the jobExecutionId

Converted from [001] to [Technology]
1
Converted from [002] to [Operations]
1
Converted from [003] to [Accounts]
1
Converted from [001] to [Technology]
1
Converted from [001] to [Technology]

Where 1 is the jobId. So, this brings me to a conclusion that there is an error with the way I have setup composite processor and cannot get the id with @BeforeStep.

Why does @Beforestep behave differently and not get invoke when using a composite item processor?

UPDATE As per suggested answer, I have added this.

    @Bean
    public Processor itemProcessor() {
        return new Processor();
    }

And now my steplooks like this…

        Step step = stepBuilderFactory.get("ETL-file-load")
                .<User, User>chunk(100)
                .reader(itemReader)
                .processor(compositeItemProcessor())
                .writer(itemWriter)
                .faultTolerant()
                .skipPolicy(jobSkipPolicy())
                .listener(userValidationListener())
                .listener(itemProcessor())
                .build();

Advertisement

Answer

The issue is that your processor is not being implicitly registered as a listener, only the composite processor is. You will need to do this explicitly in your step

    @Bean
    public CompositeItemProcessor<User, User> compositeItemProcessor() throws Exception {
        CompositeItemProcessor<User, User> processor = new CompositeItemProcessor<>();
        List<ItemProcessor<User, User>> processors = Arrays.asList(processor(), validatingItemProcessor());
        processor.setDelegates(processors);
        processor.afterPropertiesSet();
        return processor;
    }

    @Bean
    public ItemProcessor<User, User> processor() {
        return new Processor();
    }

    @Bean
    public void step() {
        Step step = stepBuilderFactory.get("ETL-file-load")
                .<User, User>chunk(100)
                .reader(itemReader)
                .processor(compositeItemProcessor())
                .writer(itemWriter)
                .faultTolerant()
                .skipPolicy(jobSkipPolicy())
                .listener(userValidationListener())
                .listener(processor()) // register your custom processor as a listener to execute @BeforeStep
                .build();
        return step;
    }
User contributions licensed under: CC BY-SA
10 People found this is helpful
Advertisement