Skip to content
Advertisement

Spring cloud stream merge responses from two different functions

I’m trying to use spring cloud stream to solve the following problem:

enter image description here

I have a class that calls two separated functions (Function A and B), both of those functions must work in parallel if the Function A finishes it must call the Function C, the same happens if Function B finish but this will call Function D, then I need to wait for Function C and Function D to finish and merge both responses in a single response, and then return this merged response object to the starting class that must be waiting to get that response.

The problems I have are:

  • How do I call Function C to pass the Function A response?
  • How to wait until Function C and Function D finish and get their responses in Function E?
  • How to wait for the response of Function E in the controller, I’m using streamBridge.send which to start Function A and Function B at the same time.

I’m using

  • spring-cloud-stream-3.1.3
  • spring-cloud-stream-binder-rabbit Required

I cannot use Kafka Required

Sample code

ServiceClass

@Service
@RequiredArgsConstructor
public class ServiceClass {

    @NonNull
    private final StreamBridge streamBridge;

    @Override
    protected MergedResponse execute(Input input) {
        var send1 = streamBridge.send("functionA-in-0", input);
        var send2 = streamBridge.send("functionB-in-0", input);

        //TODO: Wait for Function E response object
    }
}

Function A

@Slf4j
@Configuration
public class FunctionAClass{

    @Bean
    public Function<Input, OutputFunctionA> functionA() {
        return input -> {
            //TODO: Invoke Function C to pass OutputFunctionA object
            return OutputFunctionA.builder.build();
        };
    }
}

I don’t mind using Supplier or Consumer instead of Function.

Edit Hi, @Oleg Zhurakousky thanks for your help, to answer your question my problem is: I have to create a REST endpoint that consume N different third-party REST endpoints (two at first, async is a must as it will be too slow to process every request sequentially) I don’t need all the data from them, just a few fields to build a common object. I’m planning to used streamBridge to start the first two functions that will to build the required request for each third-party, then a function to invoke each of their endpoints, then a function to build a common object with each response, and finally a function that collects the commons objects and send them in the response of my service. Let me know if you have another question.

Regards.

Advertisement

Answer

Couple of points.

  1. Since you are introducing a synchronization point you will have in the end a blocking call no matter what you try since synchronization (such as your aggregation requirement) would have to wait, for two responses, correlate then etc. On top there is an issue of state that you are going to deal with and how to recover such state in the event of a system crash during aggregation.
  2. Aggregation is not really a use case for s-c-stream so we don’t have a framework-based support for it. I would consider using Spring Integration framework to create pipelines for two+ different endpoints, aggregate then using [Aggregator pattern](aggregator pattern support ) and then use StreamBridge to send data out to target destination.
Advertisement