I am trying to split the string into different kafka topic based on conditions.
Here is the topology.
- Split the string into words.
- Match every words with conditions (here set of Good words and set of Bad words)
- If atleast 1 words from Bad words set found in the string, it will be sent to Bad-string
topic otherwise it will be sent to Good-string topic.
Problem:
Every string is going to only one topic. (Bad-string topic)
Input:
Your service was good.
He was angry and sad.
Your service was bad but still I am happy.
Output:
good-string (topic)
Your service was good. ( It contains Good words, “good” )
bad-string (topic)
- He was angry and sad ( It contains Bad words, “angry” and “sad” )
- Your service was bad but still I am happy. (Though, there is a Good word “happy” but there is atleast one Bad word “bad” )
Here is the code:
@Configuration @Slf4j public class SplitSentence { private static final Set<String> BAD_WORDS = Set.of("angry", "sad", "bad"); private static final Set<String> GOOD_WORDS = Set.of("happy", "good", "helpful"); @SuppressWarnings("unchecked") @Bean public KStream<String,String> windowCount(StreamsBuilder builder) { var stringSerde = Serdes.String(); var jsonSerde = new JsonSerde<>(CountingDemo.class); ((JsonDeserializer) jsonSerde.deserializer()).setUseTypeHeaders(false); var input = builder.stream("counting",Consumed.with(stringSerde,jsonSerde)); var feedbackStreams = input.flatMap(splitWords()).branch(isGoodWord(), isBadWord()); boolean newString = feedbackStreams[1].toString().isEmpty(); if(newString) input.to("good-string"); else input.to("bad-string"); return input; } private Predicate<? super String, ? super String> isBadWord() { return (key, value) -> BAD_WORDS.contains(value); } private Predicate<? super String, ? super String> isGoodWord() { return (key, value) -> GOOD_WORDS.contains(value); } private KeyValueMapper<String, CountingDemo, Iterable<KeyValue<String,String>>> splitWords() { return (key,value) -> Arrays .asList(value.getText().replaceAll("[^a-zA-Z ]", "").toLowerCase().split("\s+")).stream() .distinct().map(word -> KeyValue.pair(value.getText(), word)).collect(Collectors.toList()); } }
CountingDemo.java
public class CountingDemo { private String name; private String text; }
Where am I wrong ?
Is there any better logic for this ?
Advertisement
Answer
The if statement is always false because the .toString()
of a KStream object is the metadata of it, and never empty.
And if you want the full original string split between the two topics, you should not flatmap at all.
That being said, seems like you want
var feedbackStreams = input.branch(hasGoodWords(), hasBadWords()); feedbackStreams[0].to("good-string"); feedbackStreams[1].to("bad-string");
Where the two functions get the full input message and compare against the sets, rather than be given individual words.
Although, I think you only need one function to capture all messages with a good word to good-string
, and all other messages (no good/bad, both good/bad, and some bad) to the bad-string
topic
e.g.
var feedbackStreams = input.branch(this::hasOnlyGoodWords, (k,v) -> true); feedbackStreams[0].to("good-string"); feedbackStreams[1].to("bad-string"); return input; } private boolean hasOnlyGoodWords(Object key, String value) { String cleaned = value.getText().replaceAll("[^a-zA-Z ]", "").toLowerCase(); Set<String> uniqueWords = Arrays.stream(cleaned.split("\s+")).distinct().collect(toSet()); for (String s : BAD_WORDS) { if (uniqueWords.contains(s)) return false; } uniqueWords.retainAll(GOOD_WORDS); return uniqueWords.size() > 0; }