Skip to content
Advertisement

Master/slave Job architectural design in spring batch using modular job approach

I hope you’re doing great. I’m facing design problem in spring batch.

Let me explain:

I have a modular spring batch job architecture, each job has its own config file et context. I am designing a master Job to launch the subjobs (50+ types of subjobs).

X obj has among other name, state and blob which contains the csv file attached to it. X obj will be updated after being processed. I follow the first approach of fetching all X obj and then looping (in java stream) to call the appropriate job.

But this approach has a lot of limitations.

So I design a masterJob with reader processor and writer.

MasterJob should read X obj and call the appropriate subJob and the update the state of X obj.

masterJobReader which call a custom service to get a list of let’s say X obj.

I started by trying to launch subjob from within the masterJob processor but It did not work.

I did some research and I find that JobStep could be more adequate for this scenario.

But I’m stuck with how to pass the item read by masterJobReader to JobStep has parameter.

I did saw DefaultJobParameterExtractor and I try to set the Item read into the stepExecutionContext but It’s not working.

My question how to pass parameter from MasterJob to SubJob using JobStep approach?

If there is better way to deal with this then I’m all yours!

I’m using Java Config and spring batch 4.3.

Edit to provide sample code:

@Configuration
public class MasterJob {

@Value("${defaultCompletionPolicy}")
private Integer defaultCompletionPolicy;

@Autowired
protected StepBuilderFactory masterStepBuilderFactory;

private Logger logger = LoggerFactory.getLogger(MasterJob.class);

@Autowired
protected JobRepository jobRepo;

@Autowired
protected PlatformTransactionManager transactionManager;

@Autowired
@Qualifier("JOB_NAME1")
private Job JOB_NAME1; // this should change to be dynamic as there are around 50 types of job

@Bean(name = "masterJob")
protected Job masterBatchJob() throws ApiException {

    return new JobBuilderFactory(jobRepo).get("masterJob")
            .incrementer(new RunIdIncrementer())
            .start(masterJobStep(masterJobReader(), masterJobWriter()))
            .next(jobStepJobStep1(null))
            .next(masterUpdateStep()) // update the state of objX
            .build();
}



@Bean(name = "masterJobStep")
protected Step masterJobStep(@Qualifier("masterJobReader") MasterJobReader masterReader,
        @Qualifier("masterJobWriter") MasterJobWriter masterWriter) throws ApiException {
    
    logger.debug("inside   masterJobStep");

    return this.masterStepBuilderFactory.get("masterJobStep")

            .<Customer, Customer>chunk(defaultCompletionPolicy)
            .reader(masterJobReader())
            .processor(masterJobProcessor())
            .writer(masterJobWriter())
            .transactionManager(transactionManager)             
            .listener(new MasterJobWriter()) // I set the parameter inside this.
            .listener(masterPromotionListener())
            .build();
}

@Bean(name = "masterJobWriter", destroyMethod = "")
@StepScope
protected MasterJobWriter masterJobWriter() {
    return new MasterJobWriter();
}

@Bean(name = "masterJobReader", destroyMethod = "")
@StepScope
protected MasterJobReader masterJobReader() throws ApiException {
    return new MasterJobReader();
}


protected FieldSetMapper<Customer> mapper() {
    return new CustomerMapper();
}

@Bean(name="masterPromotionListener")
public ExecutionContextPromotionListener masterPromotionListener() {
    ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
    
    listener.setKeys(
             
            new String[]                        
                    {   
                        "inputFile", 
                        "outputFile", 
                        "customerId",                           
                        "comments", 
                        "customer" 
                    });
    
    //listener.setStrict(true);
    return listener;
}



@Bean(name = "masterUpdateStep")
public Step masterUpdateStep() {

    return this.masterStepBuilderFactory.get("masterCleanStep").tasklet(new MasterUpdateTasklet()).build();
}



@Bean(name = "masterJobProcessor", destroyMethod = "")
@StepScope
protected MasterJobProcessor masterJobProcessor() {
    return new MasterJobProcessor();
}


@Bean
public Step jobStepJobStep1(JobLauncher jobLauncher) {
    return this.masterStepBuilderFactory.get("jobStepJobStep1")
                .job(JOB_NAME1)
                .launcher(jobLauncher)
                .parametersExtractor(jobParametersExtractor())
                .build();
}

@Bean
public DefaultJobParametersExtractor jobParametersExtractor() {
    DefaultJobParametersExtractor extractor = new DefaultJobParametersExtractor();

    extractor.setKeys(
            new String[] { "inputFile", "outputFile", , "customerId", "comments", "customer" });

    return extractor;
}
 }

This is how I set parameter from within the MasterJobWriter:

String inputFile = fetchInputFile(customer);
                String outputFile = buildOutputFileName(customer);
                
               Comments comments = "comments"; // from business logic
                
                ExecutionContext stepContext = this.stepExecution.getExecutionContext();
                stepContext.put("inputFile", inputFile);
                stepContext.put("outputFile", outputFile);                 
                stepContext.put("customerId", customer.getCustomerId());
                stepContext.put("comments", new CustomJobParameter<Comments>(comments));
                stepContext.put("customer", new CustomJobParameter<Customer>(customer));

I follow this section of the documentation of spring batch

Advertisement

Answer

My question how to pass parameter from MasterJob to SubJob using JobStep approach?

The JobParametersExtractor is what you are looking for. It allows you to extract parameters from the main job and pass them to the subjob. You can find an example here.

EDIT: Adding suggestions based on comments

I have a list of X obj in the DB. X obj has among other fields, id, type(of work), name, state and blob which contains the csv file attached to it. The blob field containing the csv file depends on the type field so it’s not one pattern csv file. I need to process each X obj and save the content of the csv file in the DB and generate a csv result file containing the original data plus a comment field in the result csv file and update X obj state with the result csv field attached to X obj and other fields.

As you can see, the process is already complex for a single X object. So trying to process all X objects in the same job of jobs is too complex IMHO. So much complexity in software comes from trying to make one thing do two things..

If there is better way to deal with this then I’m all yours!

Since you are open for suggestions, I will recommend two options:

Option 1:

If it were up to me, I would create a job instance per X obj. This way, I can 1) parallelize things and 2) in case of failure, restart only the failed job. These two characteristics (Scalability and Restartability) are almost impossible with the job of jobs approach. Even if you have a lot of X objects, this is not a problem. You can use one of the scaling techniques provided by Spring Batch to process things in parallel.

Option 2:

If you really can’t or don’t want to use different job instances, you can use a single job with a chunk-oriented step that iterates over X objects list. The processing logic seems independent from one record to another, so this step should be easily scalable with multiple threads.

User contributions licensed under: CC BY-SA
1 People found this is helpful
Advertisement