Context
I am writing a Netty application (Netty 4) in which the processing of each message might take some time. As an example of what I mean I have created an EchoHandler
that responds with the original message, though sometimes after a short delay:
public class EchoHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { var message = (String) msg; if (message.equals("delay")) { ctx.executor().schedule(() -> ctx.writeAndFlush(message), 400, TimeUnit.MILLISECONDS); } else { ctx.writeAndFlush(message); } } }
Problem
Using this code, the order of the responses is not guaranteed to be the same as the order of the incoming requests. In fact, if the first message consists of the string “delay” and the second of some other string, the order of the responses will be reversed! I have written a test to illustrate this:
public class Tests { @Test public void test() throws ExecutionException, InterruptedException { var channel = new EmbeddedChannel(new EchoHandler()); channel.writeInbound("delay", "second message"); // Let some time pass and process any scheduled tasks Thread.sleep(500); channel.runPendingTasks(); // The response to the last message comes first! var firstMessage = (String) channel.readOutbound(); Assertions.assertEquals("second message", firstMessage); // The response to the first message comes second! var secondMessage = (String) channel.readOutbound(); Assertions.assertEquals("delay", secondMessage); } }
Question
I am looking for a built-in way in Netty to guarantee the order of the outgoing responses matches the order of the incoming messages. Using an imaginary extension of the library, I would rewrite the handler as follows:
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) { var message = (String) msg; ctx.pauseMessageProcessing(); // Upcoming messages will be temporarily queued if (message.equals("delay")) { ctx.executor().schedule(() -> { ctx.writeAndFlush(message); ctx.resumeMessageProcessing(); // After flushing we can resume message processing }, 400, TimeUnit.MILLISECONDS); } else { ctx.writeAndFlush(message); ctx.resumeMessageProcessing(); // After flushing we can resume message processing } }
Does Netty provide something like this out of the box? It sounds like a common use case and I’d much rather rely on battle-tested code than writing my own queuing implementation.
Advertisement
Answer
Though not built-in, a seemingly idiomatic solution for this problem is a custom MessageToMessageCodec
. Based on the example which uses String
as message type, I wrote the following codec which handles queuing for you:
public class QueuingCodec extends MessageToMessageCodec<String, String> { private final Queue<String> messageQueue = new ArrayDeque<>(); private boolean processingMessage = false; @Override protected void encode(ChannelHandlerContext ctx, String s, List<Object> list) throws Exception { // Pass the message on to output list.add(s); // Allow processing more messages processingMessage = false; // Send the next message in the queue if available if (!messageQueue.isEmpty()) { processingMessage = true; var message = messageQueue.poll(); // Pass the message on to input ctx.executor().execute(() -> { ctx.fireChannelRead(message); }); } } @Override protected void decode(ChannelHandlerContext ctx, String s, List<Object> list) throws Exception { if (processingMessage) { // Store message for later processing messageQueue.add(s); } else { // Pass data on to input list.add(s); // Prevent processing of new messages processingMessage = true; } } }