I have a kafka streams application waiting for records to be published on topic user_activity
. It will receive json data and depending on the value of against a key I want to push that stream into different topics.
This is my streams App code:
KStream<String, String> source_user_activity = builder.stream("user_activity"); source_user_activity.flatMapValues(new ValueMapper<String, Iterable<String>>() { @Override public Iterable<String> apply(String value) { System.out.println("value: " + value); ArrayList<String> keywords = new ArrayList<String>(); try { JSONObject send = new JSONObject(); JSONObject received = new JSONObject(value); send.put("current_date", getCurrentDate().toString()); send.put("activity_time", received.get("CreationTime")); send.put("user_id", received.get("UserId")); send.put("operation_type", received.get("Operation")); send.put("app_name", received.get("Workload")); keywords.add(send.toString()); // apply regex to value and for each match add it to keywords } catch (Exception e) { // TODO: handle exception System.err.println("Unable to convert to json"); e.printStackTrace(); } return keywords; } }).to("user_activity_by_date");
In this code, I want to check operation type and then depending on that I want to push the streams into the relevant topic.
How can I achieve this?
EDIT:
I have updated my code to this:
final StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> source_o365_user_activity = builder.stream("o365_user_activity"); KStream<String, String>[] branches = source_o365_user_activity.branch( (key, value) -> (value.contains("Operation":"SharingSet") && value.contains("ItemType":"File")), (key, value) -> (value.contains("Operation":"AddedToSecureLink") && value.contains("ItemType":"File")), (key, value) -> true ); branches[0].to("o365_sharing_set_by_date"); branches[1].to("o365_added_to_secure_link_by_date"); branches[2].to("o365_user_activity_by_date");
Advertisement
Answer
You can use branch
method in order to split your stream. This method takes predicates for splitting the source stream into several streams.
The code below is taken from kafka-streams-examples:
KStream<String, OrderValue>[] forks = ordersWithTotals.branch( (id, orderValue) -> orderValue.getValue() >= FRAUD_LIMIT, (id, orderValue) -> orderValue.getValue() < FRAUD_LIMIT); forks[0].mapValues( orderValue -> new OrderValidation(orderValue.getOrder().getId(), FRAUD_CHECK, FAIL)) .to(ORDER_VALIDATIONS.name(), Produced .with(ORDER_VALIDATIONS.keySerde(), ORDER_VALIDATIONS.valueSerde())); forks[1].mapValues( orderValue -> new OrderValidation(orderValue.getOrder().getId(), FRAUD_CHECK, PASS)) .to(ORDER_VALIDATIONS.name(), Produced .with(ORDER_VALIDATIONS.keySerde(), ORDER_VALIDATIONS.valueSerde()));