Stream et Gatherers en Java 23 (2/2)

Pour découvrir la première partie de cet article : Stream et Gatherers en Java 23 (1/2)

Oui mais … et les Gatherers ?

C’est ici que les Gatherers prennent toute leur signification. Jusqu’à présent, il n’y avait pas de moyen dans l’API pour effectuer des opérations intermédiaires mutables (stateful) personnalisées.

Les opérations intermédiaires personnalisables telles que filter() et map() et leurs dérivées peuvent être paramétrées par des Predicat ou des Function qui doivent obligatoirement être “stateless” (et “non-interfering”, c’est-à-dire ne provoquant pas d’interférence dans la source du stream pendant l’exécution du pipeline).

Seules les opérations intermédiaires standards comme distinct(), limit()… sont “stateful”.

Par conséquent il est impossible de filtrer ou de mapper selon, par exemple, les éléments déjà traités ou tout simplement selon un état mémorisé (nombre d’éléments traités), ce que nous avons déjà remarqué plus tôt en constatant qu’il est prévu de transformer (map) sous forme 1 -> 1 ou 1 -> n mais pas n -> 1, sans parler de n -> m.

L’évolution en cours consiste donc à remédier à cette injustice en proposant un point d’extension intermédiaire à l’image du point d’extension collect(). L’objectif est ainsi d’offrir la possibilité d’ajouter des étapes intermédiaires personnalisées, mutables ou non, autorisant à filtrer, transformer, réordonner, regrouper les éléments. Il deviendra ainsi possible de combiner les éléments, de les traiter en se souvenant du précédent, de tous les précédents, de les cumuler, de les consolider, avant de propager le ou les résultats dans la suite du pipeline.

L’API proposée actuellement consiste en :

  • une méthode supplémentaire intermédiaire gather(Gatherer),
  • une interface Gatherer
  • une classe utilitaires Gatherers proposant des implémentations standards

Comme pour Collector, un Gatherer est défini par 4 fonctions :

  • un “initializer” (optionnel), fournissant un état mutable
  • un “integrator”, un peu équivalent à l’accumulator du Collector
  • un “combiner” (optionnel) pour combiner deux états intermédiaires dans le cas de la parallélisation
  • un “finisher” (optionnel) déclenché à la fin du traitement

Commençons par tester les Gatherer standards pour avoir un aperçu des possibilités à venir. Noter que pour tester ces exemples, il faudra utiliser un JDK 23 et le mode –enable-preview.

Le Gatherers.windowFixed(int) offre un regroupement des items en lots, ici 2 par 2 :

List<List<Integer>> paires = Stream.of(2, 5, 9, 12, 17)
       .gather(Gatherers.windowFixed(2))
       .toList();
System.out.println(paires);
// [[2, 5], [9, 12], [17]]

Le point intéressant est ici que l’opération gather() est intermédiaire, ce qui permet d’enchainer avec un toList() ou avec n’importe quelle(s) autre(s) opération(s) intermédiaires manipulant les paires ainsi obtenues.

List<Integer> unSurDeux = Stream.of(2, 5, 9, 12, 17)
       .gather(Gatherers.windowFixed(2))
       .map(p -> p.getFirst())
       .toList();
System.out.println(unSurDeux);
// [2, 9, 17]

Le Gatherers.windowsSliding(int) est similaire mais avec une fenêtre glissante :

List<List<Integer>> pairesGlissantes = Stream.of(2, 5, 9, 12, 17)
       .gather(Gatherers.windowSliding(2))
       .toList();
// [[2, 5], [5, 9], [9, 12], [12, 17]]

Le Gatherers.fold(Supplier, BiFunction) opère une forme de réduction ordonnée :

List<String> chaine = Stream.of(2, 5, 9, 12, 17)
       .gather(Gatherers.fold(
               () -> "",
               (str, n) -> (str.isEmpty()
                       ? str + n
                       : str + " -> " + n)))
       .toList();
// [2 -> 5 -> 9 -> 12 -> 17]

Le Gatherers.scan(Supplier, BiFunction) permet un cumul incrémental :

List<Integer> cumul = Stream.of(2, 5, 9, 12, 17)
       .gather(Gatherers.scan(
               () -> 0,
               (sum, n) -> sum + n)) //ou Integer::sum
       .toList();
//[2, 7, 16, 28, 45]

Enfin, le Gatherers.mapConcurrent(int, Function) permet de paralléliser une fonction en maîtrisant le niveau de concurrence :

List<String> concurrent = Stream.of(2, 5, 9, 12, 17)
        .gather(Gatherers.mapConcurrent(2, //2 au plus en parallèle
                n -> {
                    String r = "" + n + "(";
                    try {
                        r += (System.currentTimeMillis() - start);
                        TimeUnit.MILLISECONDS.sleep(500);
                        r += ","
                           + (System.currentTimeMillis() - start);
                    } catch (InterruptedException e) {}
                    return r + ")";
                })
        ).toList();
System.out.println(concurrent);
//[2(12,527), 5(12,527), 9(527,1029), 12(527,1029), 17(1029,1542)]
//  \___en parallèle__/  \______en parallèle_____/  \___à part__/

Essayons d’expérimenter nos propres Gatherer personnalisés afin de mieux cerner les subtilités.

Un premier Gatherer simpliste permet de comprendre la notion d’Integrator, similaire à l’accumulator du Collector, et consiste à faire passe-plat. Il n’y a pas d’état ni de finisher, les éléments sont simplement passés au downstream au fur et à mesure.

Cette fonction d’intégration prend 3 paramètres : un state (ici inutilisé), un élément à intégrer et le downstream (le stream en sortie de l’opération gather()).

List<Integer> result = Stream.of(2, 5, 9, 12, 17)
       .<Integer> gather(Gatherer.of(
               (_, element, downstream) -> downstream.push(element)))
       .toList();
System.out.println(result);
//[2, 5, 9, 12, 17]

Cette fonction passe-plat consiste donc à pousser l’élément reçu vers le downstream. C’est finalement équivalent au mapMulti() réalisé plus tôt (avec la même nécessité de typage explicite) et pousser deux fois chaque élément serait aussi simple que :

List<Integer> result2 = Stream.of(2, 5, 9, 12, 17)
       .<Integer> gather(Gatherer.of(
               (_, element, downstream) -> {
                   downstream.push(element);
                   downstream.push(element);
                   return true;
               }))
       .toList();
System.out.println(result2);
//[2, 2, 5, 5, 9, 9, 12, 12, 17, 17]

La seule subtilité ici est la nécessité de retourner un booléen indiquant si d’autres éléments peuvent suivre. Retourner false opérera un “short-circuit” comme le ferait un findFirst().

Introduisons un état mutable afin de traiter un élément sur deux. Un simple tableau avec un int peut suffire mais une nouvelle subtilité apparaît : l’absence de “combiner” nous impose d’utiliser ofSequential() :

List<Integer> unSurDeux = Stream.of(2, 5, 9, 12, 17)
       .<Integer> gather(Gatherer.ofSequential( // ofSequential !
               () -> new int[1],                // supplier de l'état
               (state, element, downstream) -> {
                   if (state[0]++ % 2 == 0) {   // teste/modifie l'état
                       downstream.push(element);// un élément sur deux
                   }
                   return true;     // d'autres valeurs à suivre
               }))
       .toList();
System.out.println(unSurDeux);
// [2, 9, 17]

Dans le Gatherer personnalisé suivant, on regroupe les éléments en lots, en découpant cette fois selon une condition. L’état est une ArrayList accumulant les items.

List<List<Integer>> lots = Stream.of(2, 5, 9, 12, 17)
       .<List<Integer>> gather(Gatherer.ofSequential(
               ArrayList::new,          // état accumulant les éléments
               (state, element, downstream) -> {
                   state.add(element);     // accumulation des éléments
                   if (element % 3 == 0) { // jusqu'à un multiple de 3
                       List<Integer> copy = new ArrayList(state);
                       downstream.push(copy); // émission du lot
                       state.clear();         // vidange de l'état
                   }
                   return true;         // poursuivre le traitement
               }))
       .toList();
System.out.println(lots);
// [[2, 5, 9], [12]]
// les éléments sont bien regroupés jusqu'à un multiple de 3
// mais il manque 17

On remarque immédiatement que les derniers éléments ne sont pas émis. C’est là l’intérêt du “finisher” qui sera déclenché lorsque le traitement sera terminé.

List<List<Integer>> lotsComplets = Stream.of(2, 5, 9, 12, 17)
       .<List<Integer>> gather(Gatherer.ofSequential(
               ArrayList::new,     // état accumulant les éléments
               (state, element, downstream) -> {
                   state.add(element);     // accumulation des éléments
                   if (element % 3 == 0) { // jusqu'à un multiple de 3
                       List<Integer> copy = new ArrayList(state);
                       downstream.push(copy); // émission du lot
                       state.clear();         // vidange de l'état
                   }
                   return true;        // poursuivre le traitement
               },
               (state, downstream) -> { // finisher
                       if (state.size() > 0) {// pousse le dernier lot
                           List<Integer> copy = new ArrayList(state);
                           downstream.push(copy);
                       }
               }))
       .toList();
System.out.println(lotsComplets);
// [[2, 5, 9], [12], [17]]
// le dernier lot est bien généré

Il nous reste à explorer l’utilisation d’un Gatherer avec un “combiner”, ici encore nécessaire lors de la parallélisation.

Tentons un Gatherer permettant de produire les paires ordonnées des éléments soumis en entrée du stream. Ainsi, 2, 5, 9 devrait produire (2,5), (5,2) puis (2,9), (9,2) et finalement (5, 9), (9,5). Remarquons qu’il n’est pas nécessaire d’attendre d’avoir tous les éléments pour commencer à produire des paires mais il faudra mémoriser les éléments au passage pour être sûr de produire toutes les paires, d’où la nécessité d’un état mutable.

Un premier traitement très proche de l’exemple précédent nous donne :

List<List<Integer>> paires = Stream.of(2, 5, 9, 12, 17)
       .<List<Integer>>gather(Gatherer.ofSequential(
               () -> new ArrayList<Integer>(), // accumule les éléments
               (state, element, downstream) -> {
                   state.forEach(o -> {
                       downstream.push(List.of(element, o));
                       downstream.push(List.of(o, element));
                   });
                   state.add(element);  // mémorise l’élément
                   return true;         // éléments à suivre
               }))
       .toList();
System.out.println(paires);
//[[5, 2], [2, 5], [9, 2], [2, 9], [9, 5], [5, 9], [12, 2], [2, 12], [12, 5], [5, 12], [12, 9],
//[9, 12], [17, 2], [2, 17], [17, 5], [5, 17], [17, 9], [9, 17], [17, 12], [12, 17]]

Essayons maintenant de le transformer pour ajouter un “combiner” et ainsi tenter d’améliorer la parallélisation. Plusieurs subtilités se présentent à nouveau. Le “combiner” ne dispose pas du downstream, il faut donc accumuler les paires à produire pour les générer plus tard, lors du “finisher”. Introduisons une classe MutableState. Elle servira à mémoriser les éléments rencontrés et les paires déjà produites.

class MutableState {
   private List<Integer> elements = new ArrayList<>();
   private List<List<Integer>> paires = new ArrayList<>();
   MutableState() {
       System.out.println("create MutableState()");
   }


   @Override
   public String toString() {
       return "{" + elements + ", " + paires + "}";
   }
}

On peut alors obtenir le traitement suivant (après quelques tâtonnements, il faut bien le dire) :

List<List<Integer>> pairesParalleles = Stream.of(2, 5, 9, 12, 17)
       .parallel()
       .<List<Integer>>gather(Gatherer.of( // of pour combiner
               MutableState::new,          // accumule les éléments
               (state, element, downstream) -> {
                   System.out.println("integrate " + element);
                   state.elements.forEach(o -> {
                       System.out.println("generate paire "
                                               + element + " " + o);
                       downstream.push(List.of(element, o));
                       downstream.push(List.of(o, element));
                   });
                   state.elements.add(element);   // éléments accumulés
                   return true;       // poursuivre le traitement
               },
               (state1, state2) -> {
                   System.out.println("combine" 
                                            + state1 + " " + state2);
                   state1.elements.forEach(o1 -> {
                       state2.elements.forEach(o2 -> {
                           System.out.println("accumulate paires " 
                                                   + o1 + " " + o2);
                           state1.paires.add(List.of(o1, o2));
                           state1.paires.add(List.of(o2, o1));
                       });
                   });
                   state1.elements.addAll(state2.elements);
                   state1.paires.addAll(state2.paires);
                   return state1;
               },
               (state, downstream) -> {
                       System.out.println("finish " + state);
                       state.paires.forEach(p -> {
                           System.out.println(
                                   "produce finishing paire " + p);
                           downstream.push(p);
                       });
               }))
       .toList();
System.out.println(pairesParalleles);
//create MutableState()
//create MutableState()
//create MutableState()
//create MutableState()
//create MutableState()
//integrate 9
//integrate 17
//integrate 5
//integrate 2
//integrate 12
//combine{[2], []} {[5], []}
//combine{[12], []} {[17], []}
//accumulate paires 2 5
//accumulate paires 12 17
//combine{[9], []} {[12, 17], [[12, 17], [17, 12]]}
//accumulate paires 9 12
//accumulate paires 9 17
//combine{[2, 5], [[2, 5], [5, 2]]} {[9, 12, 17], [[9, 12], [12, 9], [9, 17], [17, 9], [12, 17],
//[17, 12]]}
//accumulate paires 2 9
//accumulate paires 2 12
//accumulate paires 2 17
//accumulate paires 5 9
//accumulate paires 5 12
//accumulate paires 5 17
//finish {[2, 5, 9, 12, 17], [[2, 5], [5, 2], [2, 9], [9, 2], [2, 12], [12, 2], [2, 17],
//[17, 2],[5, 9], [9, 5], [5, 12], [12, 5], [5, 17], [17, 5], [9, 12], [12, 9], [9, 17],
//[17, 9], [12, 17], [17, 12]]}
//produce finishing paire [2, 5]
//produce finishing paire [5, 2]
//produce finishing paire [2, 9]
//produce finishing paire [9, 2]
//produce finishing paire [2, 12]
//produce finishing paire [12, 2]
//produce finishing paire [2, 17]
//produce finishing paire [17, 2]
//produce finishing paire [5, 9]
//produce finishing paire [9, 5]
//produce finishing paire [5, 12]
//produce finishing paire [12, 5]
//produce finishing paire [5, 17]
//produce finishing paire [17, 5]
//produce finishing paire [9, 12]
//produce finishing paire [12, 9]
//produce finishing paire [9, 17]
//produce finishing paire [17, 9]
//produce finishing paire [12, 17]
//produce finishing paire [17, 12]
//[[2, 5], [5, 2], [2, 9], [9, 2], [2, 12], [12, 2], [2, 17], [17, 2], [5, 9], [9, 5],
//[5, 12], [12, 5], [5, 17], [17, 5], [9, 12], [12, 9], [9, 17], [17, 9], [12, 17], [17, 12]]

L’exécution est complètement différente selon l’utilisation de parallel() ou non. Sans le parallel(), le “combiner” n’est pas utilisé. Un seul MutableState est créé et les paires sont générées au fur et à mesure comme précédemment. L’utilisation de parallel() produit de nombreux MutableState décomposant le traitement et accumulant les paires. Cette fois les paires sont produites par le “finisher”.

En conclusion

Nous avons organisé les différentes opérations proposées par l’API Stream afin de mieux faire apparaître sa logique filter – map – reduce. La partie map() nous a permis de voir une première motivation pour la notion de Gatherer : l’impossibilité de transformer plusieurs éléments (n -> 1).

En approfondissant, la notion de réduction, nous avons mis en évidence la distinction entre réduction stateless et réduction mutable. Cette dernière, matérialisée par l’opération collect() et la notion de Collector, constitue le point d’extension permettant de créer nos propres réductions mutables personnalisées. Il manquait un point d’extension similaire pour effectuer des filter/map mutables personnalisés et c’est précisément ce point d’extension que constitue l’opération gather() et sa notion de Gatherer.

Il ne reste plus qu’à se poser la question fondamentale, celle qui aurait pu servir d’introduction à cet article : ça veut dire quoi Gatherer ? Une petite recherche web nous indique la traduction “Cueilleur”, à rapprocher du Collector, il devient clair : l’api stream permet de “cueillir” des éléments avant de les envoyer vers un collecteur.

par Jean-François Lefevre
Consultant – Formateur chez OXiane Institut


Contactez-nous pour échanger sur ce sujet !