Java Streams: the Gatherers API

Java 24 introduces a new feature related to streams: the Gatherer
interface. This interface (used with the Stream.gather
method) allows for more complete and expressive transformation of Java streams than was previously possible with existing methods. Explanations and examples in this article.
Contents
Overview
What are Streams?
What we commonly refer to as Streams in Java is a set of interfaces, methods, and classes that allow for modeling data transformation pipelines.
At the heart of this set is, of course, the Stream
interface, which represents a sequence of data that may be empty or infinite.
This interface includes a number of methods that model successive operations on this data sequence. These operations fall into two types:
- Some operations transform the
Stream
into anotherStream
. These are intermediate operations, such asmap
,filter
, orlimit
. - Some operations transform the Stream into a value, a collection, or another type of container. These are terminal operations, such as
collect
,findFirst
, orreduce
.
Streams are not another type of collection. The idea is to model a chain of transformations, not a set of values. Streams are lazy: as long as no terminal operation is used, no operation is actually performed, as there is no expected result. This enables modeling infinite sequences without causing your program to run indefinitely.
import java.util.stream.Stream;
void main() {
// An infinite Stream representing natural numbers.
var naturalNumbersStream = Stream.iterate(1, i -> i + 1);
// Here we model the sequence of operations on our stream:
// square the numbers, keep only the even ones, then take the first 100.
var streamPipeline = naturalNumbersStream
.map(i -> i * i)
.filter(i -> i % 2 == 0)
.limit(100);
// At this stage, no computation has taken place yet.
// We collect the stream into a final value: the average of the numbers.
int averageOfFirst100EvenSquares = streamPipeline.collect(Collectors.averagingInt(i -> i));
System.out.println(averageOfFirst100EvenSquares);
}
The other benefit of thinking in terms of operation pipelines rather than collections is access to bonus features such as parallelism (under certain conditions, operations can be performed in parallel on segments of the stream) or other optimizations (the terminal operation count
, for instance, can sometimes skip all intermediate operations if it can compute the number of elements directly from the source).
What exactly was missing?
When it comes to terminal operations, the Stream API is relatively complete. We have a number of simple operations for common use cases (findFirst
, toArray
, forEach
, etc.) and a Collector
interface that can be implemented to define any type of terminal operation.
/**
* This class transforms a `Stream<Customer>` into an instance of `Invoice`. It
* processes the stream element by element and can use an internal state of type
* `Map<UUID, Customer>`. Beyond that, the logic is yours—implement whatever terminal
* operation you need.
*/
class MyCollector implements Collector<Customer, Map<UUID, Customer>, Invoice> {
// ...
}
In contrast, intermediate operations were not as robust. Acting on items one by one is fairly flexible (map
, filter
, etc.), but there are few stateful operations, which consider multiple items from the input stream to produce one or more output elements. For example, distinct
or dropWhile
serve specific needs, but there was no way to implement fully custom operations, such as transforming multiple consecutive or non-consecutive elements into a single one (or none at all).
A simple example would be transforming a stream of random numbers into a stream of strictly increasing numbers. filter
does not allow consideration of anything but the current item when deciding whether or not to keep it. You would need to convert the stream to a list, apply the operation on the list, then reconvert to a stream, which undermines many of the stream’s advantages (laziness, parallelism handling, etc.)
This is exactly the kind of problem that Stream Gatherers solve. Any custom intermediate operation can be defined, just as Collectors allow custom terminal operations.
The Gatherer
Interface
Definition
Here is a (simplified) definition of this interface:
public interface Gatherer<Input, State, Result> {
Supplier<State> initializer();
Integrator<State, Input, Result> integrator();
BinaryOperator<State> combiner();
BiConsumer<State, Downstream<? super Result>> finisher();
}
Just like the Collector
interface, a gatherer works through four methods that are called at different points during the gathering operation to produce the final result.
- The
initializer
is used at the beginning of the operation to create the gatherer’s internal state. This state enables the operation to consume multiple input elements before emitting an output element. This initializer may be called multiple times in case of parallel execution (each thread will have its stream chunk and its own internal state). - The
integrator
is the core of the gatherer. It receives each input stream element and can emit elements into the output stream (via an interface calledDownstream
). It also has access to the gatherer’s internal state. - The
combiner
allows the operation to run in parallel. In this case, a state instance will be initialized for each thread, and the combiner will be called to merge these states into one after the operation. - The
finisher
allows final actions at the end of the operation. It is called once all the elements have been integrated and can optionally emit more elements via theDownstream
interface. It also has access to the final combined state.
Usage and utility methods
A gatherer is used with the Stream.gather
method.
var resultStream = myList.stream()
.gather(new MyGatherer());
Defining a gatherer by creating a new class implementing Gatherer
can be verbose, especially for simple cases. Therefore, utility methods exist to create a Gatherer
instance directly from provided lambdas. For example:
- Create a gatherer from the 4 methods:
Gatherer.of( Supplier<State> initializer, Integrator<State, Input, Result> integrator, BinaryOperator<State> combiner, BiConsumer<State, Downstream<? super Result>> finisher );
- Create a sequential gatherer (one state, no combiner)
Gatherer.ofSequential( Supplier<State> initializer, Integrator<State, Input, Result> integrator );
- Create a stateless and parallelizable gatherer
Gatherer.of( Integrator<State, Input, Result> integrator );
It’s not always easy to understand how the four methods of a gatherer interact. Practicing a few examples is the best way to learn.
Examples
Compute Bitcoin Price All Time Highs
We have a stream of monthly Bitcoin prices in chronological order. We want a stream that emits an item each time the price hits a new all-time high.
Without gatherers, it would have been necessary to convert the stream into a list. For a stream from a large file, for example, that’s unfortunate, since it forces us to read the entire file before performing the operation (even if we only want the first 10 highs…).
The gatherer-based solution allows us to handle All-Time Highs in a streaming fashion.
record BitcoinPrice(YearMonth month, double price) {}
/**
* Our gatherer transforms a `Stream<BitcoinPrice>` into a `Stream<BitcoinPrice>`, which
* matches the first and third types.
* The internal state is the highest price seen so far. Since the state must be mutable,
* we use an array (a common technique with Collectors as well). If you prefer, you may use
* `AtomicDouble` or `MutableObject` if you have the Apache Commons dependency.
*/
class IncreasingPrices implements Gatherer<BitcoinPrice, Double[], BitcoinPrice> {
public Supplier<Double[]> initializer() {
// The initial state is Double.MIN_VALUE so the first stream value is always retained.
return () -> new Double[] { Double.MIN_VALUE };
}
public Integrator<Double[], BitcoinPrice, BitcoinPrice> integrator() {
return (state, element, downstream) -> {
// If the current price is higher than the last recorded price, emit the value
// and update the highest price.
if(element.price > state[0]) {
downstream.push(element);
state[0] = element.price;
}
// Returning false from the integrator stops processing.
// Here, we want to process all elements.
return true;
};
}
}
// our gatherer depends on input data being sorted by date.
Stream<BitcoinPrice> historicalData = getBitcoinSortedHistoricalData();
historicalData
.gather(new IncreasingPrices())
.forEach(System.out::println);
The full example is available on my GitLab.
Calculating Median Item Prices in Parallel
We have data on the prices of consumer goods in various stores and want to compute the average price for each item. In this case, we can’t benefit much from stream laziness (we need all data to compute the correct average), but we can take advantage of parallel processing.
You could already do this with a Collector, but if you want a Stream output, you’d need to recreate a stream from the collector’s result. We’re going to do the work with a Gatherer instead.
/** Input data **/
record ItemPrice(String item, double priceInEuros, String shop) {}
/** Output data **/
record ItemMeanPrice(String item, double meanPrice) {}
class MeanPriceGatherer implements
Gatherer<ItemPrice, Map<String, MeanPriceGatherer.CumulativeSumCount>, ItemMeanPrice> {
/** Our internal state stores, for each item, the sum and count of prices seen. */
public Supplier<Map<String, CumulativeSumCount>> initializer() {
return HashMap::new;
}
/**
* For each item, add the price to the cumulative state and increase the count by 1.
*
* Here, we don't emit items during integration because we need all inputs first—
* that's the finisher's job.
*/
public Integrator<Map<String, CumulativeSumCount>, ItemPrice, ItemMeanPrice> integrator() {
return ((state, element, downstream) -> {
state.merge(
element.item,
new CumulativeSumCount(element.priceInEuros, 1),
CumulativeSumCount::add
);
return true;
});
}
/**
* To support parallelism, we must be able to combine states.
* Here, for each item in one state map, we merge it with occurrences in the other.
*/
public BinaryOperator<Map<String, CumulativeSumCount>> combiner() {
return (state1, state2) -> {
state2.forEach((key, value) ->
state1.merge(key, value, CumulativeSumCount::add)
);
return state1;
};
}
/**
* The finisher runs after all parallel states have been merged.
* It iterates over the state, computes the mean, and emits results downstream.
*/
public BiConsumer<Map<String, CumulativeSumCount>, Downstream<? super ItemMeanPrice>> finisher() {
return (state, downstream) ->
state.forEach(
(itemName, sumCount) ->
downstream.push(new ItemMeanPrice(itemName, sumCount.getMean()))
);
}
record CumulativeSumCount(double sum, int count) {
public CumulativeSumCount add(CumulativeSumCount cumulativeSumCount) {
return new CumulativeSumCount(
sum + cumulativeSumCount.sum,
count + cumulativeSumCount.count
);
}
public double getMean() {
return sum / count;
}
}
}
Again, the full demo is available on GitLab
Conclusion
The Gatherer
interface is not revolutionary, but it fills a real gap in Java streams. Developers who use streams extensively have likely had to convert a stream to a collection just to perform an operation and convert it back immediately. Gatherers address that need while improving code readability and occasionally even performance.
For more detail, it’s always useful to check out the Javadoc, and JEP 485, which introduces this feature.