I am using ApacheBeam to process data and trying to achieve the following.
- read the data from CSV file. (Completed )
- Group the records based on Customer ID (Completed)
- Resample the data based on month and calculate the sum for that particular month.
Detailed Explanation:
I have a CSV file as shown below.
customerId | date | amount |
---|---|---|
BS:89481 | 11/14/2012 | 124 |
BS:89480 | 11/14/2012 | 234 |
BS:89481 | 11/10/2012 | 189 |
BS:89480 | 11/02/2012 | 987 |
BS:89481 | 09/14/2012 | 784 |
BS:89480 | 11/14/2012 | 056 |
Intermediate stage: Grouping it by customerId and sorting it by date
customerId | date | amount |
---|---|---|
BS:89481 | 09/14/2012 | 784 |
BS:89481 | 11/10/2012 | 189 |
BS:89481 | 11/14/2012 | 124 |
BS:89480 | 11/02/2012 | 987 |
BS:89480 | 11/14/2012 | 234 |
BS:89480 | 11/14/2012 | 056 |
Expected output on( resampling) Here, we calculate the sum of all the amounts for that particular month for the individual customers. Eg: the customer BS:89481 has two spends for the month of November, so we have calculated the sum for that month (124 + 189).
customerId | date | amount |
---|---|---|
BS:89481 | 09/30/2012 | 784 |
BS:89481 | 11/30/2012 | 313 |
BS:89480 | 11/02/2012 | 1277 |
I was able to complete step1 and step2, not sure how to implement step3.
Schema schema = new Schema.Parser().parse(schemaFile);
Pipeline pipeline = Pipeline.create(); // Reading schema org.apache.beam.sdk.schemas.Schema beamSchema = AvroUtils.toBeamSchema(schema); final PCollectionTuple tuples = pipeline // Reading csv input .apply("1", FileIO.match().filepattern(csvFile.getAbsolutePath())) // Reading files that matches conditions //PRashanth needs to be looked at .apply("2", FileIO.readMatches()) // Reading schema and validating with schema and converts to row and returns // valid and invalid list .apply("3", ParDo.of(new FileReader(beamSchema)).withOutputTags(FileReader.validTag(), TupleTagList.of(invalidTag()))); // Fetching only valid rows final PCollection<Row> rows = tuples.get(FileReader.validTag()).setCoder(RowCoder.of(beamSchema)); // Step2 //Convert row to KV for grouping StringToKV stringtoKV = new StringToKV(); stringtoKV.setColumnName("customerId"); PCollection<KV<String, Row>> kvOrderRows = rows.apply(ParDo.of(stringtoKV)).setCoder(KvCoder.of(StringUtf8Coder.of(), rows.getCoder())); //setCoder(KvCoder.of(VoidCoder.of()), rows.getCoder())); // Obtain a PCollection of KeyValue class of PCollection<KV<String,Iterable<Row>>> kvIterableForm = kvOrderRows.apply(GroupByKey.<String,Row>create());
Update:
Schema Transform:
{ "type" : "record", "name" : "Entry", "namespace" : "transform", "fields" : [ { "name" : "customerId", "type" : [ "string", "null" ] }, { "name" : "date", "type" : [ "long", "null" ] }, { "name" : "amount", "type" : [ "double", "null" ] }] }
CSV File
customerId | date | amount |
---|---|---|
BS:89481 | 11/14/2012 | 124 |
BS:89480 | 11/14/2012 | 234 |
BS:89481 | 11/10/2012 | 189 |
BS:89480 | 11/02/2012 | 987 |
BS:89481 | 09/14/2012 | 784 |
BS:89480 | 11/14/2012 | 056 |
class StringToKV1 extends DoFn<Row, KV<String, Row>> { private static final long serialVersionUID = -8093837716944809689L; String columnName=null; @ProcessElement public void processElement(ProcessContext context) { Row row = context.element(); context.output(KV.of(row.getValue(columnName), row)); } public void setColumnName(String columnName) { this.columnName = columnName; } }
Code:
public class GroupByTest { public static void main(String[] args) throws IOException { System.out.println("We are about to start!!"); final File schemaFile = new File( "C:\AI\Workspace\office\lombok\artifact\src\main\resources\schema_transform2.avsc"); File csvFile = new File( "C:\AI\Workspace\office\lombok\artifact\src\main\resources\CustomerRequest-case2.csv"); Schema schema = new Schema.Parser().parse(schemaFile); Pipeline pipeline = Pipeline.create(); // Reading schema org.apache.beam.sdk.schemas.Schema beamSchema = AvroUtils.toBeamSchema(schema); final PCollectionTuple tuples = pipeline // Reading csv input .apply("1", FileIO.match().filepattern(csvFile.getAbsolutePath())) // Reading files that matches conditions //PRashanth needs to be looked at .apply("2", FileIO.readMatches()) // Reading schema and validating with schema and converts to row and returns // valid and invalid list .apply("3", ParDo.of(new FileReader(beamSchema)).withOutputTags(FileReader.validTag(), TupleTagList.of(invalidTag()))); // Fetching only valid rows final PCollection<Row> rows = tuples.get(FileReader.validTag()).setCoder(RowCoder.of(beamSchema)); // Transformation //Convert row to KV StringToKV1 stringtoKV1 = new StringToKV1(); stringtoKV1.setColumnName("customerId"); PCollection<KV<String, Row>> kvOrderRows = rows.apply(ParDo.of(stringtoKV1)).setCoder(KvCoder.of(StringUtf8Coder.of(), rows.getCoder())); // Will throw error // rows.apply(Group.byFieldNames("customerId", "date").aggregateField("amount", Sum.ofIntegers(), //"totalAmount")); System.out.println("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ "+Group.byFieldNames("customerId", "date") .aggregateField("amount", Sum.ofIntegers(), "totalAmount").getName()); pipeline.run().waitUntilFinish(); System.out.println("The end"); } private static String getColumnValue(String columnName, Row row, Schema sourceSchema) { String type = sourceSchema.getField(columnName).schema().getType().toString().toLowerCase(); LogicalType logicalType = sourceSchema.getField(columnName).schema().getLogicalType(); if (logicalType != null) { type = logicalType.getName(); } switch (type) { case "string": return row.getString(columnName); case "int": return Objects.requireNonNull(row.getInt32(columnName)).toString(); case "bigint": return Objects.requireNonNull(row.getInt64(columnName)).toString(); case "double": return Objects.requireNonNull(row.getDouble(columnName)).toString(); case "timestamp-millis": return Instant.ofEpochMilli(Objects.requireNonNull(row.getDateTime("eventTime")).getMillis()).toString(); default: return row.getString(columnName); } } }
Corrected Code:
final Group.CombineFieldsByFields<Row> combine = Group.<Row>byFieldNames("customerId", "date") .aggregateField("amount", Sum.ofDoubles(), "sumAmount"); final PCollection<Row> aggregagte = rows.apply(combine); PCollection<String> pOutput = aggregagte.apply(ParDo.of(new RowToString()));
The output that I am getting is
TheExpected Output
customerId | date | amount |
---|---|---|
BS:89481 | 09/30/2012 | 784 |
BS:89481 | 11/30/2012 | 313 |
BS:89480 | 11/30/2012 | 1277 |
Advertisement
Answer
Since you already have a PCollection
of Row
s then you can use Schema-aware PTransforms. For your case, you may want to use “Grouping aggregations” and it can be something like this:
Group.byFieldNames("customerId", "date") .aggregateField("amount", Sum.ofIntegers(), "totalAmount")
It will group the rows by customer id and date, and then count a total amount for them per day. If you want to count it per month, then you need to create a new column (or modify a current one) with just a date in month format and group it by id and this column.
Also, it can be useful to use Selected.flattenedSchema
to flatten an output schema. Beam Schema API allows to work with Schema-aware PCollections in a very simple and effective way.
Another option could be to implement your GroupBy/Aggregate logic manually with KV
s but it’s more complicated and error-prone since it will require much more boilerplate code.