Skip to content
Advertisement

How to implement a synchronous job queue in Spring?

I’m trying to find out how to implement a job queue using Spring.

I’ve got a server up and running that I plan to have users submit POST requests to. This will take in some data and will then queue jobs to process this data.

The processing of this data is an expensive process that can sometimes take 5 to 20 minutes (depending on the amount of work needing done). Because of this, it needs to run synchronously. I.e. one job finishes and then the next one can start.

e.g

  • A user submits job A
  • Job A is started since the queue is empty
  • Another user submits a second job, job B
  • Job A is still running, so job B is placed into a queue
  • Another user comes along and submits job C, job A is still running so it’s placed into the queue along with job B.

I’ve only recently started learning Spring so I’m looking for some idea as to how to achieve this.

My idea would be to have a factory class that takes in jobs which can then be scheduled.

One of my end-points look like the following:

@RequestMapping(value = "/submitjob", method = RequestMethod.POST)
    public void queueJob(
            @RequestPart("file") MultipartFile file
    ) {

        if (file != null) {
           // queue job
        }
        // else return bad response.
    }

Any advice is greatly appreciated.

Advertisement

Answer

You can use java.util.concurrent.ExecutorService with a single thread to implement this behavior.

NB : this implementation could evolve easily to a multithreaded service so you can run processing in parallel

The first problem you have to face is you don’t want to block your client’s request.

If you pass MultipartFile directly to a service, it will have to wait until the file is processed which can end to a timeout because the input stream is in the request.

First you have to copy the file of the multipart in order to upload it. In your controller :

private final FileProcessingService fileProcessingService;

public StackOverFlowController(FileProcessingService fileProcessingService) {
    this.fileProcessingService = fileProcessingService;
}

@PostMapping(value = "/submitjob")
public void queueJob(@RequestPart("file") MultipartFile multipartFile) throws IOException, ExecutionException, InterruptedException {

    File tempFile = copyInputStreamToTempFile(multipartFile);

    fileProcessingService.queueFile(tempFile);

}

private File copyInputStreamToTempFile(MultipartFile multipartFile) throws IOException {
    File tempFile = File.createTempFile("queued-file-", ".tmp");
    try (OutputStream os = new FileOutputStream(tempFile)) {
        IOUtils.copy(multipartFile.getInputStream(), os);
    }
    return tempFile;
}

        

Here the MultipartFile is copied to a temp file but you could save it in a directory The file is then passed to the FileProcessingService which has to be non blocking

Then to create a non blocking queue which will process files sequentially you can use a single threaded ExecutorService. Calling execute will add tasks to the queue. This method takes in parameter an implementation of Runnable

The service skeleton may look like this :

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.io.File;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Component
@Slf4j
public class FileProcessingService {

    private final ExecutorService executor = Executors.newSingleThreadExecutor();

    public void queueFile(File fileToProcess) {
        executor.execute(new FileProcessRunnable(fileToProcess));
        log.info("Queued file " + fileToProcess);
    }
}

a simple stub of the Runnable with a Thread.sleep to simulate processing :

@Slf4j
public class FileProcessRunnable implements Runnable {

    private final File fileToProcess;

    public FileProcessRunnable(File fileToProcess) {
        this.fileToProcess = fileToProcess;
    }

    @Override
    public void run() {
        process();
        log.info("Processed file " + fileToProcess.getName());
    }

    private void process() {
        try {
            Thread.sleep(1000); //simulating process
        } catch (InterruptedException e) {
            log.error("Error during process", e);
        }
    }
}

A not so real test to simulate behavior :

@Test
@SneakyThrows
void should_queue_file_processing() {
    FileProcessingService fileProcessingService = new FileProcessingService();

    File file1 = File.createTempFile("temp-", ".tmp");
    File file2 = File.createTempFile("temp-", ".tmp");
    File file3 = File.createTempFile("temp-", ".tmp");
    File file4 = File.createTempFile("temp-", ".tmp");

    fileProcessingService.queueFile(file1);
    fileProcessingService.queueFile(file2);
    fileProcessingService.queueFile(file3);
    fileProcessingService.queueFile(file4);

    Thread.sleep(1000 * 5);//await until tasks are completed
}

The test above will log : enter image description here

as you can see, files are queued before they are processed

see for more info on Executors : https://www.baeldung.com/java-executor-service-tutorial

Advertisement