I’m trying to use spring cloud stream to solve the following problem:
I have a class that calls two separated functions (Function A and B), both of those functions must work in parallel if the Function A finishes it must call the Function C, the same happens if Function B finish but this will call Function D, then I need to wait for Function C and Function D to finish and merge both responses in a single response, and then return this merged response object to the starting class that must be waiting to get that response.
The problems I have are:
- How do I call Function C to pass the Function A response?
- How to wait until Function C and Function D finish and get their responses in Function E?
- How to wait for the response of Function E in the controller, I’m using
streamBridge.send
which to start Function A and Function B at the same time.
I’m using
spring-cloud-stream-3.1.3
spring-cloud-stream-binder-rabbit
Required
I cannot use Kafka
Required
Sample code
ServiceClass
@Service @RequiredArgsConstructor public class ServiceClass { @NonNull private final StreamBridge streamBridge; @Override protected MergedResponse execute(Input input) { var send1 = streamBridge.send("functionA-in-0", input); var send2 = streamBridge.send("functionB-in-0", input); //TODO: Wait for Function E response object } }
Function A
@Slf4j @Configuration public class FunctionAClass{ @Bean public Function<Input, OutputFunctionA> functionA() { return input -> { //TODO: Invoke Function C to pass OutputFunctionA object return OutputFunctionA.builder.build(); }; } }
I don’t mind using Supplier
or Consumer
instead of Function
.
Edit
Hi, @Oleg Zhurakousky thanks for your help, to answer your question my problem is: I have to create a REST endpoint that consume N
different third-party REST endpoints (two at first, async is a must as it will be too slow to process every request sequentially) I don’t need all the data from them, just a few fields to build a common object. I’m planning to used streamBridge
to start the first two functions that will to build the required request for each third-party, then a function to invoke each of their endpoints, then a function to build a common object with each response, and finally a function that collects the commons objects and send them in the response of my service. Let me know if you have another question.
Regards.
Advertisement
Answer
Couple of points.
- Since you are introducing a synchronization point you will have in the end a blocking call no matter what you try since synchronization (such as your aggregation requirement) would have to wait, for two responses, correlate then etc. On top there is an issue of state that you are going to deal with and how to recover such state in the event of a system crash during aggregation.
- Aggregation is not really a use case for s-c-stream so we don’t have a framework-based support for it. I would consider using Spring Integration framework to create pipelines for two+ different endpoints, aggregate then using [Aggregator pattern](aggregator pattern support ) and then use StreamBridge to send data out to target destination.