In apache beam step I have a PCollection of KV<String, Iterable<KV<Long, GenericRecord>>>>
.
I want to write all the records in the iterable to the same parquet file. My code snippet is given below
p.apply(ParDo.of(new MapWithAvroSchemaAndConvertToGenericRecord())) // PCollection<GenericRecord> .apply(ParDo.of(new MapKafkaGenericRecordValue(formatter, options.getFileNameDelimiter()))) //PCollection<KV<String, KV<Long, GenericRecord>>> .apply(GroupByKey.create()) //PCollection<KV<String, Iterable<KV<Long, GenericRecord>>>>>
now I want to write all the Records in the Iterable in the same parquet file(derive the file name by the key of KV).
Advertisement
Answer
I found out the solution to the problem. at the step –
apply(GroupByKey.create()) //PCollection<KV<String, Iterable<KV<Long, GenericRecord>>>>>
I will apply another transform that will return only the Iterable as the output pCollection. `.apply(ParDo.of(new GetIterable())) //PCollection>> where key is the name of the file I have to write to. then remaining snippet is
.apply(Flatten.iterables()) .apply( FileIO.<String, KV<String, GenericRecord>>writeDynamic() .by((SerializableFunction<KV<String, GenericRecord>, String>) KV::getKey) .via( Contextful.fn( (SerializableFunction<KV<String, GenericRecord>, GenericRecord>) KV::getValue ), ParquetIO.sink(schema) .withCompressionCodec(CompressionCodecName.SNAPPY) ) .withTempDirectory("/tmp/temp-beam") .to(options.getGCSBucketUrl()) .withNumShards(1) .withDestinationCoder(StringUtf8Coder.of()) )