Skip to content
Advertisement

RabbitMQ Streams

Using this document as a reference: https://blog.rabbitmq.com/posts/2021/07/rabbitmq-streams-first-application I have created a stream in RabbitMQ and added 1 million messages to it.

try (Environment environment = Environment.builder().uri("rabbitmq-stream://guest:guest@localhost:5552")
                .build()) {

            environment.streamCreator().stream("tenth-application-stream").create();
            Producer producer = environment.producerBuilder().stream("tenth-application-stream") 
                    .build();

        int messageCount = 1_000_000;

        CountDownLatch confirmLatch = new CountDownLatch(messageCount);
        IntStream.range(0, messageCount).forEach(i -> {
            Message message = producer.messageBuilder().properties().creationTime(System.currentTimeMillis())
                    .messageId(i).messageBuilder().properties().groupId(String.valueOf(i)).messageBuilder()
                    .addData(String.valueOf(i).getBytes(StandardCharsets.UTF_8)).build();
            producer.send(message, confirmationStatus -> confirmLatch.countDown());
        });
        try {
            boolean done = confirmLatch.await(10, TimeUnit.MINUTES);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    } catch (StreamException e) {
        System.out.println(e);
    }

I’m trying to read back all of the messages in the stream by doing the following:

    Consumer consumer = environment.consumerBuilder().stream("tenth-application-stream")
            .offset(OffsetSpecification.first()).messageHandler((context, message) -> {
                System.out.println(message.getBody().toString());

            }).build();

However, this will only print up to message 499. How do i see all of the messages in the stream?

Answer

Since your client has read 499 messages, we know that some messages were published by the producer. Have you verified that there were in fact more than 499 messages in the queue? That can be done using the RabbitMQ web manager. A nice feature of steam queues is that messages can remain in the queue after consumers have read them.

If the rest of the messages were published, then it is likely that the consumer closed its connection before all messages were consumed. The example from the blog (repo here) uses the Utils.waitAtMost method to delay closing the connection:

Utils.waitAtMost(60, () -> messageConsumed.get() >= 1_000_000);

This waits until either 60 seconds has elapsed or 1M messages have been consumed. Alternatively, you could use the CountDownLatch method to keep the connection open until the goal is reached.

Neither of these options seem ideal for a production use case, but they work well for a proof of concept.

Advertisement