Avro custom decoding of UUID through kafka on consumer end



I’ve written a class to custom encode objects of UUID type to bytes to be transported across kafka and avro.

To use this class, I put an @AvroEncode(using=UUIDAsBytesEncoding.class) above the uuid variable in my target object. (This is being implemented by the apache avro reflect library)

I’m having difficulty figuring out how to have my consumer automatically use the custom decoder. (or do I have to go in and manually decode it?).

Here is my UUIDAsBytesEncoder extends CustomEncoding class:

public class UUIDAsBytesEncoding extends CustomEncoding<UUID> {

    public UUIDAsBytesEncoding() {
        List<Schema> union = Arrays.asList(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.BYTES));
        union.get(1).addProp("CustomEncoding", "UUIDAsBytesEncoding");

        schema = Schema.createUnion(union);
    }

    @Override
    protected void write(Object datum, Encoder out) throws IOException {
        if(datum != null) {
            // encode the position of the data in the union
            out.writeLong(1);

            // convert uuid to bytes
            byte[] bytes = new byte[16];
            Conversion.uuidToByteArray(((UUID) datum),bytes,0,16);

            // encode length of data
            out.writeLong(16);

            // write the data
            out.writeBytes(bytes);
        } else {
            // position of null in union
            out.writeLong(0);
        }
    }

    @Override
    protected UUID read(Object reuse, Decoder in) throws IOException {
        System.out.println("READING");
        Long size = in.readLong();
        Long leastSig = in.readLong();
        Long mostSig = in.readLong();
        return new UUID(mostSig, leastSig);
    }
}

The write method and encoding work well, but the read method is never getting called on deserialization. How would I implement this in a consumer?

The schema on the registry looks like:

{“type”:”record”,”name”:”Request”,”namespace”:”xxxxxxx.xxx.xxx”,”fields”:[{“name”:”password”,”type”:”string”},{“name”:”email”,”type”:”string”},{“name”:”id”,”type”:[“null”,{“type”:”bytes”,”CustomEncoding”:”UUIDAsBytesEncoding”}],”default”:null}]} `

If the consumer can’t automatically use that information to use the UUIDAsBytesEncoding read method, then how would I find the data marked with that tag in my consumer?

I am using the confluent schema-registry as well.

Any help would be appreciated!

Answer

Ended up finding the solution. The encoding was incorrect– the built in writeBytes() method automatically writes the length for you.

Then in the consumer, we must do go to through a GenericDatumWriter, write to a binary stream, and then read from the binary stream with a ReflectDatumReader. This will automatically call the UUIAsBytesEncoding read() method and deserialize the UUID.

My consumer would look something like this (as part of a consumer group executor service walkthrough here):

/**
 * Start a single consumer instance
 * This will use the schema built into the IndexedRecord to decode and create key/value for the message
 */
public void run() {
    ConsumerIterator it = this.stream.iterator();
    while (it.hasNext()) {
        MessageAndMetadata messageAndMetadata = it.next();
        try {
            String key = (String) messageAndMetadata.key();
            IndexedRecord value = (IndexedRecord) messageAndMetadata.message();

            ByteArrayOutputStream bytes = new ByteArrayOutputStream();

            GenericDatumWriter<Object> genericRecordWriter = new GenericDatumWriter<>(value.getSchema());
            genericRecordWriter.write(value, EncoderFactory.get().directBinaryEncoder(bytes, null));

            ReflectDatumReader<T> reflectDatumReader = new ReflectDatumReader<>(value.getSchema());
            T newObject = reflectDatumReader.read(null, DecoderFactory.get().binaryDecoder(bytes.toByteArray(), null));
            IOUtils.closeQuietly(bytes);

            System.out.println("************CONSUMED:  " + key + ": "+ newObject);

        } catch(SerializationException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    System.out.println("Shutting down Thread: " + this.threadNumber);
}

Then the new UUIDAsBytesEncoding would look like:

public class UUIDAsBytesEncoding extends CustomEncoding<UUID> {

    public UUIDAsBytesEncoding() {
        List<Schema> union = Arrays.asList(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.BYTES));
        union.get(1).addProp("CustomEncoding", "UUIDAsBytesEncoding");

        schema = Schema.createUnion(union);
    }

    @Override
    protected void write(Object datum, Encoder out) throws IOException {
        if(datum != null) {
            // encode the position of the data in the union
            out.writeLong(1);

            // convert uuid to bytes
            byte[] bytes = new byte[16];
            Conversion.uuidToByteArray(((UUID) datum), bytes, 0, 16);

            // write the data
            out.writeBytes(bytes);
        } else {
            // position of null in union
            out.writeLong(0);
        }
    }

    @Override
    protected UUID read(Object reuse, Decoder in) throws IOException {
        // get index in union
        int index = in.readIndex();
        if (index == 1) {
            // read in 16 bytes of data
            ByteBuffer b = ByteBuffer.allocate(16);
            in.readBytes(b);

            // convert
            UUID uuid = Conversion.byteArrayToUuid(b.array(), 0);

            return uuid;
        } else {
            // no uuid present
            return null;
        }
    }
}

This is also an example of how to implement a CustomEncoding avro class. The current version of avro does not have a UUID serializer built in, so this is a solution to that problem.



Source: stackoverflow