Skip to content
Advertisement

How to create a custom Uniform Fan Out Shape in Akka through the Java library

So I am building an Akka custom component to learn the framework. I need to create a UnifromFanOut shape custom component. So I tried to extend the shape but the issue is how do I create the shape with 1 input and 3 outputs. The class requires a seq object which is a part of Scala but I am a bit confused on whether the whole shape itself is correct. I just started learning Akka so I can understand if my approach is wrong. Also I the purpose of the exercise was to create a custom component and I know I can get the logic done through GraphDSL but it needs to be a custom component. My question is how do I create this shape correctly. (the documentation isn’t the best for custom components)

    public final Inlet<DeviceInfo> in = Inlet.create("Map.in");
    public final Outlet<DeviceInfo> temp_out = Outlet.create("Map.out");
    public final Outlet<DeviceInfo> humidity_out = Outlet.create("Map.out");
    public final Outlet<DeviceInfo> illumination_out = Outlet.create("Map.out");
    //Does not work
    private final UniformFanOutShape<DeviceInfo, DeviceInfo> shape = UniformFanOutShape.apply(in, Arrays.asList(temp_out, humidity_out));

    @Override
    public UniformFanOutShape<DeviceInfo, DeviceInfo> shape() {     
        return shape;
    }

Advertisement

Answer

[EDIT] TL;DR You basically need to work around calling Scala vararg method from Java. You can do it like this using scala.collection.mutable.ArrayBuffer

    ArrayBuffer<Outlet<DeviceInfo>> arrayBuffer = new ArrayBuffer<>();
    arrayBuffer.append(temp_out);
    arrayBuffer.append(humidity_out);
    arrayBuffer.append(illumination_out);

    final UniformFanOutShape<DeviceInfo, DeviceInfo> shape =
      UniformFanOutShape.apply(in, arrayBuffer.toSeq());

source: How to use Scala varargs from Java code

[/EDIT]

(the documentation isn’t the best for custom components)

I disagree. The documentation is pretty comprehensive (https://doc.akka.io/docs/akka/current/stream/stream-customize.html)

You shouldn’t extend UniformFanOutShape<DeviceInfo, DeviceInfo>, but rather GraphStage<UniformFanOutShape<DeviceInfo, DeviceInfo>>.

You then need to override two methods:

 public UniformFanOutShape<DeviceInfo, DeviceInfo> shape() {
    //implement
 }

 @Override
 public GraphStageLogic createLogic(Attributes inheritedAttributes) {
   //implement
 }

the documentation shows you how to create shapes and how to implement GraphStageLogic with detailed information about handling backpressure, etc.

The documentation shows examples for SourceShape, SinkShape and FlowShape. If you understand those examples, but need some more guidance you can look at the source code. In akka.stream.scaladsl.Graph.scala you can find implementation of all built in GraphStages Akka Streams provide. For example Broadcast is an example of a UniformFanOutShape stage and this is how the code for it starts:

final class Broadcast[T](val outputPorts: Int, val eagerCancel: Boolean) extends GraphStage[UniformFanOutShape[T, T]]

The implementation is under 100 lines of code and follows the same pattern like the documentation shows. The code is Scala, because Akka is implemented in that language with a thin Java layer meant to be used from Java code. So for example there’s also akka.stream.javadsl.Graph.scala, but you will see that the code there is delegating to implementations in scaladsl package. You will see that pattern all over Akka code. So in short you will need to understand a bit of scala to understand the implementation but in this case here it’s very similar to what Java implementation could look like.

User contributions licensed under: CC BY-SA
2 People found this is helpful
Advertisement