I want to understand a little more details on the relationship between StreamThread
, StreamTask
and how many instances of StreamProcessor
is created when we have:
- a source kafka topic with multiple partitions , say 6.
- I am keeping only ONE
StreamThread
(num.stream.threads=1)
I am keeping a simple processor topology:
source_topic –> Processor1 –> Processor2 –> Processo3 –> sink_topic
Each processor simply forwards to next processor in chain. Snippet of one of the processors. I am using low level Java API.
public class Processor1 implements Processor<String, String> { private ProcessorContext context; public Processor1() { } @Override @SuppressWarnings("unchecked") public void init(ProcessorContext context) { this.context = context } @Override public void punctuate(long timestamp) { // TODO Auto-generated method stub } @Override public void close() { // TODO Auto-generated method stub } @Override public void process(String key, String value) { System.out.println("Inside Processor1#process() method"); context.forward(key, value); } }
Snippet of Main driver application:
Topology topology = new Topology(); topology.addSource("SOURCE", "source-topic-data"); topology.addProcessor("Processor1", () -> new Processor1(), "SOURCE"); topology.addProcessor("Processor2", () -> new Processor2(), "Processor1"); topology.addProcessor("Processor3", () -> new Processor3(), "Processor2"); topology.addSink("SINK", "sink-topic-data", "Processor3"); Properties settings = new Properties(); settings.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1); StreamsConfig config = new StreamsConfig(settings); KafkaStreams streams = new KafkaStreams(topology, config); streams.start();
With this arrangement, I have following questions:
- How many instances of processors (
Processor1
,Processor2
,Processor3
) will be created? - As per my understanding , there will be
SIX stream tasks
. Is a new instance of processor created for eachStream task
or they “share” the sameProcessor instance
? - When a
Stream Thread
is created, does it create a new instance ofprocessor
? - Are
Stream Tasks
created as part ofStream Threads
creation?
(New question added to original list)
- In this scenario a
single stream thread
will haveSIX stream tasks
. Does astream thread
execute thesestream tasks
one-by-one, sort of “in-a-loop”. Dostream tasks
run as a separate “thread”. Basically, not able to understand how asingle stream thread
run multiplestream tasks
at the same time/in parallel?
The below is topology which gets printed:
KafkaStreams processID: 1602fe25-57ab-4620-99df-fd0c15d96e42 StreamsThread appId: my-first-streams-application StreamsThread clientId: my-first-streams-application-1602fe25-57ab-4620-99df-fd0c15d96e42 StreamsThread threadId: my-first-streams-application-1602fe25-57ab-4620-99df-fd0c15d96e42-StreamThread-1 Active tasks: Running: StreamsTask taskId: 0_0 ProcessorTopology: SOURCE: topics: [source-topic-data] children: [Processor1] Processor1: children: [Processor2] Processor2: children: [Processor3] Processor3: children: [SINK] SINK: topic: sink-topic-data Partitions [source-topic-data-0] StreamsTask taskId: 0_1 ProcessorTopology: SOURCE: topics: [source-topic-data] children: [Processor1] Processor1: children: [Processor2] Processor2: children: [Processor3] Processor3: children: [SINK] SINK: topic: sink-topic-data Partitions [source-topic-data-1] StreamsTask taskId: 0_2 ProcessorTopology: SOURCE: topics: [source-topic-data] children: [Processor1] Processor1: children: [Processor2] Processor2: children: [Processor3] Processor3: children: [SINK] SINK: topic: sink-topic-data Partitions [source-topic-data-2] StreamsTask taskId: 0_3 ProcessorTopology: SOURCE: topics: [source-topic-data] children: [Processor1] Processor1: children: [Processor2] Processor2: children: [Processor3] Processor3: children: [SINK] SINK: topic: sink-topic-data Partitions [source-topic-data-3] StreamsTask taskId: 0_4 ProcessorTopology: SOURCE: topics: [source-topic-data] children: [Processor1] Processor1: children: [Processor2] Processor2: children: [Processor3] Processor3: children: [SINK] SINK: topic: sink-topic-data Partitions [source-topic-data-4] StreamsTask taskId: 0_5 ProcessorTopology: SOURCE: topics: [source-topic-data] children: [Processor1] Processor1: children: [Processor2] Processor2: children: [Processor3] Processor3: children: [SINK] SINK: topic: sink-topic-data Partitions [source-topic-data-5] Suspended: Restoring: New: Standby tasks: Running: Suspended: Restoring: New:
Advertisement
Answer
How many instances of processors (Processor1, Processor2, Processor3) will be created?
In your example, six each. Each task will instantiate a full copy of the Topology
. (cf. https://github.com/apache/kafka/blob/2.4/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L355; note: a Topology
is a the logical representation of the program, and is instantiated asProcessorTopology
at runtime)
As per my understanding, there will be SIX stream tasks. Is a new instance of processor created for each Stream task or they “share” the same Processor instance?
Each task has its own Processor
instance — they are not shared.
When a Stream Thread is created, does it create a new instance of processor?
No. When a task is created, it will create new Processor
instances.
Are Stream Tasks created as part of Stream Threads creation?
No. Tasks are create during a rebalance according to the partition/task assignment. KafkaStreams registers a StreamsRebalanceListener
on its internal cosumner that call TaskManager#createTasks()
Update (as question was extended):
In this scenario a single stream thread will have SIX stream tasks. Does a stream thread execute these stream tasks one-by-one, sort of “in-a-loop”. Do stream tasks run as a separate “thread”. Basically, not able to understand how a single stream thread run multiple stream tasks at the same time/parallely?
Yes, the StreamsThread
will execute the tasks in a loop. There are no other threads. Hence, tasks that are assigned to the same thread are not executed at the same time/in-parallel but one after each other.(Cf. https://github.com/apache/kafka/blob/2.4/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java#L472 — each StreamThread
used exactly one TaskManager
that uses AssignedStreamsTasks
and AssignedStandbyTasks
internally.)