How do I use MapElements and KV in together in Apache Beam?

Tags: , ,

I wanted to do something like:

PCollection<String> a = whatever;
PCollection<KV<String, User>> b = a.apply(
        MapElements.into(TypeDescriptor.of(KV<String, User>.class))
        .via(s -> KV.of(s, new User(s))));

Where User is a custom datatype with Arvo coder and a constructor that takes a string into account.

However, I get the following error:

Cannot select from parameterized type

I tried to change it to TypeDescriptor.of(KV.class) instead, but then I get:

Incompatible types; Required PCollection> but ‘apply’ was inferred to OutputT: no instance(s) of type variable(s) exists so that PCollection conforms to PCollection>

So how am I suppose to use KV with MapElements?

I know that what I want to do is doable using ParDo where I could explicitly specify how to do Type Erasure by declearing new DoFn<String, KV<String, User>> but ParDo does not support lambda function. As we are using Java 8, this seems less elegant….


Due to type erasure in Java during compilation, KV<String, User>.class is transformed into KV.class and at runtime KV.class isn’t enough information to infer a coder since the type variables have been erased.

To get around this limitation, you need to use a mechanism which preserves type information after compilation. For example you could use:

TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptor.of(User.class))

which is the same as providing your own anonymous class:

new TypeDescriptor<KV<String, User>> {}

Providing anonymous classes with type variables bound is one of the ways to get around type erasure in Java currently.

Source: stackoverflow