In project reactor, meeting with scenarios demanding the fusion of data from multiple publishers are frequent. Today, we delve into the methods of accomplishing this through the usage of zip(), merge(), and concat() as well as the differences among them.

Table of content

  1. Zip
    1. Zip()
    2. ZipWith()
    3. Mono ZipWhen
  2. Concat
    1. concat()
    2. concatWith()
  3. Merge

Zip

The zip() method is used when one wish to combine two or more publishers into a single entity. The output is a tuple where each element is a publisher. Zip will wait for each publisher emit one element to combine them.

An image featuring two lines representing distinct publisher’s data which are going to be combined with. The third row illustrates the data gathering process, while the last row shows the outcome of this operation.

Zip()

Zip method is available for both Mono and Flux. This is a static method. The main point of this method is that their parameters can takes up to 8 publishers.

public static <T1, T2> Flux<Tuple2<T1, T2>> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2) 

There are several versions of this method. The simplest version is the one that takes only one publisher, but there are versions that take a combinator and/or prefetch.

Combinator: It is a Java’s 8 BiFunction which takes 2 publishers parameters as input. As name itself suggests, in this operation, the two publishers are combined in a way that it returnes a publisher. The combinator resembles a map().

Prefetch: It is a number that represents how much will be fetched from the publisher for the first request. If unspecified, most of these operators start with a demand of 32. The subsequent requests will be triggered when 75% of the prefetch amount has been emitted but just to fill 25%.

ZipWith()

ZipWith is available for both Mono and Flux. In the non-static version of the method, it can only combine with a single publisher.

<T2> Flux<Tuple2<T, T2>> zipWith(Publisher<? extends T2> source2) {}

<T2, V> Flux<V> zipWith(Publisher<? extends T2> source2, BiFunction<? super T, ? super T2, ? extends V> combinator) {}

<T2, V> Flux<V> zipWith(Publisher<? extends T2> source2, int prefetch, BiFunction<? super T, ? super T2, ? extends V> combinator) {}

<T2> Flux<Tuple2<T, T2>> zipWith(Publisher<? extends T2> source2, int prefetch) {}
Flux<String> f1 = Flux.just("One", "Three", "Five");
Flux<String> f2 = Flux.just("Two", "Four", "Six");

f1.zipWith(f2).subscribe(element -> System.out.println(element));

Mono ZipWhen

This function is used when we already have a data (like A1) and want to combine it with another one that we will specify using a Java’s 8 Function (like B1).

<T2> Mono<Tuple2<T, T2>> zipWhen(Function<T, Mono<? extends T2>> rightGenerator)

It takes a function in which the single method parameter is a mono while the method output is a tuple of two elements. The first element (T1) in the tuple is the mono provided as input to the function and the second (T2) represents the final outcome.

Concat

The difference between concat and merge is that concat, the first publisher is terminated and the other stream is just concatenated at the end of it. In other words, concat reads one publisher completely and then appends the second one at the end of the flow. The first important point is that the sequence order is respected. Another point is that the operation is layz, which means that the second publisher will only subscribe after the conclusion of the first stream.

There are two common versions of concat operator.

concat()

In this version of the method, there is no restriction on the quantity of publishers. It’s important to mention that this is a static method.

Flux<Integer> f1 = Flux.just(1, 2, 3, 4, 5);
Flux<Integer> f2 = Flux.just(6, 7, 8, 9, 10);

Flux.concat(f1, f2).log().subscribe(System.out::println);

concatWith()

Unlike the concat method, this is an instance method.

Flux<Integer> f1 = Flux.just(1, 2, 3, 4, 5);
Flux<Integer> f2 = Flux.just(6, 7, 8, 9, 10);

f1.concatWith(f2).log().subscribe(System.out::println);

Merge

Unlike concat, merge is considered an eager operation where all publishers subscribe simultaneously. The resulting order is not guaranteed, as it simply collects and emits data as soon as it becomes available from any publisher.

To understand how merge works, let’s start with a simple example where we are calling the merge between the publishers, like we did in the previous exemples.

Flux<String> f1 = Flux.just("One", "Three", "Five");
Flux<String> f2 = Flux.just("Two", "Four", "Six");

f1.mergeWith(f2)
        .doOnNext(item -> System.out.println("Item " + item))
        .subscribe();

In the above example, we’ve created two publishers. The first one, denotated as f1, has three elements (One, Three and five), while the second one, ‘f2’, contains three additional elements (Two, Four, Six). In the subsequent lines, we merge ‘f1’ with ‘f2’ and observe the results by printing the elements using doOnNext method. Since the publisher operates only after a subscription, we proceed to call the subscribe method in the following line.

When we run the code above, we can notice that there is no difference compared to concat; The second publisher are used only when the first publisher completes. But why? In fact, this is not a problem. Merge is working as expected!

Unlike concat, merge doesn’t wait for a publisher to emit all its data before starting to collects values from others publishers. In this case, we have two publishers, and they have their data collected independently, regardless of their order. If we want to observe merge in action, we can modify our code to simulate a delayed processing operation, as follows:

Flux<String> f1 = Flux.just("One", "Three", "Five");
Flux<String> f2 = Flux.just("Two", "Four", "Six");

Random random = new Random();
f1.delayElements(Duration.ofMillis(random.nextInt(0, 100)))
        .mergeWith(f2.delayElements(Duration.ofMillis(random.nextInt(0, 100))))
        .doOnNext(item -> System.out.println("Item " + item))
        .subscribe();

Now we can observe a significant change in the result. Using the ‘delayElements’ method, we can apply a random delay in milliseconds from 0 to 100, which allows us to slow down a publisher. In order to notice a variation in the elements collected by the merge, we apply a delay to both subscribers, causing them to experience sluggishness.

Leave a comment