I have below schema
{"name": "timestampstring", "type": [{"type":"string","logicalType":"timestamp-millis"}, "null"]},
I intend to supply date to it, and have conversion convert the date to epoch mili.
GenericRecord user2 = new GenericData.Record(schema1); user2.put("timestampstring", "2019-01-26T12:00:40.931"); final GenericData genericData = new GenericData(); genericData.addLogicalTypeConversion(new MyTimestampConversion()); datumReader = new GenericDatumReader<GenericRecord>(schema2, schema2, genericData); GenericRecord user = null; try (DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(file1, datumReader)) { while (dataFileReader.hasNext()) { user = dataFileReader.next(user); System.out.println(user); } }
//Conversion code
public static class MyTimestampConversion extends Conversion<Long> { public MyTimestampConversion() { } public Class<Long> getConvertedType() { return Long.class; } public String getLogicalTypeName() { return "timestamp-millis"; } public Long fromCharSequence(CharSequence value, Schema schema, LogicalType type) { return 123L; } }
But this code doesnt work… I was expecting it to convert to timestamp milis (i hardcoded 123L in above sample).
Any help?
Advertisement
Answer
Referring back to How to define a LogicalType in Avro. (java), I managed to solve this by creating my own logical type. It seems like doing this with “timestamp-millis” logicalType wont work. So I created my own logicalType…
package example; import org.apache.avro.Conversion; import org.apache.avro.LogicalType; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; import org.apache.avro.Schema.Parser; import org.apache.avro.file.DataFileReader; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.*; import org.apache.avro.io.DatumReader; import org.apache.avro.io.DatumWriter; import org.apache.avro.io.ResolvingDecoder; import org.joda.time.DateTime; import java.io.File; import java.io.IOException; public class AvroWriteDateUtcToEpochMili { public static void main(String[] args) throws IOException { Boolean isRegisterNewLogicalType = true; Boolean isWrite = true; if(isRegisterNewLogicalType) { LogicalTypes.register(UtcDateTimeToTimestampMilisLogicalType.CONVERT_LONG_TYPE_NAME, new LogicalTypes.LogicalTypeFactory() { private final LogicalType convertLongLogicalType = new UtcDateTimeToTimestampMilisLogicalType(); @Override public LogicalType fromSchema(Schema schema) { return convertLongLogicalType; } }); } Schema schema1 = new Parser().parse(new File("./userdate_modified_string.avsc")); // Serialize user1 and user2 to disk File file1 = new File("users.avro"); if(isWrite) { GenericRecord user1 = new GenericData.Record(schema1); user1.put("timestamplong", "2019-07-09T04:31:45.281Z"); //user1.put("timestamplong", 1L); user1.put("timestampstring", "2019-07-09T04:31:45.281Z"); GenericRecord user2 = new GenericData.Record(schema1); //user2.put("timestamplong", "2018-07-09T04:30:45.781Z"); user2.put("timestamplong", 2L); user2.put("timestampstring", (new DateTime(2L)).toString()); //user2.put("timestampstring", new Timestamp(new Date("2018-01-26").getTime())); var currentDateTime = DateTime.now(); GenericRecord user3 = new GenericData.Record(schema1); user3.put("timestamplong", currentDateTime.toString()); //user3.put("timestamplong", 3L); user3.put("timestampstring", currentDateTime.toString()); final GenericData genericData2 = new GenericData(); genericData2.addLogicalTypeConversion(new MyStringTimestampConversion()); DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema1, genericData2); DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter); dataFileWriter.create(schema1, file1); dataFileWriter.append(user1); dataFileWriter.append(user2); dataFileWriter.append(user3); dataFileWriter.close(); } // Deserialize users from disk Boolean once = true; DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema1); GenericRecord user = null; try (DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(file1, datumReader)) { while (dataFileReader.hasNext()) { user = dataFileReader.next(user); if(once) { System.out.println(user.getSchema()); once = false; } //System.out.println(LogicalTypes.fromSchema(user.getSchema())); System.out.println(user); } } // Deserialize users from disk System.out.println("//AFTER"); Schema schema2 = new Parser().parse(new File("./userdate_modified_string.avsc")); final GenericData genericData = new GenericData(); genericData.addLogicalTypeConversion(new MyStringTimestampConversion()); datumReader = new MyReader<GenericRecord>(schema2, schema2, genericData); user = null; try (DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(file1, datumReader)) { while (dataFileReader.hasNext()) { user = dataFileReader.next(user); System.out.println(user); } } } public static class MyReader<G extends IndexedRecord> extends GenericDatumReader { public MyReader() { super(); } public MyReader(Schema writer, Schema reader, GenericData data) { super(writer, reader, data); } @Override protected Object read(Object old, Schema expected, ResolvingDecoder in) throws IOException { Object datum = this.readWithoutConversion(old, expected, in); LogicalType logicalType = expected.getLogicalType(); if (logicalType != null) { Conversion<?> conversion = this.getData().getConversionFor(logicalType); if (conversion != null) { return this.convert(datum, expected, logicalType, conversion); } } return datum; } } public static class MyStringTimestampConversion extends Conversion<String> { public MyStringTimestampConversion() { super(); } @Override public Class<String> getConvertedType() { return String.class; } @Override public String getLogicalTypeName() { // "timestamp-millis"; return UtcDateTimeToTimestampMilisLogicalType.CONVERT_LONG_TYPE_NAME; } @Override public String fromLong(Long millisFromEpoch, Schema schema, LogicalType type) { return (new DateTime(millisFromEpoch)).toString(); //return "123456L"; } @Override public Long toLong(String value, Schema schema, LogicalType type) { //https://stackoverflow.com/questions/22681348/joda-datetime-to-unix-datetime //DateTimeFormatter dtf = DateTimeFormatter.ofPattern("uuuu-MM-dd'T'HH:mm:ss.SSSSSS'Z'");//https://stackoverflow.com/questions/8405087/what-is-this-date-format-2011-08-12t201746-384z DateTime dateTime = DateTime.parse(value); long epochMilli = dateTime.toDate().toInstant().toEpochMilli(); return epochMilli; } } }
LogicalType
public class UtcDateTimeToTimestampMilisLogicalType extends LogicalType { //The key to use as a reference to the type public static final String CONVERT_LONG_TYPE_NAME = "utc-to-epoch-millis"; public UtcDateTimeToTimestampMilisLogicalType() { super(CONVERT_LONG_TYPE_NAME); } @Override public void validate(Schema schema) { super.validate(schema); if (schema.getType() != Schema.Type.LONG) { throw new IllegalArgumentException( "Logical type 'utc-to-epoch-millis' must be backed by bytes"); } } }
Schema
{ "namespace": "example.avro.modified.string", "type": "record", "name": "UserDate", "fields": [ { "name": "timestamplong", "type": { "type": "long", "logicalType": "utc-to-epoch-millis" } }, { "name": "timestampstring", "type": "string" } ] }
Result
{"type":"record","name":"UserDate","namespace":"example.avro.modified.string","fields":[{"name":"timestamplong","type":{"type":"long","logicalType":"utc-to-epoch-millis"}},{"name":"timestampstring","type":"string"}]} {"timestamplong": 1562646705281, "timestampstring": "2019-07-09T04:31:45.281Z"} {"timestamplong": 2, "timestampstring": "1970-01-01T07:30:00.002+07:30"} {"timestamplong": 1601616694713, "timestampstring": "2020-10-02T13:31:34.713+08:00"} //AFTER {"timestamplong": "2019-07-09T12:31:45.281+08:00", "timestampstring": "2019-07-09T04:31:45.281Z"} {"timestamplong": "1970-01-01T07:30:00.002+07:30", "timestampstring": "1970-01-01T07:30:00.002+07:30"} {"timestamplong": "2020-10-02T13:31:34.713+08:00", "timestampstring": "2020-10-02T13:31:34.713+08:00"} Process finished with exit code 0