I have a problem trying to run a DRPC topology containing one single bolt and query it through a local cluster. After debugging with IntelliJ, the bolt is indeed executed but the JCQueue is stuck in an infinite loop after that the bolt has been executed and until a timeout is sent to the server.
Here is the code used to build the topology builder:
public static LinearDRPCTopologyBuilder createBuilder() { var bolt = new MRedisLookupBolt(createRedisConfiguration(), new RedisTurnoverMapper()); var builder = new LinearDRPCTopologyBuilder("sales"); builder.addBolt(bolt, 1).localOrShuffleGrouping(); return builder; }
The MRedisLookupBolt is just a very simple implementation of IBasicBolt executing a hget command against Jedis. The execute
method of the MRedisLookupBolt is just emitting an instance of Values
containing the value for two fields that are declared like this:
declarer.declare(new Fields("id", "Value"));
The topology is built and queried in an unit test like this:
Config conf = new Config(); conf.setDebug(true); conf.setNumWorkers(1); try(LocalDRPC drpc = new LocalDRPC()) { LocalCluster cluster = new LocalCluster(); var builder = BasicRedisRPCTopology.createBuilder(); LocalCluster.LocalTopology topo = cluster.submitTopology( "Sales-fetch", conf, builder.createLocalTopology(drpc)); var result = drpc.execute("sales", "XXXXX"); System.out.println("################ Result: " + result); } catch (Exception e) { e.printStackTrace(); }
When reading the logs, I am sure that the data is well red by the bolt and that everything is emitted
But at the end, I have this stack trace gently printed out by my test method. Of course, no value is allocated to the result variable and the process never reach the last print instructions:
There is something that I am missing here. What I understand: the JCQueue used by BoltExecutor to retrieve the id of which bolt to execute is never ending although there is only one parameters sent to the local DRPC and only one bolt declared into the topology. I have already tried to add more bolts to the topology or change the builder implementation used to create it but with no success.
Advertisement
Answer
I found a solution suitable for my use case using Apache Storm 2.1.0.
It seems that invoking the submitTopology
method of the local cluster as proposed by the documentation does not end the executor correctly with version 2.1.0 using the LinearDRPCTopologyBuilder
to build the topology.
By looking closer to the source code, it was possible to understand how to apply the LinearDRPCTopologyBuilder
logic to the TopologyBuilder
directly.
Here is the change applied to the createBuilder
method:
public static TopologyBuilder createBuilder(ILocalDRPC localDRPC) { var spout = Optional.ofNullable(localDRPC) .map(drpc -> new DRPCSpout("sales", drpc)) .orElse(new DRPCSpout("sales")); var bolt = new MRedisLookupBolt(createRedisConfiguration(), new RedisTurnoverMapper()); var builder = new TopologyBuilder(); builder.setSpout("drpc", spout); builder.setBolt("redisLookup", bolt, 1) .shuffleGrouping("drpc"); builder.setBolt("return", new ReturnResults()) .shuffleGrouping("redisLookup"); return builder; }
And here is an exemple of execution:
Config conf = new Config(); conf.setDebug(true); conf.setNumWorkers(1); try(LocalDRPC drpc = new LocalDRPC()) { LocalCluster cluster = new LocalCluster(); var builder = BasicRedisRPCTopology.createBuilder(drpc); cluster.submitTopology("Sales-fetch", conf, builder.createTopology()); var result = drpc.execute("sales", "XXXXX"); System.out.println("################ Result: " + result); } catch (Exception e) { e.printStackTrace(); }
Unfortunately this solution does not allow to use all the embedded tools of the LinearDRPCTopologyBuilder
and implies to build all the topology flow ‘by hand’. Is is necessary to change the mapper behavior to as the fields are not exposed in the same order as before.