Rethinking Java Streams: Gatherer for more control and parallelism
Since version 8, Java has introduced an elegant, functional approach to processing data sets with the Streams API. The terminal operation collect(…) represents the bridge from the stream to a targeted aggregation – in the form of lists, maps, strings or more complex data structures. Until Java 20 the processing was done Collector-Instances were regulated, which internally consisted of a supplier, an accumulator, a combiner and optionally a finisher. This model works well for simple accumulations but has noticeable limitations, particularly for complex, stateful, or conditional aggregations.
- The semantics of gatherers
- Why Gatherers are more than just a “better collector”
- Integration with Streams API
- Sequentieller Gatherer
- Initialisation: The state of the gatherer
- Accumulation: Processing the input elements
- Emission: The control over the output
- Signature and purpose of the finisher
- Concrete example: chunking with remainder
- Interaction with accumulation
- No return – but effect through push()
- Parallel Gatherer
- Initializer – The creation of the accumulator
- Integrator – The processing of elements
- Combiner – The combination of partial accumulators
- Finisher – The transformation of the accumulator into the final result
- Interaction in parallel gatherers
- An example implementation
- Initialiser:
- Integrator:
- Combiner:
- entries state1 insert
- entries state2 insert
- Finisher:
Java 21 was the new interface java.util.stream. The gatherer was introduced, significantly expanding the semantics and control over the accumulation process. A Collector passively collects data, acts as a Gatherer, and actively responds to the incoming elements, which is comparable to a specialised transducer in functional programming languages. Gatherers are particularly useful where procedural or stateful aggregation is necessary, and they also allow element insertion, filtering, skipping, and explicit termination of the gathering process – all within the framework of a functional composable architecture.
The semantics of gatherers
A Gatherer<T, R> describes the transformation of one Stream<T> into a result of type R under close control over the accumulation process. In contrast to Collector, which is in a sense a container for aggregation logic, the gatherer allows rule-based, state-dependent processing of inputs – including the ability to skip elements (Drop), to insert additionally (Inject) or to end processing early (FinishEarly).
To make this possible, a gatherer is based on the idea of a Sink, which is called in the context of the stream processor. This sink receives every input element, can react to it and thus actively influences the flow of processing. The actual processing is done via a so-called Adapter Factory which manages the transitions between the aggregation states.
Why Gatherers are more than just a “better collector”
While the conventional Collector serves primarily as a final accumulation tool – i.e. to transfer the elements contained in the stream into a target structure such as a list, a map or an aggregation – it works Gatherer conceptually far beyond this role. It is not just an optimised or syntactically varied form of Collectors, but rather an independent mechanism that opens up new expressions for stream processing, both semantically and functionally.
The central difference lies in the expanded scope for action during the transformation: This means explicitly that a Gatherer can accumulate not only elements but also new, previously non-existent elements that can be fed into the data stream. This opens up the possibility, for example, of introducing initialisation values at the beginning of a stream or placing control characters such as headers and footers specifically at the beginning or end – without artificially expanding the original data stream.
This creative freedom becomes particularly clear when dealing with conditions. Where a Collector usually operated with a simple accumulator, the state of which leads to a final result, can be a Gatherer work based on state – and allow this state to be influenced across several elements. This opens up new semantic horizons: For example, window operations can be formulated in which temporal or sequential logic is applied – such as aggregating data up to an inevitable “end” marker, or combining groups of elements that can only be identified by a particular order or content structure.
Even complex decision structures, such as those required in multi-stage parsing processes or when traversing decision trees, can be achieved using stateful ones that Gatherer implements elegantly and declaratively. The interface remains in the spirit of functional programming: transformation and aggregation can still be described separately, but the Gatherer ensures their connection in a way that was previously only possible through imperative or difficult-to-maintain stream hacks.
Another advantage is the controlled influence of past elements on current behavior. This is how one can Gatherer For example, making the decision to discard an element because a previous element set a certain context. This context sensitivity capability is particularly relevant in situations where data streams are structurally “not clean” – such as log files, inconsistent data exports, or natural language analysis.
A Gatherer is not just “better Collector”, but a fundamental new tool for data stream-based modeling of complex transformation logic. It opens up a design space in which state, transformation, context and accumulation can interact in a way that was previously only possible with considerable effort – or outside of the stream model. Anyone who has ever worked with stateful Gathererconstructions will notice how much this expands the expressiveness of functional stream architectures.
A concrete example: grouping with filter logic
Let’s imagine that we want to collect from a stream of strings only those elements that have a particular property and then group them – for example, all words longer than 5 characters, grouped by their first letter. This requirement can be met with a Collector formulate, but requires a combination of preprocessing (e.g. filter(…)) and downstream grouping. With a Gatherer On the other hand, this combined process can be represented elegantly, comprehensively and in one step:
Gatherer<String, ?, Map<Character, List<String>>> gatherer = Gatherer.ofSequential( () -> new HashMap<Character, List<String>>(), (map, element, downstream) -> { if (element.length() > 5) { char key = element.charAt(0); map.computeIfAbsent(key, k -> new ArrayList<>()).add(element); } return true; } );
In this example, a decision is made for each element as to whether it will be included in the result. The logic is embedded directly into the Gatherer. The return value accurate signals that processing should continue. You would, at this point, instead of false return, and the stream would end prematurely – a behaviour that is not possible with conventional Collectors is not reachable like that.
Integration with Streams API
The interface Gatherer<T, A, R> explicitly distinguishes between more sequential and parelleler processing. The central distinction arises from the factory methods:
Gatherer.ofSequential(…) // Can only be used sequentially
Gatherer.ofConcurrent(…) // Suitable for parallel streams
A gatherer who comes with ofConcurrent(…) may be used in parallel streams, but must meet certain requirements: it must be thread-safe or rely on thread-isolated accumulators. This is similar to the logic of parallel collectors, where internal state management allows different elements to be processed simultaneously in independent threads.
Sequentieller Gatherer
Especially at sequential processing—i.e., if there is no parallelisation—the Gatherer develops its full expressiveness while remaining simple, type-safe, and deterministic.
The functionality of a sequential gatherer can be divided into three main phases: initialisation, accumulation and Emission. Each of these phases is described in detail below, with particular attention to the unique features of sequential processing.
Initialisation: The state of the gatherer
Each gatherer has an internal state that is recreated per stream execution. This condition is about one Supplier<S> defined, where S represents the type of condition. In sequential processing, this state is reserved exclusively for a single thread; therefore, no thread safety requirements exist. This means that simple objects like ArrayList, StringBuilder, counter arrays, and custom records can be used without problems.
A typical condition could e.g. B. look like this:
record ChunkState<T>(List<T> buffer, int chunkSize) {}
The associated supplier:
() -> new ChunkState<>(new ArrayList<>(chunkSize), chunkSize)
The state lives for the entire stream run and serves as context for all further processing steps. Initialisation lays the foundation for state-based logic, such as buffering, counting, aggregation or tracking previous elements.
Accumulation: Processing the input elements
The accumulation function is the heart of every gatherer. It is called for each element of the input stream. The signature of this function is:
(state, input, downstream) -> { … }
This is where the actual logic happens: The input element is brought into the state, and – depending on the current state – a decision can be made as to whether (and if necessary, how many) output values are produced. The decision as to whether an item is passed downstream rests entirely with the gatherer.
Example: Every third element of a stream should be emitted.
Gatherer<String, ?, String> everyThird() { return Gatherer.ofSequential( () -> new int[]{0}, (state, element, downstream) -> { state[0]++; if (state[0] % 3 == 0) { downstream.push(element); } } );}
In contrast to classic filter or map operations, this logic is conditional and imperative: The gatherer remembers how many elements have already been processed and only emits a result for every third. The accumulation logic is, therefore, comparable to that of the accept () Method of a specially defined consumer, but supplemented by downstream control.
Since there is no threading in sequential processing, all operations can be performed directly without synchronisation. The state can be arbitrarily complex and dynamic as long as it is updated correctly within the stream.
Emission: The control over the output
Elements are output via the Sink<R>-Object provided to the Gatherer upon each accumulation. With his method, push(R element) elements can be passed downstream in a targeted and controlled manner. Instead of map or flatMap, where each input leads to one or more outputs automatically transformed, the gatherer decides himself, if, at and was he emits.
For example, a gatherer can:
- push individual output values,
- push multiple values at once (e.g. with chunking or tokenisation),
- completely suppress the emission (e.g. under preconditions),
- generate values with a delay (e.g., at the stream’s end or after accumulation thresholds).
Example: Combining three consecutive elements into a string:
Gatherer<String, ?, String> triplets() { return Gatherer.ofSequential( () -> new ArrayList<String>(3), (state, element, downstream) -> { state.add(element); if (state.size() == 3) { downstream.push(String.join("-", state)); state.clear(); } });}
The emission here only occurs when three elements have been collected. These are merged, pushed and the state is then emptied.
An often overlooked but essential component of a sequential gatherer is the Finisher – i.e. the closing function after the last input element. This phase is crucial because often during regular accumulation Items retained in state which will only be done at a later date or even no longer through regular accumulation can be emitted. The finisher ensures that such remaining elements or aggregated partial results are not lost but are correctly transferred downstream.
Signature and purpose of the finisher
The closing function has the signature:
BiConsumer<State, Sink<R>>
she will after all input values have been processed called by the stream framework – exactly once. In this function, the final state can be accessed and, based on this state, a decision can be made as to whether (additional) output values should be created.
The finisher is particularly suitable for:
- Partially filled buffers, for example in chunking operations when the last block does not reach full size,
- Final aggregations, e.g. B. in averaging, summation, hash calculation or protocol completion,
- Finalisation of state machines, e.g. B. if an open state still needs to be completed,
- Cleaning or logging, e.g. B. statistical outputs or final indicators.
Concrete example: chunking with remainder
Let’s look again at the example of a gatherer that groups elements into groups of three. Without finishers, if the number of elements is odd, the last one or two values would be lost:
Gatherer<String, ?, String> triplets() { return Gatherer.ofSequential( () -> new ArrayList<String>(3), (state, element, downstream) -> { state.add(element); if (state.size() == 3) { downstream.push(String.join("-", state)); state.clear(); } }, (state, downstream) -> { if (!state.isEmpty()) { downstream.push(String.join("-", state)); } } );}
In the closing function (finish), it is explicitly checked whether there are still elements in the state—i.e., whether the buffer is incomplete. These residual values are then combined into an aggregate and pushed.
Without the finisher there would be the gatherer functionally incomplete: For input sets that are not a multiple of three, the last chunk would simply be discarded – a classic off-by-one error.
Interaction with accumulation
The finisher is semantically separated from the accumulation logic, but accesses the same state. This means that, depending on the specific application, it can use the same auxiliary functions or serialisation routines as the accumulation itself. In practice, it is advisable to define auxiliary methods for common logic such as “combine and empty the list” in order to avoid redundancy.
No return – but effect through push()
The finisher gives no return value back, but – like the accumulation function – works via what is provided Sink. So it doesn’t find any return semantics, instead a controlled completion of processing push() views.
The finisher of a sequential gatherer is the binding conclusion of the processing model. He guarantees that all information remaining in the state is processed and, if necessary, emitted. Especially in data stream-based applications where incomplete blocks, open ends, or residual states are typical, the finisher is essential to avoid data loss and ensure semantic correctness. Therefore, the finisher has a clean gatherer design that is not optional but rather an integral part of a well-defined stream processing step.
A sequential gatherer combines:
- the state handling of an aggregator,
- the control flow options of a parser,
- the expressiveness of an imperative processor,
- and the clarity of functional APIs.
By foregoing parallelisation logic and concurrency, the sequential variant allows a gatherer to be developed with minimal overhead and maximum expressiveness – a tool that combines both the flexibility of imperative logic and the composition of functional programming.
Parallel Gatherer
A parallel gatherer is for parallel Data processing pipelines are responsible for the four phases initialiser, integrator, combiner and finisher can be explicitly separated and controlled from each other.
Initializer – The creation of the accumulator
The method initialiser defines how a new accumulator (internal state) is created. This is the first step in processing each substream in sequential and parallel pipelines.
The signature is also: Supplier<A> initializer();
In parallel processing, this initialiser is called several times – once per substream, i.e. per thread that takes over a split of the data. This ensures that no synchronisation within the accumulator is necessary: each thread operates in isolation with its own state.
Integrator – The processing of elements
The integrator is a central function for inserting stream elements into the accumulator. It is one BiConsumer<A, T>, i.e. a function for each element T the Accumulator A changed accordingly.
The signature reads: BiConsumer<A, ? super T> integrator();
In parallel streams, this integrator is also applied to partial accumulators. What is important here is that this function may only change the accumulator locally and may not influence any global states.
Combiner – The combination of partial accumulators
The combiner’s task is to combine several independently processed accumulators into one overall accumulator. This phase is only relevant in parallel stream executions. The combiner receives two partial results—typically from two threads—and has to combine them into a common result.
The signature is: BinaryOperator<A> combiner();
The correct implementation of the combiner is essential for the correctness of the parallel execution. It must be associative. This is the only way the JVM can freely distribute the operation across multiple threads.
Finisher – The transformation of the accumulator into the final result
The finisherfunction transforms the accumulator A into the desired result R. While A is used internally to work efficiently during aggregation R the actual result of the entire operation – such as an immutable collection, a merged string, an optional, a report object, etc.
The signature reads: Function<A, R> finisher();
Unlike the integrator and combiner, the finisher becomes accurate once called, at the end of the entire processing chain. It therefore serves as a bridge between the internal aggregation mechanism and external representation.
Interaction in parallel gatherers
In a parallel stream with gatherer-based collector, the following happens:
- An initialiser is called for each partial stream (split) to create a local accumulator.
- The integrator processes all elements in the respective substream one after the other and modifies the local accumulator.
- The combiner phase is called as soon as two partial accumulators need to be combined. This happens either through ForkJoin merging or in the final reduction step.
- After all partial accumulators have been combined, the finisher is called to calculate the final result.
This explicit separation and the ability to control each phase make Gatherer a powerful tool for complex, stateful, or stateless aggregations—especially when performance through parallelisation is crucial or custom aggregation logic is required.
An example implementation
Let’s first define the gatherer in general and put it in a stream.
Gatherer<Integer, ?, ConcurrentMap<Integer, List<Integer>>> concurrentGatherer = Gatherer.of( initializer, integrator, combiner, finisher );IntStream.rangeClosed(0, END_INCLUSIVE) .boxed() .parallel() .gather(concurrentGatherer) .forEach(e -> System.out.println("e.size() = " + e.size()));
Now we define the respective subcomponents:
Initialiser:
var initializer = (Supplier<ConcurrentMap<Integer, List<Integer>>>) ConcurrentHashMap::new;
Integrator:
var integrator = new Gatherer.Integrator< ConcurrentMap<Integer, List<Integer>>, Integer, ConcurrentMap<Integer, List<Integer>>>() { @Override public boolean integrate(ConcurrentMap<Integer, List<Integer>> state, Integer element, Gatherer.Downstream< ? super ConcurrentMap<Integer, List<Integer>>> downstream) { if (element > END_INCLUSIVE) return false; //processing can be interrupted int blockStart = (element / 100) * 100; state .computeIfAbsent(blockStart, k -> Collections.synchronizedList(new ArrayList<>())) .add(element); return true; }};
The integrator is responsible for processing individual stream elements (here: Integer) and their insertion into a shared state (ConcurrentMap). The element is sorted according to a specific grouping criterion: all elements that are in the same block of 100 (e.g. 0–99, 100–199, …), are entered in the same list.
There is a special feature in this implementation:
if (element > END_INCLUSIVE) return false;
This condition serves as Abort signal: as soon as an element is above a specified limit (END_INCLUSIVE), processing is completed by returning false canceled. This is a special feature of the Gatherer-Model: The return value false signals that no further elements should be processed – a type early termination.
state .computeIfAbsent(blockStart, k -> Collections.synchronizedList(new ArrayList<>())) .add(element);
This line does the actual grouping: If under the key blockStart, if no list already exists, a new, synchronised one will be created. ArrayList created.
The current item is then added to this list.
By using Collections.synchronizedList(…) becomes even within a parallel gatherer context ensures that list accesses are thread-safe – even though the ConcurrentMap itself is only responsible for map access, not for the values it contains.
The integrator therefore defines the following processing semantics:
- Elements are grouped by blocks of 100 (0–99, 100–199, etc.).
- The assignment is done via a ConcurrentMap, where each block contains a list.
- The lists themselves are synchronised to allow concurrency within the list operations.
- By returning false can processing ended early become – e.g. B. when a limit value is reached.
Combiner:
var combiner = new BinaryOperator<ConcurrentMap<Integer, List<Integer>>>() { @Override public ConcurrentMap<Integer, List<Integer>> apply(ConcurrentMap<Integer, List<Integer>> state1, ConcurrentMap<Integer, List<Integer>> state2) { var mergedMap = new ConcurrentHashMap<Integer, List<Integer>>(); // fill in state1 state1.forEach((key, value) -> mergedMap.merge(key, value, (v1, v2) -> { v1.addAll(v2); return v1; }) ); // fill in state 2 state2.forEach((key, value) -> mergedMap.merge(key, value, (v1, v2) -> { v1.addAll(v2); return v1; }) ); return mergedMap; }
First, a new empty, thread-safe map is prepared to contain all entries state1 and state2 should be inserted. This new map is deliberately fresh because neither state1 still state2 should be changed – this protects against unwanted side effects and makes the function work referentially transparent.
var mergedMap = new ConcurrentHashMap<Integer, List<Integer>>();
entries state1 insert
state1 .forEach((k,v) -> { mergedMap .computeIfAbsent(k, k1 -> new ArrayList<>()) .addAll(v); });
This method computeIfAbsent checks whether in the target map mergedMap already an entry for the key k exists. If this is not the case, the lambda is used k1 -> new ArrayList<>() a new entry is created and inserted. The method guarantees that an existing, modifiable list is returned afterwards – regardless of whether it was just created or already existed.
The method addAll(…) hangs all elements of the list v out of state1 to the corresponding list in mergedMap to. This expands the aggregate state for this key in the target map.
entries state2 insert
The same process is then repeated for state2 repeated:
state2.forEach((key, value) -> mergedMap.merge(key, value, (v1, v2) -> { v1.addAll(v2); return v1; }));
Every entry is made here state2 in the mergedMap transmitted. If the key does not yet exist, the value (value, one List<Integer>) taken directly. If the key is already in mergedMap, it exists by merge(…) using a custom merge strategy: the list contents of both maps are merged v1.addAll(v2) combined. What is important here is that v1 is the existing list, and v2 is the newly added one.
In the end, the newly created, combined map is returned—it represents the complete aggregate state, which contains the contents of both partial states.
Finisher:
var finisher = new BiConsumer< ConcurrentMap<Integer, List<Integer>>, Gatherer.Downstream<? super ConcurrentMap<Integer, List<Integer>>>>() { @Override public void accept( ConcurrentMap<Integer, List<Integer>> state, Gatherer.Downstream<? super ConcurrentMap<Integer, List<Integer>>> downstream) { downstream.push(state); }};
This implementation of the finisher is minimalistic but functionally correct: it takes the final state (state) of the accumulator – one ConcurrentMap<Integer, List<Integer>> – and hands it directly to him downstream, i.e. the next stage of the processing chain in the stream.
The attribute state is the aggregated result of the previous steps (initialiser, integrator, combiner). In this case it is a map that maps blocks of 100 (Integers) to a list of the associated values (List<Integer>).
The attribute downstream is a push receiver that consumes the end result. It abstracts the next processing stage, such as a downstream map, flatMap, or terminal collection process.
The method push(…) of the downstream object explicitly forwards the finished result to the next processing stage. This is fundamentally different from conventional collector Concepts, where the end result is simply returned.
This type of handover makes it possible, in particular, in one within Gatherer defined, stateful context to deliver multiple or even conditional results – for example:
- Streaming of intermediate results (e.g. after a specific batch)
- Quitting early after the first valid result
- Multiple edition during partitioning
In this specific case, however, precisely one result was passed on—the fully aggregated map. This classic “push-forward finisher” determines the condition as a result emitted.
We have now examined the Gatherer in detail and pointed out the differences in sequential and parallel processing. So, everything should be together for your first experiments.
Happy Coding
Sven