Skip to content
Advertisement

Make order of responses match order of requests in Netty

Context

I am writing a Netty application (Netty 4) in which the processing of each message might take some time. As an example of what I mean I have created an EchoHandler that responds with the original message, though sometimes after a short delay:

public class EchoHandler extends ChannelInboundHandlerAdapter {
  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) {
    var message = (String) msg;

    if (message.equals("delay")) {
      ctx.executor().schedule(() -> ctx.writeAndFlush(message), 400, TimeUnit.MILLISECONDS);
    } else {
      ctx.writeAndFlush(message);
    }
  }
}

Problem

Using this code, the order of the responses is not guaranteed to be the same as the order of the incoming requests. In fact, if the first message consists of the string “delay” and the second of some other string, the order of the responses will be reversed! I have written a test to illustrate this:

public class Tests {
  @Test
  public void test() throws ExecutionException, InterruptedException {
    var channel = new EmbeddedChannel(new EchoHandler());
    channel.writeInbound("delay", "second message");

    // Let some time pass and process any scheduled tasks
    Thread.sleep(500);
    channel.runPendingTasks();

    // The response to the last message comes first!
    var firstMessage = (String) channel.readOutbound();
    Assertions.assertEquals("second message", firstMessage);

    // The response to the first message comes second!
    var secondMessage = (String) channel.readOutbound();
    Assertions.assertEquals("delay", secondMessage);
  }
}

Question

I am looking for a built-in way in Netty to guarantee the order of the outgoing responses matches the order of the incoming messages. Using an imaginary extension of the library, I would rewrite the handler as follows:

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
  var message = (String) msg;

  ctx.pauseMessageProcessing(); // Upcoming messages will be temporarily queued
  
  if (message.equals("delay")) {
    ctx.executor().schedule(() -> {
      ctx.writeAndFlush(message);
      ctx.resumeMessageProcessing(); // After flushing we can resume message processing
    }, 400, TimeUnit.MILLISECONDS);
  } else {
    ctx.writeAndFlush(message);
    ctx.resumeMessageProcessing(); // After flushing we can resume message processing
  }
}

Does Netty provide something like this out of the box? It sounds like a common use case and I’d much rather rely on battle-tested code than writing my own queuing implementation.

Advertisement

Answer

Though not built-in, a seemingly idiomatic solution for this problem is a custom MessageToMessageCodec. Based on the example which uses String as message type, I wrote the following codec which handles queuing for you:

public class QueuingCodec extends MessageToMessageCodec<String, String> {

  private final Queue<String> messageQueue = new ArrayDeque<>();
  private boolean processingMessage = false;

  @Override
  protected void encode(ChannelHandlerContext ctx, String s, List<Object> list)
      throws Exception {

    // Pass the message on to output
    list.add(s);

    // Allow processing more messages
    processingMessage = false;

    // Send the next message in the queue if available
    if (!messageQueue.isEmpty()) {
      processingMessage = true;
      var message = messageQueue.poll();

      // Pass the message on to input
      ctx.executor().execute(() -> {
        ctx.fireChannelRead(message);
      });
    }
  }

  @Override
  protected void decode(ChannelHandlerContext ctx, String s, List<Object> list)
      throws Exception {

    if (processingMessage) {
      // Store message for later processing
      messageQueue.add(s);
    } else {
      // Pass data on to input
      list.add(s);

      // Prevent processing of new messages
      processingMessage = true;
    }
  }
}
User contributions licensed under: CC BY-SA
8 People found this is helpful
Advertisement