Module java.base

Interface Gatherer<T,A,R>

Type Parameters:
T - the type of input elements to the gatherer operation
A - the potentially mutable state type of the gatherer operation (often hidden as an implementation detail)
R - the type of output elements from the gatherer operation

public interface Gatherer<T,A,R>
An intermediate operation that processes input elements, optionally mutating intermediate state, optionally transforming the input elements into a different type of output elements, and optionally applies final actions at end-of-upstream. Gatherer operations can be performed either sequentially, or be parallelized -- if a combiner function is supplied.

Examples of gathering operations include, but is not limited to: grouping elements into batches, also known as windowing functions; de-duplicating consecutively similar elements; incremental accumulation functions; incremental reordering functions, etc. The class Gatherers provides implementations of common gathering operations.

API Note:

A Gatherer is specified by four functions that work together to process input elements, optionally using intermediate state, and optionally perform a final operation at the end of input. They are:

Each invocation to initializer(), integrator(), combiner(), and finisher() must return an equivalent result.

Implementations of Gatherer must not capture, retain, or expose to other threads, the references to the state instance, or the downstream Gatherer.Downstream for longer than the invocation duration of the method which they are passed to.

Performing a gathering operation with a Gatherer should produce a result equivalent to:


     Gatherer.Downstream<? super R> downstream = ...;
     A state = gatherer.initializer().get();
     for (T t : data) {
         gatherer.integrator().integrate(state, t, downstream);
     }
     gatherer.finisher().apply(state, downstream);
 

However, the library is free to partition the input, perform the integrations on the partitions, and then use the combiner function to combine the partial results to achieve a gathering operation. (Depending on the specific gathering operation, this may perform better or worse, depending on the relative cost of the integrator and combiner functions.)

In addition to the predefined implementations in Gatherers, the static factory methods of(...) and ofSequential(...) can be used to construct gatherers. For example, you could create a gatherer that implements the equivalent of Stream.map(java.util.function.Function) with:


     public static <T, R> Gatherer<T, ?, R> map(Function<? super T, ? extends R> mapper) {
         return Gatherer.of(
             (unused, element, downstream) -> // integrator
                 downstream.push(mapper.apply(element))
         );
     }
 

Gatherers are designed to be composed; two or more Gatherers can be composed into a single Gatherer using the andThen(Gatherer) method.


     // using the implementation of `map` as seen above
     Gatherer<Integer, ?, Integer> increment = map(i -> i + 1);

     Gatherer<Object, ?, String> toString = map(i -> i.toString());

     Gatherer<Integer, ?, String> incrementThenToString = plusOne.andThen(intToString);
 
AS an example, in order to create a gatherer to implement a sequential Prefix Scan as a Gatherer, it could be done the following way:

     public static <T, R> Gatherer<T, ?, R> scan(
         Supplier<R> initial,
         BiFunction<? super R, ? super T, ? extends R> scanner) {

         class State {
             R current;
             State() {
                 current = initial.get();
             }
         }

         return Gatherer.<T, State, R>ofSequential(
              State::new,
              Gatherer.Integrator.ofGreedy((state, element, downstream) -> {
                  state.current = scanner.apply(state.current, element);
                  return downstream.push(state.current);
              })
         );
     }
 
Implementation Requirements:
Libraries that implement transformation based on Gatherer, such as Stream.gather(Gatherer), must adhere to the following constraints:
  • Gatherers whose initializer is defaultInitializer() are considered to be stateless, and invoking their initializer is optional.
  • Gatherers whose integrator is an instance of Gatherer.Integrator.Greedy can be assumed not to short-circuit, and the return value of invoking Gatherer.Integrator.integrate(Object, Object, Downstream) does not need to be inspected.
  • The first argument passed to the integration function, both arguments passed to the combiner function, and the argument passed to the finisher function must be the result of a previous invocation of the result initializer, integrator, or combiner functions.
  • The implementation should not do anything with the result of any of the result initializer, integrator, or combiner functions other than to pass them again to the integrator, combiner, or finisher functions.
  • Once a state object is passed to the combiner or finisher function, it is never passed to the integrator function again.
  • When the integrator function returns false, it shall be interpreted just as if there were no more elements to pass it.
  • For parallel evaluation, the gathering implementation must manage that the input is properly partitioned, that partitions are processed in isolation, and combining happens only after integration is complete for both partitions.
  • Gatherers whose combiner is defaultCombiner() may only be evaluated sequentially. All other combiners allow the operation to be parallelized by initializing each partition in separation, invoking the integrator until it returns false, and then joining each partitions state using the combiner, and then invoking the finalizer on the joined state. Outputs and state later in the input sequence will be discarded if processing an earlier segment short-circuits.
  • Gatherers whose finisher is defaultFinisher() are considered to not have an end-of-stream hook and invoking their finisher is optional.
Since:
22
See Also:
  • Method Details

    • initializer

      default Supplier<A> initializer()
      A function that produces an instance of the intermediate state used for this gathering operation.

      By default, this method returns defaultInitializer()

      Returns:
      A function that produces an instance of the intermediate state used for this gathering operation
    • integrator

      Gatherer.Integrator<A,T,R> integrator()
      A function which integrates provided elements, potentially using the provided intermediate state, optionally producing output to the provided Gatherer.Downstream.
      Returns:
      a function which integrates provided elements, potentially using the provided state, optionally producing output to the provided Downstream
    • combiner

      default BinaryOperator<A> combiner()
      A function which accepts two intermediate states and combines them into one.

      By default, this method returns defaultCombiner()

      Returns:
      a function which accepts two intermediate states and combines them into one
    • finisher

      default BiConsumer<A,Gatherer.Downstream<? super R>> finisher()
      A function which accepts the final intermediate state and a Gatherer.Downstream handle, allowing to perform a final action at the end of input elements.

      By default, this method returns defaultFinisher()

      Returns:
      a function which transforms the intermediate result to the final result(s) which are then passed on to the supplied downstream consumer
    • andThen

      default <AA, RR> Gatherer<T,?,RR> andThen(Gatherer<? super R,AA,? extends RR> that)
      Returns a composed Gatherer which connects the output of this Gatherer to the input of that Gatherer.
      Type Parameters:
      AA - The type of the state of that Gatherer
      RR - The type of output of that Gatherer
      Parameters:
      that - the other gatherer
      Returns:
      returns a composed Gatherer which connects the output of this Gatherer as input that Gatherer
      Throws:
      NullPointerException - if the argument is null
    • defaultInitializer

      static <A> Supplier<A> defaultInitializer()
      Returns an initializer which is the default initializer of a Gatherer. The returned initializer identifies that the owner Gatherer is stateless.
      Type Parameters:
      A - the type of the state of the returned initializer
      Returns:
      the instance of the default initializer
      See Also:
    • defaultCombiner

      static <A> BinaryOperator<A> defaultCombiner()
      Returns a combiner which is the default combiner of a Gatherer. The returned combiner identifies that the owning Gatherer must only be evaluated sequentially.
      Type Parameters:
      A - the type of the state of the returned combiner
      Returns:
      the instance of the default combiner
      See Also:
    • defaultFinisher

      static <A, R> BiConsumer<A,Gatherer.Downstream<? super R>> defaultFinisher()
      Returns a finisher which is the default finisher of a Gatherer. The returned finisher identifies that the owning Gatherer performs no additional actions at the end of input.
      Type Parameters:
      A - the type of the state of the returned finisher
      R - the type of the Downstream of the returned finisher
      Returns:
      the instance of the default finisher
      See Also:
    • ofSequential

      static <T, R> Gatherer<T,Void,R> ofSequential(Gatherer.Integrator<Void,T,R> integrator)
      Returns a stateless, sequential, gatherer from the supplied logic.
      Type Parameters:
      T - the type of input elements for the new gatherer
      R - the type of results for the new gatherer
      Parameters:
      integrator - the integrator function for the new gatherer
      Returns:
      a new gatherer comprised of the supplied logic
      Throws:
      NullPointerException - if the argument is null
    • ofSequential

      static <T, R> Gatherer<T,Void,R> ofSequential(Gatherer.Integrator<Void,T,R> integrator, BiConsumer<Void,Gatherer.Downstream<? super R>> finisher)
      Returns a stateless, sequential, gatherer from the supplied logic.
      Type Parameters:
      T - the type of input elements for the new gatherer
      R - the type of results for the new gatherer
      Parameters:
      integrator - the integrator function for the new gatherer
      finisher - the finisher function for the new gatherer
      Returns:
      a new gatherer comprised of the supplied logic
      Throws:
      NullPointerException - if any argument is null
    • ofSequential

      static <T, A, R> Gatherer<T,A,R> ofSequential(Supplier<A> initializer, Gatherer.Integrator<A,T,R> integrator)
      Returns a sequential gatherer from the supplied logic.
      Type Parameters:
      T - the type of input elements for the new gatherer
      A - the type of initializer for the new gatherer
      R - the type of results for the new gatherer
      Parameters:
      initializer - the supplier function for the new gatherer
      integrator - the integrator function for the new gatherer
      Returns:
      a new gatherer comprised of the supplied logic
      Throws:
      NullPointerException - if any argument is null
    • ofSequential

      static <T, A, R> Gatherer<T,A,R> ofSequential(Supplier<A> initializer, Gatherer.Integrator<A,T,R> integrator, BiConsumer<A,Gatherer.Downstream<? super R>> finisher)
      Returns a sequential gatherer from the supplied logic.
      Type Parameters:
      T - the type of input elements for the new gatherer
      A - the type of initializer for the new gatherer
      R - the type of results for the new gatherer
      Parameters:
      initializer - the supplier function for the new gatherer
      integrator - the integrator function for the new gatherer
      finisher - the finisher function for the new gatherer
      Returns:
      a new gatherer comprised of the supplied logic
      Throws:
      NullPointerException - if any argument is null
    • of

      static <T, R> Gatherer<T,Void,R> of(Gatherer.Integrator<Void,T,R> integrator)
      Returns a stateless, parallelizable, gatherer from the supplied logic.
      Type Parameters:
      T - the type of input elements for the new gatherer
      R - the type of results for the new gatherer
      Parameters:
      integrator - the integrator function for the new gatherer
      Returns:
      a new gatherer comprised of the supplied logic
      Throws:
      NullPointerException - if any argument is null
    • of

      static <T, R> Gatherer<T,Void,R> of(Gatherer.Integrator<Void,T,R> integrator, BiConsumer<Void,Gatherer.Downstream<? super R>> finisher)
      Returns a stateless, parallelizable, gatherer from the supplied logic.
      Type Parameters:
      T - the type of input elements for the new gatherer
      R - the type of results for the new gatherer
      Parameters:
      integrator - the integrator function for the new gatherer
      finisher - the finisher function for the new gatherer
      Returns:
      a new gatherer comprised of the supplied logic
      Throws:
      NullPointerException - if any argument is null
    • of

      static <T, A, R> Gatherer<T,A,R> of(Supplier<A> initializer, Gatherer.Integrator<A,T,R> integrator, BinaryOperator<A> combiner, BiConsumer<A,Gatherer.Downstream<? super R>> finisher)
      Returns a parallelizable gatherer from the supplied logic.
      Type Parameters:
      T - the type of input elements for the new gatherer
      A - the type of initializer for the new gatherer
      R - the type of results for the new gatherer
      Parameters:
      initializer - the supplier function for the new gatherer
      integrator - the integrator function for the new gatherer
      combiner - the combiner function for the new gatherer
      finisher - the finisher function for the new gatherer
      Returns:
      a new gatherer comprised of the supplied logic
      Throws:
      NullPointerException - if any argument is null