I am exploring testing with Beam and encountered a weird problem.
My driver program works as expected, but its test is failing with an error like this:
Expected: iterable with items [<Row: project_id:count count_in:2 count_out:0 type:null window_max_ts:86399999 >] in any order but: not matched: <Row: project_id:p1 count_in:2 count_out:0 type:count window_max_ts:86399999 >
And here is my PAssert code:
PAssert .that(output) .inWindow(window) .containsInAnyOrder( Row .withSchema(OUTPUT_SCHEMA) .withFieldValue("type", "count") .withFieldValue("count_in", 2L) .withFieldValue("count_out", 0L) .withFieldValue(AddWindowTimestamp.TIMESTAMP_FIELD, window.maxTimestamp().getMillis()) .build() );
On the last step of my pipeline, I log the element in question.
[direct-runner-worker] DEBUG co.botanalytics.data.processing.beam.transforms.Log - Window: [maxTimestamp=1970-01-01T23:59:59.999Z], Pane: [PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, onTimeIndex=0}], Element: Row: project_id:p1 count_in:2 count_out:0 type:count window_max_ts:86399999
This is the expected result.
When I debugged the test, the problem boiled down to CoderUtils
from Beam Java SDK.
After CoderUtils
encodes and decodes, it produces a completely different expected Row. All of its fields are messed up, and as a result, PAssert fails.
I am wondering if there are any solutions to this problem. Any suggestions are more than welcome.
Thanks in advance!
OUTPUT_SCHEMA
definition:
private static final transient Schema SCHEMA = Schema .builder() .addStringField("project_id") .addNullableField("type", Schema.FieldType.STRING) .addInt64Field("count_in") .addInt64Field("count_out") .build();
Advertisement
Answer
The code can work as expected and the test fails. I believe that is happening because of an error on the PAssert definition.
Add the project tag in the test row definition
.withFieldValue("project_id", "p1")
, it may solve the problem of the crossed parameters.For the error
Expected: iterable with items [<Row: ... >] in any order but: not matched:
please provide theoutput
variable as anArray of Rows
, instead of only a singleRow
. Its expecting anarray
but just receiving a singleRow
.
Your final code will be something like this:
// just an example to convert to array, choose any suitable way for you Foo[] array = new Foo[output.size()]; output.toArray(array); PAssert .that(output) .inWindow(window) .containsInAnyOrder( Row .withSchema(OUTPUT_SCHEMA) .withFieldValue("project_id", "p1") .withFieldValue("type", "count") .withFieldValue("count_in", 2L) .withFieldValue("count_out", 0L) .withFieldValue(AddWindowTimestamp.TIMESTAMP_FIELD, window.maxTimestamp().getMillis()) .build() );