How do you sort in descending order using the Apache Beam framework?
I managed to create a word count pipeline which sorts alphabetically the output by word, but did not figure out how to invert the sorting order.
Here is the code:
JavaScript
x
public class SortedWordCount {
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
BufferedExternalSorter.Options options1 = BufferedExternalSorter.options();
p.apply(TextIO.read().from("d:/dev/playground/apache/beam/word-count-beam/src/test/resources/bible/whole_bible.txt"))
.apply("ExtractWords", ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
for (String word : c.element().split(ExampleUtils.TOKENIZER_PATTERN)) {
if (!word.isEmpty()) {
c.output(word);
}
}
}
}))
.apply(Count.perElement())
.apply(ParDo.of(new DoFn<KV<String, Long>, KV<String, Long>>() {
@ProcessElement
public void processElement(ProcessContext c){
KV<String, Long> element = c.element();
if(element.getKey().length() > 2) {
c.output(element);
}
}
}))
.apply("CreateKey", MapElements.via(new SimpleFunction<KV<String, Long>, KV<String, KV<String, Long>>>() {
public KV<String, KV<String, Long>> apply(KV<String, Long> input) {
return KV.of("sort", KV.of(input.getKey().toLowerCase(), input.getValue()));
}
}))
.apply(GroupByKey.create())
.apply(SortValues.create(options1))
.apply("FormatResults", MapElements.via(new SimpleFunction<KV<String, Iterable<KV<String, Long>>>, String>() {
@Override
public String apply(KV<String, Iterable<KV<String, Long>>> input) {
return StreamSupport.stream(input.getValue().spliterator(), false)
.map(value -> String.format("%20s: %s", value.getKey(), value.getValue()))
.collect(Collectors.joining(String.format("%n")));
}
}))
.apply(TextIO.write().to("bible"));
// Run the pipeline.
p.run().waitUntilFinish();
}
}
This code produces a list of words sorted alphabetically with its respective counts:
JavaScript
aaron: 350
aaronites: 2
abaddon: 1
abagtha: 1
abana: 1
abarim: 4
abase: 4
abased: 4
abasing: 1
abated: 6
abba: 3
abda: 2
abdeel: 1
abdi: 3
abdiel: 1
abdon: 8
abednego: 15
abel: 16
abelbethmaachah: 2
abelmaim: 1
Edit 1:
After some debugging I know that the code uses the class:
JavaScript
org.apache.beam.sdk.extensions.sorter.InMemorySorter
This class uses a static final Comparator during the execution of the sort method:
JavaScript
private static final Comparator<byte[]> COMPARATOR = UnsignedBytes.lexicographicalComparator();
public Iterable<KV<byte[], byte[]>> sort() {
checkState(!sortCalled, "sort() can only be called once.");
sortCalled = true;
Comparator<KV<byte[], byte[]>> kvComparator =
new Comparator<KV<byte[], byte[]>>() {
@Override
public int compare(KV<byte[], byte[]> o1, KV<byte[], byte[]> o2) {
return COMPARATOR.compare(o1.getKey(), o2.getKey());
}
};
Collections.sort(records, kvComparator);
return Collections.unmodifiableList(records);
}
There is no way to inject a comparator in this class.
Advertisement
Answer
You can extract the Iterable<KV<String, Long>>
into a List<KV<String, Long>>
and reverse the list using Collections.reverse()
.