Skip to content
Advertisement

Kafka Connect: Convert message from bytes to Json

I am trying to use a Google PubSub source connector to fetch data from my google cloud to kafka. I do get the data, but the message comes as bytes. I refered here and as mentioned, I have used a JSON convertor to change it.

Here is my connector code part:

name=CPSSourceConnector
connector.class=com.google.pubsub.kafka.source.CloudPubSubSourceConnector
tasks.max=10
kafka.topic=test-topic
kafka.topic.replication.factor=1
kafka.key.attribute=message
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
cps.subscription=test-sub
cps.project=sensor-alpha

And this what I get in my kafka:

{
   "schema":{
      "type":"struct",
      "fields":[
         {
            "type":"bytes",
            "optional":false,
            "field":"message"
         },
         {
            "type":"string",
            "optional":false,
            "field":"subFolder"
         },
         {
            "type":"string",
            "optional":false,
            "field":"deviceId"
         },
         {
            "type":"string",
            "optional":false,
            "field":"deviceRegistryLocation"
         },
         {
            "type":"string",
            "optional":false,
            "field":"projectId"
         },
         {
            "type":"string",
            "optional":false,
            "field":"deviceNumId"
         },
         {
            "type":"string",
            "optional":false,
            "field":"deviceRegistryId"
         }
      ],
      "optional":false
   },
   "payload":{
      "message":"eyJzZW5zb3JfaWQiOiAiYmEwMGQyNjNiNzRiMzBhMGFjM2EzMDlkZWZjZjM0ODMtMzAyIiwgInRfY2Vsc2l1cyI6IDEwLCAicmVnaXN0cnlfaWQiOiAiYmFsZW5hLXJlZ2lzdHJ5IiwgInByZXNzdXJlIjogMTAsICJ0aW1lc3RhbXAiOiAxNTk4NDM2NTk3LjQxNTEwNDYsICJkZXZpY2VfaWQiOiAiYmEwMGQyNjNiNzRiMzBhMGFjM2EzMDlkZWZjZjM0ODMiLCAic3RyaW5nX2JhdHRlcnkiOiAiYmF0dGVyeV9ub3JtYWwiLCAic3RyaW5nX2luZmxhdGUiOiAidGlyZV9vdmVyX2luZmxhdGVkIn0=",
      "subFolder":"",
      "deviceId":"deviceid",
      "deviceRegistryLocation":"region_value",
      "projectId":"projectid",
      "deviceNumId":"device_num_value",
      "deviceRegistryId":"registryid"
   }
}

Even after providing the connector, details I get message as byte. Is there something further I should be doing to convert it to json format ?

Advertisement

Answer

The Cloud Pub/Sub Kafka connector does not inspect or convert the data in messages that it receives; it just passes the data field through as bytes, which is the type of the field in the PubsubMessage. There is currently no way to get the connector itself to read the contents of the message and convert it to JSON.

Advertisement