Skip to content
Advertisement

Sending PubSub message manually in Dataflow

How can I send a PubSub message manually (that is to say, without using a PubsubIO) in Dataflow ?

Importing (via Maven) google-cloud-dataflow-java-sdk-all 2.5.0 already imports a version of com.google.pubsub.v1 for which I was unable to find an easy way to send messages to a Pubsub topic (this version doesn’t, for instance, allow to manipulate Publisher instances, which is the way described in the official documentation).

Advertisement

Answer

Here’s a way I found browsing https://github.com/GoogleCloudPlatform/cloud-pubsub-samples-java/blob/master/dataflow/src/main/java/com/google/cloud/dataflow/examples/StockInjector.java:

import com.google.api.services.pubsub.Pubsub;
import com.google.api.services.pubsub.model.PublishRequest;
import com.google.api.services.pubsub.model.PubsubMessage;
public class PubsubManager {
    private static final Logger logger = LoggerFactory.getLogger(PubsubManager.class);
    private static final JsonFactory JSON_FACTORY = JacksonFactory.getDefaultInstance();
    private static final Pubsub pubsub = createPubsubClient();

    public static class RetryHttpInitializerWrapper implements HttpRequestInitializer {

        // Intercepts the request for filling in the "Authorization"
        // header field, as well as recovering from certain unsuccessful
        // error codes wherein the Credential must refresh its token for a
        // retry.
        private final GoogleCredential wrappedCredential;

        // A sleeper; you can replace it with a mock in your test.
        private final Sleeper sleeper;

        private RetryHttpInitializerWrapper(GoogleCredential wrappedCredential) {
            this(wrappedCredential, Sleeper.DEFAULT);
        }

        // Use only for testing.
        RetryHttpInitializerWrapper(
                GoogleCredential wrappedCredential, Sleeper sleeper) {
            this.wrappedCredential = Preconditions.checkNotNull(wrappedCredential);
            this.sleeper = sleeper;
        }

        @Override
        public void initialize(HttpRequest request) {
            final HttpUnsuccessfulResponseHandler backoffHandler =
                    new HttpBackOffUnsuccessfulResponseHandler(
                            new ExponentialBackOff())
                            .setSleeper(sleeper);
            request.setInterceptor(wrappedCredential);
            request.setUnsuccessfulResponseHandler(
                    new HttpUnsuccessfulResponseHandler() {
                        @Override
                        public boolean handleResponse(HttpRequest request,
                                                      HttpResponse response,
                                                      boolean supportsRetry)
                                throws IOException {
                            if (wrappedCredential.handleResponse(request,
                                    response,
                                    supportsRetry)) {
                                // If credential decides it can handle it, the
                                // return code or message indicated something
                                // specific to authentication, and no backoff is
                                // desired.
                                return true;
                            } else if (backoffHandler.handleResponse(request,
                                    response,
                                    supportsRetry)) {
                                // Otherwise, we defer to the judgement of our
                                // internal backoff handler.
                                logger.info("Retrying " + request.getUrl());
                                return true;
                            } else {
                                return false;
                            }
                        }
                    });
            request.setIOExceptionHandler(new HttpBackOffIOExceptionHandler(
                    new ExponentialBackOff()).setSleeper(sleeper));
        }
    }

    /**
     * Creates a Cloud Pub/Sub client.
     */
    private static Pubsub createPubsubClient() {
        try {
            HttpTransport transport = GoogleNetHttpTransport.newTrustedTransport();
            GoogleCredential credential = GoogleCredential.getApplicationDefault();
            HttpRequestInitializer initializer =
                    new RetryHttpInitializerWrapper(credential);
            return new Pubsub.Builder(transport, JSON_FACTORY, initializer).build();
        } catch (IOException | GeneralSecurityException e) {
            logger.error("Could not create Pubsub client: " + e);
        }
        return null;
    }

    /**
     * Publishes the given message to a Cloud Pub/Sub topic.
     */
    public static void publishMessage(String message, String outputTopic) {
        int maxLogMessageLength = 200;
        if (message.length() < maxLogMessageLength) {
            maxLogMessageLength = message.length();
        }
        logger.info("Received ...." + message.substring(0, maxLogMessageLength));

        // Publish message to Pubsub.
        PubsubMessage pubsubMessage = new PubsubMessage();
        pubsubMessage.encodeData(message.getBytes());

        PublishRequest publishRequest = new PublishRequest();
        publishRequest.setMessages(Collections.singletonList(pubsubMessage));
        try {
            pubsub.projects().topics().publish(outputTopic, publishRequest).execute();
        } catch (java.io.IOException e) {
            logger.error("Stuff happened in pubsub: " + e);
        }
    }
}
User contributions licensed under: CC BY-SA
8 People found this is helpful
Advertisement