Skip to content
Advertisement

2 consecutive stream-stream inner joins produce wrong results: what does KStream join between streams really do internally?

The problem setting

I have a stream of nodes and a stream of edges that represent consecutive updates of a graph and I want to build patterns composed of nodes and edges using multiple joins in series. Let’s suppose I want to match a pattern like: (node1) –[edge1]–> (node2).
My idea is to join the stream of nodes with the stream of edges in order to compose a stream of sub-patterns of type (node1) –[edge1]–>. Then take the resulting stream and join it with the stream of nodes another time in order to compose the final pattern (node1) –[edge1]–> (node2). Filterings on the particular type of nodes and edges are not important.

Data model

So I have nodes, edges and patterns structured in Avro format:

{
  "namespace": "DataModel",
  "type": "record",
  "name": "Node",
  "doc": "Node schema, it contains a nodeID label and properties",
  "fields": [
    {
      "name": "nodeID",
      "type": "long"
    },
    {
      "name": "labels",
      "type": {
        "type": "array",
        "items": "string",
        "avro.java.string": "String"
      }
    },
    {
      "name": "properties",
      "type": {
        "type": "map",
        "values": "string",
        "avro.java.string": "String"
      }
    },
    {
      "name": "timestamp",
      "type": "long"
    }
  ]
}


{
  "namespace": "DataModel",
  "type": "record",
  "name": "Edge",
  "doc": "contains edgeID, a type, a list of properties, a starting node ID and an ending node ID ",
  "fields": [
    {
      "name": "edgeID",
      "type": "long"
    },
    {
      "name": "type",
      "type": "string"
    },
    {
      "name": "properties",
      "type": {
        "type": "map",
        "values": "string",
        "avro.java.string": "String"
      }
    },
    {
      "name": "startID",
      "type": "long"
    },
    {
      "name": "endID",
      "type": "long"
    },
    {
      "name": "timestamp",
      "type": "long"
    }
  ]
}


{
    "namespace": "DataModel",
    "type": "record",
    "name": "Pattern",
    "fields": [
      {
        "name": "first",
        "type": "long"
      },
      {
        "name": "nextJoinID",
        "type": [
          "null",
          "long"
        ],
        "default": null
      },
      {
        "name": "timestamp",
        "type": "long"
      },
      {
        "name": "segments",
        "doc": "It's the ordered list of nodes and edges that compose this sub-pattern from the leftmost node to the rightmost edge or node",
        "type": {
          "type": "array",
          "items": [
            "DataModel.Node",
            "DataModel.Edge"
          ]
        }
      }

Then I have the following two ValueJoiners:

The first one to be used for an inner join of a nodes stream and an edges stream.
The second one to be used for an inner join of a supatterns stream and node stream.

public class NodeEdgeJoiner implements ValueJoiner<Node, Edge, Pattern> {

    @Override
    public Pattern apply(Node node, Edge edge) {

        Object[] segments = {node,edge};
        return Pattern.newBuilder()
                .setFirst(node.getNodeID())
                .setNextJoinID(edge.getEndID())
                .setSegments(Arrays.asList(segments))
                .setTimestamp(Math.min(node.getTimestamp(),edge.getTimestamp()))
                .build();

    }
}

public class PatternNodeJoiner implements ValueJoiner<Pattern, Node, Pattern> {

    @Override
    public Pattern apply(Pattern pattern, Node node) {

        List<Object> segments = pattern.getSegments();
        segments.add(node);
        return Pattern.newBuilder()
                .setFirst(pattern.getFirst())
                .setNextJoinID(node.getNodeID())
                .setSegments(segments)
                .setTimestamp(Math.min(node.getTimestamp(),pattern.getTimestamp()))
                .build();
    }
}

My intention is to catch patterns like : (nodeId == 1)–[label == “related_to”]–>() where

  • (nodeId == 1) represents a node with id=1
  • –[label == “related_to”]–> represents a directed edge with label = “related_to”
  • () represents a generic node.

The idea for concatenating those pieces together is to perform two consecutive joins using the previous Valuejoiners. I want you to focus to the first operation performed by both the ValueJoiners: in order to build the pattern I just simply append nodes and edges at the end of a list that is part of the Avro schema of a Pattern. The following is the generic loop to produce nodes and edges and publish them in the corresponding topics. The key of each node record corresponds to the nodeID and the key of each edge record is the nodeID of the incoming node of the edge.

while(true){

            try (final KafkaProducer<Long, Node> nodeKafkaProducer = new KafkaProducer<Long, Node>(props)) {

                final KafkaProducer<Long, Edge> edgeKafkaProducer = new KafkaProducer<Long, Edge>(props);

                nodeKafkaProducer.send(new ProducerRecord<Long, Node>(nodeTopic, (long) 1,
                        buildNodeRecord(1, Collections.singletonList("aString"), "aString",
                                System.currentTimeMillis())));
                edgeKafkaProducer.send(new ProducerRecord<Long, Edge>(edgesTopic, (long) 1,
                        buildEdgeRecord(1, 1, 4, "related_to", "aString",
                                System.currentTimeMillis())));
                Thread.sleep(9000);

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

where:

private Node buildNodeRecord(long nodeId, List<String> labelsToSet, String property, long timestamp){

        Node record = new Node();
        record.setNodeID(nodeId);
        record.setLabels(labelsToSet);
        Map<String, String> propMap = new HashMap<String, String>();
        propMap.put("property", property);
        record.setProperties(propMap);
        record.setTimestamp(timestamp);
        return record;


    }  
private Edge buildEdgeRecord(long edgeId,long startID, long endID, String type, String property, long timestamp) {

        Edge record = new Edge();
        record.setEdgeID(edgeId);
        record.setStartID(startID);
        record.setEndID(endID);
        record.setType(type);

        Map<String,String> propMap = new HashMap<String, String>();
        propMap.put("property",property);
        record.setProperties(propMap);
        record.setTimestamp(timestamp);

        return record;
    }

The following part of the code describes the pipeline.

//configuration of specific avro serde for pattern type
        final SpecificAvroSerde<Pattern> patternSpecificAvroSerde = new SpecificAvroSerde<>();
        final Map<String, String> serdeConfig = Collections.singletonMap(
                AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, envProps.getProperty("schema.registry.url"));
        patternSpecificAvroSerde.configure(serdeConfig,false);

        //the valueJoiners we need
        final NodeEdgeJoiner nodeEdgeJoiner = new NodeEdgeJoiner();
        final PatternNodeJoiner patternNodeJoiner = new PatternNodeJoiner();

        //timestampExtractors
        NodeTimestampExtractor nodeTimestampExtractor = new NodeTimestampExtractor();
        SubPatternTimeStampExtractor subPatternTimeStampExtractor = new SubPatternTimeStampExtractor();
        EdgeTimestampExtractor edgeTimestampExtractor = new EdgeTimestampExtractor();

        //node source
        final KStream<Long, Node> nodeKStream = builder.stream(envProps.getProperty("node.topic.name"),
                Consumed.with(nodeTimestampExtractor));

       //filter on nodes topic
        nodeKStream.filter((key, value) -> value.getNodeID()==1).to(envProps.getProperty("firstnodes.topic.name"));
        final KStream<Long,Node> firstFilteredNodes = builder.stream(envProps.getProperty("firstnodes.topic.name"),
                Consumed.with(nodeTimestampExtractor));

        //edges keyed by incoming node
        final KStream<Long,Edge> edgeKstream = builder.stream(envProps.getProperty("edge.topic.name"),
                Consumed.with(edgeTimestampExtractor));

        //filter operation on edges for the first part of the pattern
        final KStream<Long,Edge> firstEdgeFiltered = edgeKstream.filter((key, value) ->
                value.getType().equals("related_to"));

        //first join
        firstFilteredNodes.join(firstEdgeFiltered,nodeEdgeSubJoiner,
                JoinWindows.of(Duration.ofSeconds(10)))
                .map((key, value) -> new KeyValue<Long, SubPattern>(value.getNextJoinID(), value))
                .to(envProps.getProperty("firstJoin.topic.name"));

        final KStream <Long,SubPattern> mappedFirstJoin = builder.stream(envProps.getProperty("firstJoin.topic.name"),
                Consumed.with(subPatternTimeStampExtractor));

        //second join
        KStream <Long,Pattern> secondJoin = mappedFirstJoin
                .join(nodeKStream,subPatternNodeJoiner, JoinWindows.of(Duration.ofSeconds(10)));
        secondJoin.print(Printed.toSysOut()); // should print out final records

I’m not going to show timestampextractors since I think they are irrelevant to the point.

The Issue

So I expect the output to be a stream of pattern records and the list (“segments” from the Avro schema) of each Pattern to be the same size: 1 node 1 edge and another node. But this doesn’t happen. Instead I get this output:

[KSTREAM-MERGE-0000000018]: 4, {"first": 1, "nextJoinID": 4, "timestamp": 1611252427338, "segments": [{"nodeID": 1, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252427338}, {"edgeID": 1, "type": "related_to", "properties": {"property": "aString"}, "startID": 1, "endID": 4, "timestamp": 1611252427777}, {"nodeID": 4, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252427795}]}
[KSTREAM-MERGE-0000000018]: 4, {"first": 1, "nextJoinID": 4, "timestamp": 1611252427338, "segments": [{"nodeID": 1, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252427338}, {"edgeID": 1, "type": "related_to", "properties": {"property": "aString"}, "startID": 1, "endID": 4, "timestamp": 1611252427777}, {"nodeID": 4, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252427795}, {"nodeID": 4, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252436847}]}
[KSTREAM-MERGE-0000000018]: 4, {"first": 1, "nextJoinID": 4, "timestamp": 1611252427338, "segments": [{"nodeID": 1, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252427338}, {"edgeID": 1, "type": "related_to", "properties": {"property": "aString"}, "startID": 1, "endID": 4, "timestamp": 1611252436837}, {"nodeID": 4, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252427795}]}
[KSTREAM-MERGE-0000000018]: 4, {"first": 1, "nextJoinID": 4, "timestamp": 1611252427338, "segments": [{"nodeID": 1, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252427338}, {"edgeID": 1, "type": "related_to", "properties": {"property": "aString"}, "startID": 1, "endID": 4, "timestamp": 1611252436837}, {"nodeID": 4, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252427795}, {"nodeID": 4, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252436847}]}
[KSTREAM-MERGE-0000000018]: 4, {"first": 1, "nextJoinID": 4, "timestamp": 1611252427777, "segments": [{"nodeID": 1, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252436822}, {"edgeID": 1, "type": "related_to", "properties": {"property": "aString"}, "startID": 1, "endID": 4, "timestamp": 1611252427777}, {"nodeID": 4, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252427795}]}
[KSTREAM-MERGE-0000000018]: 4, {"first": 1, "nextJoinID": 4, "timestamp": 1611252427777, "segments": [{"nodeID": 1, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252436822}, {"edgeID": 1, "type": "related_to", "properties": {"property": "aString"}, "startID": 1, "endID": 4, "timestamp": 1611252427777}, {"nodeID": 4, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252427795}, {"nodeID": 4, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252436847}]}
[KSTREAM-MERGE-0000000018]: 4, {"first": 1, "nextJoinID": 4, "timestamp": 1611252427795, "segments": [{"nodeID": 1, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252436822}, {"edgeID": 1, "type": "related_to", "properties": {"property": "aString"}, "startID": 1, "endID": 4, "timestamp": 1611252436837}, {"nodeID": 4, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252427795}]}
[KSTREAM-MERGE-0000000018]: 4, {"first": 1, "nextJoinID": 4, "timestamp": 1611252436822, "segments": [{"nodeID": 1, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252436822}, {"edgeID": 1, "type": "related_to", "properties": {"property": "aString"}, "startID": 1, "endID": 4, "timestamp": 1611252436837}, {"nodeID": 4, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252427795}, {"nodeID": 4, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252436847}]}
.  
.  
.  

As you can see the size of the array of ordered nodes and edges in each record is different. In particular I always see in them: a node and a edge followed by numerous nodes. If I reduce the millisec of sleep in the while(true){…} it will get worse and generate very long lists with many more nodes in the list. I guarantee that the node-edge join is performing well in every condition. It always generate correct results. The problem seems to effect the second join. But I don’t understand how.. I tried to do some testing without success..

The following is the topology:

Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [nodes])
      --> KSTREAM-WINDOWED-0000000015, KSTREAM-FILTER-0000000001
    Source: KSTREAM-SOURCE-0000000013 (topics: [firstJoin])
      --> KSTREAM-WINDOWED-0000000014
    Processor: KSTREAM-WINDOWED-0000000014 (stores: [KSTREAM-JOINTHIS-0000000016-store])
      --> KSTREAM-JOINTHIS-0000000016
      <-- KSTREAM-SOURCE-0000000013
    Processor: KSTREAM-WINDOWED-0000000015 (stores: [KSTREAM-JOINOTHER-0000000017-store])
      --> KSTREAM-JOINOTHER-0000000017
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-JOINOTHER-0000000017 (stores: [KSTREAM-JOINTHIS-0000000016-store])
      --> KSTREAM-MERGE-0000000018
      <-- KSTREAM-WINDOWED-0000000015
    Processor: KSTREAM-JOINTHIS-0000000016 (stores: [KSTREAM-JOINOTHER-0000000017-store])
      --> KSTREAM-MERGE-0000000018
      <-- KSTREAM-WINDOWED-0000000014
    Processor: KSTREAM-FILTER-0000000001 (stores: [])
      --> KSTREAM-SINK-0000000002
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-MERGE-0000000018 (stores: [])
      --> KSTREAM-PRINTER-0000000019
      <-- KSTREAM-JOINTHIS-0000000016, KSTREAM-JOINOTHER-0000000017
    Processor: KSTREAM-PRINTER-0000000019 (stores: [])
      --> none
      <-- KSTREAM-MERGE-0000000018
    Sink: KSTREAM-SINK-0000000002 (topic: firstFilter)
      <-- KSTREAM-FILTER-0000000001

  Sub-topology: 1
    Source: KSTREAM-SOURCE-0000000004 (topics: [edges])
      --> KSTREAM-FILTER-0000000005
    Processor: KSTREAM-FILTER-0000000005 (stores: [])
      --> KSTREAM-WINDOWED-0000000007
      <-- KSTREAM-SOURCE-0000000004
    Source: KSTREAM-SOURCE-0000000003 (topics: [firstFilter])
      --> KSTREAM-WINDOWED-0000000006
    Processor: KSTREAM-WINDOWED-0000000006 (stores: [KSTREAM-JOINTHIS-0000000008-store])
      --> KSTREAM-JOINTHIS-0000000008
      <-- KSTREAM-SOURCE-0000000003
    Processor: KSTREAM-WINDOWED-0000000007 (stores: [KSTREAM-JOINOTHER-0000000009-store])
      --> KSTREAM-JOINOTHER-0000000009
      <-- KSTREAM-FILTER-0000000005
    Processor: KSTREAM-JOINOTHER-0000000009 (stores: [KSTREAM-JOINTHIS-0000000008-store])
      --> KSTREAM-MERGE-0000000010
      <-- KSTREAM-WINDOWED-0000000007
    Processor: KSTREAM-JOINTHIS-0000000008 (stores: [KSTREAM-JOINOTHER-0000000009-store])
      --> KSTREAM-MERGE-0000000010
      <-- KSTREAM-WINDOWED-0000000006
    Processor: KSTREAM-MERGE-0000000010 (stores: [])
      --> KSTREAM-MAP-0000000011
      <-- KSTREAM-JOINTHIS-0000000008, KSTREAM-JOINOTHER-0000000009
    Processor: KSTREAM-MAP-0000000011 (stores: [])
      --> KSTREAM-SINK-0000000012
      <-- KSTREAM-MERGE-0000000010
    Sink: KSTREAM-SINK-0000000012 (topic: firstJoin)
      <-- KSTREAM-MAP-0000000011


pom.xml

<groupId>KafkaJOINS</groupId>
    <artifactId>KafkaJOINS</artifactId>
    <version>1.0</version>

    <repositories>
        <repository>
            <id>confluent</id>
            <url>https://packages.confluent.io/maven/</url>
        </repository>
    </repositories>

    <pluginRepositories>
        <pluginRepository>
            <id>confluent</id>
            <url>https://packages.confluent.io/maven/</url>
        </pluginRepository>
    </pluginRepositories>

    <properties>
        <log4j.version>2.13.3</log4j.version>
        <avro.version>1.9.2</avro.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <confluent.version>6.0.0</confluent.version>
        <kafka.version>6.0.0-ccs</kafka.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>${kafka.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
        </dependency><dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-streams-avro-serde</artifactId>
            <version>${confluent.version}</version>
        </dependency>
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>${confluent.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>${avro.version}</version>
        </dependency>


Advertisement

Answer

In your first ValueJoiner you create a new new object:

Object[] segments = {node,edge};

In you second ValueJoiner you are getting a list and adding to it. You would need to deep-copy the list though:

// your code
List<Object> segments = pattern.getSegments();
segments.add(node); // this effectively modifies the input object;
                    // if this input object joins multiple times,
                    // you may introduce an undesired side effect

// instead you should do
List<Object> segments = new LinkedList<>(pattern.getSegments());
segments.add(node);

Advertisement