Skip to content
Advertisement

Avro logicalType String Date conversion to EPOCH timestamp-millis

I have below schema

     {"name": "timestampstring", "type": [{"type":"string","logicalType":"timestamp-millis"}, "null"]},

I intend to supply date to it, and have conversion convert the date to epoch mili.

    GenericRecord user2 = new GenericData.Record(schema1);
    user2.put("timestampstring", "2019-01-26T12:00:40.931");

    final GenericData genericData = new GenericData();
    genericData.addLogicalTypeConversion(new MyTimestampConversion());
    datumReader = new GenericDatumReader<GenericRecord>(schema2, schema2, genericData);

    GenericRecord user = null;
    try (DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(file1, datumReader)) {
        while (dataFileReader.hasNext()) {
            user = dataFileReader.next(user);

            System.out.println(user);
        }
    }

//Conversion code

public static class MyTimestampConversion extends Conversion<Long> {
    public MyTimestampConversion() {
    }

    public Class<Long> getConvertedType() {
        return Long.class;
    }

    public String getLogicalTypeName() {
        return "timestamp-millis";
    }

    public Long fromCharSequence(CharSequence value, Schema schema, LogicalType type) {
        return 123L;
    }
}

But this code doesnt work… I was expecting it to convert to timestamp milis (i hardcoded 123L in above sample).

Any help?

Advertisement

Answer

Referring back to How to define a LogicalType in Avro. (java), I managed to solve this by creating my own logical type. It seems like doing this with “timestamp-millis” logicalType wont work. So I created my own logicalType…

package example;

import org.apache.avro.Conversion;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Parser;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.*;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.ResolvingDecoder;
import org.joda.time.DateTime;

import java.io.File;
import java.io.IOException;

public class AvroWriteDateUtcToEpochMili {
    public static void main(String[] args) throws IOException {

        Boolean isRegisterNewLogicalType = true;
        Boolean isWrite = true;

        if(isRegisterNewLogicalType) {
            LogicalTypes.register(UtcDateTimeToTimestampMilisLogicalType.CONVERT_LONG_TYPE_NAME, new LogicalTypes.LogicalTypeFactory() {
                private final LogicalType convertLongLogicalType = new UtcDateTimeToTimestampMilisLogicalType();

                @Override
                public LogicalType fromSchema(Schema schema) {
                    return convertLongLogicalType;
                }
            });
        }

        Schema schema1 = new Parser().parse(new File("./userdate_modified_string.avsc"));

        // Serialize user1 and user2 to disk
        File file1 = new File("users.avro");
        if(isWrite) {
            GenericRecord user1 = new GenericData.Record(schema1);
            user1.put("timestamplong", "2019-07-09T04:31:45.281Z");
            //user1.put("timestamplong", 1L);
            user1.put("timestampstring", "2019-07-09T04:31:45.281Z");

            GenericRecord user2 = new GenericData.Record(schema1);
            //user2.put("timestamplong", "2018-07-09T04:30:45.781Z");
            user2.put("timestamplong", 2L);
            user2.put("timestampstring", (new DateTime(2L)).toString());
            //user2.put("timestampstring", new Timestamp(new Date("2018-01-26").getTime()));

            var currentDateTime = DateTime.now();

            GenericRecord user3 = new GenericData.Record(schema1);
            user3.put("timestamplong", currentDateTime.toString());
            //user3.put("timestamplong", 3L);
            user3.put("timestampstring", currentDateTime.toString());

            final GenericData genericData2 = new GenericData();
            genericData2.addLogicalTypeConversion(new MyStringTimestampConversion());

            DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema1, genericData2);
            DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter);
            dataFileWriter.create(schema1, file1);
            dataFileWriter.append(user1);
            dataFileWriter.append(user2);
            dataFileWriter.append(user3);
            dataFileWriter.close();
        }

        // Deserialize users from disk

        Boolean once = true;
        DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema1);
        GenericRecord user = null;
        try (DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(file1, datumReader)) {
            while (dataFileReader.hasNext()) {
                user = dataFileReader.next(user);

                if(once) {
                    System.out.println(user.getSchema());
                    once = false;
                }

                //System.out.println(LogicalTypes.fromSchema(user.getSchema()));
                System.out.println(user);
            }
        }

        // Deserialize users from disk
        System.out.println("//AFTER");

        Schema schema2 = new Parser().parse(new File("./userdate_modified_string.avsc"));
        final GenericData genericData = new GenericData();
        genericData.addLogicalTypeConversion(new MyStringTimestampConversion());
        datumReader = new MyReader<GenericRecord>(schema2, schema2, genericData);

        user = null;
        try (DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(file1, datumReader)) {
            while (dataFileReader.hasNext()) {
                user = dataFileReader.next(user);
                System.out.println(user);
            }
        }
    }

    public static class MyReader<G extends IndexedRecord> extends GenericDatumReader {
        public MyReader() {
            super();
        }

        public MyReader(Schema writer, Schema reader, GenericData data) {
            super(writer, reader, data);
        }

        @Override
        protected Object read(Object old, Schema expected, ResolvingDecoder in) throws IOException {
            Object datum = this.readWithoutConversion(old, expected, in);
            LogicalType logicalType = expected.getLogicalType();
            if (logicalType != null) {
                Conversion<?> conversion = this.getData().getConversionFor(logicalType);
                if (conversion != null) {
                    return this.convert(datum, expected, logicalType, conversion);
                }
            }

            return datum;
        }
    }

    public static class MyStringTimestampConversion extends Conversion<String> {
        public MyStringTimestampConversion() {
            super();
        }

        @Override
        public Class<String> getConvertedType() {
            return String.class;
        }

        @Override
        public String getLogicalTypeName() {
            // "timestamp-millis";
            return UtcDateTimeToTimestampMilisLogicalType.CONVERT_LONG_TYPE_NAME;
        }

        @Override
        public String fromLong(Long millisFromEpoch, Schema schema, LogicalType type) {
            return (new DateTime(millisFromEpoch)).toString();
            //return "123456L";
        }

        @Override
        public Long toLong(String value, Schema schema, LogicalType type) { //https://stackoverflow.com/questions/22681348/joda-datetime-to-unix-datetime
            //DateTimeFormatter dtf = DateTimeFormatter.ofPattern("uuuu-MM-dd'T'HH:mm:ss.SSSSSS'Z'");//https://stackoverflow.com/questions/8405087/what-is-this-date-format-2011-08-12t201746-384z
            DateTime dateTime = DateTime.parse(value);
            long epochMilli = dateTime.toDate().toInstant().toEpochMilli();
            return epochMilli;
        }
    }
}

LogicalType

public class UtcDateTimeToTimestampMilisLogicalType extends LogicalType {
    //The key to use as a reference to the type
    public static final String CONVERT_LONG_TYPE_NAME = "utc-to-epoch-millis";

    public UtcDateTimeToTimestampMilisLogicalType() {
        super(CONVERT_LONG_TYPE_NAME);
    }


    @Override
    public void validate(Schema schema) {
        super.validate(schema);
        if (schema.getType() != Schema.Type.LONG) {
            throw new IllegalArgumentException(
                    "Logical type 'utc-to-epoch-millis' must be backed by bytes");
        }
    }
}

Schema

{
    "namespace": "example.avro.modified.string",
    "type": "record",
    "name": "UserDate",
    "fields": [
        {
            "name": "timestamplong",
            "type": 
                {
                    "type": "long",
                    "logicalType": "utc-to-epoch-millis"
                }
        },
        {
            "name": "timestampstring",
            "type": "string"
        }
    ]
}

Result

{"type":"record","name":"UserDate","namespace":"example.avro.modified.string","fields":[{"name":"timestamplong","type":{"type":"long","logicalType":"utc-to-epoch-millis"}},{"name":"timestampstring","type":"string"}]}
{"timestamplong": 1562646705281, "timestampstring": "2019-07-09T04:31:45.281Z"}
{"timestamplong": 2, "timestampstring": "1970-01-01T07:30:00.002+07:30"}
{"timestamplong": 1601616694713, "timestampstring": "2020-10-02T13:31:34.713+08:00"}
//AFTER
{"timestamplong": "2019-07-09T12:31:45.281+08:00", "timestampstring": "2019-07-09T04:31:45.281Z"}
{"timestamplong": "1970-01-01T07:30:00.002+07:30", "timestampstring": "1970-01-01T07:30:00.002+07:30"}
{"timestamplong": "2020-10-02T13:31:34.713+08:00", "timestampstring": "2020-10-02T13:31:34.713+08:00"}

Process finished with exit code 0
User contributions licensed under: CC BY-SA
9 People found this is helpful
Advertisement