Skip to content
Advertisement

Spark Dataset Foreach function does not iterate

Context

I want to iterate over a Spark Dataset and update a HashMap for each row.

Here is the code I have:

// At this point, I have a my_dataset variable containing 300 000 rows and 10 columns
// - my_dataset.count() == 300 000
// - my_dataset.columns().length == 10

// Declare my HashMap
HashMap<String, Vector<String>> my_map = new HashMap<String, Vector<String>>();

// Initialize the map
for(String col : my_dataset.columns())
{
    my_map.put(col, new Vector<String>());
}

// Iterate over the dataset and update the map
my_dataset.foreach( (ForeachFunction<Row>) row -> {
    for(String col : my_map.KeySet())
    {
        my_map.get(col).add(row.get(row.fieldIndex(col)).toString());
    }
});

Issue

My issue is that the foreach doesn’t iterate at all, the lambda is never executed and I don’t know why.
I implemented it as indicated here: How to traverse/iterate a Dataset in Spark Java?

At the end, all the inner Vectors remain empty (as they were initialized) despite the Dataset is not (Take a look to the first comments in the given code sample).

I know that the foreach never iterates because I did two tests:

  • Add an AtomicInteger to count the iterations, increment it right in the beginning of the lambda with incrementAndGet() method. => The counter value remains 0 at the end of the process.
  • Print a debug message right in the beginning of the lambda. => The message is never displayed.

I’m not used of Java (even less with Java lambdas) so maybe I missed an important point but I can’t find what.

Advertisement

Answer

I am probably a little old school, but I never like lambdas too much, as it can get pretty complicated.

Here is a full example of a foreach():

package net.jgp.labs.spark.l240_foreach.l000;

import java.io.Serializable;

import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class ForEachBookApp implements Serializable {
  private static final long serialVersionUID = -4250231621481140775L;

  private final class BookPrinter implements ForeachFunction<Row> {
    private static final long serialVersionUID = -3680381094052442862L;

    @Override
    public void call(Row r) throws Exception {
      System.out.println(r.getString(2) + " can be bought at " + r.getString(
          4));
    }
  }

  public static void main(String[] args) {
    ForEachBookApp app = new ForEachBookApp();
    app.start();
  }

  private void start() {
    SparkSession spark = SparkSession.builder().appName("For Each Book").master(
        "local").getOrCreate();

    String filename = "data/books.csv";
    Dataset<Row> df = spark.read().format("csv").option("inferSchema", "true")
        .option("header", "true")
        .load(filename);
    df.show();

    df.foreach(new BookPrinter());
  }
}

As you can see, this example reads a CSV file and prints a message from the data. It is fairly simple.

The foreach() instantiates a new class, where the work is done.

df.foreach(new BookPrinter());

The work is done in the call() method of the class:

  private final class BookPrinter implements ForeachFunction<Row> {

    @Override
    public void call(Row r) throws Exception {
...
    }
  }

As you are new to Java, make sure you have the right signature (for classes and methods) and the right imports.

You can also clone the example from https://github.com/jgperrin/net.jgp.labs.spark/tree/master/src/main/java/net/jgp/labs/spark/l240_foreach/l000. This should help you with foreach().

User contributions licensed under: CC BY-SA
2 People found this is helpful
Advertisement