So I am building an Akka custom component to learn the framework. I need to create a UnifromFanOut shape custom component. So I tried to extend the shape but the issue is how do I create the shape with 1 input and 3 outputs. The class requires a seq object which is a part of Scala but I am a bit confused on whether the whole shape itself is correct. I just started learning Akka so I can understand if my approach is wrong. Also I the purpose of the exercise was to create a custom component and I know I can get the logic done through GraphDSL but it needs to be a custom component. My question is how do I create this shape correctly. (the documentation isn’t the best for custom components)
public final Inlet<DeviceInfo> in = Inlet.create("Map.in"); public final Outlet<DeviceInfo> temp_out = Outlet.create("Map.out"); public final Outlet<DeviceInfo> humidity_out = Outlet.create("Map.out"); public final Outlet<DeviceInfo> illumination_out = Outlet.create("Map.out"); //Does not work private final UniformFanOutShape<DeviceInfo, DeviceInfo> shape = UniformFanOutShape.apply(in, Arrays.asList(temp_out, humidity_out)); @Override public UniformFanOutShape<DeviceInfo, DeviceInfo> shape() { return shape; }
Advertisement
Answer
[EDIT]
TL;DR
You basically need to work around calling Scala vararg method from Java. You can do it like this using scala.collection.mutable.ArrayBuffer
ArrayBuffer<Outlet<DeviceInfo>> arrayBuffer = new ArrayBuffer<>(); arrayBuffer.append(temp_out); arrayBuffer.append(humidity_out); arrayBuffer.append(illumination_out); final UniformFanOutShape<DeviceInfo, DeviceInfo> shape = UniformFanOutShape.apply(in, arrayBuffer.toSeq());
source: How to use Scala varargs from Java code
[/EDIT]
(the documentation isn’t the best for custom components)
I disagree. The documentation is pretty comprehensive (https://doc.akka.io/docs/akka/current/stream/stream-customize.html)
You shouldn’t extend UniformFanOutShape<DeviceInfo, DeviceInfo>
, but rather GraphStage<UniformFanOutShape<DeviceInfo, DeviceInfo>>
.
You then need to override two methods:
public UniformFanOutShape<DeviceInfo, DeviceInfo> shape() { //implement } @Override public GraphStageLogic createLogic(Attributes inheritedAttributes) { //implement }
the documentation shows you how to create shapes and how to implement GraphStageLogic
with detailed information about handling backpressure, etc.
The documentation shows examples for SourceShape
, SinkShape
and FlowShape
. If you understand those examples, but need some more guidance you can look at the source code. In akka.stream.scaladsl.Graph.scala
you can find implementation of all built in GraphStage
s Akka Streams provide. For example Broadcast
is an example of a UniformFanOutShape
stage and this is how the code for it starts:
final class Broadcast[T](val outputPorts: Int, val eagerCancel: Boolean) extends GraphStage[UniformFanOutShape[T, T]]
The implementation is under 100 lines of code and follows the same pattern like the documentation shows. The code is Scala, because Akka is implemented in that language with a thin Java layer meant to be used from Java code. So for example there’s also akka.stream.javadsl.Graph.scala
, but you will see that the code there is delegating to implementations in scaladsl
package. You will see that pattern all over Akka code. So in short you will need to understand a bit of scala to understand the implementation but in this case here it’s very similar to what Java implementation could look like.