Understanding Number of StreamProcessor instances created and do stream task share same streamprocessor instance?

Tags: , ,



I want to understand a little more details on the relationship between StreamThread, StreamTask and how many instances of StreamProcessor is created when we have:

  • a source kafka topic with multiple partitions , say 6.
  • I am keeping only ONE StreamThread (num.stream.threads=1)

I am keeping a simple processor topology:

source_topic –> Processor1 –> Processor2 –> Processo3 –> sink_topic

Each processor simply forwards to next processor in chain. Snippet of one of the processors. I am using low level Java API.

public class Processor1 implements Processor<String, String> {

    private ProcessorContext context;
    public Processor1() {
    
    }

    @Override
    @SuppressWarnings("unchecked")
    public void init(ProcessorContext context) {
        this.context = context
    }

    @Override
    public void punctuate(long timestamp) {
        // TODO Auto-generated method stub
    }

    @Override
    public void close() {
        // TODO Auto-generated method stub

    }

    @Override
    public void process(String key, String value) {
        System.out.println("Inside Processor1#process() method");
        context.forward(key, value);
    }
}

Snippet of Main driver application:

Topology topology = new Topology();

topology.addSource("SOURCE", "source-topic-data");
topology.addProcessor("Processor1", () -> new Processor1(), "SOURCE");
topology.addProcessor("Processor2", () -> new Processor2(), "Processor1");
topology.addProcessor("Processor3", () -> new Processor3(), "Processor2");
topology.addSink("SINK", "sink-topic-data", "Processor3");

Properties settings = new Properties();
settings.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
StreamsConfig config = new StreamsConfig(settings);
KafkaStreams streams = new KafkaStreams(topology, config);
streams.start();

With this arrangement, I have following questions:

  • How many instances of processors (Processor1, Processor2, Processor3) will be created?
  • As per my understanding , there will be SIX stream tasks. Is a new instance of processor created for each Stream task or they “share” the same Processor instance?
  • When a Stream Thread is created, does it create a new instance of processor?
  • Are Stream Tasks created as part of Stream Threads creation?

(New question added to original list)

  • In this scenario a single stream thread will have SIX stream tasks. Does a stream thread execute these stream tasks one-by-one, sort of “in-a-loop”. Do stream tasks run as a separate “thread”. Basically, not able to understand how a single stream thread run multiple stream tasks at the same time/in parallel?

The below is topology which gets printed:

KafkaStreams processID: 1602fe25-57ab-4620-99df-fd0c15d96e42
    StreamsThread appId: my-first-streams-application
        StreamsThread clientId: my-first-streams-application-1602fe25-57ab-4620-99df-fd0c15d96e42
        StreamsThread threadId: my-first-streams-application-1602fe25-57ab-4620-99df-fd0c15d96e42-StreamThread-1
        Active tasks:
            Running:                                StreamsTask taskId: 0_0
                                            ProcessorTopology:
                            SOURCE:
                                topics:     [source-topic-data]
                                children:   [Processor1]
                            Processor1:
                                children:   [Processor2]
                            Processor2:
                                children:   [Processor3]
                            Processor3:
                                children:   [SINK]
                            SINK:
                                topic:      sink-topic-data
                    Partitions [source-topic-data-0]
                                StreamsTask taskId: 0_1
                                            ProcessorTopology:
                            SOURCE:
                                topics:     [source-topic-data]
                                children:   [Processor1]
                            Processor1:
                                children:   [Processor2]
                            Processor2:
                                children:   [Processor3]
                            Processor3:
                                children:   [SINK]
                            SINK:
                                topic:      sink-topic-data
                    Partitions [source-topic-data-1]
                                StreamsTask taskId: 0_2
                                            ProcessorTopology:
                            SOURCE:
                                topics:     [source-topic-data]
                                children:   [Processor1]
                            Processor1:
                                children:   [Processor2]
                            Processor2:
                                children:   [Processor3]
                            Processor3:
                                children:   [SINK]
                            SINK:
                                topic:      sink-topic-data
                    Partitions [source-topic-data-2]
                                StreamsTask taskId: 0_3
                                            ProcessorTopology:
                            SOURCE:
                                topics:     [source-topic-data]
                                children:   [Processor1]
                            Processor1:
                                children:   [Processor2]
                            Processor2:
                                children:   [Processor3]
                            Processor3:
                                children:   [SINK]
                            SINK:
                                topic:      sink-topic-data
                    Partitions [source-topic-data-3]
                                StreamsTask taskId: 0_4
                                            ProcessorTopology:
                            SOURCE:
                                topics:     [source-topic-data]
                                children:   [Processor1]
                            Processor1:
                                children:   [Processor2]
                            Processor2:
                                children:   [Processor3]
                            Processor3:
                                children:   [SINK]
                            SINK:
                                topic:      sink-topic-data
                    Partitions [source-topic-data-4]
                                StreamsTask taskId: 0_5
                                            ProcessorTopology:
                            SOURCE:
                                topics:     [source-topic-data]
                                children:   [Processor1]
                            Processor1:
                                children:   [Processor2]
                            Processor2:
                                children:   [Processor3]
                            Processor3:
                                children:   [SINK]
                            SINK:
                                topic:      sink-topic-data
                    Partitions [source-topic-data-5]

            Suspended:
            Restoring:
            New:
        Standby tasks:
            Running:
            Suspended:
            Restoring:
            New:


Answer

How many instances of processors (Processor1, Processor2, Processor3) will be created?

In your example, six each. Each task will instantiate a full copy of the Topology. (cf. https://github.com/apache/kafka/blob/2.4/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L355; note: a Topology is a the logical representation of the program, and is instantiated asProcessorTopology at runtime)

As per my understanding, there will be SIX stream tasks. Is a new instance of processor created for each Stream task or they “share” the same Processor instance?

Each task has its own Processor instance — they are not shared.

When a Stream Thread is created, does it create a new instance of processor?

No. When a task is created, it will create new Processor instances.

Are Stream Tasks created as part of Stream Threads creation?

No. Tasks are create during a rebalance according to the partition/task assignment. KafkaStreams registers a StreamsRebalanceListener on its internal cosumner that call TaskManager#createTasks()

Update (as question was extended):

In this scenario a single stream thread will have SIX stream tasks. Does a stream thread execute these stream tasks one-by-one, sort of “in-a-loop”. Do stream tasks run as a separate “thread”. Basically, not able to understand how a single stream thread run multiple stream tasks at the same time/parallely?

Yes, the StreamsThread will execute the tasks in a loop. There are no other threads. Hence, tasks that are assigned to the same thread are not executed at the same time/in-parallel but one after each other.(Cf. https://github.com/apache/kafka/blob/2.4/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java#L472 — each StreamThread used exactly one TaskManager that uses AssignedStreamsTasks and AssignedStandbyTasks internally.)



Source: stackoverflow