Skip to content
Advertisement

Apache Beam Resampling of columns based on date

I am using ApacheBeam to process data and trying to achieve the following.

  1. read the data from CSV file. (Completed )
  2. Group the records based on Customer ID (Completed)
  3. Resample the data based on month and calculate the sum for that particular month.

Detailed Explanation:

I have a CSV file as shown below.

customerId date amount
BS:89481 11/14/2012 124
BS:89480 11/14/2012 234
BS:89481 11/10/2012 189
BS:89480 11/02/2012 987
BS:89481 09/14/2012 784
BS:89480 11/14/2012 056

Intermediate stage: Grouping it by customerId and sorting it by date

customerId date amount
BS:89481 09/14/2012 784
BS:89481 11/10/2012 189
BS:89481 11/14/2012 124
BS:89480 11/02/2012 987
BS:89480 11/14/2012 234
BS:89480 11/14/2012 056

Expected output on( resampling) Here, we calculate the sum of all the amounts for that particular month for the individual customers. Eg: the customer BS:89481 has two spends for the month of November, so we have calculated the sum for that month (124 + 189).

customerId date amount
BS:89481 09/30/2012 784
BS:89481 11/30/2012 313
BS:89480 11/02/2012 1277

I was able to complete step1 and step2, not sure how to implement step3.

Schema schema = new Schema.Parser().parse(schemaFile);

    Pipeline pipeline = Pipeline.create();

    // Reading schema
    org.apache.beam.sdk.schemas.Schema beamSchema = AvroUtils.toBeamSchema(schema);

    final PCollectionTuple tuples = pipeline

            // Reading csv input
            .apply("1", FileIO.match().filepattern(csvFile.getAbsolutePath()))

            // Reading files that matches conditions //PRashanth needs to be looked at
            .apply("2", FileIO.readMatches())

            // Reading schema and validating with schema and converts to row and returns
            // valid and invalid list
            .apply("3", ParDo.of(new FileReader(beamSchema)).withOutputTags(FileReader.validTag(),
                    TupleTagList.of(invalidTag())));

    // Fetching only valid rows

    final PCollection<Row> rows = tuples.get(FileReader.validTag()).setCoder(RowCoder.of(beamSchema));

    // Step2
    //Convert row to KV for grouping
    StringToKV stringtoKV = new StringToKV();
    stringtoKV.setColumnName("customerId");
    PCollection<KV<String, Row>> kvOrderRows = rows.apply(ParDo.of(stringtoKV)).setCoder(KvCoder.of(StringUtf8Coder.of(), rows.getCoder()));
            //setCoder(KvCoder.of(VoidCoder.of()), rows.getCoder()));

    
    // Obtain a PCollection of KeyValue class of 
    PCollection<KV<String,Iterable<Row>>> kvIterableForm = kvOrderRows.apply(GroupByKey.<String,Row>create());

Update:

Schema Transform:

    {
      "type" : "record",
      "name" : "Entry",
      "namespace" : "transform",
      "fields" : [  {
        "name" : "customerId",
        "type" : [ "string", "null" ]
      }, {
        "name" : "date",
        "type" : [ "long", "null" ]
      }, {
        "name" : "amount",
        "type" : [ "double", "null" ]
      }]
    }

CSV File

customerId date amount
BS:89481 11/14/2012 124
BS:89480 11/14/2012 234
BS:89481 11/10/2012 189
BS:89480 11/02/2012 987
BS:89481 09/14/2012 784
BS:89480 11/14/2012 056
class StringToKV1 extends DoFn<Row, KV<String, Row>> {

        private static final long serialVersionUID = -8093837716944809689L;
        String columnName=null;

        @ProcessElement
        public void processElement(ProcessContext context) {
            Row row = context.element();
            context.output(KV.of(row.getValue(columnName), row));
        }
        
        public void setColumnName(String columnName) {
            this.columnName = columnName;
        }
    }

Code:

        public class GroupByTest {
            public static void main(String[] args) throws IOException {
                System.out.println("We are about to start!!");

                final File schemaFile = new File(
                        "C:\AI\Workspace\office\lombok\artifact\src\main\resources\schema_transform2.avsc");

                File csvFile = new File(
                        "C:\AI\Workspace\office\lombok\artifact\src\main\resources\CustomerRequest-case2.csv");
                Schema schema = new Schema.Parser().parse(schemaFile);

                Pipeline pipeline = Pipeline.create();

                // Reading schema
                org.apache.beam.sdk.schemas.Schema beamSchema = AvroUtils.toBeamSchema(schema);

                final PCollectionTuple tuples = pipeline

                        // Reading csv input
                        .apply("1", FileIO.match().filepattern(csvFile.getAbsolutePath()))

                        // Reading files that matches conditions //PRashanth needs to be looked at
                        .apply("2", FileIO.readMatches())

                        // Reading schema and validating with schema and converts to row and returns
                        // valid and invalid list
                        .apply("3", ParDo.of(new FileReader(beamSchema)).withOutputTags(FileReader.validTag(),
                                TupleTagList.of(invalidTag())));

                // Fetching only valid rows
                final PCollection<Row> rows = tuples.get(FileReader.validTag()).setCoder(RowCoder.of(beamSchema));

                // Transformation
                //Convert row to KV
                StringToKV1 stringtoKV1 = new StringToKV1();
                stringtoKV1.setColumnName("customerId");
                PCollection<KV<String, Row>> kvOrderRows = rows.apply(ParDo.of(stringtoKV1)).setCoder(KvCoder.of(StringUtf8Coder.of(), rows.getCoder()));

                
                // Will throw error
                // rows.apply(Group.byFieldNames("customerId", "date").aggregateField("amount", Sum.ofIntegers(), //"totalAmount"));
                System.out.println("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@  "+Group.byFieldNames("customerId", "date")
                 .aggregateField("amount", Sum.ofIntegers(), "totalAmount").getName());                 
                
                pipeline.run().waitUntilFinish();
                System.out.println("The end");

            }

            private static String getColumnValue(String columnName, Row row, Schema sourceSchema) {
                String type = sourceSchema.getField(columnName).schema().getType().toString().toLowerCase();
                LogicalType logicalType = sourceSchema.getField(columnName).schema().getLogicalType();
                if (logicalType != null) {
                    type = logicalType.getName();
                }

                switch (type) {
                case "string":
                    return row.getString(columnName);
                case "int":
                    return Objects.requireNonNull(row.getInt32(columnName)).toString();
                case "bigint":
                    return Objects.requireNonNull(row.getInt64(columnName)).toString();
                case "double":
                    return Objects.requireNonNull(row.getDouble(columnName)).toString();
                case "timestamp-millis":
                    return Instant.ofEpochMilli(Objects.requireNonNull(row.getDateTime("eventTime")).getMillis()).toString();

                default:
                    return row.getString(columnName);

                }
            }



        }

Corrected Code:

        final Group.CombineFieldsByFields<Row> combine = Group.<Row>byFieldNames("customerId", "date")
                .aggregateField("amount", Sum.ofDoubles(), "sumAmount");

        final PCollection<Row> aggregagte = rows.apply(combine);

        PCollection<String> pOutput = aggregagte.apply(ParDo.of(new RowToString()));

The output that I am getting is

enter image description here

TheExpected Output

customerId date amount
BS:89481 09/30/2012 784
BS:89481 11/30/2012 313
BS:89480 11/30/2012 1277

Advertisement

Answer

Since you already have a PCollection of Rows then you can use Schema-aware PTransforms. For your case, you may want to use “Grouping aggregations” and it can be something like this:

Group.byFieldNames("customerId", "date")
     .aggregateField("amount", Sum.ofIntegers(), "totalAmount")

It will group the rows by customer id and date, and then count a total amount for them per day. If you want to count it per month, then you need to create a new column (or modify a current one) with just a date in month format and group it by id and this column.

Also, it can be useful to use Selected.flattenedSchema to flatten an output schema. Beam Schema API allows to work with Schema-aware PCollections in a very simple and effective way.

Another option could be to implement your GroupBy/Aggregate logic manually with KVs but it’s more complicated and error-prone since it will require much more boilerplate code.

Advertisement