How can I send a PubSub message manually (that is to say, without using a PubsubIO
) in Dataflow ?
Importing (via Maven) google-cloud-dataflow-java-sdk-all 2.5.0
already imports a version of com.google.pubsub.v1
for which I was unable to find an easy way to send messages to a Pubsub topic (this version doesn’t, for instance, allow to manipulate Publisher
instances, which is the way described in the official documentation).
Advertisement
Answer
Here’s a way I found browsing https://github.com/GoogleCloudPlatform/cloud-pubsub-samples-java/blob/master/dataflow/src/main/java/com/google/cloud/dataflow/examples/StockInjector.java:
JavaScript
x
import com.google.api.services.pubsub.Pubsub;
import com.google.api.services.pubsub.model.PublishRequest;
import com.google.api.services.pubsub.model.PubsubMessage;
public class PubsubManager {
private static final Logger logger = LoggerFactory.getLogger(PubsubManager.class);
private static final JsonFactory JSON_FACTORY = JacksonFactory.getDefaultInstance();
private static final Pubsub pubsub = createPubsubClient();
public static class RetryHttpInitializerWrapper implements HttpRequestInitializer {
// Intercepts the request for filling in the "Authorization"
// header field, as well as recovering from certain unsuccessful
// error codes wherein the Credential must refresh its token for a
// retry.
private final GoogleCredential wrappedCredential;
// A sleeper; you can replace it with a mock in your test.
private final Sleeper sleeper;
private RetryHttpInitializerWrapper(GoogleCredential wrappedCredential) {
this(wrappedCredential, Sleeper.DEFAULT);
}
// Use only for testing.
RetryHttpInitializerWrapper(
GoogleCredential wrappedCredential, Sleeper sleeper) {
this.wrappedCredential = Preconditions.checkNotNull(wrappedCredential);
this.sleeper = sleeper;
}
@Override
public void initialize(HttpRequest request) {
final HttpUnsuccessfulResponseHandler backoffHandler =
new HttpBackOffUnsuccessfulResponseHandler(
new ExponentialBackOff())
.setSleeper(sleeper);
request.setInterceptor(wrappedCredential);
request.setUnsuccessfulResponseHandler(
new HttpUnsuccessfulResponseHandler() {
@Override
public boolean handleResponse(HttpRequest request,
HttpResponse response,
boolean supportsRetry)
throws IOException {
if (wrappedCredential.handleResponse(request,
response,
supportsRetry)) {
// If credential decides it can handle it, the
// return code or message indicated something
// specific to authentication, and no backoff is
// desired.
return true;
} else if (backoffHandler.handleResponse(request,
response,
supportsRetry)) {
// Otherwise, we defer to the judgement of our
// internal backoff handler.
logger.info("Retrying " + request.getUrl());
return true;
} else {
return false;
}
}
});
request.setIOExceptionHandler(new HttpBackOffIOExceptionHandler(
new ExponentialBackOff()).setSleeper(sleeper));
}
}
/**
* Creates a Cloud Pub/Sub client.
*/
private static Pubsub createPubsubClient() {
try {
HttpTransport transport = GoogleNetHttpTransport.newTrustedTransport();
GoogleCredential credential = GoogleCredential.getApplicationDefault();
HttpRequestInitializer initializer =
new RetryHttpInitializerWrapper(credential);
return new Pubsub.Builder(transport, JSON_FACTORY, initializer).build();
} catch (IOException | GeneralSecurityException e) {
logger.error("Could not create Pubsub client: " + e);
}
return null;
}
/**
* Publishes the given message to a Cloud Pub/Sub topic.
*/
public static void publishMessage(String message, String outputTopic) {
int maxLogMessageLength = 200;
if (message.length() < maxLogMessageLength) {
maxLogMessageLength = message.length();
}
logger.info("Received ...." + message.substring(0, maxLogMessageLength));
// Publish message to Pubsub.
PubsubMessage pubsubMessage = new PubsubMessage();
pubsubMessage.encodeData(message.getBytes());
PublishRequest publishRequest = new PublishRequest();
publishRequest.setMessages(Collections.singletonList(pubsubMessage));
try {
pubsub.projects().topics().publish(outputTopic, publishRequest).execute();
} catch (java.io.IOException e) {
logger.error("Stuff happened in pubsub: " + e);
}
}
}