Skip to content
Advertisement

i.grpc.internal.AbstractClientStream – Received data on closed stream meaning

I have a Spring boot application (v2.2.10.RELEASE) that subscribes to multiple topics in pubSub and pulls async data and sends it to somewhere else. I am not using SpringGCP, just native google libraries

this is my subscriber setting:

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
            (PubsubMessage message, AckReplyConsumer consumer) -> {
                messages.add(message);
                consumer.ack();
            };

    Subscriber subscriber = Subscriber.newBuilder(subscriptionName, receiver)
                                    .setParallelPullCount(2)
                                    .setFlowControlSettings(flowControlSettings)
                                    .setCredentialsProvider(credentialsProvider)
                                    .setExecutorProvider(executorProvider)
                                     //.setChannelProvider()
                                    .build();

With high traffic and big messages (2 – 4 kb) I encounter this info message:

[grpc-default-worker-ELG-1-1] INFO  i.grpc.internal.AbstractClientStream - Received data on closed stream

first of all, I don’t fully understand what that means? all that I noticed was that when this happens the delivered duplicated messages increase. so I assumed it meant that pubSub tried to reach the subscriber with some messages but the subscriber for some reason was not ready so pubSub will try to deliver the messages again. and hence more duplicates, is that right?

would this problem be solved using the TransportChannelProvider in subscribers? my understanding of the poorly written documentation, that this will create a new channel for delivery when the current in-use channel is closed, hence get rid of the previous log message.

if yes, how do I define the channel target string? and where can I find A NameResolver-compliant URI for the mangagedChannel. the snippet I mean is this:

    private TransportChannelProvider getChannelProvider() {
    ManagedChannel channel = ManagedChannelBuilder.forTarget(target).usePlaintext(true).build();
    return FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));
}

I am pretty new to GCP so sorry if my question is not coherent enough

Advertisement

Answer

Using a custom TransportChannelProvider won’t solve this type of issue. This is more likely an issue deeper down in the stack, e.g., at the gRPC level. There have been some open issues for this type of error [1, 2].

With regard to why it is causing duplicates, it is possible that the messages are getting delivered via a stream that is already closed (which aligns with the error message) because they were trapped in a lower-level buffer at the gRPC layer and therefore ended up being duplicates of messages that were subsequently delivered and processed via another stream. This could be a version of the issue discussed in the documentation around large backlogs of small messages. There was a fix for this issue in v1.109.0 of the Java client library, so if you are using a version older than that, it is worth updating.

If duplicates continue to be an issue, it would be best to reach out to support with the name of your subscription and the message IDs of some of the duplicate messages so that they can look at the delivery patterns for those messages and further diagnose if these redeliveries are unexpected.

Advertisement