The problem setting
I have a stream of nodes and a stream of edges that represent consecutive updates of a graph and I want to build patterns composed of nodes and edges using multiple joins in series. Let’s suppose I want to match a pattern like: (node1) –[edge1]–> (node2).
My idea is to join the stream of nodes with the stream of edges in order to compose a stream of sub-patterns of type (node1) –[edge1]–>. Then take the resulting stream and join it with the stream of nodes another time in order to compose the final pattern (node1) –[edge1]–> (node2). Filterings on the particular type of nodes and edges are not important.
Data model
So I have nodes, edges and patterns structured in Avro format:
{ "namespace": "DataModel", "type": "record", "name": "Node", "doc": "Node schema, it contains a nodeID label and properties", "fields": [ { "name": "nodeID", "type": "long" }, { "name": "labels", "type": { "type": "array", "items": "string", "avro.java.string": "String" } }, { "name": "properties", "type": { "type": "map", "values": "string", "avro.java.string": "String" } }, { "name": "timestamp", "type": "long" } ] }
{ "namespace": "DataModel", "type": "record", "name": "Edge", "doc": "contains edgeID, a type, a list of properties, a starting node ID and an ending node ID ", "fields": [ { "name": "edgeID", "type": "long" }, { "name": "type", "type": "string" }, { "name": "properties", "type": { "type": "map", "values": "string", "avro.java.string": "String" } }, { "name": "startID", "type": "long" }, { "name": "endID", "type": "long" }, { "name": "timestamp", "type": "long" } ] }
{ "namespace": "DataModel", "type": "record", "name": "Pattern", "fields": [ { "name": "first", "type": "long" }, { "name": "nextJoinID", "type": [ "null", "long" ], "default": null }, { "name": "timestamp", "type": "long" }, { "name": "segments", "doc": "It's the ordered list of nodes and edges that compose this sub-pattern from the leftmost node to the rightmost edge or node", "type": { "type": "array", "items": [ "DataModel.Node", "DataModel.Edge" ] } }
Then I have the following two ValueJoiners:
The first one to be used for an inner join of a nodes stream and an edges stream.
The second one to be used for an inner join of a supatterns stream and node stream.
public class NodeEdgeJoiner implements ValueJoiner<Node, Edge, Pattern> { @Override public Pattern apply(Node node, Edge edge) { Object[] segments = {node,edge}; return Pattern.newBuilder() .setFirst(node.getNodeID()) .setNextJoinID(edge.getEndID()) .setSegments(Arrays.asList(segments)) .setTimestamp(Math.min(node.getTimestamp(),edge.getTimestamp())) .build(); } }
public class PatternNodeJoiner implements ValueJoiner<Pattern, Node, Pattern> { @Override public Pattern apply(Pattern pattern, Node node) { List<Object> segments = pattern.getSegments(); segments.add(node); return Pattern.newBuilder() .setFirst(pattern.getFirst()) .setNextJoinID(node.getNodeID()) .setSegments(segments) .setTimestamp(Math.min(node.getTimestamp(),pattern.getTimestamp())) .build(); } }
My intention is to catch patterns like : (nodeId == 1)–[label == “related_to”]–>() where
- (nodeId == 1) represents a node with id=1
- –[label == “related_to”]–> represents a directed edge with label = “related_to”
- () represents a generic node.
The idea for concatenating those pieces together is to perform two consecutive joins using the previous Valuejoiners. I want you to focus to the first operation performed by both the ValueJoiners: in order to build the pattern I just simply append nodes and edges at the end of a list that is part of the Avro schema of a Pattern. The following is the generic loop to produce nodes and edges and publish them in the corresponding topics. The key of each node record corresponds to the nodeID and the key of each edge record is the nodeID of the incoming node of the edge.
while(true){ try (final KafkaProducer<Long, Node> nodeKafkaProducer = new KafkaProducer<Long, Node>(props)) { final KafkaProducer<Long, Edge> edgeKafkaProducer = new KafkaProducer<Long, Edge>(props); nodeKafkaProducer.send(new ProducerRecord<Long, Node>(nodeTopic, (long) 1, buildNodeRecord(1, Collections.singletonList("aString"), "aString", System.currentTimeMillis()))); edgeKafkaProducer.send(new ProducerRecord<Long, Edge>(edgesTopic, (long) 1, buildEdgeRecord(1, 1, 4, "related_to", "aString", System.currentTimeMillis()))); Thread.sleep(9000); } catch (InterruptedException e) { e.printStackTrace(); } }
where:
private Node buildNodeRecord(long nodeId, List<String> labelsToSet, String property, long timestamp){ Node record = new Node(); record.setNodeID(nodeId); record.setLabels(labelsToSet); Map<String, String> propMap = new HashMap<String, String>(); propMap.put("property", property); record.setProperties(propMap); record.setTimestamp(timestamp); return record; } private Edge buildEdgeRecord(long edgeId,long startID, long endID, String type, String property, long timestamp) { Edge record = new Edge(); record.setEdgeID(edgeId); record.setStartID(startID); record.setEndID(endID); record.setType(type); Map<String,String> propMap = new HashMap<String, String>(); propMap.put("property",property); record.setProperties(propMap); record.setTimestamp(timestamp); return record; }
The following part of the code describes the pipeline.
//configuration of specific avro serde for pattern type final SpecificAvroSerde<Pattern> patternSpecificAvroSerde = new SpecificAvroSerde<>(); final Map<String, String> serdeConfig = Collections.singletonMap( AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, envProps.getProperty("schema.registry.url")); patternSpecificAvroSerde.configure(serdeConfig,false); //the valueJoiners we need final NodeEdgeJoiner nodeEdgeJoiner = new NodeEdgeJoiner(); final PatternNodeJoiner patternNodeJoiner = new PatternNodeJoiner(); //timestampExtractors NodeTimestampExtractor nodeTimestampExtractor = new NodeTimestampExtractor(); SubPatternTimeStampExtractor subPatternTimeStampExtractor = new SubPatternTimeStampExtractor(); EdgeTimestampExtractor edgeTimestampExtractor = new EdgeTimestampExtractor(); //node source final KStream<Long, Node> nodeKStream = builder.stream(envProps.getProperty("node.topic.name"), Consumed.with(nodeTimestampExtractor)); //filter on nodes topic nodeKStream.filter((key, value) -> value.getNodeID()==1).to(envProps.getProperty("firstnodes.topic.name")); final KStream<Long,Node> firstFilteredNodes = builder.stream(envProps.getProperty("firstnodes.topic.name"), Consumed.with(nodeTimestampExtractor)); //edges keyed by incoming node final KStream<Long,Edge> edgeKstream = builder.stream(envProps.getProperty("edge.topic.name"), Consumed.with(edgeTimestampExtractor)); //filter operation on edges for the first part of the pattern final KStream<Long,Edge> firstEdgeFiltered = edgeKstream.filter((key, value) -> value.getType().equals("related_to")); //first join firstFilteredNodes.join(firstEdgeFiltered,nodeEdgeSubJoiner, JoinWindows.of(Duration.ofSeconds(10))) .map((key, value) -> new KeyValue<Long, SubPattern>(value.getNextJoinID(), value)) .to(envProps.getProperty("firstJoin.topic.name")); final KStream <Long,SubPattern> mappedFirstJoin = builder.stream(envProps.getProperty("firstJoin.topic.name"), Consumed.with(subPatternTimeStampExtractor)); //second join KStream <Long,Pattern> secondJoin = mappedFirstJoin .join(nodeKStream,subPatternNodeJoiner, JoinWindows.of(Duration.ofSeconds(10))); secondJoin.print(Printed.toSysOut()); // should print out final records
I’m not going to show timestampextractors since I think they are irrelevant to the point.
The Issue
So I expect the output to be a stream of pattern records and the list (“segments” from the Avro schema) of each Pattern to be the same size: 1 node 1 edge and another node. But this doesn’t happen. Instead I get this output:
[KSTREAM-MERGE-0000000018]: 4, {"first": 1, "nextJoinID": 4, "timestamp": 1611252427338, "segments": [{"nodeID": 1, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252427338}, {"edgeID": 1, "type": "related_to", "properties": {"property": "aString"}, "startID": 1, "endID": 4, "timestamp": 1611252427777}, {"nodeID": 4, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252427795}]} [KSTREAM-MERGE-0000000018]: 4, {"first": 1, "nextJoinID": 4, "timestamp": 1611252427338, "segments": [{"nodeID": 1, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252427338}, {"edgeID": 1, "type": "related_to", "properties": {"property": "aString"}, "startID": 1, "endID": 4, "timestamp": 1611252427777}, {"nodeID": 4, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252427795}, {"nodeID": 4, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252436847}]} [KSTREAM-MERGE-0000000018]: 4, {"first": 1, "nextJoinID": 4, "timestamp": 1611252427338, "segments": [{"nodeID": 1, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252427338}, {"edgeID": 1, "type": "related_to", "properties": {"property": "aString"}, "startID": 1, "endID": 4, "timestamp": 1611252436837}, {"nodeID": 4, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252427795}]} [KSTREAM-MERGE-0000000018]: 4, {"first": 1, "nextJoinID": 4, "timestamp": 1611252427338, "segments": [{"nodeID": 1, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252427338}, {"edgeID": 1, "type": "related_to", "properties": {"property": "aString"}, "startID": 1, "endID": 4, "timestamp": 1611252436837}, {"nodeID": 4, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252427795}, {"nodeID": 4, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252436847}]} [KSTREAM-MERGE-0000000018]: 4, {"first": 1, "nextJoinID": 4, "timestamp": 1611252427777, "segments": [{"nodeID": 1, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252436822}, {"edgeID": 1, "type": "related_to", "properties": {"property": "aString"}, "startID": 1, "endID": 4, "timestamp": 1611252427777}, {"nodeID": 4, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252427795}]} [KSTREAM-MERGE-0000000018]: 4, {"first": 1, "nextJoinID": 4, "timestamp": 1611252427777, "segments": [{"nodeID": 1, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252436822}, {"edgeID": 1, "type": "related_to", "properties": {"property": "aString"}, "startID": 1, "endID": 4, "timestamp": 1611252427777}, {"nodeID": 4, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252427795}, {"nodeID": 4, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252436847}]} [KSTREAM-MERGE-0000000018]: 4, {"first": 1, "nextJoinID": 4, "timestamp": 1611252427795, "segments": [{"nodeID": 1, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252436822}, {"edgeID": 1, "type": "related_to", "properties": {"property": "aString"}, "startID": 1, "endID": 4, "timestamp": 1611252436837}, {"nodeID": 4, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252427795}]} [KSTREAM-MERGE-0000000018]: 4, {"first": 1, "nextJoinID": 4, "timestamp": 1611252436822, "segments": [{"nodeID": 1, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252436822}, {"edgeID": 1, "type": "related_to", "properties": {"property": "aString"}, "startID": 1, "endID": 4, "timestamp": 1611252436837}, {"nodeID": 4, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252427795}, {"nodeID": 4, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252436847}]} . . .
As you can see the size of the array of ordered nodes and edges in each record is different. In particular I always see in them: a node and a edge followed by numerous nodes. If I reduce the millisec of sleep in the while(true){…} it will get worse and generate very long lists with many more nodes in the list. I guarantee that the node-edge join is performing well in every condition. It always generate correct results. The problem seems to effect the second join. But I don’t understand how.. I tried to do some testing without success..
The following is the topology:
Topologies: Sub-topology: 0 Source: KSTREAM-SOURCE-0000000000 (topics: [nodes]) --> KSTREAM-WINDOWED-0000000015, KSTREAM-FILTER-0000000001 Source: KSTREAM-SOURCE-0000000013 (topics: [firstJoin]) --> KSTREAM-WINDOWED-0000000014 Processor: KSTREAM-WINDOWED-0000000014 (stores: [KSTREAM-JOINTHIS-0000000016-store]) --> KSTREAM-JOINTHIS-0000000016 <-- KSTREAM-SOURCE-0000000013 Processor: KSTREAM-WINDOWED-0000000015 (stores: [KSTREAM-JOINOTHER-0000000017-store]) --> KSTREAM-JOINOTHER-0000000017 <-- KSTREAM-SOURCE-0000000000 Processor: KSTREAM-JOINOTHER-0000000017 (stores: [KSTREAM-JOINTHIS-0000000016-store]) --> KSTREAM-MERGE-0000000018 <-- KSTREAM-WINDOWED-0000000015 Processor: KSTREAM-JOINTHIS-0000000016 (stores: [KSTREAM-JOINOTHER-0000000017-store]) --> KSTREAM-MERGE-0000000018 <-- KSTREAM-WINDOWED-0000000014 Processor: KSTREAM-FILTER-0000000001 (stores: []) --> KSTREAM-SINK-0000000002 <-- KSTREAM-SOURCE-0000000000 Processor: KSTREAM-MERGE-0000000018 (stores: []) --> KSTREAM-PRINTER-0000000019 <-- KSTREAM-JOINTHIS-0000000016, KSTREAM-JOINOTHER-0000000017 Processor: KSTREAM-PRINTER-0000000019 (stores: []) --> none <-- KSTREAM-MERGE-0000000018 Sink: KSTREAM-SINK-0000000002 (topic: firstFilter) <-- KSTREAM-FILTER-0000000001 Sub-topology: 1 Source: KSTREAM-SOURCE-0000000004 (topics: [edges]) --> KSTREAM-FILTER-0000000005 Processor: KSTREAM-FILTER-0000000005 (stores: []) --> KSTREAM-WINDOWED-0000000007 <-- KSTREAM-SOURCE-0000000004 Source: KSTREAM-SOURCE-0000000003 (topics: [firstFilter]) --> KSTREAM-WINDOWED-0000000006 Processor: KSTREAM-WINDOWED-0000000006 (stores: [KSTREAM-JOINTHIS-0000000008-store]) --> KSTREAM-JOINTHIS-0000000008 <-- KSTREAM-SOURCE-0000000003 Processor: KSTREAM-WINDOWED-0000000007 (stores: [KSTREAM-JOINOTHER-0000000009-store]) --> KSTREAM-JOINOTHER-0000000009 <-- KSTREAM-FILTER-0000000005 Processor: KSTREAM-JOINOTHER-0000000009 (stores: [KSTREAM-JOINTHIS-0000000008-store]) --> KSTREAM-MERGE-0000000010 <-- KSTREAM-WINDOWED-0000000007 Processor: KSTREAM-JOINTHIS-0000000008 (stores: [KSTREAM-JOINOTHER-0000000009-store]) --> KSTREAM-MERGE-0000000010 <-- KSTREAM-WINDOWED-0000000006 Processor: KSTREAM-MERGE-0000000010 (stores: []) --> KSTREAM-MAP-0000000011 <-- KSTREAM-JOINTHIS-0000000008, KSTREAM-JOINOTHER-0000000009 Processor: KSTREAM-MAP-0000000011 (stores: []) --> KSTREAM-SINK-0000000012 <-- KSTREAM-MERGE-0000000010 Sink: KSTREAM-SINK-0000000012 (topic: firstJoin) <-- KSTREAM-MAP-0000000011
pom.xml
<groupId>KafkaJOINS</groupId> <artifactId>KafkaJOINS</artifactId> <version>1.0</version> <repositories> <repository> <id>confluent</id> <url>https://packages.confluent.io/maven/</url> </repository> </repositories> <pluginRepositories> <pluginRepository> <id>confluent</id> <url>https://packages.confluent.io/maven/</url> </pluginRepository> </pluginRepositories> <properties> <log4j.version>2.13.3</log4j.version> <avro.version>1.9.2</avro.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <confluent.version>6.0.0</confluent.version> <kafka.version>6.0.0-ccs</kafka.version> </properties> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>${kafka.version}</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency><dependency> <groupId>io.confluent</groupId> <artifactId>kafka-streams-avro-serde</artifactId> <version>${confluent.version}</version> </dependency> <dependency> <groupId>io.confluent</groupId> <artifactId>kafka-avro-serializer</artifactId> <version>${confluent.version}</version> </dependency> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>${avro.version}</version> </dependency>
Advertisement
Answer
In your first ValueJoiner
you create a new new object:
Object[] segments = {node,edge};
In you second ValueJoiner
you are getting a list and adding to it. You would need to deep-copy the list though:
// your code List<Object> segments = pattern.getSegments(); segments.add(node); // this effectively modifies the input object; // if this input object joins multiple times, // you may introduce an undesired side effect // instead you should do List<Object> segments = new LinkedList<>(pattern.getSegments()); segments.add(node);