Skip to content

Spark – Transforming Complex Data Types


The goal I want to achieve is to

  • read a CSV file (OK)
  • encode it to Dataset<Person>, where Person object has a nested object Address[]. (Throws an exception)

The Person CSV file

In a file called person.csv, there is the following data describing some persons:


The first line is the schema and address is a nested structure.

Data classes

The data classes are:

public class Address implements Serializable {
    public String street;
    public String city;


public class Person implements Serializable {
    public String name;
    public Integer age;
    public Address[] address;

Reading untyped Data

I have tried first to read the data from the CSV in a Dataset<Row>, which works as expected:

    Dataset<Row> ds = //
                           .format("csv") //
                           .option("header", "true") // first line has headers
                           .load("src/test/resources/outer/person.csv");"=============== Print schema =============");

|-- name: string (nullable = true)
|-- age: string (nullable = true)
|-- address: string (nullable = true)"================ Print data ==============");;

| name|age|             address|
|name1| 10|streetA~cityA||st...|
|name2| 20|streetA~cityA||st...|
+-----+---+--------------------+"================ Print name ==============");"name").show();

| name|

    assertThat(ds.isEmpty(), is(false)); //OK
    assertThat(ds.count(), is(2L)); //OK
    final List<String> names ="name").as(Encoders.STRING()).collectAsList();
    assertThat(names, hasItems("name1", "name2")); //OK

Encoding through a UserDefinedFunction

My udf that take a Stringand return an Address[]:

private static void registerAsAddress(SparkSession spark) {
    spark.udf().register("asAddress", new UDF1<String, Address[]>() {

                             public Address[] call(String rowValue) {
                                 return"||"), -1)) //
                                              .map(object -> object.split("~")) //
                                              .map(Address::fromArgs) //
                                              .map(a -> a.orElse(null)) //
                         },  //
                                 new StructField[]{new StructField("street", DataTypes.StringType, true, Metadata.empty()), //
                                                   new StructField("city", DataTypes.StringType, true, Metadata.empty()) //

The caller:

    void asAddressTest() throws URISyntaxException {

        // given, when
        Dataset<Row> ds = //
                               .format("csv") //
                               .option("header", "true") // first line has headers
        // create a typed dataset
        Encoder<Person> personEncoder = Encoders.bean(Person.class);
        Dataset<Person> typed = ds.withColumn("address2", //
                                                callUDF("asAddress", ds.col("address")))
                .drop("address").withColumnRenamed("address2", "address")
                .as(personEncoder);"Typed Address");;

Which leads to this execption:

Caused by: java.lang.IllegalArgumentException: The value (Address(street=streetA, city=cityA)) of the type ( cannot be converted to struct

Why it cannot convert from Address to Struct?



After trying a lot of different ways and spending some hours researching over the Internet, I have the following conclusions:

UserDefinedFunction is good but are from the old world, it can be replaced by a simple map() function where we need to transform object from one type to another. The simplest way is the following

    SparkSession spark = SparkSession.builder().appName("CSV to Dataset").master("local").getOrCreate();
    Encoder<FileFormat> fileFormatEncoder = Encoders.bean(FileFormat.class);
    Dataset<FileFormat> rawFile = //
                                       .format("csv") //
                                       .option("inferSchema", "true") //
                                       .option("header", "true") // first line has headers
                                       .load("src/test/resources/encoding-tests/persons.csv") //
                                       .as(fileFormatEncoder);"=============== Print schema =============");
    rawFile.printSchema();"================ Print data ==============");;"================ Print name ==============");"name").show();

    // when
    final SerializableFunction<String, List<Address>> asAddress = (String text) -> Arrays
            .stream(text.split(Pattern.quote("||"), -1)) //
            .map(object -> object.split("~")) //
            .map(Address::fromArgs) //
            .map(a -> a.orElse(null)).collect(Collectors.toList());

    final MapFunction<FileFormat, Person> personMapper = (MapFunction<FileFormat, Person>) row -> new Person(,
    final Encoder<Person> personEncoder = Encoders.bean(Person.class);
    Dataset<Person> persons =, personEncoder);;

    // then
    assertThat(persons.isEmpty(), is(false));
    assertThat(persons.count(), is(2L));
    final List<String> names ="name").as(Encoders.STRING()).collectAsList();
    assertThat(names, hasItems("name1", "name2"));
    final List<Integer> ages ="age").as(Encoders.INT()).collectAsList();
    assertThat(ages, hasItems(10, 20));
    final Encoder<Address> addressEncoder = Encoders.bean(Address.class);
    final MapFunction<Person, Address> firstAddressMapper = (MapFunction<Person, Address>) person -> person.addresses.get(0);
    final List<Address> addresses =, addressEncoder).collectAsList();
    assertThat(addresses, hasItems(new Address("streetA", "cityA"), new Address("streetC", "cityC")));