I have a requirement to trigger the Cloud Dataflow pipeline from Cloud Functions. But the Cloud function must be written in Java. So the Trigger for Cloud Function is Google Cloud Storage’s Finalise/Create Event, i.e., when a file is uploaded in a GCS bucket, the Cloud Function must trigger the Cloud dataflow.
When I create a dataflow pipeline (batch) and I execute the pipeline, it creates a Dataflow pipeline template and creates a Dataflow job.
But when I create a cloud function in Java, and a file is uploaded, the status just says “ok”, but it does not trigger the dataflow pipeline.
Cloud function
package com.example; import com.example.Example.GCSEvent; import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport; import com.google.api.client.http.HttpRequestInitializer; import com.google.api.client.http.HttpTransport; import com.google.api.client.json.JsonFactory; import com.google.api.client.json.jackson2.JacksonFactory; import com.google.api.services.dataflow.Dataflow; import com.google.api.services.dataflow.model.CreateJobFromTemplateRequest; import com.google.api.services.dataflow.model.RuntimeEnvironment; import com.google.auth.http.HttpCredentialsAdapter; import com.google.auth.oauth2.GoogleCredentials; import com.google.cloud.functions.BackgroundFunction; import com.google.cloud.functions.Context; import java.io.IOException; import java.security.GeneralSecurityException; import java.util.HashMap; import java.util.logging.Logger; public class Example implements BackgroundFunction<GCSEvent> { private static final Logger logger = Logger.getLogger(Example.class.getName()); @Override public void accept(GCSEvent event, Context context) throws IOException, GeneralSecurityException { logger.info("Event: " + context.eventId()); logger.info("Event Type: " + context.eventType()); HttpTransport httpTransport = GoogleNetHttpTransport.newTrustedTransport(); JsonFactory jsonFactory = JacksonFactory.getDefaultInstance(); GoogleCredentials credentials = GoogleCredentials.getApplicationDefault(); HttpRequestInitializer requestInitializer = new HttpCredentialsAdapter(credentials); Dataflow dataflowService = new Dataflow.Builder(httpTransport, jsonFactory, requestInitializer) .setApplicationName("Google Dataflow function Demo") .build(); String projectId = "my-project-id"; RuntimeEnvironment runtimeEnvironment = new RuntimeEnvironment(); runtimeEnvironment.setBypassTempDirValidation(false); runtimeEnvironment.setTempLocation("gs://my-dataflow-job-bucket/tmp"); CreateJobFromTemplateRequest createJobFromTemplateRequest = new CreateJobFromTemplateRequest(); createJobFromTemplateRequest.setEnvironment(runtimeEnvironment); createJobFromTemplateRequest.setLocation("us-central1"); createJobFromTemplateRequest.setGcsPath("gs://my-dataflow-job-bucket-staging/templates/cloud-dataflow-template"); createJobFromTemplateRequest.setJobName("Dataflow-Cloud-Job"); createJobFromTemplateRequest.setParameters(new HashMap<String,String>()); createJobFromTemplateRequest.getParameters().put("inputFile","gs://cloud-dataflow-bucket-input/*.txt"); dataflowService.projects().templates().create(projectId,createJobFromTemplateRequest); throw new UnsupportedOperationException("Not supported yet."); } public static class GCSEvent { String bucket; String name; String metageneration; } }
pom.xml(cloud function)
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>cloudfunctions</groupId> <artifactId>http-function</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.target>11</maven.compiler.target> <maven.compiler.source>11</maven.compiler.source> </properties> <dependencies> <!-- https://mvnrepository.com/artifact/com.google.auth/google-auth-library-credentials --> <dependency> <groupId>com.google.auth</groupId> <artifactId>google-auth-library-credentials</artifactId> <version>0.21.1</version> </dependency> <dependency> <groupId>com.google.apis</groupId> <artifactId>google-api-services-dataflow</artifactId> <version>v1b3-rev207-1.20.0</version> </dependency> <dependency> <groupId>com.google.cloud.functions</groupId> <artifactId>functions-framework-api</artifactId> <version>1.0.1</version> </dependency> <dependency> <groupId>com.google.auth</groupId> <artifactId>google-auth-library-oauth2-http</artifactId> <version>0.21.1</version> </dependency> </dependencies> <!-- Required for Java 11 functions in the inline editor --> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.1</version> <configuration> <excludes> <exclude>.google/</exclude> </excludes> </configuration> </plugin> </plugins> </build> </project>
cloud function logs
I went through the below blogs (adding for reference) where they have triggered dataflow from cloud storage via cloud function. But the code has been written in either Node.js or python. But my cloud function must be written in java.
Triggering Dataflow pipeline via cloud functions in Node.js
https://dzone.com/articles/triggering-dataflow-pipelines-with-cloud-functions
Triggering dataflow pipeline via cloud functions using python
https://medium.com/google-cloud/how-to-kick-off-a-dataflow-pipeline-via-cloud-functions-696927975d4e
Any help on this is very much appreciated.
Advertisement
Answer
RuntimeEnvironment runtimeEnvironment = new RuntimeEnvironment(); runtimeEnvironment.setBypassTempDirValidation(false); runtimeEnvironment.setTempLocation("gs://karthiksfirstbucket/temp1"); LaunchTemplateParameters launchTemplateParameters = new LaunchTemplateParameters(); launchTemplateParameters.setEnvironment(runtimeEnvironment); launchTemplateParameters.setJobName("newJob" + (new Date()).getTime()); Map<String, String> params = new HashMap<String, String>(); params.put("inputFile", "gs://karthiksfirstbucket/sample.txt"); params.put("output", "gs://karthiksfirstbucket/count1"); launchTemplateParameters.setParameters(params); writer.write("4"); Dataflow.Projects.Templates.Launch launch = dataflowService.projects().templates().launch(projectId, launchTemplateParameters); launch.setGcsPath("gs://dataflow-templates-us-central1/latest/Word_Count"); launch.execute();
The above code launches a template and executes the dataflow pipeline
- using application default credentials(Which can be changed to user cred or service cred)
- region is default region(Which can be changed).
- creates a job for every HTTP trigger(Trigger can be changed).
The complete code can be found below:
https://github.com/karthikeyan1127/Java_CloudFunction_DataFlow/blob/master/Hello.java