Skip to content
Advertisement

Does the Kafka Streams StreamBuilder always detect “duplicate” input topics?

This code creates two KStream instances separately, both are reading from the same topic:

    final KStream<String, String> inputStream1 =
      builder.stream(INPUT_TOPIC, consumed);
    final KStream<String, String> inputStream2 =
      builder.stream(INPUT_TOPIC, consumed);

    final KStream<String, String> mappedStream1 = inputStream1
            .peek((k, v) -> System.out.println("1: " + k + " -> " + v))
            .mapValues((ValueMapper<String, String>) String::toLowerCase);

    final KStream<String, String> mappedStream2 = inputStream2
            .peek((k, v) -> System.out.println("2: " + k + " -> " + v))
            .mapValues((ValueMapper<String, String>) String::toUpperCase);

    mappedStream1.to(OUTPUT_TOPIC_1, produced);
    mappedStream2.to(OUTPUT_TOPIC_2, produced);

The topology looks like this: There is only one source definition which is then used two times:

Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [input])
      --> KSTREAM-PEEK-0000000002, KSTREAM-PEEK-0000000004
    Processor: KSTREAM-PEEK-0000000002 (stores: [])
      --> KSTREAM-MAPVALUES-0000000003
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-PEEK-0000000004 (stores: [])
      --> KSTREAM-MAPVALUES-0000000005
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-MAPVALUES-0000000003 (stores: [])
      --> KSTREAM-SINK-0000000006
      <-- KSTREAM-PEEK-0000000002
    Processor: KSTREAM-MAPVALUES-0000000005 (stores: [])
      --> KSTREAM-SINK-0000000007
      <-- KSTREAM-PEEK-0000000004
    Sink: KSTREAM-SINK-0000000006 (topic: output-1)
      <-- KSTREAM-MAPVALUES-0000000003
    Sink: KSTREAM-SINK-0000000007 (topic: output-2)
      <-- KSTREAM-MAPVALUES-0000000005

Now my question is: Is it always safe to assume that the StreamBuilder creates only one source (= only one consumer for the same topic)?

In other words: Is it always guaranteed that – given a topic with multiple partitions – inputStream1 and inputStream2 see the same records?

Or is it better to rewrite it to something like this, to make it explicit:

    final KStream<String, String> inputStream =
      builder.stream(INPUT_TOPIC, consumed);

    final KStream<String, String> mappedStream1 = inputStream
            .peek((k, v) -> System.out.println("1: " + k + " -> " + v))
            .mapValues((ValueMapper<String, String>) String::toLowerCase);

    final KStream<String, String> mappedStream2 = inputStream
            .peek((k, v) -> System.out.println("2: " + k + " -> " + v))
            .mapValues((ValueMapper<String, String>) String::toUpperCase);

Update

The 2nd version results in this topology:

Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [input])
      --> KSTREAM-PEEK-0000000001, KSTREAM-PEEK-0000000003
    Processor: KSTREAM-PEEK-0000000001 (stores: [])
      --> KSTREAM-MAPVALUES-0000000002
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-PEEK-0000000003 (stores: [])
      --> KSTREAM-MAPVALUES-0000000004
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-MAPVALUES-0000000002 (stores: [])
      --> KSTREAM-SINK-0000000005
      <-- KSTREAM-PEEK-0000000001
    Processor: KSTREAM-MAPVALUES-0000000004 (stores: [])
      --> KSTREAM-SINK-0000000006
      <-- KSTREAM-PEEK-0000000003
    Sink: KSTREAM-SINK-0000000005 (topic: output-1)
      <-- KSTREAM-MAPVALUES-0000000002
    Sink: KSTREAM-SINK-0000000006 (topic: output-2)
      <-- KSTREAM-MAPVALUES-0000000004

Advertisement

Answer

The builder would be the same, same application.id

Can’t speak for the topology, but thinking about the flow at the Consumer API level, the group.id is built based on the application.id, therefore your consumer group would be the same for both streams.

With one input topic, only one consumer instance (between the two) would be able to consume from that input topic.

This would explain why there is only one source; therefore, you don’t need additional builder.stream() calls with the same parameters.

User contributions licensed under: CC BY-SA
3 People found this is helpful
Advertisement