Skip to content
Advertisement

Kafka Stream-GlobalKTable join on a specific field

So I have a KStream that that gets deserialized into a POJO like so

public class FinancialMessage {

public String user_id;
public String stock_symbol;
public String exchange_id;

}

And here’s how the Global Ktable record looks like

public class CompanySectors {

public String company_id;
public String company_name;
public String tckr;
public String sector_cd;
}

I want to be able to join the KStream’s stock_symbol field with the Ktable’s tckr field. Is this possible? I want to create a new EnrichedMessage object before I stream it into another topic. I had code like below but I seem to be getting some null pointer exceptions.

Exception in thread "trade-enrichment-stream-0c7e7782-4217-4450-8086-21871b4ebc45-StreamThread-1" java.lang.NullPointerException
    at com.domain.EnrichedMessage.<init>(EnrichedMessage.java:51)
    at com.domain.TradeEnrichmentTopology.lambda$3(TradeEnrichmentTopology.java:73)
    at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:79)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:101)
    at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$3(StreamTask.java:383)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:801)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:383)
    at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:475)
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:550)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)

Here’s what the code snippet looks like.

KStream<String, FinancialMessage> financialMessageStream =
        builder.stream(
            INCOMING_TOPIC,
            Consumed.with(Serdes.String(), financialMessageSerde)
        );

    GlobalKTable<String, CompanySectors> companySectorsStore = 
        builder.globalTable(
            KTABLE_TOPIC,
            Consumed.with(Serdes.String(), companySectorsSerde)
    );
    
    KStream<String, EnrichedMessage> enrichedStream = financialMessageStream.leftJoin(
        companySectorsStore,
        (financialMessageKey, financialMessageValue) -> financialMessageValue.stock_symbol,
        (financialMessageValue, companySectorsValue) -> new EnrichedMessage(financialMessageValue, companySectorsValue)
    );
    
    enrichedStream.to(
        OUTGOING_TOPIC,
        Produced.with(Serdes.String(), enrichedMessageSerde));

I imagine that there might be some error in my leftJoin logic.

Advertisement

Answer

When doing a left join, you can assume that the left stream’s record is not null; however, you cannot assume that the right GlobalKTable will have a record for matching the given key, and therefore the resulting record could be null. In your case, when you instantiate a new EnrichedMessage(financialMessageValue, companySectorsValue), are you sure that companySectorsValue isn’t null? If it is null, are you handling it properly? It appears that your NPE is occurring in the constructor of EnrichedMessage, so just make sure that you know that companySectorsValue can be null.

Also, ensure your GlobalKTable is prepopulated before any join logic occurs.

User contributions licensed under: CC BY-SA
1 People found this is helpful
Advertisement