Skip to content
Advertisement

How to append new rows or perform union on tow PCollection

In the following CSV, I need to append new row values for it.

ID date balance
01 31/01/2021 100
01 28/02/2021 200
01 31/03/2021 200
01 30/04/2021 200
01 31/05/2021 500
01 30/06/2021 600

Expected output:

ID date balance
01 31/01/2021 100
01 28/02/2021 200
01 31/03/2021 200
01 30/04/2021 200
01 31/05/2021 500
01 30/06/2021 600
01 30/07/2021 999

Java code:

    public static void main(String[] args) throws IOException {
        final File schemaFile = new File("src/main/resources/addRow/schema_transform.avsc");

        File csvFile = new File("src/main/resources/addRow/CustomerRequest.csv");

        Schema schema = new Schema.Parser().parse(schemaFile);

        Pipeline pipeline = Pipeline.create();

        // Reading schema
        org.apache.beam.sdk.schemas.Schema beamSchema = AvroUtils.toBeamSchema(schema);

        final PCollectionTuple tuples = pipeline

                // Reading csv input
                .apply("1", FileIO.match().filepattern(csvFile.getAbsolutePath()))

                // Reading files that matches conditions //PRashanth needs to be looked at
                .apply("2", FileIO.readMatches())

                // Reading schema and validating with schema and converts to row and returns
                // valid and invalid list
                .apply("3", ParDo.of(new FileReader(beamSchema)).withOutputTags(FileReader.validTag(),
                        TupleTagList.of(invalidTag())));

        // Fetching only valid rows

        final PCollection<Row> rows = tuples.get(FileReader.validTag()).setCoder(RowCoder.of(beamSchema));
        RowAddition rowAddition = new RowAddition();
        final PCollection<Row> newlyAddedRows = rows.apply(ParDo.of(rowAddition)).setCoder(RowCoder.of(beamSchema));
        ;

How to combine these two PCollection objects?

        PCollection<String> pOutput = newlyAddedRows.apply(ParDo.of(new RowToString()));
        pOutput.apply(TextIO.write().to("src/main/resources/addRow/rowOutput").withNumShards(1).withSuffix(".csv"));

        pipeline.run().waitUntilFinish();
        System.out.println("The end");
    }
}

Logic for adding rows

class RowAddition extends DoFn<Row, Row> {

    private static final long serialVersionUID = -8093837716944809689L;

    @ProcessElement
    public void processElement(ProcessContext context) {
        org.apache.beam.sdk.schemas.Schema beamSchema=null;
        try {
            beamSchema = AvroUtils.toBeamSchema(new Schema.Parser().parse(new File("src/main/resources/addRow/schema_transform.avsc")));
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        Row row = context.element();
        Row newRow = row.withSchema(beamSchema).addValues("01", "30/7/2021", 999.0).build();
        context.output(newRow);
    }
}

I have referring this link

https://beam.apache.org/documentation/pipelines/design-your-pipeline/#:~:text=Merging%20PCollections,-Often%2C%20after%20you&text=You%20can%20do%20so%20by,join%20between%20two%20PCollection%20s.

Advertisement

Answer

You’re looking for the Flatten transform. This takes any number of existing PCollections and produces a new PCollection with the union of their elements. For completely new elements, you could use Create or use another PTransform to compute the new elements based on the old ones.

Advertisement