Skip to content
Advertisement

How to Peek the Lowest elements from a List of sorted Streams continously

I started learning about Java Stream, and I am wondering is it possible to only peek the first element of the stream without retrieving it.

For example, I have multiple streams, and each of them have integers that are sorted in non-decreasing order, and I want to get a sorted list of all integers, so I’m thinking about using a PrioirtyQueue<Stream> sorted in non-decreasing order as well.

However, in order to get the PrioirtyQueue<Stream> to sort the streams, I need to pass in a comparator for stream to compare streams by their first element, and I am not sure how to peek the first element in each stream.

For example, I have the following streams.

[1, 2, 3, 5],
[0, 2, 4, 6]

I want to write a function getNextInteger(), that handles a list of sorted streams.

Every time I call the method, it returns the next smallest integer, so the result might be [0,1,2,2] if I call the method 4 times.

I want to use PriorityQueue to sort the streams by their first value, and retrieve the smallest one and requeue the stream if it is not empty.

Advertisement

Answer

Stream is a mean of iteration over the source of data, it’s meant to process the data, not to store it.

Therefore, your question is inherently incorrect. The short answer is no.

It’s not a data structure, you can’t access elements in a stream in the same way as elements in a List or in a Queue.

Have a look at the documentation:

Collections and streams, while bearing some superficial similarities, have different goals. Collections are primarily concerned with the efficient management of, and access to, their elements. By contrast, streams do not provide a means to directly access or manipulate their elements, and are instead concerned with declaratively describing their source and the computational operations which will be performed in aggregate on that source.

As I said, stream is a mean of iteration, but stream pipeline also differs from the Iterator. An Iterator allows retrieving elements one by one. Conversely, a stream pipeline will be either executed and produce a result (as a single value or a collection of values) and will be closed, or will not be executed. It will depend on whether stream has a terminal operation or not.

For instance, this stream is valid, it will compile fine, but it will not get executed:

Stream.of("a", "b", "c").map(String::toUpperCase);

Because it lacks a terminal operation.

Every stream should have a source and a single terminal operation which triggers the execution of the pipeline and produces the result. Intermediate operations like map() and filter(), which are meant to transform the stream, are optional.

You can’t obtain the data from the stream without processing it. And once it’s processed, it can no longer be used.

As a possible remedy for this problem, you might consider wrapping the stream with an object that will maintain separately the first element from the stream source and the stream itself.

public record StreamWrapper(int first, IntStream stream) {}

That approach can be used, it will be sufficient to compare streams by a single value, which should be extracted from the stream source (if the stream source allows that) at the same time when the stream gets generated.


Update

I want to write a function getNextInteger(), that handles a list of sorted streams.

Every time I call the method, it returns the next smallest integer, so the result might be [0,1,2,2] if I call the method 4 times.

That task in not suitable for streams. Unless you can blind on the fact that the data in every stream is already sorted.

If we combine all the streams in one and apply sorting, it will not cause a gigantic performance hit as it might seem at the beginning. In order to sort the data stream dumps all the elements into an array which in this case will be comprised of sorted subarrays. Because an array of a reference type will be sorted using Timsort, the algorithm implementation will spot all these sorted chunks. I.e. sorting the array composed of partially sorted subarrays isn’t the same as sorting all these data from scratch. Hence, we can consider it as a possible option:

List<Stream<Integer>> streams =
List.of(Stream.of(1, 3), Stream.of(5), Stream.of(2, 6, 7),
        Stream.of(4, 9, 10), Stream.of(8));
        
streams.stream()
    .flatMap(Function.identity())
    .sorted()
    .forEach(num -> System.out.print(num + " "));

Will produce an output:

1 2 3 4 5 6 7 8 9 10 

If printing (or storing into a collection) the overall data sorted in ascending order doesn’t seem satisfactory, and you insist on retrieving only a single value as a result of method invocation, I’ll reiterate that it’s impossible to fetch values one by one continuously from a stream.

For that you need an Iterator as the documentation suggests:

However, if the provided stream operations do not offer the desired functionality, the BaseStream.iterator() and BaseStream.spliterator() operations can be used to perform a controlled traversal.

You can implement a custom iterator that will utilize a PriorityQueue under the hood.

I assume that streams are of type that implements Comparable and streams are sorted (like in the example you’ve provided).

Iterator:

public class QueueBasedIterator<T extends Comparable<T>> implements Iterator<T> {
    private Queue<IteratorWrapper<T>> nextValues = new PriorityQueue<>();
    private List<Iterator> iterators = new ArrayList<>();
    
    @SafeVarargs
    public StreamBasedIterator(Stream<T>... streams) {
        this.iterators = Stream.of(streams).map(Stream::iterator)
            .collect(Collectors.toList());
        
        for (int i = 0; i < iterators.size(); i++) {
            Iterator<T> iterator = iterators.get(i);
            if (iterator.hasNext()) 
                nextValues.add(new IteratorWrapper<T>(i, iterator.next()));
        }
    }
    
    @Override
    public boolean hasNext() {
        return !nextValues.isEmpty();
    }
    
    @Override
    public T next() {
        if (nextValues.isEmpty()) {
            throw new NoSuchElementException();
        }
        
        IteratorWrapper<T> next = nextValues.remove();
        Iterator<T> iterator = iterators.get(next.getPosition());
        if (iterator.hasNext())
            nextValues.add(new IteratorWrapper<T>(next.getPosition(), iterator.next()));
        
        return next.getValue();
    }
}

IteratorWrapper:

class IteratorWrapper<T extends Comparable<T>> implements Comparable<IteratorWrapper<T>> {
    private T value;
    private int position;
    
    public IteratorWrapper(int position, T value) {
        this.value = value;
        this.position = position;
    }
    
    public T getValue() {
        return value;
    }
    
    public int getPosition() {
        return position;
    }
    
    @Override
    public int compareTo(IteratorWrapper<T> o) {
        return this.value.compareTo(o.value);
    }
}

main() – demo

public static void main(String[] args) {
    QueueBasedIterator<Integer> iterator =
        new QueueBasedIterator<>(Stream.of(1, 3), Stream.of(5), Stream.of(2, 6, 7),
                                 Stream.of(4, 9, 10), Stream.of(8));
    
    while (iterator.hasNext()) {
        System.out.print(iterator.next() + " ");
    }
}

Output

1 2 3 4 5 6 7 8 9 10
Advertisement