Skip to content
Advertisement

Apache Storm 2.1.0 local DRPC does not return any response although a tuple is well emitted to the collector by the last bolt

I have a problem trying to run a DRPC topology containing one single bolt and query it through a local cluster. After debugging with IntelliJ, the bolt is indeed executed but the JCQueue is stuck in an infinite loop after that the bolt has been executed and until a timeout is sent to the server.

Here is the code used to build the topology builder:

public static LinearDRPCTopologyBuilder createBuilder()
{
    var bolt = new MRedisLookupBolt(createRedisConfiguration(), new RedisTurnoverMapper());
    var builder = new LinearDRPCTopologyBuilder("sales");
    builder.addBolt(bolt, 1).localOrShuffleGrouping();
    return builder;
}

The MRedisLookupBolt is just a very simple implementation of IBasicBolt executing a hget command against Jedis. The execute method of the MRedisLookupBolt is just emitting an instance of Values containing the value for two fields that are declared like this:

declarer.declare(new Fields("id", "Value"));

The topology is built and queried in an unit test like this:

Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(1);

try(LocalDRPC drpc = new LocalDRPC())
{
       LocalCluster cluster = new LocalCluster();
       var builder = BasicRedisRPCTopology.createBuilder();
       LocalCluster.LocalTopology topo = cluster.submitTopology(
              "Sales-fetch", conf, builder.createLocalTopology(drpc));
       var result = drpc.execute("sales", "XXXXX");
       System.out.println("################ Result: " + result);
}
catch (Exception e)
{
       e.printStackTrace();
}

When reading the logs, I am sure that the data is well red by the bolt and that everything is emitted enter image description here

But at the end, I have this stack trace gently printed out by my test method. Of course, no value is allocated to the result variable and the process never reach the last print instructions:

enter image description here

There is something that I am missing here. What I understand: the JCQueue used by BoltExecutor to retrieve the id of which bolt to execute is never ending although there is only one parameters sent to the local DRPC and only one bolt declared into the topology. I have already tried to add more bolts to the topology or change the builder implementation used to create it but with no success.

Answer

I found a solution suitable for my use case using Apache Storm 2.1.0.

It seems that invoking the submitTopology method of the local cluster as proposed by the documentation does not end the executor correctly with version 2.1.0 using the LinearDRPCTopologyBuilder to build the topology.

By looking closer to the source code, it was possible to understand how to apply the LinearDRPCTopologyBuilder logic to the TopologyBuilder directly.

Here is the change applied to the createBuilder method:

    public static TopologyBuilder createBuilder(ILocalDRPC localDRPC)
    {
        var spout = Optional.ofNullable(localDRPC)
                .map(drpc -> new DRPCSpout("sales", drpc))
                .orElse(new DRPCSpout("sales"));
        var bolt = new MRedisLookupBolt(createRedisConfiguration(), new RedisTurnoverMapper());
        var builder = new TopologyBuilder();
        builder.setSpout("drpc", spout);
        builder.setBolt("redisLookup", bolt, 1)
               .shuffleGrouping("drpc");
        builder.setBolt("return", new ReturnResults())
               .shuffleGrouping("redisLookup");
        return builder;
    }

And here is an exemple of execution:

        Config conf = new Config();
        conf.setDebug(true);
        conf.setNumWorkers(1);

        try(LocalDRPC drpc = new LocalDRPC())
        {
            LocalCluster cluster = new LocalCluster();
            var builder = BasicRedisRPCTopology.createBuilder(drpc);
            cluster.submitTopology("Sales-fetch", conf, builder.createTopology());
            var result = drpc.execute("sales", "XXXXX");
            System.out.println("################ Result: " + result);
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }

Unfortunately this solution does not allow to use all the embedded tools of the LinearDRPCTopologyBuilder and implies to build all the topology flow ‘by hand’. Is is necessary to change the mapper behavior to as the fields are not exposed in the same order as before.

Advertisement