This is an informal overview of the major proposed library enhancements to take advantage of new language features, primarily lambda expressions and extension methods, specified by JSR 335 and implemented in the OpenJDK Lambda Project. This version describes "Iteration 3" of the API design; for comparison, see Iteration 1. We are in the process of implementing this in the lambda repository.
Had lambda expressions (closures) been part of the Java language from
the beginning, our Collections APIs would certainly look different
than they do today. As the Java language acquires lambda expressions
as part of JSR 335, this has the unfortunate side effect of making
our Collections interfaces look even more out of date! While it might
be tempting to start from scratch and build a replacement Collections
framework ("Collections II"), replacing the Collections framework would
be a major task, as the Collections interfaces permeate the JDK
libraries. Instead, we will pursue an evolutionary strategy of adding
extension methods to existing interfaces (such as Collection
,
List
, or Iterable
), and adding new interfaces (such as Stream
)
that are retrofitted onto existing classes, enabling many of the
desired idioms without making people trade in their trusty
ArrayList
s and HashMap
s. (This is not to say that Java will never
have a new Collections framework; clearly there are limitations with
the existing Collections framework beyond simply not being designed
for lambdas. Creating a new-and-improved collections framework is a
fine candidate for consideration in a future version of the JDK.)
Parallelism is an important driver for this work. Therefore, it is important to encourage idioms that are both sequential- and parallel-friendly. We achieve this by focusing less on in-place mutation and more on computations that produce new values. It is also important to strike the balance between making parallelism easier but not going so far as to make it invisible; our goal is explicit but unobstrusive parallelism for both old and new collections.
For a description of the language features being specified by [JSR 335][], see the State of the Lambda.
The Collections framework relies on the concept of external
iteration, where a Collection
provides a way for its client to
enumerate its elements (Collection
extends Iterable
), and clients
use this to step sequentially through the elements of a collection.
For example, if we wanted to set the color of every shape in a
collection of shapes to red, we would write:
for (Shape s : shapes) {
s.setColor(RED);
}
This example illustrates external iteration; the for-each loop calls
the iterator()
method of shapes
, and steps through the collection
one by one. External iteration is straightforward enough, but it has
several problems:
Sometimes the tight specification of the for-each loop (sequential, in-order) is desirable, but sometimes this is an impediment to performance.
The alternative to external iteration is internal iteration, where instead of controlling the iteration, the client delegates that to the library and passes in snippets of code to execute at various points in the computation.
The internal-iteration equivalent of the previous example is:
shapes.forEach(s -> { s.setColor(RED); });
This approach moves the control flow management from the client code
to the library code, allowing the libraries not only to abstract over
common control flow operations, but also enabling them to potentially
use laziness, parallelism, and out-of-order execution to improve
performance. (Whether the implementation of forEach
actually does
any of these things is a matter for the library implementor forEach
to decide, but with internal iteration they are at least possible,
whereas with external iteration, they are not.)
Internal iteration lends itself to a programming style where operations can be "pipelined" together. For example, if we wanted to color only the blue shapes red, we could say:
shapes.stream()
.filter(s -> s.getColor() == BLUE)
.forEach(s -> { s.setColor(RED); });
The filter operation produces a stream of values matching the
provided condition, and the result of the filter operation is piped
into forEach
.
If we wanted to collect the blue shapes into a new List
, we could say:
List<Shape> blue = shapes.stream()
.filter(s -> s.getColor() == BLUE)
.into(new ArrayList<>());
If each shape were contained in a Box
, and we wanted to know which
boxes contained at least one blue shape, we could say:
Set<Box> hasBlueShape = shapes.stream()
.filter(s -> s.getColor() == BLUE)
.map(s -> s.getContainingBox())
.into(new HashSet<>());
If we wanted to add up the total weight of the blue shapes, we could express that as:
int sum = shapes.stream()
.filter(s -> s.getColor() == BLUE)
.map(s -> s.getWeight())
.sum();
So far, we haven't yet written down the signatures of these operations or how they relate to existing Collections classes. These examples simply illustrate the types of problems that are easily solved with internal iteration, and illustrate functionality we want to expose on collections.
Operations like filtering or mapping, can be performed "eagerly"
(where the filtering is complete on the return from the filter
method), or "lazily" (where the filtering is only done when you start
iterating the elements of the result of the filter
method.) Stream
operations that produce new streams, such as filtering or mapping,
lend themselves to lazy implementation, which often can result in
significant performance improvements. We can think of these
operations as "naturally lazy", whether or not they are implemented as
such. On the other hand, operations like accumulation, or those that
produce side effects such as dumping the results into a collection or
doing something for each element (such as printing them out), are
"naturally eager."
Based on examination of many existing loops, a significant proportion can be restated as bulk operations drawing from a data source (array or collection), doing a series of lazy operations (filtering, mapping, etc), and then doing a single eager operation -- such as filter-map-accumulate, filter-map-sort-foreach, etc. Accordingly, most of the naturally lazy operations tend to be used to compute temporary intermediate results, and we can exploit this property to produce more efficient libraries. (For example, a library that does filtering or mapping lazily can fuse pipelines like filter-map-accumulate into a single pass on the data, rather than three distinct passes; a library that does them eagerly cannot. Similarly, if we are looking for the first element that matches a certain characteristic, a lazy approach lets us get to the answer having examined fewer elements.)
This observation informs a critical design choice: what should the
return type of filter
and map
be? One candidate would be that
List.filter
returns a new List
, which would push us towards an
all-eager approach. This is straightforward, but may well end up
doing way more work than we really need. Another approach would be to
create a whole new set of abstractions for explicit laziness --
LazyList
, LazySet
, etc. (But note that lazy collections would probably still
have operations that trigger eager computation -- such as size
.)
And, this approach has the risk to devolve into a combinatorial
explosion of types like MutableSynchronizedLazySortedSet
, etc.
Our preferred approach is to treat the naturally-lazy operations as
returning a stream rather than a new collection (which might just get
thrown away by the next pipeline stage anyway). Applying this to the
examples above, filter
draws from a source and produces a stream of values matching the provided
Predicate
. In most cases where potentially-lazy operations are
being applied to aggregates, this turns out to be exactly what we want
-- a stream of values that can be passed to the next stage in the
pipeline.
The stream approach has the advantage that, when used in a source-lazy-lazy-eager pipeline, the laziness is mostly invisible, as the pipeline is "sealed" at both ends with instantiated data structures, but yields both good usability and performance without dramatically increasing the conceptual surface area of the library.
A basic set of stream operations is shown below. The methods that return a new
Stream
are termed intermediate operations; those that do not are termed
terminal operations.
Streams differ from Collections in several ways:
The following shows a basic set of stream operations.
public interface Stream<T> {
Stream<T> filter(Predicate<? super T> predicate);
<R> Stream<R> map(Mapper<? extends R, ? super T> mapper);
<R> Stream<R> flatMap(FlatMapper<? extends R, ? super T> mapper);
Stream<T> uniqueElements();
Stream<T> sorted(Comparator<? super T> comparator);
Stream<T> cumulate(BinaryOperator<T> operator);
void forEach(Block<? super T> block);
Stream<T> tee(Block<? super T> block);
Stream<T> limit(int n);
Stream<T> skip(int n);
<A extends Destination<? super T>> A into(A target);
Object[] toArray();
<U> Map<U, Collection<T>> groupBy(Mapper<? extends U, ? super T> classifier);
<U, W> Map<U, W> reduceBy(Mapper<? extends U, ? super T> classifier,
Factory<W> baseFactory,
Combiner<W, W, T> reducer);
T reduce(T base, BinaryOperator<T> op);
Optional<T> reduce(BinaryOperator<T> op);
<U> U fold(Factory<U> baseFactory,
Combiner<U, U, T> reducer,
BinaryOperator<U> combiner);
boolean anyMatch(Predicate<? super T> predicate);
boolean allMatch(Predicate<? super T> predicate);
boolean noneMatch(Predicate<? super T> predicate);
Optional<T> findFirst();
Optional<T> findAny();
Stream<T> sequential();
Stream<T> unordered();
}
Rather than retrofitting all the Stream
methods onto Collection
, as was done in
an earlier iteration of the API, instead we add a single stream()
method into Collection
which yields a stream backed by the collection.
Stream operations can operate in either serial or parallel; whether the stream is serial
or parallel is a property of the stream source. We have also added a parallel()
method
to Collection that returns a parallel Stream
.
Stream functionality is only tangentially tied to Collections;
aggregates other than Collection
can be used as sources for streams as well.
All of the stream operations can be implemented in terms of
iteration, so the minimum needed to create a Stream
is to create an Iterator
for the elements. If additional information is available (such as size or metadata
about stream contents, such as sortedness), the library can provide
optimized implementations. (To execute in parallel, a stream source needs to provide a
Spliterator
which manages decomposition in addition to iteration.)
Methods like anyMatch
, while eager, can use short-circuiting to stop
processing once they can determine the final result -- it need only
evaluate the predicate on enough elements to find a single element for
which the predicate is true.
In a pipeline like:
int sum = shapes.stream()
.filter(s -> s.getColor() == BLUE)
.map(s -> s.getWeight())
.sum();
the filter
and map
operations are lazy. This means that we don't
start drawing elements from the source until we start the sum
step,
minimizing the bookkeeping costs required to manage intermediate
elements. Additionally, given a pipeline like:
Optional<Shape> firstBlue = shapes.stream()
.filter(s -> s.getColor() == BLUE)
.findFirst();
Because the filter step is lazy, the findFirst
step will only draw
from upstream until it gets an element, which means we need only
evaluate the predicate on elements until we find one for which the
predicate is true, rather than all of them. The findFirst()
method
returns an Optional
, since it might be the case that there were no
elements matching the desired criteria. Optional
provides a means
to describe a value that might or might not be present.
Note that the user didn't have to ask for laziness, or even think about it very much; the right thing happened, with the library arranging for as little computation as it could.
Lambda expressions in Java are converted into instances of one-method interfaces (functional
interfaces). The package java.util.function
contains a "starter set" of functional interfaces:
Additionally, we plan to provide specialized
primitive versions of these core interfaces. Rather than provide the full
complement of primitive specializations, we intend to provide
versions for Integer
, Long
, and Double
, and the other
primitive types can be accomodated through conversions.) Similarly,
we plan to provide specialized versions for different arities; for example,
BiFunction<T,U,V>
represents a function from (T,U)
to V
.
Because the stream source might be a mutable collection, there is the
possibility for interference if the source is modified while it is
being traversed. The stream operations are intended to be
used while the underlying source is held constant for the duration of
the operation. (This condition is generally easy to maintain; if the
collection is confined to the current thread, simply ensure that the
lambda expressions passed to filter
, map
, etc., do not mutate the
underlying collection. This condition is not substantially different
from the restrictions on iterating Collections today; if a Collection
is modified while being iterated, most implementations throw
ConcurrentModificationException
.)
Below is an example from the JDK class Class
(the getEnclosingMethod
method), which loops over all declared methods,
matching method name, return type, and number and type of parameters. Here is the original code:
for (Method m : enclosingInfo.getEnclosingClass().getDeclaredMethods()) {
if (m.getName().equals(enclosingInfo.getName()) ) {
Class<?>[] candidateParamClasses = m.getParameterTypes();
if (candidateParamClasses.length == parameterClasses.length) {
boolean matches = true;
for(int i = 0; i < candidateParamClasses.length; i++) {
if (!candidateParamClasses[i].equals(parameterClasses[i])) {
matches = false;
break;
}
}
if (matches) { // finally, check return type
if (m.getReturnType().equals(returnType) )
return m;
}
}
}
}
throw new InternalError("Enclosing method not found");
Using filter
and getFirst
, we can eliminate all the temporary
variables and move the control logic into the library. We fetch the
list of methods via reflection, turn it into a Stream
with
Arrays.stream
, and then use a series of filters to reject the ones
that don't match name, parameter types, or return type. The result of
findFirst
is an Optional<Method>
, and we then either fetch and return
the resulting method or throw an exception.
return Arrays.stream(enclosingInfo.getEnclosingClass().getDeclaredMethods())
.filter(m -> Objects.equals(m.getName(), enclosingInfo.getName())
.filter(m -> Arrays.equals(m.getParameterTypes(), parameterClasses))
.filter(m -> Objects.equals(m.getReturnType(), returnType))
.findFirst()
.getOrThrow(() -> new InternalError("Enclosing method not found");
This version of the code is more compact, more readable, and less error-prone.
Stream operations are very effective for ad-hoc queries over collections. Consider a hypothetical "music library" application, where a library has a list of albums, an album has a title and a list of tracks, and a track has a name, artist, and rating.
Consider the query "find me the names of albums that have at least one track rated four or higher, sorted by name." To construct this set, we might write:
List<Album> favs = new ArrayList<>();
for (Album a : albums) {
boolean hasFavorite = false;
for (Track t : a.tracks) {
if (t.rating >= 4) {
hasFavorite = true;
break;
}
}
if (hasFavorite)
favs.add(a);
}
Collections.sort(favs, new Comparator<Album>() {
public int compare(Album a1, Album a2) {
return a1.name.compareTo(a2.name);
}});
We can use the stream operations to simplify each of the three major
steps -- identification of whether any track in an album has a rating
of at least for (anyMatch
), the sorting, and the collection of
albums matching our criteria into a List
:
List<Album> sortedFavs =
albums.stream()
.filter(a -> a.tracks.anyMatch(t -> (t.rating >= 4)))
.sorted(comparing(a -> a.name))
.into(new ArrayList<>());
Unlike a Collection
, there is nothing about a stream that requires it to
be of finite size. While certain operations on an infinite stream (such as
forEach
) would never terminate under normal conditions, there are many
operations that can deal perfectly well with infinite streams (e.g., limit
will truncate a stream; findFirst
or findAny
will terminate as soon as they
find a match, etc.) Similarly, an infinite stream can be turned into an Iterator
and iterated directly.
Some collections permit nulls; some do not. There were three choices we could have made about the role of nulls in streams:
We have chosen the "null agnostic" strategy for the Streams classes. The "error" option
would have outlawed some useful pipelines that could perfectly well handle nulls; the
"ignore" option would have undermined some useful invariants and optimizations (such as,
for size-preserving operations like map
, the stream input and output are the same size).
The "null agnostic" strategy leaves maximal flexibility (at the cost of more complicated
reasoning) to the user. If all the operations (and the lambdas they are passed) can handle
nulls, everything is fine. If you prefer the "error" strategy for a given pipeline, you can add
a filter(e -> { (if e == null) throw new NPE(); }
stage; if you prefer the "ignore" strategy,
you can add a filter(e -> e != null)
stage.
Many use cases for bulk operations on collections produce a new value,
collection, or side-effect. However, sometimes we do want to mutate
the collection in-place. The primary in-place mutations on
Collection
and friends that we intend to add are:
Iterator
or Iterable
(Iterable.forEach(Block)
and Iterator.forEach (Block)
)Collection.removeAll(Predicate)
)List.replaceAll(Function)
, similar for Map
)List.replaceAllIf(Predicate, Function)
, similar for Map
)List.sort(Comparator)
)These will be added as default methods on the appropriate interface.
While the use of internal iteration makes it possible that operations
be done in parallel, we do not wish to inflict any sort of
"transparent parallelism" on the user. Instead, users should be able
to select parallelism in an explicit but unobtrusive manner. We
accomplish this by allowing clients to explicitly ask for a "parallel
view" of the collection, whose operations execute in parallel; this is
exposed on Collection
via the parallel()
method. If we wanted to
calculate our "sum of the weights of blue blocks" query in parallel,
we need only add a call to parallel()
:
int sum = shapes.parallel()
.filter(s -> s.getColor() == BLUE)
.map(s -> s.getWeight())
.sum();
This looks very similar to the serial version, but is clearly identified as parallel without the parallel machinery overwhelming the code.
Because of the non-interference requirements describe above, you can execute
parallel operations even on non-thread-safe sources such as ArrayList
.
With the Fork/Join framework added in Java SE 7, we have efficient machinery for implementing parallel operations. However, one of the goals of this effort is to reduce the gap between the serial and parallel versions of the same computation, and currently parallelizing a computation with Fork/Join looks very different from (and much bigger than) the serial code -- a barrier to parallelization. By exposing separate parallel and serial stream views of collections, users can explicitly but unobtrusively choose between serial and parallel execution, and we can close this gap substantially.
The steps involved in implementing parallel computations with Fork/Join are: dividing a problem into subproblems, solving the subproblems sequentially, and combining the results of subproblems. The Fork/Join machinery is designed to automate this process.
We model the structural requirements of Fork/Join with an abstraction
for decomposition. In various drafts, this was called Splittable
, Spliterator
,
or StreamAccessor
, but the basic concept is that it is an extension of iteration
that add the ability to ask the data structure to subdivide itself into recursively
decomposible chunks. Just as an Iterator
lets you carve off a single element and leave
the rest in the Iterator
, a Spliterator
lets you carve off a chunk of the data, described
by a new Spliterator
(which can then be further decomposed)
and leave the rest in the original Spliterator
. Once a data structure provides a means of decomposition,
the library can provide all the parallel stream operations. Decomposition for for common data
structures like array-based lists, binary trees, and maps is straightforward.
The basic form of Spliterator
is shown below:
public interface Spliterator<T> {
/** Carve off a portion of the data into a separate Spliterator */
Spliterator<T> split();
/** Iterate the data described by this Spliterator */
Iterator<T> iterator();
/** The size of the data described by this Spliterator, if known */
int getSizeIfKnown();
}
This approach separates the structural
properties of recursive decomposition from the algorithms that can be
executed in parallel on decomposible data structures. The author of a
data structure need only provide the decomposition logic, and then
immediately gets the benefit of the parallel implementations of
filter
, map
, and friends. Similarly, new stream operations
can be parallelized in terms of decomposition and immediately
available on any data structure that knows how to decompose itself.
(Most users won't ever have to implement Spliterator
.)
We would like to have an efficient way of expressing idioms like:
List<String> strings = ...
int sumOfLengths = strings.stream()
.map(String::length)
.reduce(0, Integer::plus);
Without any special consideration for primitives, this will cause trouble as
the result of the map
operation will be a Stream<Integer>
and therefore each string length will be boxed and unboxed (since it starts
out as an int) at the boundary between the map
and reduce
operations.
Ideally we would not have to distort the API to work around this performance challenge, but VM-based techniques for rendering this problem irrelevant (e.g., better box elimination, tagged fixnums, etc) are unlikely to arrive with Java SE 8. That leaves two viable library-based strategies for dealing with this problem:
map
operation
with one that takes an IntMapper
and returns an IntStream
.map
and reduce
into overloaded
mapReduce
operations that can take an IntMapper
+ IntOperator
as arguments.While the fused-operation approach is less intrusive, it is not as complete a solution
as the IntStream
approach is. With the IntStream
approach, we can have specialized
operations tailored to what we know about primitives, such as sum()
, sorted()
, and
specialized operations like sum
can use numerical techniques to improve the quality
of the result when adding floating point numbers (whereas simply reducing on Double::plus
cannot.)
List<String> strings = ...
int sumOfLengths = strings.stream()
.map(String::length)
.sum()
This approach entails overloading map(Function<T,U>)
with map(IntMapper<T>)
(and similar for other primitive types), and specialized Stream types (IntStream
, DoubleStream
, etc):
interface Stream<T> {
...
<U> Stream<U> map(Mapper<T,U>);
IntStream map(IntMapper<T>);
LongStream map(LongMapper<T>);
DoubleStream map(DoubleMapper<T>);
}
We will likely limit specialization to int
, long
, and double
; the other primitive types can
be handled by these. We do not intend to provide specialized implementations of List
and other
collections; instead we will provide stream generators such as range(0, 100)
.
From an implementation perspective, this approach is straightforward; there is lots of
almost-duplicated code, but this is par for the course (see implementation of Arrays
for a dramatic illustration.)
Having exposed Stream
as a top-level abstraction, we want to ensure that the features
of Stream
are available as widely throughout the JDK as possible. Much of this can be
accomplished simply be providing a means to convert a Collection
, Iterable
, Iterator
,
Enumeration
, or array into a Stream
. Adding a stream()
methods to the above classes,
as well as Arrays.stream(array)
, will accomplish much of the needed "streamification" of
the JDK, since many APIs already return aggregates in one of these forms.
Additionally, we can add a set of new Stream
-bearing methods, such as String.chars()
or
BufferedReader.lines()
, where the above techniques do not suffice.
Finally, we provide a set of APIs for constructing streams, to be used by library writers who
wish to expose stream functionality on non-standard aggregates. The minimial information needed
to create a Stream
is an Iterator
, but if the creator has additional metadata (such as knowing
the size), the library can provide a more efficient implementation, and if the creator can provide
a Spliterator
then parallel streams can be created as well.