I have a stream of Kafka messages and wanted to build a HashMap<String,List<Object>>
to be used as API response in Json format.
for (ConsumerRecord<String,String> consumerRecord : records) { if(!responses.containsKey(consumerRecord.topic())){ responses.put(consumerRecord.topic(), new ArrayList<Object>()); } responses.get(consumerRecord.topic()).add(JSONObject.stringToValue(consumerRecord.value())); }
expected response:
{ "topic1": [ {"field1":"value1","field2":"value2","field3":"value3"} ], "topic2": [ {"field1":"value1","field2":"value2","field3":"value3"}, {"anotherfield1":"anothervalue1","anotherfield2":"anothervalue2"} ] }
actual response:
{ "topic1": [ "{"field1":"value1","field2":"value2","field3":"value3"}" ], "topic2": [ "{"field1":"value1","field2":"value2","field3":"value3"}", "{"anotherfield1":"anothervalue1","anotherfield2":"anothervalue2"}" ] }
Advertisement
Answer
It is now working using these changes:
HashMap<String,List<Object>> responses = new HashMap<String,List<Object>>(); for (ConsumerRecord<String,String> consumerRecord : records) { if(!responses.containsKey(consumerRecord.topic())){ responses.put(consumerRecord.topic(), new ArrayList<Object>()); } responses.get(consumerRecord.topic()).add(new JSONObject(consumerRecord.value())); } jsonObject = new JSONObject(responses); return jsonObject.toMap();
- Convert Kafka message string to JSONObject
new JSONObject(consumerRecord.value())
- Construct a JSONObject from a Map using
jsonObject = new JSONObject(responses);
- Return Map<String, Object> using
jsonObject.toMap();