Context
I want to iterate over a Spark Dataset and update a HashMap for each row.
Here is the code I have:
// At this point, I have a my_dataset variable containing 300 000 rows and 10 columns // - my_dataset.count() == 300 000 // - my_dataset.columns().length == 10 // Declare my HashMap HashMap<String, Vector<String>> my_map = new HashMap<String, Vector<String>>(); // Initialize the map for(String col : my_dataset.columns()) { my_map.put(col, new Vector<String>()); } // Iterate over the dataset and update the map my_dataset.foreach( (ForeachFunction<Row>) row -> { for(String col : my_map.KeySet()) { my_map.get(col).add(row.get(row.fieldIndex(col)).toString()); } });
Issue
My issue is that the foreach doesn’t iterate at all, the lambda is never executed and I don’t know why.
I implemented it as indicated here: How to traverse/iterate a Dataset in Spark Java?
At the end, all the inner Vectors remain empty (as they were initialized) despite the Dataset is not (Take a look to the first comments in the given code sample).
I know that the foreach never iterates because I did two tests:
- Add an
AtomicInteger
to count the iterations, increment it right in the beginning of the lambda withincrementAndGet()
method. => The counter value remains0
at the end of the process. - Print a debug message right in the beginning of the lambda. => The message is never displayed.
I’m not used of Java (even less with Java lambdas) so maybe I missed an important point but I can’t find what.
Advertisement
Answer
I am probably a little old school, but I never like lambdas too much, as it can get pretty complicated.
Here is a full example of a foreach()
:
package net.jgp.labs.spark.l240_foreach.l000; import java.io.Serializable; import org.apache.spark.api.java.function.ForeachFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; public class ForEachBookApp implements Serializable { private static final long serialVersionUID = -4250231621481140775L; private final class BookPrinter implements ForeachFunction<Row> { private static final long serialVersionUID = -3680381094052442862L; @Override public void call(Row r) throws Exception { System.out.println(r.getString(2) + " can be bought at " + r.getString( 4)); } } public static void main(String[] args) { ForEachBookApp app = new ForEachBookApp(); app.start(); } private void start() { SparkSession spark = SparkSession.builder().appName("For Each Book").master( "local").getOrCreate(); String filename = "data/books.csv"; Dataset<Row> df = spark.read().format("csv").option("inferSchema", "true") .option("header", "true") .load(filename); df.show(); df.foreach(new BookPrinter()); } }
As you can see, this example reads a CSV file and prints a message from the data. It is fairly simple.
The foreach()
instantiates a new class, where the work is done.
df.foreach(new BookPrinter());
The work is done in the call()
method of the class:
private final class BookPrinter implements ForeachFunction<Row> { @Override public void call(Row r) throws Exception { ... } }
As you are new to Java, make sure you have the right signature (for classes and methods) and the right imports.
You can also clone the example from https://github.com/jgperrin/net.jgp.labs.spark/tree/master/src/main/java/net/jgp/labs/spark/l240_foreach/l000. This should help you with foreach()
.