This code creates two KStream
instances separately, both are reading from the same topic:
final KStream<String, String> inputStream1 = builder.stream(INPUT_TOPIC, consumed); final KStream<String, String> inputStream2 = builder.stream(INPUT_TOPIC, consumed); final KStream<String, String> mappedStream1 = inputStream1 .peek((k, v) -> System.out.println("1: " + k + " -> " + v)) .mapValues((ValueMapper<String, String>) String::toLowerCase); final KStream<String, String> mappedStream2 = inputStream2 .peek((k, v) -> System.out.println("2: " + k + " -> " + v)) .mapValues((ValueMapper<String, String>) String::toUpperCase); mappedStream1.to(OUTPUT_TOPIC_1, produced); mappedStream2.to(OUTPUT_TOPIC_2, produced);
The topology looks like this: There is only one source definition which is then used two times:
Topologies: Sub-topology: 0 Source: KSTREAM-SOURCE-0000000000 (topics: [input]) --> KSTREAM-PEEK-0000000002, KSTREAM-PEEK-0000000004 Processor: KSTREAM-PEEK-0000000002 (stores: []) --> KSTREAM-MAPVALUES-0000000003 <-- KSTREAM-SOURCE-0000000000 Processor: KSTREAM-PEEK-0000000004 (stores: []) --> KSTREAM-MAPVALUES-0000000005 <-- KSTREAM-SOURCE-0000000000 Processor: KSTREAM-MAPVALUES-0000000003 (stores: []) --> KSTREAM-SINK-0000000006 <-- KSTREAM-PEEK-0000000002 Processor: KSTREAM-MAPVALUES-0000000005 (stores: []) --> KSTREAM-SINK-0000000007 <-- KSTREAM-PEEK-0000000004 Sink: KSTREAM-SINK-0000000006 (topic: output-1) <-- KSTREAM-MAPVALUES-0000000003 Sink: KSTREAM-SINK-0000000007 (topic: output-2) <-- KSTREAM-MAPVALUES-0000000005
Now my question is: Is it always safe to assume that the StreamBuilder
creates only one source (= only one consumer for the same topic)?
In other words: Is it always guaranteed that – given a topic with multiple partitions – inputStream1
and inputStream2
see the same records?
Or is it better to rewrite it to something like this, to make it explicit:
final KStream<String, String> inputStream = builder.stream(INPUT_TOPIC, consumed); final KStream<String, String> mappedStream1 = inputStream .peek((k, v) -> System.out.println("1: " + k + " -> " + v)) .mapValues((ValueMapper<String, String>) String::toLowerCase); final KStream<String, String> mappedStream2 = inputStream .peek((k, v) -> System.out.println("2: " + k + " -> " + v)) .mapValues((ValueMapper<String, String>) String::toUpperCase);
Update
The 2nd version results in this topology:
Topologies: Sub-topology: 0 Source: KSTREAM-SOURCE-0000000000 (topics: [input]) --> KSTREAM-PEEK-0000000001, KSTREAM-PEEK-0000000003 Processor: KSTREAM-PEEK-0000000001 (stores: []) --> KSTREAM-MAPVALUES-0000000002 <-- KSTREAM-SOURCE-0000000000 Processor: KSTREAM-PEEK-0000000003 (stores: []) --> KSTREAM-MAPVALUES-0000000004 <-- KSTREAM-SOURCE-0000000000 Processor: KSTREAM-MAPVALUES-0000000002 (stores: []) --> KSTREAM-SINK-0000000005 <-- KSTREAM-PEEK-0000000001 Processor: KSTREAM-MAPVALUES-0000000004 (stores: []) --> KSTREAM-SINK-0000000006 <-- KSTREAM-PEEK-0000000003 Sink: KSTREAM-SINK-0000000005 (topic: output-1) <-- KSTREAM-MAPVALUES-0000000002 Sink: KSTREAM-SINK-0000000006 (topic: output-2) <-- KSTREAM-MAPVALUES-0000000004
Advertisement
Answer
The
builder
would be the same, sameapplication.id
Can’t speak for the topology, but thinking about the flow at the Consumer API level, the group.id
is built based on the application.id
, therefore your consumer group would be the same for both streams.
With one input topic, only one consumer instance (between the two) would be able to consume from that input topic.
This would explain why there is only one source; therefore, you don’t need additional builder.stream()
calls with the same parameters.