Streams are one of the niceties provided in JDK 8 and they provide extreme flexibility to the user. They make it extremely easy to transform Collections in some way, either serially or in parallel.

One of the most awesome things about Streams is that they're extremely extensible. The example we're going to be using today is collecting a Stream into a Jackson ArrayNode. For those of you who don't know, Jackson is a JSON library for Java, and ArrayNode is the JSON implementation of a List.

Even if you don't care about Jackson, the below will show you how to implement your own collectors (or at least get you started).

Implementing Collector

The first step is to create a class extending the Collector class, providing types of the objects you're expecting to be working with. In my case, I'm going to be receiving a JsonNode, and outputting to an ArrayNode. Because I can buffer straight into the ArrayNode, I can specify this too:

public class ArrayNodeCollector implements Collector<JsonNode, ArrayNode, ArrayNode> { }

Just to clarify what those 3 types are, in order of left to right:

  1. The type being passed to the collector, i.e. the type being returned by the previous Stream phase.
  2. Your mutable accumulation type. My case is quite simple, I can just use the same type as the final output, but consider that you want to count occurrences of String input, and then output a Set of the top 5 most occurring values. In this case, you might wish to use a Map for your accumulation phase.
  3. The final output type of your Collector.

Class Structure

Ok, so now your IDE is going to yell at you about implementing the interface methods, so let's go ahead and add them in as shells. For learning purposes I've added comments to the shell to attempt to define what each one does. Note that I have omitted return values below, so this won't compile at this point.

public class ArrayNodeCollector implements Collector<JsonNode, ArrayNode, ArrayNode> {

    @Override
    public Supplier<ArrayNode> supplier() {
        // This provides a Function which creates
        // a new instance of the accumulation type.
        // In my case, it has to return a method which
        // creates an ArrayNode.
    }

    @Override
    public BiConsumer<ArrayNode, JsonNode> accumulator() {
        // This is pretty simple, it dictates how you
        // wish to accumulate values. Also returns a
        // Function.
    }

    @Override
    public BinaryOperator<ArrayNode> combiner() {
        // This guy is slightly more complicated; because
        // Streams can run in parallel, they obviously have
        // to be merged at some point - this is where that
        // merge takes place.
    }

    @Override
    public Function<ArrayNode, ArrayNode> finisher() {
        // As mentioned, you can convert your accumulation type
        // to your final return type - that is what this Function
        // does.
    }

    @Override
    public Set<Characteristics> characteristics() {
        // This method returns a list of characteristics associated
        // with the Collector. For example, a thread-safe Collector
        // may return Characteristics.CONCURRENT.
    }

}

Implementation

Using the layout above, it comes pretty clear that any accumulation into a List is actually going to be pretty simple. Let's go through each step one-by-one for the example of an ArrayNode:

supplier()

We want to return a way to construct an ArrayNode, but unfortunately this gets a little messy with JsonNodeFactory. Lucky for us, we can just place a static ObjectMapper into the mix. Then we just have to return a method reference to createArrayNode. For things such as custom List implementations, you can naturally use MyList::new.

public class ArrayNodeCollector implements Collector<JsonNode, ArrayNode, ArrayNode> {

    private static final ObjectMapper mapper
            = new ObjectMapper();

    @Override
    public Supplier<ArrayNode> supplier() {
        return mapper::createArrayNode;
    }

}

accumulator()

Seeing as an ArrayNode is basically a glorified List, the accumulator method is super simple. All we have to do is return a reference ArrayNode#add. Were we doing any complicated processing, this is where it would take place.

public class ArrayNodeCollector implements Collector<JsonNode, ArrayNode, ArrayNode> {

    @Override
    public BiConsumer<ArrayNode, JsonNode> accumulator() {
        return ArrayNode::add;
    }

}

combiner()

Again, this is pretty easy because we have access to ArrayNode#add. We can just loop the right ArrayNode and add all the elements. However, we also have the beauty of ArrayNode#addAll, so let's take full advantage of that. Please note that both x and y refer to two accumulators, so make sure to merge them accordingly (i.e. be careful of overwriting values in one side). In my case, I just add all the elements of y into x.

public class ArrayNodeCollector implements Collector<JsonNode, ArrayNode, ArrayNode> {

    @Override
    public BinaryOperator<ArrayNode> combiner() {
        return (x, y) -> {
            x.addAll(y);
            return x;
        };
    }

}

finisher()

Almost there now.

I previously mentioned that the finisher is pretty cheap in the example, as my return type is the same as my accumulation type. This means that I can return a Function which literally just returns it's input.

public class ArrayNodeCollector implements Collector<JsonNode, ArrayNode, ArrayNode> {

    @Override
    public Function<ArrayNode, ArrayNode> finisher() {
        return accumulator -> accumulator;
    }

}

characteristics()

I'm not going to make any assertions about the ArrayNode class, so I'm not going to announce to the world that my Collector is thread-safe. Instead I'm just going to tell the Stream that you shouldn't expect an ordered List, especially if you're working with Stream#parallelStream.

public class ArrayNodeCollector implements Collector<JsonNode, ArrayNode, ArrayNode> {

    @Override
    public Set<Characteristics> characteristics() {
        return EnumSet.of(Characteristics.UNORDERED);
    }

}

Full Example

Now let's take a look at the entire class:

Usage

So now that we have put it all together, let's take a look at how we use it:

IntStream
    .range(0, 10)
    .map(num -> JsonNodeFactory.instance.numberNode(num))
    .collect(new ArrayNodeCollector());

And boom, you have a ArrayNode containing some NumberNodes.

Tidying Up

The above usage works fine, but it's not very functional. I prefer to make it read a little better, like Collectors.toList(). This is a pattern adopted by the JDK, so you shouldn't feel too bad about using it.

Basically, create a small class containing all custom Collectors:

public class MyCollectors {

    public static ArrayNodeCollector toArrayNode(){
        return new ArrayNodeCollector();
    }

}

You then get a much more readable flow (you can even make the import static if you like).

IntStream
    .range(0, 10)
    .map(num -> JsonNodeFactory.instance.numberNode(num))
    .collect(MyCollectors.toArrayNode());

Thanks for reading!

I hope this provided some insight into how to create a custom Collector for the JDK 8 Stream feature. Feel free to comment or tweet me if you have any questions!