In the following CSV, I need to append new row values for it.
ID | date | balance |
---|---|---|
01 | 31/01/2021 | 100 |
01 | 28/02/2021 | 200 |
01 | 31/03/2021 | 200 |
01 | 30/04/2021 | 200 |
01 | 31/05/2021 | 500 |
01 | 30/06/2021 | 600 |
Expected output:
ID | date | balance |
---|---|---|
01 | 31/01/2021 | 100 |
01 | 28/02/2021 | 200 |
01 | 31/03/2021 | 200 |
01 | 30/04/2021 | 200 |
01 | 31/05/2021 | 500 |
01 | 30/06/2021 | 600 |
01 | 30/07/2021 | 999 |
Java code:
public static void main(String[] args) throws IOException { final File schemaFile = new File("src/main/resources/addRow/schema_transform.avsc"); File csvFile = new File("src/main/resources/addRow/CustomerRequest.csv"); Schema schema = new Schema.Parser().parse(schemaFile); Pipeline pipeline = Pipeline.create(); // Reading schema org.apache.beam.sdk.schemas.Schema beamSchema = AvroUtils.toBeamSchema(schema); final PCollectionTuple tuples = pipeline // Reading csv input .apply("1", FileIO.match().filepattern(csvFile.getAbsolutePath())) // Reading files that matches conditions //PRashanth needs to be looked at .apply("2", FileIO.readMatches()) // Reading schema and validating with schema and converts to row and returns // valid and invalid list .apply("3", ParDo.of(new FileReader(beamSchema)).withOutputTags(FileReader.validTag(), TupleTagList.of(invalidTag()))); // Fetching only valid rows final PCollection<Row> rows = tuples.get(FileReader.validTag()).setCoder(RowCoder.of(beamSchema)); RowAddition rowAddition = new RowAddition(); final PCollection<Row> newlyAddedRows = rows.apply(ParDo.of(rowAddition)).setCoder(RowCoder.of(beamSchema)); ;
How to combine these two PCollection objects?
PCollection<String> pOutput = newlyAddedRows.apply(ParDo.of(new RowToString())); pOutput.apply(TextIO.write().to("src/main/resources/addRow/rowOutput").withNumShards(1).withSuffix(".csv")); pipeline.run().waitUntilFinish(); System.out.println("The end"); } }
Logic for adding rows
class RowAddition extends DoFn<Row, Row> { private static final long serialVersionUID = -8093837716944809689L; @ProcessElement public void processElement(ProcessContext context) { org.apache.beam.sdk.schemas.Schema beamSchema=null; try { beamSchema = AvroUtils.toBeamSchema(new Schema.Parser().parse(new File("src/main/resources/addRow/schema_transform.avsc"))); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } Row row = context.element(); Row newRow = row.withSchema(beamSchema).addValues("01", "30/7/2021", 999.0).build(); context.output(newRow); } }
I have referring this link
Advertisement
Answer
You’re looking for the Flatten transform. This takes any number of existing PCollections and produces a new PCollection with the union of their elements. For completely new elements, you could use Create or use another PTransform to compute the new elements based on the old ones.