Skip to content
Advertisement

Apache Beam framework – sort in descending order

How do you sort in descending order using the Apache Beam framework?

I managed to create a word count pipeline which sorts alphabetically the output by word, but did not figure out how to invert the sorting order.

Here is the code:

public class SortedWordCount {

    public static void main(String[] args) {
        PipelineOptions options = PipelineOptionsFactory.create();
        Pipeline p = Pipeline.create(options);

        BufferedExternalSorter.Options options1 = BufferedExternalSorter.options();

        p.apply(TextIO.read().from("d:/dev/playground/apache/beam/word-count-beam/src/test/resources/bible/whole_bible.txt"))
                .apply("ExtractWords", ParDo.of(new DoFn<String, String>() {
                    @ProcessElement
                    public void processElement(ProcessContext c) {
                        for (String word : c.element().split(ExampleUtils.TOKENIZER_PATTERN)) {
                            if (!word.isEmpty()) {
                                c.output(word);
                            }
                        }
                    }
                }))
                .apply(Count.perElement())
                .apply(ParDo.of(new DoFn<KV<String, Long>, KV<String, Long>>() {
                    @ProcessElement
                    public void processElement(ProcessContext c){
                        KV<String, Long> element = c.element();
                        if(element.getKey().length() > 2) {
                            c.output(element);
                        }
                    }
                }))
                .apply("CreateKey", MapElements.via(new SimpleFunction<KV<String, Long>, KV<String, KV<String, Long>>>() {
                    public KV<String, KV<String, Long>> apply(KV<String, Long> input) {
                        return KV.of("sort", KV.of(input.getKey().toLowerCase(), input.getValue()));
                    }
                }))
                .apply(GroupByKey.create())
                .apply(SortValues.create(options1))
                .apply("FormatResults", MapElements.via(new SimpleFunction<KV<String, Iterable<KV<String, Long>>>, String>() {
                    @Override
                    public String apply(KV<String, Iterable<KV<String, Long>>> input) {
                        return StreamSupport.stream(input.getValue().spliterator(), false)
                                .map(value -> String.format("%20s: %s", value.getKey(), value.getValue()))
                                .collect(Collectors.joining(String.format("%n")));
                    }
                }))
                .apply(TextIO.write().to("bible"));
        // Run the pipeline.
        p.run().waitUntilFinish();
    }
}

This code produces a list of words sorted alphabetically with its respective counts:

           aaron: 350
       aaronites: 2
         abaddon: 1
         abagtha: 1
           abana: 1
          abarim: 4
           abase: 4
          abased: 4
         abasing: 1
          abated: 6
            abba: 3
            abda: 2
          abdeel: 1
            abdi: 3
          abdiel: 1
           abdon: 8
        abednego: 15
            abel: 16
 abelbethmaachah: 2
        abelmaim: 1

Edit 1:

After some debugging I know that the code uses the class:

org.apache.beam.sdk.extensions.sorter.InMemorySorter

This class uses a static final Comparator during the execution of the sort method:

private static final Comparator<byte[]> COMPARATOR = UnsignedBytes.lexicographicalComparator();

public Iterable<KV<byte[], byte[]>> sort() {
  checkState(!sortCalled, "sort() can only be called once.");

  sortCalled = true;

  Comparator<KV<byte[], byte[]>> kvComparator =
    new Comparator<KV<byte[], byte[]>>() {

      @Override
      public int compare(KV<byte[], byte[]> o1, KV<byte[], byte[]> o2) {
        return COMPARATOR.compare(o1.getKey(), o2.getKey());
      }
    };
  Collections.sort(records, kvComparator);
  return Collections.unmodifiableList(records);
}

There is no way to inject a comparator in this class.

Advertisement

Answer

You can extract the Iterable<KV<String, Long>> into a List<KV<String, Long>> and reverse the list using Collections.reverse().

User contributions licensed under: CC BY-SA
7 People found this is helpful
Advertisement