I am trying to use a Google PubSub source connector to fetch data from my google cloud to kafka. I do get the data, but the message comes as bytes. I refered here and as mentioned, I have used a JSON convertor to change it.
Here is my connector code part:
name=CPSSourceConnector connector.class=com.google.pubsub.kafka.source.CloudPubSubSourceConnector tasks.max=10 kafka.topic=test-topic kafka.topic.replication.factor=1 kafka.key.attribute=message key.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter=org.apache.kafka.connect.json.JsonConverter value.converter.schemas.enable=true cps.subscription=test-sub cps.project=sensor-alpha
And this what I get in my kafka:
{ "schema":{ "type":"struct", "fields":[ { "type":"bytes", "optional":false, "field":"message" }, { "type":"string", "optional":false, "field":"subFolder" }, { "type":"string", "optional":false, "field":"deviceId" }, { "type":"string", "optional":false, "field":"deviceRegistryLocation" }, { "type":"string", "optional":false, "field":"projectId" }, { "type":"string", "optional":false, "field":"deviceNumId" }, { "type":"string", "optional":false, "field":"deviceRegistryId" } ], "optional":false }, "payload":{ "message":"eyJzZW5zb3JfaWQiOiAiYmEwMGQyNjNiNzRiMzBhMGFjM2EzMDlkZWZjZjM0ODMtMzAyIiwgInRfY2Vsc2l1cyI6IDEwLCAicmVnaXN0cnlfaWQiOiAiYmFsZW5hLXJlZ2lzdHJ5IiwgInByZXNzdXJlIjogMTAsICJ0aW1lc3RhbXAiOiAxNTk4NDM2NTk3LjQxNTEwNDYsICJkZXZpY2VfaWQiOiAiYmEwMGQyNjNiNzRiMzBhMGFjM2EzMDlkZWZjZjM0ODMiLCAic3RyaW5nX2JhdHRlcnkiOiAiYmF0dGVyeV9ub3JtYWwiLCAic3RyaW5nX2luZmxhdGUiOiAidGlyZV9vdmVyX2luZmxhdGVkIn0=", "subFolder":"", "deviceId":"deviceid", "deviceRegistryLocation":"region_value", "projectId":"projectid", "deviceNumId":"device_num_value", "deviceRegistryId":"registryid" } }
Even after providing the connector, details I get message as byte. Is there something further I should be doing to convert it to json format ?
Advertisement
Answer
The Cloud Pub/Sub Kafka connector does not inspect or convert the data in messages that it receives; it just passes the data field through as bytes, which is the type of the field in the PubsubMessage. There is currently no way to get the connector itself to read the contents of the message and convert it to JSON.