Streams Java : l'api Gatherers

Java 24 apporte une nouvelle fonctionnalité en lien avec les streams : l’interface Gatherer
. Cette interface (utilisée avec la méthode Stream.gather
) permet de transformer les streams Java de manière plus complète et plus expressive que ne le permettaient les méthodes disponibles jusqu’à maintenant. Explications et exemples dans cet article.
Contents
Vue d’ensemble
C’est quoi les Streams ?
Ce qu’on appelle communément Streams en java, c’est un ensemble d’interfaces, de méthodes et de classes qui permettent de modéliser des pipelines de transformation de données.
Au cœur de cet ensemble il y a bien sûr l’interface Stream
qui représente une séquence de données potentiellement vide ou infinie.
Cette interface comporte un certain nombre de méthodes qui permettent de modéliser des opérations successives sur cette séquence de données. Ces opérations sont de deux types :
- Certaines opérations transforment le
Stream
en un autreStream
. Ce sont les opérations intermédiaires, par exemplemap
,filter
oulimit
. - Certaines opérations transforment le Stream en une valeur, une collection ou autre type de container. ce sont les opérations terminales, par exemple
collect
,findFirst
oureduce
.
Les streams ne sont pas un autre type de collection. Il s’agit de modéliser une suite de transformations, et non pas un ensemble de valeurs. Les Streams sont lazy : tant qu’on n’utilise pas d’opération terminale, aucune opération n’est réellement effectuée, car aucun résultat n’est attendu. C’est ce qui permet de modéliser des séquences infinies sans faire tourner votre programme à l’infini.
import java.util.stream.Stream;
void main() {
// Un Stream infini représentant les entiers naturels.
var naturalNumbersStream = Stream.iterate(1, i -> i + 1);
// Ici, on modélise la suite des opérations sur notre stream :
// prendre le carré des nombres, garder uniquement les nombres pairs puis les 100 premiers.
var streamPipeline = naturalNumbersStream
.map(i -> i * i)
.filter(i -> i % 2 == 0)
.limit(100);
// À ce stade, aucun calcul n'a encore été fait.
// On collecte le stream en une valeur finale : la moyenne des nombres.
int averageOfFirst100EvenSquares = streamPipeline.collect(Collectors.averagingInt(i -> i));
System.out.println(averageOfFirst100EvenSquares);
}
L’autre intérêt de penser pipeline d’opérations plutôt que collection, c’est qu’on accède à d’autres features bonus, comme le parallélisme (les opérations sous certaines conditions peuvent être effectuées en parallèle pour plusieurs morceaux du stream) ou autres optimisations (l’opération terminale count
, par exemple, peut parfois sauter toutes les opérations intermédiaires si elle arrive à calculer le nombre d’éléments directement depuis la source).
Qu’est-ce qui manquait exactement ?
En ce qui concerne les opérations terminales, l’api Stream est plutôt bien fournie. Nous avons un certain nombre d’opérations simples pour les cas courants (findFirst
, toArray
, forEach
, etc.) et une interface Collector
qu’on peut implémenter pour définir n’importe quelle opération terminale.
/**
* Cette classe transforme un `Stream<Customer>` en une instance de `Invoice`. Elle
* parcourt le stream élément par élément et peut utiliser un état interne de type
* `Map<UUID, Customer>`Pour le reste, tout est libre et on peut implémenter une
* opération terminale avec la logique qu'on veut.
*/
class MyCollector implements Collector<Customer, Map<UUID, Customer>, Invoice> {
// ...
}
En revanche, la partie opérations intermédiaires n’était pas aussi bien fournie. Agir sur des éléments un par un est relativement libre (map
, filter
, etc.) mais il existe peu d’opérations stateful, qui considèrent plusieurs éléments du stream d’entrée pour produire un ou plusieurs éléments en sortie. Il y a par exemple distinct
ou dropWhile
, mais ils répondent à des besoins spécifiques. Il n’y avait pas de moyen d’implémenter des opérations complètement libres, transformant par exemple plusieurs éléménts consécutifs ou non en un seul élément (voire zéro).
Un exemple tout bête serait par exemple de transformer un stream de nombres quelconques en un stream de nombres strictement croissant. filter
ne permet pas de considérer autre chose que l’élément courant pour décider de le garder ou non. Il faudrait donc convertir le stream en une liste, faire cette opération sur la liste puis repasser en stream, ce qui enlève une bonne part de l’intérêt des streams (laziness, gestion du parallélisme, etc.)
C’est cette problématique que viennent résoudre les Stream Gatherers. Il est possible de définir n’importe quelle opération intermédiaire, tout comme les Collectors permettent de définir n’importe quelle opération terminale.
L’interface Gatherer
Définition
Voilà la définition (simplifiée) de cette interface :
public interface Gatherer<Input, State, Result> {
Supplier<State> initializer();
Integrator<State, Input, Result> integrator();
BinaryOperator<State> combiner();
BiConsumer<State, Downstream<? super Result>> finisher();
}
Tout comme l’interface Collector
, un gatherer fonctionne grâce à quatre méthodes qui sont appelées à divers moments de l’opération de gathering pour obtenir le résultat final.
- L’
initializer
est utilisé au début de l’opération pour créer l’état interne du gatherer. C’est cet état qui permet à l’opération de consommer plusieurs éléments du stream d’entrée avant d’émettre un élément dans le stream de sortie. Cet initializer peut être appelé plusieurs fois dans le cas d’une opération en parallèle (chaque thread aura sa sous-partie du stream et son propre état interne) - L’
integrator
est l’élément principal du gatherer. Il reçoit chaque élément du stream d’entrée, et a la possibilité d’émettre des éléments dans le stream de sortie (grâce à une interface appeléeDownstream
). Il a également accès à l’état interne du gatherer. - Le
combiner
permet à l’opération d’être exécutée en parallèle. Dans ce cas, une instance de l’état sera initialisée pour chaque thread, et le combiner sera appelé pour fusionner ces états en un seul à la fin de l’opération. - Le
finisher
permet d’intervenir à la fin de l’opération. Il est appelé une fois que tous les éléments ont été intégrés et peut éventuellement émettre d’autres éléments grâce à l’interfaceDownstream
. Il a également accès à l’état interne (après qu’il a été regroupé grace au combiner).
Usage et méthodes utilitaires
Un gatherer s’utilise avec la méthode Stream.gather
.
var resultStream = myList.stream()
.gather(new MyGatherer());
Définir un gatherer en créant une nouvelle classe qui implémente Gatherer
peut être assez verbeux, surtout pour les cas les plus simples. Il existe donc des méthodes utilitaires pour créer une instance de Gatherer
directement à partir des méthodes, qu’on peut donner sous forme de lambda. Par exemple :
- Créer un gatherer à partir des 4 méthodes :
Gatherer.of( Supplier<State> initializer, Integrator<State, Input, Result> integrator, BinaryOperator<State> combiner, BiConsumer<State, Downstream<? super Result>> finisher );
- Créer un gatherer séquentiel (un état mais pas de combiner)
Gatherer.ofSequential( Supplier<State> initializer, Integrator<State, Input, Result> integrator );
- Créer un gatherer sans état, et donc parallélisable
Gatherer.of( Integrator<State, Input, Result> integrator );
Il n’est pas forcément évident de se représenter la façon dont les quatre méthodes du gatherer fonctionnent entre elles. Le mieux est donc de s’exercer avec quelques exemples.
Exemples
Calculer les All Time Highs du prix du BitCoin
Nous avons un stream correspondant au prix du bitcoin mois par mois dans l’ordre chronologique. On souhaite avoir un Stream qui émet un élément à chaque fois que le prix atteint un nouveau all-time high.
Sans les gatherers, il aurait fallu convertir le stream en liste. Dans le cas d’un stream provenant d’un gros fichier par exemple, c’est dommage, car ça nous force à lire le fichier en entier avant même de commencer à faire notre opération (alors qu’on a peut-être envie seulement des 10 premiers…)
La solution avec le gatherer permet de traiter les All-Time highs au fur et à mesure.
record BitcoinPrice(YearMonth month, double price) {}
/**
* Notre gatherer transforme `Stream<BitcoinPrice>` en `Stream<BitcoinPrice>`, ce qui correspond
* au premier et troisième type.
* L'état interne est le plus haut prix rencontré jusqu'à présent. Comme l'état doit être
* mutable, on utilise un tableau (c'est une technique fréquente avec les Collecteurs
* également). Si vous n'aimez pas, il est aussi possible d'utiliser `AtomicDouble` ou
* `MutableObject` si vous avez la dépendance à Apache Commons.
*/
class IncreasingPrices implements Gatherer<BitcoinPrice, Double[], BitcoinPrice> {
public Supplier<Double[]> initializer() {
// L'état initial est Double.MIN_VALUE pour que la première valeur du stream soit
// toujours retenue.
return () -> new Double[] { Double.MIN_VALUE };
}
public Integrator<Double[], BitcoinPrice, BitcoinPrice> integrator() {
return (state, element, downstream) -> {
// Si le prix actuel est plus grand que le précédent prix retenu, on émet la
// valeur et on stocke le nouveau prix le plus élevé.
if(element.price > state[0]) {
downstream.push(element);
state[0] = element.price;
}
// La valeur de retour de l'integrator permet de stopper le traitement des
// prochains éléments en retournant false. Dans notre cas, on veut traiter toutes
// les données.
return true;
};
}
}
// notre gatherer dépend du fait que les données sont déjà triées par date.
Stream<BitcoinPrice> historicalData = getBitcoinSortedHistoricalData();
historicalData
.gather(new IncreasingPrices())
.forEach(System.out::println);
L’exemple complet est disponible sur mon GitLab.
Calculer le prix médian d’articles en parallèle
Nous avons des données sur le prix d’articles de consommation dans différents magasins, et on veut calculer, pour chaque article, le prix moyen. C’est un cas ou on ne peut pas vraiment profiter du caractère lazy des streams (il faut consommer tout le stream pour être sûr d’avoir les bons prix moyen), par contre on peut bénéficier de la simplicité de paralléliser les traitements.
C’est un cas qu’on peut déjà coder en utilisant un Collector, mais si on préfère avoir un Stream en sortie, il faudra recréer un stream à partir de la collection générée par le collecteur. Nous allons donc faire le travail avec un Gatherer.
/** Les données d'entrée **/
record ItemPrice(String item, double priceInEuros, String shop) {}
/** Les données en sortie **/
record ItemMeanPrice(String item, double meanPrice) {}
class MeanPriceGatherer implements
Gatherer<ItemPrice, Map<String, MeanPriceGatherer.CumulativeSumCount>, ItemMeanPrice> {
/** Notre état interne consiste à stocker, pour chaque item, la somme et le nombre
des prix rencontrés. */
public Supplier<Map<String, CumulativeSumCount>> initializer() {
return HashMap::new;
}
/**
* Pour chaque item, on ajoute le prix à notre état cumulé et on augmente de 1 le nombre
* d'occurrences de cet item.
*
* Ici on n'émet pas d'éléments dans le `downstream` car il faut attendre d'avoir toutes
* les données. C'est le finisher qui va s'en charger.
*/
public Integrator<Map<String, CumulativeSumCount>, ItemPrice, ItemMeanPrice> integrator() {
return ((state, element, downstream) -> {
state.merge(
element.item,
new CumulativeSumCount(element.priceInEuros, 1),
CumulativeSumCount::add
);
return true;
});
}
/**
* Pour que le gatherer puisse fonctionner en parallèle, il faut pouvoir combiner les états.
* Ici, pour chaque item rencontré dans l'une des instances, on l'ajoute aux occurences
* de l'autre.
*/
public BinaryOperator<Map<String, CumulativeSumCount>> combiner() {
return (state1, state2) -> {
state2.forEach((key, value) ->
state1.merge(key, value, CumulativeSumCount::add)
);
return state1;
};
}
/**
* Le finisher est appelé une fois que tous les états des opérations parallèles ont été
* fusionné par le combiner. Il ne reste plus qu'à itérer sur les sommes, calculer la
* moyenne et les émettre downstream.
*/
public BiConsumer<Map<String, CumulativeSumCount>, Downstream<? super ItemMeanPrice>> finisher() {
return (state, downstream) ->
state.forEach(
(itemName, sumCount) ->
downstream.push(new ItemMeanPrice(itemName, sumCount.getMean()))
);
}
record CumulativeSumCount(double sum, int count) {
public CumulativeSumCount add(CumulativeSumCount cumulativeSumCount) {
return new CumulativeSumCount(
sum + cumulativeSumCount.sum,
count + cumulativeSumCount.count
);
}
public double getMean() {
return sum / count;
}
}
}
Encore une fois, la démo complète est sur GitLab
Conclusion
L’interface Gatherer
n’est pas révolutionnaire, mais elle comble un réel manque dans les streams java. Tous les développeurs qui utilisent les streams de manière intense ont déjà eu à convertir un stream en collection pour effectuer une action et reconvertir aussitôt le résultat en stream. Les gatherers répondent à ce besoin tout en permettant dans certains cas d’augmenter la lisibilité du code, voire les performances.
Pour plus de détails, il est toujours intéressant de lire la Javadoc, et la JEP 485 qui introduit cette fonctionnalité.