"

52 Javanotes 9.0, Section 10.6 — Introduction the Stream API

Section 10.6

Introduction the Stream API



Among its new features,
Java 8 introduced
a stream API, which represents a new way of
expressing operations on collections of data. A major motivation
for the new approach was to make it possible for the Java compiler to
“parallelize” a computation, that is, to break it into pieces that
can be run simultaneously on several processors. Doing so can
significantly speed up the computation. Chapter 12
will discuss parallel programming in Java using threads. Using
threads directly can be difficult and error-prone. The stream
API offers the possibility of parallelizing some kinds of
computation automatically and safely, and it is not surprising
that it has generated a lot of interest on those grounds.

The classes and interfaces that define the stream API
are defined in package java.util.stream.
Stream is an interface in that package
that represents streams and defines the basic operations on streams.

A stream is simply
a sequence of data values. A stream can be created from a Collection,
from an array, or from a variety of other data sources.
The stream API provides a set of operators that operate on
streams. (The API is covered in this chapter because it makes
extensive use of generic programming and parameterized types.)
To perform some computation using the stream API means creating a stream to get the
data from some source, and then applying a sequence of stream
operations that will produce the result that you want. Once a
stream has been used in this way, it cannot be reused. Of course,
you can usually make another stream from the same data source
if you want to use it in another computation.

Expressing a computation as a sequence of stream operations requires a new
kind of thinking, and it takes some getting used to. Let’s
start with an example.
Suppose that stringList is a large ArrayList<String>,
where none of the elements are null, and you want to know the average
length of the strings in the list. This can be done easily with a for-each loop:

int lengthSum = 0;
for ( String str : stringList ) {
    lengthSum = lengthSum + str.length();
}
double average = (double)lengthSum / stringList.size();

To do the same thing with the stream API, you could use:

int lengthSum = stringList.parallelStream()
                          .mapToInt( str -> str.length() )
                          .sum();
double average = (double)lengthSum / stringList.size();

In this version, stringList.parallelStream() creates a
stream consisting of all the elements of the list. The fact that it is
a “parallelStream” makes it possible to parallelize the computation.
The method mapToInt() applies a map
operation to the stream of strings. That is, it takes each string from
the stream and applies a function to it; in this case, the function
computes the length of the string, giving a value
of type int. The result of the map operation is to produce
a new stream, this time a stream of integers, consisting of all
the outputs from the function. The final operation, sum(),
adds up all the numbers in the stream of integers and returns the
result.

The net result is that we’ve added up the lengths of
all the strings in the list. Because of the potential for parallelization,
the stream version might be substantially faster than the for loop version.
In practice, there is significant overhead involved in setting up and
manipulating streams, so the list would have to be fairly large before
you would see any speedup. In fact, for small lists, the stream version
will almost certainly take longer than the for loop.

The stream API is complex, and I can only give a basic introduction
to it here—but hopefully enough to convey some of its spirit.


10.6.1  Generic Functional Interfaces

Many stream operators take parameters, which are often given
as lambda expressions. The mapToInt() operator in
the above example takes a parameter representing a function
from String to int. The
type for that parameter is given by a parameterized functional interface,
ToIntFunction<T>, from package java.util.function.
This interface represents the general idea of a function that takes an
input of type T and outputs an int.
If you were to look at the definition of that interface, it would be
something like

public interface ToIntFunction<T> {
    public int applyAsInt( T x );
}

Stream<T> is also a parameterized interface. In the above
example, stringList is of type ArrayList<String>,
and the stream that is created by stringList.parallelStream() is
of type Stream<String>. When the mapToInt()
operator is applied to that stream, it expects a parameter of type
ToIntFunction<String>. The lambda expression
str -> str.length()” maps a String to an
int, so it represents a value of the correct type. Fortunately,
you don’t need to think about all of that to use the stream API: All you need to
know is that to convert a stream of strings to a stream of integers using
mapToInt, you need to provide a function that maps strings to ints.
However, if you want to read the API documentation, you will have to deal with
parameter types similar to ToIntFunction.

The package java.util.function contains a large number of generic
functional interfaces. Many of them, like ToIntFunction,
are parameterized types, and they are all generic in that they represent very
generic functions, with no set meaning. For example, the functional interface
DoubleUnaryOperator represents the general idea of
a function from double to double. This interface
is essentially the same as my example FunctionR2R from
Subsection 4.5.2 (except for the name of the function that
it defines, which is often irrelevant).

The interfaces in java.util.function are used to specify
parameter types for many stream operators as well as for other built-in
functions in the Java API, and you can certainly use them to specify
parameter types for your own subroutines as well. I will discuss
some of them here. Most of the others are variations on the ones
that I cover.

The general term predicate refers to a function
whose return type is boolean. The functional interface Predicate<T>
defines a boolean-valued function test(t) with a
parameter of type T. This interface is used,
for example, as the parameter type for the method removeIf(p),
which is defined for any Collection. For example,
if strList is of type LinkedList<String>,
then you can remove all null values from the list simply
by saying

strList.removeIf( s -> (s == null) );

The parameter is a Predicate<String> that tests
whether its input, s, is null. The removeIf()
method removes all elements from the list for which the value of the predicate
is true.

A predicate for testing int values could be represented by
the type Predicate<Integer>, but that introduces the
overhead of autoboxing every int in a wrapper of type
Integer. To avoid that overhead, the package
java.util.function has the functional interface
IntPredicate, which defines the boolean-valued
function test(n), where n is of type int.
Similarly, it defines DoublePredicate and
LongPredicate. This is typical of how the stream
API deals with primitive types. For example, it defines IntStream
to represent a stream of ints as a more efficient alternative
to Stream<Integer>.

The functional interface Supplier<T> defines a function,
get() with no parameters and a return type of T.
It represents a source of values of type T.
There is a companion interface Consumer<T> that
defines the void function accept(t) with a
parameter of type T. There are also specialized
versions for primitive types, including IntSupplier,
IntConsumer, DoubleSupplier
and DoubleConsumer. I will give examples of using
suppliers and consumers below.

Function<T,R> represents functions from values
of type T to values of type R.
This functional interface defines the function apply(t),
where t is of type T and the
return type is R. The interface UnaryOperator<T>
is essentially Function<T,T>; that is, it represents a function whose
input and output types are the same. Note that DoubleUnaryOperator
is a specialized version of UnaryOperator<Double>, and
of course there is also IntUnaryOperator.

Finally, I will mention BinaryOperator<T>
and its specializations such as IntBinaryOperator. The interface
BinaryOperator<T> defines the function apply(t1,t2)
where t1 and t2 are both of type T
and the return type is also T. Binary operators include
things like addition of numbers or concatenation of strings.


10.6.2  Making Streams

To use the stream API, you have to start by creating a stream.
There are many ways to make streams.

There are two basic types of streams, sequential streams
and parallel streams. The difference is that
operations on parallel streams can, potentially, be parallelized
while the values in a sequential stream are always processed sequentially,
in a single process, as they would be by a for loop.
(It might not be clear why sequential streams should exist, but some
operations cannot be safely parallelized.) It is possible to convert
a stream from one type to the other type. If stream
is a Stream, then stream.parallel()
represents the same stream of values, but converted to a parallel
stream (if it was not already parallel). Similarly,
stream.sequential() is a sequential stream with the
same values as stream.

We have already seen that if c is any Collection,
then c.parallelStream() is a stream whose values are the values from
the collection. As you might suspect, it is a parallel stream. The method
c.stream() creates a sequential stream of the same values.
This works for any collection, including lists and sets. You could also
get the parallel stream by calling c.stream().parallel().

An array does not have a stream() method, but you can create
a stream from an array using a static method in class
Arrays from package java.util. If
A is an array, then

Arrays.stream(A)

is a sequential stream containing the values from the array. (To get
a parallel stream, use Arrays.stream(A).parallel().)
This works for arrays of objects and for arrays of the primitive types
int, double, and long.
If A is of type T[], where T
is an object type, then the stream is of type Stream<T>.
If A is an array of int, the result is
an IntStream, and similarly for double
and long.

Suppose supplier is of type Supplier<T>.
It should be possible to create a stream of values of type T
by calling supplier.get() over and over. That stream
can in fact be created using

Stream.generate( supplier )

The stream is sequential and is effectively infinite. That is, it will continue to
produce values forever or until trying to do so produces an error.
Similarly, IntStream.generate(s) will create the stream of int values
from an IntSupplier, and DoubleStream.generate(s)
creates a stream of doubles from a DoubleSupplier.
For example,

DoubleStream.generate( () -> Math.random() )

creates an infinite stream of random numbers. In fact, you
can get a similar stream of random values from a variable, rand, of type
Random (see Subsection 5.3.1):
rand.doubles() is an infinite stream of random numbers in the range 0 to 1.
If you only want a finite number of random numbers, use rand.doubles(count).
The Random class has other methods for creating streams of
random doubles and ints. You will find other methods
that create streams in various standard classes.

The IntStream interface defines a method for creating a stream
containing a given range of integer values. The stream

IntStream.range( start, end )

is a sequential stream containing the values start,
start+1, …, end-1. Note that
end is not included.

Some additional methods for making streams have been introduced in newer versions
of Java. For example, in Java 11, for a Scanner, input,
the method input.tokens() creates
a stream consisting of all the strings that would be returned by calling
input.next() over and over. And for a String, str,
that contains multiple lines of text, Java 11 added str.lines() that creates
a stream consisting of the lines from the string.


10.6.3  Operations on Streams

Some operations on a stream produce another stream. They are referred
to as “intermediate operations” because you will still have to do something
with the resulting stream to produce a final result. “Terminal operations”
on the other hand apply to a stream and produce a result that is not
a stream. The general pattern for working with streams is to create
a stream, possibly apply a sequence of intermediate operations to it,
and then apply a terminal operation to produce the desired final result.
In the example at the beginning of this section, mapToInt()
is an intermediate operation that converted the stream of strings into
a stream of ints, and sum() is a terminal operation that
found the sum of the numbers in the stream of ints.

The two most basic intermediate operations are filter
and map. A filter applies a Predicate to a stream, and it creates a
new stream consisting of the values from the original stream for
which the predicate is true. For example, if we had a boolean-valued function
isPrime(n) that tests whether an integer n is prime, then

IntSteam.range(2,1000).filter( n -> isPrime(n) )

would create a stream containing all the prime numbers in the range
2 to 1000. (I’m not saying this is a good way to produce those
numbers!)

A map applies a Function to each value in a stream,
and creates a stream consisting of the output values. For example, suppose
that strList is an ArrayList<String> and
we would like a stream consisting of all the non-null strings in the
list, converted to lower case:

strList.stream().filter( s -> (s != null) ).map( s -> s.toLowerCase() )

The specializations mapToInt(), mapToDouble(), and mapToLong()
exist to map Streams into
IntStreams, DoubleStreams,
and LongStreams.

Here are a few more intermediate operations on a stream, S, that
can be useful:
S.limit(n), where n is an integer, creates a
stream containing only the first n values from S.
(If S has fewer than n values, then
S.limit(n) is the same as S.)
S.distinct() creates a stream from the values of S
by omitting duplicate values, so that all the values in S.distinct()
are different. And S.sorted() creates a stream containing the same
values as S, but in sorted order; to sort items that do not
have a natural ordering, you can provide a Comparator
as a parameter to sorted(). (Comparators are discussed in Subsection 10.1.6.)
Note that S.limit(n)
can be especially useful for truncating what would otherwise be an
infinite stream, such as a stream generated from a Supplier.


To actually get anything done with a stream, you need to apply
a terminal operation at the end. The operator forEach(c)
applies a Consumer, c, to each element
of the stream. The result is not a stream, since consumers do not produce
values. The effect of S.forEach(c) on a stream S
is simply to do something with each value in the stream. For example,
we have a whole new way to print all the strings in a list of strings:

stringList.stream().forEach( s -> System.out.println(s) );

For parallel streams, the consumer function is not guaranteed to be applied to the
values from the stream in the same order that they occur in the
stream. If you want that guarantee, you can use forEachOrdered(c)
instead of forEach(c).

If we want to print out only some of the strings, say those that
have length at least 5, and if we want them in sorted order, we can apply
some filters:

stringList.stream()
          .filter( s -> (s.length() >= 5) )
          .sorted()
          .forEachOrdered( s -> System.out.println(s) )

Some terminal operations output a single value. For example,
S.count() returns the number of values in the stream S.
And IntStreams, LongStreams,
and DoubleStreams have the terminal operation sum(),
to compute the sum of all the values in the stream.
Suppose, for example, that you would like to
test the random number generator by generating 10000 random
numbers and counting how many of them are less than 0.5:

long half = DoubleStream.generate( Math::random )
                        .limit(10000)
                        .filter( x -> (x < 0.5) )
                        .count();

Note that count() returns a long rather
than an int. Also note that I’ve used the method reference Math::random here
instead of the equivalent lambda expression “() -> Math.random()” (see
Subsection 4.5.4).
If you are having trouble reading things like this, keep in mind that the pattern
is: Create a stream, apply some intermediate operations, apply a terminal operation.
Here, an infinite stream of random numbers is generated by calling
Math.random() over and over. The operation limit(10000)
truncates that stream to 10000 values, so that in fact only 10000 values are generated.
The filter() operation only lets through numbers x such
that x < 0.5 is true. And finally, count()
returns the number of items in the resulting stream.

A Stream<T> also has terminal operations min(c) and max(c)
to return the smallest and largest values in the stream.
The parameter, c, is of type Comparator<T>; it is used for comparing
the values. However, the return type of min() and max() is a little
peculiar: The return type is Optional<T>, which represents a value of
type T that might or might not exist. The problem is that an empty
stream does not have a largest or smallest value, so the minimum and maximum of an empty
stream do not exist. An Optional has
a get() method that returns the value of the Optional,
if there is one; it will throw an exception if the Optional is empty.
For example, if words is a Collection<String>,
you can get the longest string in the collection with

String longest = words.parallelStream()
                      .max( (s1,s2) -> s1.length() - s2.length() )
                      .get();

But this will throw an exception if the collection is empty. (The boolean-valued
method isPresent() in an Optional can be used to test
whether the value exists.)

Similarly, IntStream, LongStream,
and DoubleStream provide
terminal operations min() and max() that return values of type
OptionalInt, OptionalLong,
and OptionalDouble. Each of these classes also has an
average() method that returns an OptionalDouble.

The terminal operators allMatch(p) and anyMatch(p) take
a predicate as parameter and compute a boolean value. The value
of allMatch(p) is true if the predicate, p, is true for
every value in the stream to which it is applied. The value of anyMatch(p) is true if
there is at least one value in the stream for which p is true. Note that
anyMatch() will stop processing, and will return true
as its output, if it finds a value that satisfies the
predicate. And allMatch() will stop processing if it finds a value that
does not match the predicate.

Many terminal operations that compute a single value can be expressed in
terms of a more general operation, reduce.
A reduce operation combines the values from a stream using a
BinaryOperator. For example, a sum is computed
by a reduce operation in which the binary operation is addition.
The binary operator should be associative, which means that the order
in which the operator is applied doesn’t matter. There is no
built-in terminal operator to compute the product of the values
in a stream, but we can do that directly with reduce.
Suppose, for example, that A is an array of
double, and we want the product of all the
non-zero elements in A:

double multiply = Arrays.stream(A).filter( x -> (x != 0) )
                                  .reduce( 1, (x,y) -> x*y );

The binary operator here maps a pair of numbers (x,y) to
their product x*y. The first parameter to reduce()
is an “identity” for the binary operation. That is, it is a value such that
1*x = x for any x. The maximum of a stream
of double could be computed with reduce() by
using reduce(Double.NEGATIVE_INFINITY, Math::max).

The last major terminal operation is collect(c), a very general operation
which collects all of the values in the stream into a data structure or a single
summary result of some type. The parameter,
c is something called a collector. The collector will
ordinarily be given by one of the static functions in the
Collectors class. This can get very complicated, and
I will only give a couple of examples. The function
Collectors.toList() returns a Collector that can be used
with collect() to put all of the values from the stream
into a List. For example, suppose that
A is an array of non-null Strings, and
we want a list of all the strings in A that begin with
the substring “Fred”:

List<String> freds = Arrays.stream(A)
                           .filter( s -> s.startsWith("Fred") )
                           .collect( Collectors.toList() );

That’s actually pretty easy! Even more useful are collectors that group
the items from a stream according to some criterion. The collector
Collectors.groupingBy(f) takes a parameter, f,
whose type is specified by the functional
interface Function<T,S>, representing
a function from values of type T to values
of type S. When used with collect(),
Collectors.groupingBy(f)
operates on a stream of type Stream<T>,
and it separates the items in the stream into groups, based on the
value of the function f when applied to the items.
That is, all the items, x, in a given group have
the same value for f(x). The result is a
Map<S,List<T>>. In this map,
a key is one of the function values, f(x), and
the associated value for that key is a list containing all the items
from the stream to which f assigns that function value.

An example will clarify things. Suppose we have an array of
people, where each person has a first name and a last name.
And suppose that we want to put the people into groups, where
each group consists of all the people with a given last name.
A person can be represented by an object of type Person that
contains instance variables named firstname and lastname.
Let’s say that population is a variable of type Person[].
Then Arrays.stream(population) is a stream of Persons,
and we can group the people in the stream by last name with the following code:

Map<String, List<Person>> families;
families = Arrays.stream(population)
                 .collect(Collectors.groupingBy( person -> person.lastname ));

Here, the lambda expression, person -> person.lastname, defines the
grouping function. The function takes a Person
as input and outputs a String giving that person’s last name. In the resulting
Map, families, a key is one of the
last names from the Persons in the array, and
the value associated with that last name is a List
containing all the Persons with that last name. We could print out the
groups as follows:

for ( String lastName : families.keySet() ) {
    System.out.println("People with last name " + lastName + ":");
    for ( Person name : families.get(lastName) ) {
        System.out.println("    " + name.firstname + " " + name.lastname);
    }
    System.out.println();
}

Although the explanation is a bit long-winded, the result should be reasonably easy
to understand.


10.6.4  An Experiment

Most of the examples of using streams that I have given so far
are not very practical. In most cases, a simple for loop would have been
just as easy to write and probably more efficient. That’s especially
true since I’ve mostly used sequential streams, and most of the
examples cannot be efficiently parallelized. (A notable exception is
the reduce operation, which is important precisely because it
parallelizes well.) Let’s look at an example where the
stream API is applied to a long computation that might get
some real speedup with parallelization. The problem is
to compute a Riemann sum. This is something from Calculus,
but you don’t need to understand anything at all about what it means. Here is a
traditional method for computing the desired sum:

/**
 * Use a basic for loop to compute a Riemann sum.
 * @param f  The function that is to be summed.
 * @param a  The left endpoint of the interval over which f is summed.
 * @param b  The right endpoint.
 * @param n  The number of subdivisions of the interval.
 * @return   The value computed for the Riemann sum.
 */
private static double riemannSumWithForLoop( 
        DoubleUnaryOperator f, double a, double b, int n) {
    double sum = 0;
    double dx = (b - a) / n;
    for (int i = 0; i < n; i++) {
        sum = sum + f.applyAsDouble(a + i*dx);
    }
    return sum * dx;
}

The type for the first parameter is a functional interface, so we could call this method,
for example, with

reimannSumWithForLoop( x -> Math.sin(x), 0, Math.PI, 10000 )

How can we apply the stream API to this problem? To imitate the
for loop, we can start by generating the integers
from 0 to n as a stream, using IntStream.range(0,n).
This gives a sequential stream. To enable parallelism, we have to convert
it to a parallel stream by applying the .parallel()
operation. To compute the values that we want to sum up,
we can apply a map operation that maps the stream of ints
to a stream of doubles by mapping the integer i to
f.applyAsDouble(a+i*dx). Finally, we can apply sum()
as the terminal operation. Here is a version of the Riemann sum method that
uses a parallel stream:

private static double riemannSumWithParallelStream( 
        DoubleUnaryOperator f, double a, double b, int n) {
    double dx = (b - a) / n;
    double sum = IntStream.range(0,n)
                          .parallel()
                          .mapToDouble( i -> f.applyAsDouble(a + i*dx) )
                          .sum();
    return sum * dx;
}

I also wrote a version riemannSumWithSequentialStream(), that
leaves out the .parallel() operator. All three versions can
be found in the sample program RiemannSumStreamExperiment.java.
The main routine in that program calls each of the three methods, using various
values for n. It times how long each method takes to compute the
sum, and reports the result.

As might be expected, I found that the version that uses a sequential stream is uniformly
slower than the other versions. The sequential stream version does essentially
the same thing as the for loop version, but with the extra overhead involved
with creating and manipulating streams. The situation for parallel streams
is more interesting, and the results depend on the machine on which the program
is executed. On one old machine with four processors, the for loop version
was faster for n = 100,000, but the parallel version was
much faster for 1,000,000 items or more. On another machine, the parallel version
was faster for 10,000 or more items. Note that there is a limit to how much
faster the parallel version can be. On a machine with K processors, the
parallel version cannot be more than K times faster than the sequential
version, and will probably in reality be somewhat slower than that.
I encourage you to try out the sample program on your own computer!

It is even conceivable (or at least this is a goal of the stream API)
that you have a machine on which Java can run
parallel code on your graphics card, making use of the many processors
that it contains. If that happens, you might see a very large
speedup.

License

ITP 220 Advanced Java Copyright © by Amanda Shelton. All Rights Reserved.