Stream et Gatherers en Java 23 (1/2)

Une nouvelle méthode dans l’api Stream ?

Les récentes versions de Java introduisent une nouvelle fonctionnalité en preview dans l’api Stream : la notion de Gatherer. Pour mieux comprendre comment s’intègre cette évolution, il convient de faire une petite synthèse de cette api Stream.

Tout d’abord, rappelons qu’un Stream vise à constituer un pipeline d’opérations. On peut faire l’analogie avec la chaîne de traitement de l’eau. Le Stream ne stocke pas mais correspond plutôt à la tuyauterie et aux différentes machines intermédiaires traitant et amenant l’eau jusqu’au robinet. Le pipeline est ainsi constitué d’une source, d’opérations intermédiaires (optionnelles) et d’une opération terminale, ce qu’on peut schématiser par :

    source => intermédiaire => … => intermédiaire => terminale

L’analogie est intéressante car elle illustre la caractéristique “lazy” des Stream : l’eau ne s’écoule que si on ouvre le robinet tout au bout (mais, comme toute analogie, elle a ses limites).

L’API est conçue pour mettre en œuvre les principes du filter-map-reduce qu’on peut mettre en évidence avec un exemple simple :

int s = Stream
       .of(2, 5, 9, 12, 17)       // créer un Stream de 5 Integer
       .filter(n -> n % 2 == 0)   // garder les multiples de 2
       .map(n -> n * n)           // les élever au carré
       //.reduce(0, (sum, n) -> sum + n); // et les additionner
       .reduce(0, Integer::sum);  // avec une référence de méthode
  System.out.println(s);

Ce qui peut s’écrire un peu plus simplement et efficacement avec un IntStream :

 int s = IntStream.of(2, 5, 9, 12, 17) // IntStream (plus efficace)
       .filter(n -> n % 2 == 0)
       .map(n -> n * n)
       .sum();         // spécifique aux Streams primitifs
  System.out.println(s);

Nous avons là maintenant les bases nécessaires pour cartographier la cinquantaine d’opérations proposées par l’interface Stream (et sa super-interface BaseStream).

  • des méthodes de construction d’un Stream à partir de différentes sources
  • des opérations intermédiaires de filtre (filter) et de transformation (map) des éléments, qui retournent donc elles-mêmes un Stream
  • des opérations terminales de réduction qui consolident un résultat ou produisent un effet de bord

Création de Stream (source)

Ces méthodes statiques permettent de créer un Stream et ainsi de constituer la source du pipeline avec un nombre variable d’éléments prédéfinis, en ignorant null, en utilisant un pattern builder, à partir d’une fonction génératrice ou par itération ou encore en concaténant deux Streams.

  • empty() : un stream vide
  • of(T) : un stream avec un élément
  • of(T…) : un stream à partir d’une série d’élément
  • ofNullable(T) : un stream vide ou avec un élément s’il est non null
  • builder() : un stream construit via un pattern builder
  • generate(Supplier<? extends T>) : un stream générant chaque élément
  • iterate(T, UnaryOperator<T>) : un stream itérant sur une valeur avec un opérateur
  • iterate(T, Predicate<? super T>, UnaryOperator<T>) : idem avec un condition d’arrêt
  • concat(Stream<? extends T>, Stream<? extends T>) : enchaînement de deux streams

Il apparaît ici une autre caractéristique importante des streams : la capacité de représenter un flux potentiellement infini d’éléments, par exemple avec :

  Stream.generate(() -> Math.random())

Ou plus simplement avec une référence de méthode :

  Stream.generate(Math::random)

N’oublions pas les autres méthodes de création disséminées dans le JDK :

  • Collection.stream() et parallelStream() : stream avec les éléments de la collection
  • Map (et Properties) : entrySet().stream()keySet().stream()values().stream()
  • Arrays.stream(T[])stream(T[], int, int) et autres tableaux primitifs
  • String.chars() et codePoints() : IntStream des caractères/codePoints de la string
  • Optional.stream() : stream vide ou avec l’élément du Optional
  • Random.ints()doubles()longs() : stream infinis de valeurs aléatoires
  • Files.lines(Path) : stream des lignes d’un fichier
  • Files.walk(Path)list(Path)find(…) : stream des Path d’une arborescence de fichiers
  • Scanner.tokens() : stream des tokens d’un Scanner 
  • Pattern.splitAsStream(CharSequence) : stream obtenu en découpant selon un pattern
  • BufferedReader.lines() : stream des lignes d’un BufferedReader
  • ZipFile.stream() : stream des ZipEntry d’un fichier zip

Et pour être complet, deux méthodes spécifiques dans les IntStream et LongStream primitifs :

  • range(int,int) : stream des valeurs d’un intervalle, borne supérieure exclue
  • rangeClosed(int,int) : idem avec borne supérieure inclue

Opérations intermédiaires de filtre (filter)

Les opérations intermédiaires retournent un Stream et s’insèrent ainsi au milieu du pipeline. Celles qui nous intéressent ici éliminent des éléments, soit selon une condition (Predicate) stateless sur ces éléments, soit sur une condition interne stateful.

  • filter(Predicate<? super T>) : garde les éléments qui satisfont le prédicat
  • takeWhile(Predicate<? super T>) : garde les éléments tant que le prédicat est satisfait
  • dropWhile(Predicate<? super T>) : élimine les éléments tant que le prédicat est satisfait
  • skip(long) : élimine les premiers éléments
  • limit(long) : limite le nombre d’éléments (élimine les derniers éléments)
  • distinct() : élimine les doublons
  • sorted() : remet les éléments dans l’ordre naturel (du plus petit au plus grand)
  • sorted(Comparator<? super T>) : idem selon un Comparator
  • peek(Consumer<? super T>) : laisse passer les éléments en permettant un traitement

On peut ajouter ici les trois opérations intermédiaires qui changent le traitement du stream lui-même, bien que ces opérations pourraient être classées dans la catégorie suivante ou même à part : 

  • parallel() : autorise un traitement en parallèle des éléments
  • sequential() : réclame un traitement en séquence des éléments
  • unordered() : autorise à ne pas traiter les éléments dans l’ordre

Opérations intermédiaires de transformation (map)

Cette seconde catégorie d’opérations intermédiaires consiste à transformer les éléments du stream. Notons ici deux aspects compliquant un peu l’api : l’optimisation consistant à proposer des streams primitifs (IntStreamDoubleStream et LongStream) et la gestion d’une transformation qui ne serait pas 1 pour 1. Dans ce dernier cas, chaque élément peut donner plusieurs (ou aucun) éléments, soit sous forme d’un Stream, soit par un traitement itératif. Les signatures sont ici simplifiées par souci de lisibilité.

  • map(Function) : transforme chaque élément (1 pour 1)
  • mapToDouble(ToDoubleFunction) : transforme l’élément vers un double
  • mapToInt(ToIntFunction) : transforme l’élément vers un int
  • mapToLong(ToLongFunction) : transforme l’élément vers un long
  • mapToObj(xFunction) : pour stream primitif, transforme la valeur en un objet
  • boxed() : idem mais transforme simplement la valeur en son type wrapper
  • asDoubleStream() : pour stream primitif, convertit simplement le type
  • asLongStream() : idem
  • flatMap(Function) : transforme 1 vers n, sous forme d’un Stream
  • flatMapToDouble(Function) : idem vers double
  • flatMapToInt(Function) : idem vers int
  • flatMapToLong(Function) : idem vers long
  • mapMulti(BiConsumer) : similaire à flatMap sous forme impérative (voir ci-après)
  • mapMultiToDouble(BiConsumer) : idem vers double
  • mapMultiToInt(BiConsumer) : idem vers int
  • mapMultiToLong(BiConsumer) : idem vers long
  • gather(Gatherer<? super T, ?, R>) : la nouveauté qui nous intéresse, transforme n vers 1

Les flatMap et mapMulti méritent un peu plus de détails pour être compréhensibles avant d’introduire le gather apparu en preview. 

Dans le cas du flatMap, chaque élément est transformé en plusieurs éléments qui vont être transmis l’un après l’autre dans le pipeline. La fonction de transformation doit représenter ces éléments et le choix initial de l’api était de naturellement les représenter par un Stream, ce qui peut parfois être contraignant à construire. Voyons cela sur une exemple simple où chaque élément est dupliqué dans le Stream :

   var s = Stream
       .of(2, 5, 9, 12, 17)
       .flatMap(n -> Stream.of(n, n))  // produit deux fois n
       .toList();
   System.out.println(s);  // [2, 2, 5, 5, 9, 9, 12, 12, 17, 17]

Évidemment, ici ce n’est pas très compliqué de fabriquer un stream avec deux occurrences de la valeur mais d’autres situations peuvent être plus complexes. C’est pourquoi Java 16 a introduit le mapMulti où l’idée est de proposer une approche plus impérative pour laisser la transformation pousser elle-même les éléments dans le pipeline, via un consumer reçu en paramètre.

   var s = Stream
       .of(2, 5, 9, 12, 17)
       .<Integer> mapMulti((n, c) -> { // noter le type <Integer>
                   c.accept(n);  // pousse 2 fois n
                   c.accept(n);  // dans le pipeline
               }
       )
       .toList();
   System.out.println(s);  // [2, 2, 5, 5, 9, 9, 12, 12, 17, 17]

Cette notion de transformation 1 -> 1 et 1 -> n étant maintenant clairement explicitée, le manque pour une transformation n -> 1 est immédiat. C’est basiquement là ce que propose l’opération gather() en preview, mais terminons d’abord notre synthèse de l’api.

Opérations de réduction

Ce type d’opération est terminal. C’est le robinet qui va déclencher l’écoulement des valeurs dans le pipeline. Réfléchissons un peu avant de lister les différentes opérations proposées. Dans ce type d’opération, qui finalise le pipeline, nous allons recevoir 0, 1 ou plusieurs éléments, éventuellement rescapés des filtres et/ou transformés. Nous pouvons souhaiter obtenir une unique valeur consolidée, comme le nombre d’éléments rescapés ou leur somme ou encore un quelconque booléen (count()sum()allMatch()…), un élément particulier parmi ces rescapés (findFirst()max()…), une collection de valeurs (toList()…) ou simplement traiter ces éléments de manière impérative (forEach()…). Il existe de nombreuses manières d’effectuer ces opérations de réduction. Cependant, ces différentes manières se résument à deux options principales : la réduction stateless sur la base du reduce() et la réduction stateful sur la base du collect().

RÉDUCTION STATELESS :

  • reduce(BinaryOperator) : réduit à une valeur Optional par accumulation
  • reduce(T, BinaryOperator) : idem mais pas un Optional grâce à une valeur initiale
  • reduce(U, BiFunction, BinaryOperator) : réduit à l’aide d’un combiner
  • count() : réduit au nombre d’éléments
  • sum() : pour les streams primitifs, réduit à la somme des éléments
  • average() : pour les streams primitifs, réduit à la moyenne arithmétique des éléments
  • summaryStatistics() : pour les streams primitifs, réduit à un résumé des statistiques
  • allMatch(Predicate) : réduit à true si tous les éléments satisfont le prédicat
  • anyMatch(Predicate) : réduit à true si un élément au moins satisfait le prédicat
  • noneMatch(Predicate) : réduit à true si aucun élément ne satisfait le prédicat
  • findAny() : retourne un élément quelconque (Optional)
  • findFirst() : retourne le premier élément (Optional)
  • min(Comparator) : retourne le plus petit (Optional) selon un comparator
  • min() : pour les streams primitifs, retourne le plus petit élément (Optional)
  • max(Comparator) : retourne le plus grand (Optional) selon un comparator
  • max() : pour les streams primitifs, retourne le plus grand (Optional)

RÉDUCTION STATEFUL :

  • collect(Supplier, BiConsumer, BiConsumer) : réduit dans un conteneur mutable
  • collect(Collector) : idem sur la base l’interface Collector
  • toList() : réduit dans une liste
  • toArray() : réduit dans un tableau d’Object
  • toArray(IntFunction) : réduit en un tableau créé par la fonction passée en paramètre

Pour être exhaustif, il faut citer quelques opérations complémentaires un peu à part :

  • forEach(Consumer) : réduire à un effet de bord
  • forEachOrdered(Consumer) : idem mais garantit l’ordre des éléments
  • iterator() : retourne un Iterator pour compatibilité avec un traitement existant
  • spliterator() : base de fonctionnement en parallèle des streams
  • close() : Stream hérite indirectement de AutoClosable
  • onClose(Runnable) : peut servir à finaliser dans certains cas

Pour comprendre l’introduction en cours des Gatherer dans cette API, il faut creuser un peu plus cette partie reduce/collect et quelques caractéristiques internes des Stream.

La javadoc indique que les méthodes Stream.collect() permettent d’effectuer une “réduction mutable”, càd de consolider le résultat dans un objet mutable comme une ArrayList ou un StringBuilder, là où reduce() propose d’effectuer une réduction sur la base d’une “fonction d’accumulation associative”. Noter l’utilisation de BiConsumer en paramètre du collect() et de BinaryOperator en paramètre du reduce().

Ainsi, l’addition d’entiers, utilisée dans l’exemple précédent, est associative et permet de regrouper les entiers à additionner de différentes manières :

   2 + 5 + 9 + 12 + 17

peut aussi s’écrire :

   (2 + 5) + (9 + 12 + 17)

et se calculer :

   (7) + (21 + 17)

puis :

   (7) + (38)

et finalement :

   (45)

C’est d’ailleur ce qui se passe avec le spliterator utilisé par le stream parallèle et son Fork/Join pool :

Spliterator<Integer> sp = Stream.of(2, 5, 9, 12, 17).spliterator();
System.out.println("nb:" + sp.getExactSizeIfKnown());     // affiche 5


Spliterator<Integer> sp2 = sp.trySplit();
System.out.println("nb:" + sp2.getExactSizeIfKnown());    // affiche 2
sp2.forEachRemaining(System.out::println);         // affiche 2 puis 5
System.out.println("===");


System.out.println("nb:" + sp.getExactSizeIfKnown());     // affiche 3      
sp.forEachRemaining(System.out::println); // affiche 9 puis 12 puis 17
System.out.println("===");

Mettons cela en évidence avec l’exemple suivant (noter l’utilisation de parallel()) :

Optional<Integer> result = Stream
       .of(2, 5, 9, 12, 17)
       .parallel()
       .reduce((a, b) -> {
                   System.out.println("reduce " + a + " " + b);
                   return a+b;
               }
       );
System.out.println(result);
//produit, selon l'exécution, le résultat suivant :
//reduce 12 17
//reduce 9 29
//reduce 2 5
//reduce 7 38
//Optional[45]

Le même exemple avec une valeur initiale est un peu différent et élimine le Optional :

Integer result = Stream
       .of(2, 5, 9, 12, 17)
       .parallel()
       .reduce(0,
               (a, b) -> {
                   System.out.println("reduce " + a + " " + b);
                   return a+b;
               }
       );
System.out.println(result);
//produit, selon l'exécution, le résultat suivant :
//reduce 0 9
//reduce 0 2
//reduce 0 17
//reduce 0 12
//reduce 0 5
//reduce 12 17
//reduce 2 5
//reduce 9 29
//reduce 7 38
//45

La situation est un peu différente avec une réduction mutable car l’accumulation se fait alors dans un objet qui va donc changer d’état.

List<Integer> resultList = Stream.of(2, 5, 9, 12, 17)
       .collect(
               ArrayList::new,     // Supplier de l'état mutable
               ArrayList::add,     // BiConsumer d'accumulation
               ArrayList::addAll); // BiConsumer de combinaison
// produit la liste [2, 5, 9, 12, 17]


String resultString = Stream.of(2, 5, 9, 12, 17)
       .collect(
               StringBuilder::new,
               StringBuilder::append,
               StringBuilder::append)
       .toString();
// produit la chaîne "2591217"

Décomposons avec notre propre classe d’état mutable et quelques traces :

class EtatMutable {
   private int somme;


   EtatMutable() {
       System.out.println("création d'un EtatMutable");
   }


   void accumuler(int valeur) {
       System.out.println("accumule " + valeur);
       this.somme += valeur;
   }


   void combiner(EtatMutable autreEtat) {
       System.out.println("combine " + somme 
                        + " avec " + autreEtat.somme);
       this.somme += autreEtat.somme;
   }


   int somme() {
       return somme;
   }
};


int s = Stream.of(2, 5, 9, 12, 17)
       .collect(EtatMutable::new,      // supplier
               EtatMutable::accumuler, // accumulator
               EtatMutable::combiner)  // combiner
       .somme();                       // finisher
System.out.println(s);
//création d'un EtatMutable
//accumule 2
//accumule 5
//accumule 9
//accumule 12
//accumule 17
//45

Noter que dans ces 3 exemples précédents, l’état mutable n’est pas particulièrement thread-safe, ce qui est cependant supporté par le mécanisme de parallélisation comme indiqué dans la javadoc du Stream.collect() :

Like reduce(Object, BinaryOperator), collect operations can be parallelized without requiring additional synchronization. 

En effet, les traces de l’exécution en mode parallèle montrent la création de nombreuses instances d’EtatMutable, traitées séparément et non en concurrence.

int s = Stream.of(2, 5, 9, 12, 17)
       .parallel()
       .collect(EtatMutable::new,      // supplier
               EtatMutable::accumuler, // accumulator
               EtatMutable::combiner)  // combiner
       .somme();                       // finisher
System.out.println(s);
//création d'un EtatMutable
//création d'un EtatMutable
//création d'un EtatMutable
//création d'un EtatMutable
//création d'un EtatMutable
//accumule 9
//accumule 5
//accumule 17
//accumule 2
//accumule 12
//combine 12 avec 17
//combine 9 avec 29
//combine 2 avec 5
//combine 7 avec 38
//45

En résumé, un Collector est constitué de 4 parties distinctes que l’on retrouve dans l’interface Collector :

  • un supplier d’état mutable
  • un accumulator recevant un état et un élément à accumuler dans cet état
  • un combiner utilisé en cas de parallélisation pour combiner deux états
  • un finisher permettant d’extraire le résultat consolidé

Cette méthode collect() offre ainsi un point d’extension important dans l’API pour effectuer des réductions mutables standards et personnalisées. Voir la classe utilitaire Collectors pour la liste complète des collector standards proposés et l’interface Collector, en particulier ses deux méthodes of().

Quid des Gatherers ? 

La suite de cet article ici

par Jean-François Lefevre
Consultant – Formateur chez OXiane Institut