Java Streams: the Gatherers API

3/31/2025 — 5 minutes read (1077 words)

Java 24 introduces a new feature related to streams: the Gatherer interface. This interface (used with the Stream.gather method) allows for more complete and expressive transformation of Java streams than was previously possible with existing methods. Explanations and examples in this article.

Contents

Overview

What are Streams?

What we commonly refer to as Streams in Java is a set of interfaces, methods, and classes that allow for modeling data transformation pipelines.

At the heart of this set is, of course, the Stream interface, which represents a sequence of data that may be empty or infinite.

This interface includes a number of methods that model successive operations on this data sequence. These operations fall into two types:

Streams are not another type of collection. The idea is to model a chain of transformations, not a set of values. Streams are lazy: as long as no terminal operation is used, no operation is actually performed, as there is no expected result. This enables modeling infinite sequences without causing your program to run indefinitely.

import java.util.stream.Stream;

void main() {
    // An infinite Stream representing natural numbers.
    var naturalNumbersStream = Stream.iterate(1, i -> i + 1); 
    
    // Here we model the sequence of operations on our stream: 
    // square the numbers, keep only the even ones, then take the first 100.
    var streamPipeline = naturalNumbersStream 
            .map(i -> i * i)
            .filter(i -> i % 2 == 0)
            .limit(100);
    
    // At this stage, no computation has taken place yet.
    // We collect the stream into a final value: the average of the numbers.
    int averageOfFirst100EvenSquares = streamPipeline.collect(Collectors.averagingInt(i -> i));

    System.out.println(averageOfFirst100EvenSquares);
}

The other benefit of thinking in terms of operation pipelines rather than collections is access to bonus features such as parallelism (under certain conditions, operations can be performed in parallel on segments of the stream) or other optimizations (the terminal operation count, for instance, can sometimes skip all intermediate operations if it can compute the number of elements directly from the source).

What exactly was missing?

When it comes to terminal operations, the Stream API is relatively complete. We have a number of simple operations for common use cases (findFirst, toArray, forEach, etc.) and a Collector interface that can be implemented to define any type of terminal operation.

/**
 * This class transforms a `Stream<Customer>` into an instance of `Invoice`. It 
 * processes the stream element by element and can use an internal state of type 
 * `Map<UUID, Customer>`. Beyond that, the logic is yours—implement whatever terminal 
 * operation you need.
 */
class MyCollector implements Collector<Customer, Map<UUID, Customer>, Invoice> {
    // ...
}

In contrast, intermediate operations were not as robust. Acting on items one by one is fairly flexible (map, filter, etc.), but there are few stateful operations, which consider multiple items from the input stream to produce one or more output elements. For example, distinct or dropWhile serve specific needs, but there was no way to implement fully custom operations, such as transforming multiple consecutive or non-consecutive elements into a single one (or none at all).

A simple example would be transforming a stream of random numbers into a stream of strictly increasing numbers. filter does not allow consideration of anything but the current item when deciding whether or not to keep it. You would need to convert the stream to a list, apply the operation on the list, then reconvert to a stream, which undermines many of the stream’s advantages (laziness, parallelism handling, etc.)

This is exactly the kind of problem that Stream Gatherers solve. Any custom intermediate operation can be defined, just as Collectors allow custom terminal operations.

The Gatherer Interface

Definition

Here is a (simplified) definition of this interface:

public interface Gatherer<Input, State, Result> {
    Supplier<State> initializer();

    Integrator<State, Input, Result> integrator();

    BinaryOperator<State> combiner();

    BiConsumer<State, Downstream<? super Result>> finisher();
}

Just like the Collector interface, a gatherer works through four methods that are called at different points during the gathering operation to produce the final result.

Usage and utility methods

A gatherer is used with the Stream.gather method.

var resultStream = myList.stream()
        .gather(new MyGatherer());

Defining a gatherer by creating a new class implementing Gatherer can be verbose, especially for simple cases. Therefore, utility methods exist to create a Gatherer instance directly from provided lambdas. For example:

It’s not always easy to understand how the four methods of a gatherer interact. Practicing a few examples is the best way to learn.

Examples

Compute Bitcoin Price All Time Highs

We have a stream of monthly Bitcoin prices in chronological order. We want a stream that emits an item each time the price hits a new all-time high.

Without gatherers, it would have been necessary to convert the stream into a list. For a stream from a large file, for example, that’s unfortunate, since it forces us to read the entire file before performing the operation (even if we only want the first 10 highs…).

The gatherer-based solution allows us to handle All-Time Highs in a streaming fashion.

record BitcoinPrice(YearMonth month, double price) {}

/**
 * Our gatherer transforms a `Stream<BitcoinPrice>` into a `Stream<BitcoinPrice>`, which
 * matches the first and third types.
 * The internal state is the highest price seen so far. Since the state must be mutable,
 * we use an array (a common technique with Collectors as well). If you prefer, you may use
 * `AtomicDouble` or `MutableObject` if you have the Apache Commons dependency.
 */
class IncreasingPrices implements Gatherer<BitcoinPrice, Double[], BitcoinPrice> {
    public Supplier<Double[]> initializer() {
        // The initial state is Double.MIN_VALUE so the first stream value is always retained.
        return () -> new Double[] { Double.MIN_VALUE };
    }
    
    public Integrator<Double[], BitcoinPrice, BitcoinPrice> integrator() {
        return (state, element, downstream) -> {
            // If the current price is higher than the last recorded price, emit the value
            // and update the highest price.
            if(element.price > state[0]) {
                downstream.push(element);
                state[0] = element.price;
            }
            // Returning false from the integrator stops processing.
            // Here, we want to process all elements.
            return true;
        };
    }
}

// our gatherer depends on input data being sorted by date.
Stream<BitcoinPrice> historicalData = getBitcoinSortedHistoricalData(); 

historicalData
        .gather(new IncreasingPrices())
        .forEach(System.out::println);

The full example is available on my GitLab.

Calculating Median Item Prices in Parallel

We have data on the prices of consumer goods in various stores and want to compute the average price for each item. In this case, we can’t benefit much from stream laziness (we need all data to compute the correct average), but we can take advantage of parallel processing.

You could already do this with a Collector, but if you want a Stream output, you’d need to recreate a stream from the collector’s result. We’re going to do the work with a Gatherer instead.

/** Input data **/
record ItemPrice(String item, double priceInEuros, String shop) {}
/** Output data **/
record ItemMeanPrice(String item, double meanPrice) {}

class MeanPriceGatherer implements 
        Gatherer<ItemPrice, Map<String, MeanPriceGatherer.CumulativeSumCount>, ItemMeanPrice> {
    
    /** Our internal state stores, for each item, the sum and count of prices seen. */
    public Supplier<Map<String, CumulativeSumCount>> initializer() {
      return HashMap::new;
    }

    /**
     * For each item, add the price to the cumulative state and increase the count by 1.
     *
     * Here, we don't emit items during integration because we need all inputs first—
     * that's the finisher's job.
     */
    public Integrator<Map<String, CumulativeSumCount>, ItemPrice, ItemMeanPrice> integrator() {
      return ((state, element, downstream) -> {
        state.merge(
                element.item, 
                new CumulativeSumCount(element.priceInEuros, 1),
                CumulativeSumCount::add
        );
        return true;
      });
    }

    /**
     * To support parallelism, we must be able to combine states.
     * Here, for each item in one state map, we merge it with occurrences in the other.
     */
    public BinaryOperator<Map<String, CumulativeSumCount>> combiner() {
        return (state1, state2) -> {
            state2.forEach((key, value) ->
                    state1.merge(key, value, CumulativeSumCount::add)
            );
            return state1;
        };
    }

  /**
   * The finisher runs after all parallel states have been merged.
   * It iterates over the state, computes the mean, and emits results downstream.
   */
  public BiConsumer<Map<String, CumulativeSumCount>, Downstream<? super ItemMeanPrice>> finisher() {
    return (state, downstream) ->
            state.forEach(
                    (itemName, sumCount) ->
                            downstream.push(new ItemMeanPrice(itemName, sumCount.getMean()))
            );
  }

  record CumulativeSumCount(double sum, int count) {
    public CumulativeSumCount add(CumulativeSumCount cumulativeSumCount) {
      return new CumulativeSumCount(
              sum + cumulativeSumCount.sum, 
              count + cumulativeSumCount.count
      );
    }

    public double getMean() {
      return sum / count;
    }
  }
}

Again, the full demo is available on GitLab

Conclusion

The Gatherer interface is not revolutionary, but it fills a real gap in Java streams. Developers who use streams extensively have likely had to convert a stream to a collection just to perform an operation and convert it back immediately. Gatherers address that need while improving code readability and occasionally even performance.

For more detail, it’s always useful to check out the Javadoc, and JEP 485, which introduces this feature.