Gathering the streams

Author: Viktor Klang (version 0.2)

Introduction

Java 8 introduced the java.util.stream API, which represents a lazily computed, potentially unbounded sequence of values (Streams was also the first designed-for-lambdas API in the JDK). Streams supports the ability to process the stream either sequentially or in parallel.

A Stream pipeline consists of a source (collection, array, generator, etc), zero or more intermediate operations (Stream -> Stream transforms), and an eager terminal operation which produces a value or a side-effect.

The Streams API come with a reasonably rich, but fixed set of built-in operations (mapping, filtering, reduction, sorting, etc), as well as an extensible terminal operation (Stream::collect) that enables the stream contents to be flexibly summarized in a variety of forms. The resulting API is rich enough that users have had good experience with streams, but there are repeated requests for “please add operation X to streams”.

In this document, we explore a corresponding extensible intermediate operation, called Stream::gather, which is able to address many of the requests we’ve gotten for additional operations.

“Missing” intermediate operations

Over the years, many new operations for streams have been proposed. Each of them may be useful in some situation, but many of them are too narrow to be included in the core Stream API. We’d like for people to be able to plug these operations in via an extension point, as with Stream::collect.

Here are some examples of proposed intermediate operations that are not easily expressible as intermediate operations on Stream today.

These are just a few of the stream operations people have wished for— and while many operations are too specialized to include in the core library, all are things we’d like for people to be able to express with streams.

Extending streams

It was always desired for java.util.stream.Stream to have an extension point for intermediate operations; when the library was first built, this wish-list item was recorded: https://bugs.openjdk.org/browse/JDK-8132369. There were many reasons that this was not included in the initial version, the most important being that it was not at all obvious what the right API was.

The set of intermediate operations on Streams carries the exact same trade-off as CISC vs RISC, namely: either you have to provide a rather sizeable set of specialized operations, or you provide a small set of general operations.

For the CISC-strategy, the main drawback is that not all operations are equally useful, and having a large set of operations to choose from creates a barrier to effective usage by making the correct choice harder to identify.

For the RISC-strategy, the main drawback is that it puts more effort on the user to encode more specialized operations over a set of generic operations, making code harder to read, and harder to maintain, for the user.

However, one success story was Collector; this allowed us to extend terminal operations with a good balance of expressiveness, reuse, and API footprint. We would like to do the same for intermediate operations – and we can draw some inspiration from Collector to get there.

Requirements

Breaking down operations into intermediate and terminal is a useful distinction, however, in reality there are many more characteristics we could use to describe operations, and relatively few operations use exactly the same subset of characteristics. The following is a curated set of characteristics of stream operations; if we want to enable generic, user-defineable, intermediate operations on streams, we probably have to provide for them all.

OrdinalFeatureWhat this means in practiceExample operation(s)
1Intermediate operationAdd custom new operations to Stream pipelinesmap, filter
2Incremental operationEvaluate the Stream pipeline depth-firstmap, filter
3Stateful operationRemember information between processing elementslimit, foldLeft
4N:M inputs-to-outputsEncode a wide array of operationslimit, flatMap
5Prefix consumptionAn operation should consume only as much as neededlimit, takeWhile
6Beginning-of-stream hookAbility to have initialization logicfoldLeft, grouped
7End-of-stream hookFlush elements downstreamsorted, grouped
8Opt-in parallelizabilitySpeed up CPU-bound processing for parallelizable operationsreduce, map

Now when we have a better idea of what we need to achieve, let’s have a look at what our options are.

Comparing existing operations

Let’s look at the operations we already have, to see if they would fit the bill as a generic extension point for intermediate Stream-operations.

These are, in order, Stream.map(), Stream.flatMap(…) (Stream.mapMulti(…) is analogous), and Collector & Stream.collect(…)—and in the table below we see how they line up against our requirements.

OrdinalFeatureUsing Stream.map(…)Using Stream.flatMap(…)Using Collector
1Intermediate operationYesYesNo, terminal
2Incremental operationYesYesNo, terminal
3Stateful operationYes, via class or captureYes, via class or captureYes
4N:M inputs-to-outputsNo, 1:1No, 1:MNo, N:1
5Prefix consumptionNoNoNo
6Beginning-of-stream hookNoNoYes
7End-of-stream hookNoNoYes
8Opt-in parallelizabilityNo, always onNo, always onNo, always on

Looking at the options above, none of them are going to, as-is, fit the bill as an extension point for generic intermediate Stream operations, but one of them has more promise than the others: Collector. Obvious needed upgrades are supporting short-circuiting and incremental generation of output; let’s see what that looks like in practice.

Collector 2.0

Let’s start with a clone of Collector<T,A,R> called Gatherer<T,A,R>:

/** @param <T> the element type
 *  @param <A> the (mutable) intermediate accumulation type
 *  @param <R> the (probably immutable) final accumulation type
 */
interface Gatherer<T,A,R> {
    Supplier<A> supplier();
    BiConsumer<A, T> accumulator();
    BinaryOperator<A> combiner();
    Function<A, R> finisher();
}

Now, we need to make it output 0…M elements instead of strictly 1. In order to do that, we need a way to programmatically output elements from the accumulator to make it incremental (and has the added benefit that it would be able to run depth-first). Adding an additional argument, a downstream handle, to accumulator and finisher allows us to push elements downstream both for each input, and at the end of stream. As there exists no suitable pre-existing interface, we’ll define a new interface named Integrator and rename accumulator to the more fitting integrator. Our finisher will also not have a useful return value, so we’ll make it void:

/** @param <T> the element type
 *  @param <A> the (mutable) intermediate accumulation type
 *  @param <R> the (probably immutable) final accumulation type
 */
interface Gatherer<T,A,R> {
    interface Integrator<A,T,R> {
        void integrate(A state, T element, Consumer<? super R> downstream);
    }
    Supplier<A> supplier();
    Integrator<A, T, R> integrator();
    BinaryOperator<A> combiner();
    BiConsumer<A, Consumer<? super> R> finisher();
}

Next, we need to tackle short-circuiting and add an upstream signal from each invocation of the integrator so we can detect when the integration does not want any more elements. We do this by returning a boolean from integrate:

/** @param <T> the element type
 *  @param <A> the (mutable) intermediate accumulation type
 *  @param <R> the (probably immutable) final accumulation type
 */
interface Gatherer<T,A,R> {
    interface Integrator<A,T,R> {
        boolean integrate(A state, T element, Consumer<? super R> downstream);
    }
    Supplier<A> supplier();
    Integrator<A, T, R> integrator();
    BinaryOperator<A> combiner();
    BiConsumer<A, Consumer<? super> R> finisher();
}

Transitively supporting short-circuiting, i.e. that a downstream handle does not want to receive any more elements, requires us to make the same change as we did for integrate—return a boolean instead of void, and since the only built-in candidate would be Predicate, which is not supposed to have side-effects, let’s introduce our own type Sink:

/** @param <T> the element type
 *  @param <A> the (mutable) intermediate accumulation type
 *  @param <R> the (probably immutable) final accumulation type
 */
interface Gatherer<T,A,R> {
    interface Sink<R> {
        boolean flush(R element);
    }
    interface Integrator<A,T,R> {
        boolean integrate(A state, T element, Sink<? super R> downstream);
    }
    Supplier<A> supplier();
    Integrator<A, T, R> integrator();
    BinaryOperator<A> combiner();
    BiConsumer<A, Sink<? super R>> finisher();
}

Finally, we need to address parallelism. To opt-in to parallelism we can make the combiner optional – we call it only for parallel executions, and if it is not present, execution is constrained to sequential.

To demonstrate how all of the above fits together, we implement fixedWindow(N) where fixedWindow(2) over the values [1,2,3,4,5] yields [[1,2],[3,4],[5]].

    /**
     * Gathers elements into fixed-size windows. The last window can contain fewer elements.
     * @param windowSize the size of the windows
     * @return a new gatherer which gathers elements into fixed-size windows
     * @param <T> the type of elements this Gatherer gathers
     */
    public static <T> Gatherer<T, ?, List<T>> fixedWindow(int windowSize) {
        if (windowSize < 1)
            throw new IllegalArgumentException("'windowSize' must be greater than zero");
        else {
            Supplier<ArrayList<T>> supplier =
                () ->  new ArrayList<T>(windowSize);

            Integrator<ArrayList<T>, T, List<T>> integrator =
                (openWindow, element, downstream) -> {
                    if (openWindow.add(element) && openWindow.size() >= windowSize) {
                        var closedWindow = List.copyOf(openWindow);
                        openWindow.clear();
                        return downstream.flush(closedWindow);
                    } else
                        return true;
                };

            // This combiner signals that we opt-out of parallelisation
            BinaryOperator<ArrayList<T>> combiner = 
                Gatherer.unsupportedCombiner();

            BiConsumer<ArrayList<T>, Sink<? super List<T>>> finisher =
                (openWindow, downstream) -> {
                    if(!openWindow.isEmpty()) {
                        downstream.flush(List.copyOf(openWindow));
                        openWindow.clear();
                    }
                };

            return Gatherer.of(supplier, integrator, combiner, finisher);
        }
    }

Or by implementing the Gatherer interface directly like so:

    /**
     * Gathers elements into fixed-size windows. The last window can contain fewer elements.
     * @param windowSize the size of the windows
     * @return a new gatherer which gathers elements into fixed-size windows
     * @param <T> the type of elements this Gatherer gathers
     */
    public static <T> Gatherer<T, ?, List<T>> fixedWindow(int windowSize) {
        if (windowSize < 1)
            throw new IllegalArgumentException("'windowSize' must be greater than zero");
        
        class FixedWindowGatherer implements Gatherer<T,ArrayList<T>,List<T>> {
            @Override
            public Supplier<ArrayList<T>> initializer() {
                return () -> new ArrayList<>(windowSize);
            }

            @Override
            public Integrator<ArrayList<T>, T, List<T>> integrator() {
                return (openWindow, element, downstream) -> {
                    if (openWindow.add(element) && openWindow.size() >= windowSize) {
                        var closedWindow = List.copyOf(openWindow);
                        openWindow.clear();
                        return downstream.flush(closedWindow);
                    } else
                        return true;
                };
            }

            @Override
            public BinaryOperator<ArrayList<T>> combiner() {
                return Gatherer.unsupportedCombiner();
            }

            @Override
            public BiConsumer<ArrayList<T>, Sink<? super List<T>>> finisher() {
                return (openWindow, downstream) -> {
                    if(!openWindow.isEmpty()) {
                        downstream.flush(List.copyOf(openWindow));
                        openWindow.clear();
                    }
                };
            }
        }
        return new FixedWindowGatherer();
    }

Compared to Collector, it would look like the following:

OrdinalFeatureCollector<T,A,R>Gatherer<T,A,R>
1Beginning-of-stream hookA supply()A supply()
2Per elementvoid accept(A, T)boolean integrate(A, T, Sink<R>)
3End-of-stream hookR apply(A)void accept(A, Sink<R>)
4ParallelizabilityA apply(A, A)A apply(A, A)

To integrate it with java.util.stream.Stream we add the following method to it: Stream<R> gather(Gatherer<T,?,R> gatherer)

Now we can use our fixedWindow Gatherer:

jshell> Stream.iterate(0, i -> i + 1).limit(10).gather(fixedWindow(2)).toList();
$1 ==> [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9]]

Gatherer now provides all features:

OrdinalFeatureUsing GathererExplanation
1Intermediate operationYesA Gatherer’s output is the following operation’s input
2Incremental operationYesA Gatherer can produce elements in response to consuming elements
3Stateful operationYesA Gatherer supplies its own state, which can be Void/null if stateless
4N:M inputs-to-outputsYesA Gatherer can consume and produce any number of elements
5Prefix consumptionYesA Gatherer can signal that it is done by returning false from its integrate-method
6Beginning-of-stream hookYesA Gatherer can run logic when creating its initial state
7End-of-stream hookYesA Gatherer’s finisher is invoked at the end of input
8Opt-in parallelizabilityYesA Gatherer’s combiner is optional

Summary

None of the current candidates for a user-extensible API for generic intermediate stream operations fit the set of requirements, but Collector can form an excellent basis for a new construct, called Gatherer, which after a limited set of modifications checks all the boxes:

OrdinalFeatureUsing Stream.map(…)Using Stream.flatMap(…)Using CollectorUsing Gatherer
1Intermediate operationYesYesNo, terminalYes
2Incremental operationYesYesNo, terminalYes
3Stateful operationYes, via class or captureYes, via class or captureYesYes
4N:M inputs-to-outputsNo, 1:1No, 1:MNo, N:1Yes
5Prefix consumptionNoNoNoYes
6Beginning-of-stream hookNoNoYesYes
7End-of-stream hookNoNoYesYes
8Opt-in parallelizabilityNo, always onNo, always onNo, always onYes

As it turned out, we were a short hop away from a generic, stand-alone, reusable, parallelizable, API for intermediate operations all along!

But wait … there’s more

While the aforementioned solution contains the bare-minimum to meet our requirements, the following will provide some significant quality-of-life improvements.

Compositionality

The astute reader may have noticed that by adding the gather(Gatherer)-method on Stream, one can in-effect compose Gatherers together:

Gatherer<Integer,?,String> someGatherer =;
Gatherer<String,?,Long> someOtherGatherer =;
Stream<Long> stream = Stream.of(1).gather(someGatherer).gather(someOtherGatherer);

As it turns out, it is possible to define a default Gatherer andThen(Gatherer)-method on Gatherer which composes Gatherers:

Gatherer<Integer,?,String> someGatherer =;
Gatherer<String,?,Long> someOtherGatherer =;
Gatherer<Integer,?,Long> gatherer = someGatherer.andThen(someOtherGatherer);

This makes it possible to both de-couple operations from their use-sites, as well as create more sophisticated Gatherers from simple ones.

But that’s not all—by introducing a default Collector collect(Collector)-method it is also possible to compose a Gatherer with a Collector, and the result is another Collector:

Gatherer<Integer,?,String> someGatherer =;
Gatherer<String,?,Long> someOtherGatherer =;
Collector<Long,?,List<Long>> someGatherer.andThen(someOtherGatherer).collect(Collectors.toList());

All of this together, it is now possible to build Stream-processing: “source-to-destination” using Stream.gather(…).gather(…).gather(…); “intermediaries-first” using gatherer.andThen(…).andThen(…); and, “destination-to-source” using gatherer.collect(otherGatherer.collect(…)).

Familiarity

Since Gatherer is an evolution of Collector there’s a clear on-ramp for any and all Java developers who are already familiar with Collector.

Reusability

Gatherers, by virtue of being stand-alone, reusable, intermediaries, paired with compositionality and opt-in parallelization, allows for transformations to be created once and maintained in a single place, no matter if it is going to be used as a Collector, a Gatherer, or a Stream.

Optimizability

Collector has the notion of Characteristics, a Set of flags which describes them. This information can then be used by the underlying machinery which uses them to optimize their evaluation. The same approach can be used for Gatherers.

Summary 2.0

Taking all of the above into consideration, we end up with a definition like the following:

/** @param <T> the element type
 *  @param <A> the (mutable) intermediate accumulation type
 *  @param <R> the (probably immutable) final accumulation type
 */
interface Gatherer<T,A,R> {

    interface Sink<R> {
        boolean flush(R element);
    }
    
    interface Integrator<A,T,R> {
        boolean integrate(A state, T element, Sink<? super R> downstream);
    }

    enum Characteristics {
        GREEDY,          // Never short-circuits
        SIZE_PRESERVING, // Emits exactly once per input element
        STATELESS;       // No need to initialize or combine state
    }

    Supplier<A> supplier();
    Integrator<A, T, R> integrator();
    BinaryOperator<A> combiner();
    BiConsumer<A, Sink<? super R>> finisher();
    Set<Characteristics> characteristics();

    default <AA, RR> Gatherer<T,?,RR> andThen(Gatherer<R,AA,RR> that) {
        // Gatherers is analoguous to Collectors
        return Gatherers.Composite.of(this, that);
    }

    default <RR> Collector<T,?,RR> collect(Collector<R, ?, RR> collector) {
        // Gatherers is analoguous to Collectors
        return Gatherers.GathererCollector.of(this, collector);
    }
}

Appetizers

The following examples showcase how Gatherer can be used to implement a diverse set of pre-existing, and future, intermediate operations for Streams.

How to implement map(mapper)

public final static <T,R> Gatherer<T, ?, R> map(Function<? super T, ? extends R> mapper) {
    return Gatherer.of(
        () -> (Void)null,
        (nothing, element, downstream) ->
            downstream.flush(mapper.apply(element)),
        (l,r) -> l,
        (nothing, downstream) -> {}
    );
}
jshell> Stream.of(1,2,3,4).gather(map(i -> i + 1)).toList()
$1 ==> [2, 3, 4, 5]

How to implement flatMap(mapper)

public final static <T,R> Gatherer<T, ?, R> flatMap(Function<? super T, ? extends Stream<R>> mapper) {
    return Gatherer.of(
        () -> (Void)null,
        (nothing, element, downstream) -> {
            try(Stream<? extends R> s = mapper.apply(element)) {
                return s == null || s.sequential().allMatch(downstream::flush);
            }
        },
        (l,r) -> l,
        (nothing, downstream) -> {}
    );
}
jshell> Stream.of(1,2,3,4).gather(flatMap(i -> Stream.of(i + 1))).toList()
$1 ==> [2, 3, 4, 5]

How to implement takeWhile(predicate)

public final static <T> Gatherer<T, ?, T> takeWhile(Predicate<? super T> predicate) {
    return Gatherer.of(
        () -> (Void)null,
        (nothing, element, downstream) ->
            predicate.test(element) && downstream.flush(element),
        (l, r) -> l,
        (nothing, downstream) -> {}
    );
}
jshell> Stream.of(1,2,3,4).gather(takeWhile(i -> i < 3)).toList()
$1 ==> [1, 2]

How to implement limit(N)

public static <M> Gatherer<M, ?, M> limit(long limit) {
    if (limit < 0)
        throw new IllegalArgumentException("'limit' has to be non-negative");

    class At { long n = 0; }
    return Gatherer.of(
            At::new,
            (at, element, downstream) ->
                at.n < limit && downstream.flush(element) && ++at.n < limit,
            Gatherer.unsupportedCombiner(),
            (at, downstream) -> {}
    );
}
jshell> Stream.of(1,2,3,4).gather(limit(2)).toList()
$1 ==> [1, 2]

How to implement prefix scanLeft

public static <T,R> Gatherer<T,?,R> scanLeft(R initial, BiFunction<R,T,R> scanner) {
    class State { R current = initial; }
    return Gatherer.of(
            State::new,
            (state, element, downstream) ->
                downstream.flush(state.current = scanner.apply(state.current, element)),
            Gatherer.unsupportedCombiner(),
            (state, downstream) -> {}
    );
}
jshell> Stream.of(1,2,3,4).gather(scanLeft("'", (acc, elem) -> acc + elem + "'")).toList()
$1 ==> ['1', '1'2', '1'2'3', '1'2'3'4']

How to implement fold(initial, folder)

public static <T> Gatherer<T,?,T> fold(T initial, BinaryOperator<T> folder) {
    class State { T current = initial; }
    return Gatherer.of(
        State::new,
        (state, element, downstream) -> {
            state.current = folder.apply(state.current, element);
            return true;
        },
        (l, r) -> {
            l.current = folder.apply(l.current, r.current);
            return l;
        },
        (state, downstream) -> downstream.flush(state.current)
    );
}
jshell> Stream.of(1,2,3,4).gather(fold(0, (acc, elem) -> acc + elem)).toList()
$1 ==> [10]

How to implement foldLeft(initial, folder)

public static <T,R> Gatherer<T,?,R> foldLeft(R initial, BiFunction<R,T,R> folder) {
    class State { R current = initial; }
    return Gatherer.of(
        State::new,
        (state, element, downstream) -> {
            state.current = folder.apply(state.current, element);
            return true;
        },
        Gatherer.unsupportedCombiner(),
        (state, downstream) -> downstream.flush(state.current)
    );
}
jshell> Stream.of(1,2,3,4).gather(foldLeft(0L, (acc, elem) -> acc + elem)).toList()
$1 ==> [10]

How to implement deduplicateAdjacent()

public static <T> Gatherer<T,?,T> deduplicateAdjacent() {
    class State { T prev; boolean hasPrev; }
    return Gatherer.of(
        State::new,
        (state, element, downstream) -> {
            if (!state.hasPrev) {
                state.hasPrev = true;
                state.prev = element;
                return downstream.flush(element);
            } else if (!Objects.equals(state.prev, element)) {
                state.prev = element;
                return downstream.flush(element);
            } else {
                return true; // skip duplicate
            }
        },
        Gatherer.unsupportedCombiner(),
        (state, downstream) -> {}
    );
}
jshell> Stream.of(1,2,2,3,2,4).gather(deduplicateAdjacent()).toList()
$1 ==> [1, 2, 3, 2, 4]