Monthly Archives: February 2020

Reactive streams: a play in multiple acts

One of the most important things to understand when working with reactive streams is that things happens in different moments.

To start, let’s considerer the following code excerpt, which uses Project Reactor’s reactive streams implementation.

    @Test
    fun `a play in multiple acts`() {
        val done = CountDownLatch(2)

        log.info("Creating the mono, i.e., the processing pipeline")
        val mono: Mono<Int> = Mono.create<Int> {
            log.info("Mono.create callback called, emitting value")
            it.success(42)
        }
            .map {
                log.info("map: multiplying value by 2")
                it * 2
            }
            .delayElement(Duration.ofMillis(1000))
            .map {
                log.info("map: dividing value by 2")
                it / 2
            }


        log.info("Mono created, resting for a bit...")
        Thread.sleep(1000)
        log.info("Now going to subscribe to the mono")


        log.info("subscribing to mono on main thread")
        mono.subscribe {
            log.info("mono.subscribe called with {}", it)
            done.countDown()
        }
        log.info("subscription done")


        thread(name = "my-thread") {
            log.info("subscribing to mono on a different thread")
            mono.subscribe {
                log.info("mono.subscribe called with {}", it)
                done.countDown()
            }
            log.info("subscription done")
        }


        // waiting for subscribers to receive value before ending the test
        done.await()
    }

Act 1: creation

The first thing this example does is create a Mono<Int> and store it in the mono value (a Mono<T> is a special kind of publisher that completes successfully by emitting a single element, or completes with an error). This is done by

  • First, creating a Mono<Int> using the create static function.
  • Using the map operator to obtain a second mono that will emit the value from the first one, multiplied by 2.
  • Using the delayElement operator to obtain a third mono that will emit the element after a 1000 millisecond delay.
  • Finally, using the map operator again to create a final mono that will emit the value divided by 2.

The creation act uses reactive operators (e.g. map, delayElement) to build a data processing pipeline, as the one shown in the following figure.

Note that during this phase no data is being transformed or emitted, only the pipeline is being built. For instance, the call to the map method does not immediately perform a data transform. Instead, it just returns a new mono that will emit the transformed value sometime in the future. In other words, nothing happens until there is a subscription to the mono (at least for cold observables, but that is the subject of another post).

Act 2: subscription

After the processing pipeline is built, then we can pass on to the second act: the subscription. In this phase, components that want access to the data emitted by the mono, call the subscribe method, passing in a callback function.

In our example, we perform the subscription on the main thread and on a newly created threa. The end goal is to illustrate that:

  • The same mono can be subscribed multiple times.
  • The subscription can perfectly happen on a different thread from the one where the mono was created.

The following log output shows some interesting results.

15:29:39.616 [      main] INFO  - Creating the mono, i.e., the processing pipeline
15:29:39.692 [      main] INFO  - Mono created, resting for a bit...
15:29:40.692 [      main] INFO  - Now going to subscribe to the mono

15:29:40.693 [      main] INFO  - subscribing to mono on main thread
15:29:40.700 [      main] INFO  - Mono.create callback called, emitting value
15:29:40.701 [      main] INFO  - map: multiplying value by 2
15:29:40.703 [      main] INFO  - subscription done

15:29:40.705 [ my-thread] INFO  - subscribing to mono on a different thread
15:29:40.707 [ my-thread] INFO  - Mono.create callback called, emitting value
15:29:40.707 [ my-thread] INFO  - map: multiplying value by 2
15:29:40.707 [ my-thread] INFO  - subscription done

15:29:41.704 [parallel-1] INFO  - map: dividing value by 2
15:29:41.707 [parallel-2] INFO  - map: dividing value by 2
15:29:41.707 [parallel-1] INFO  - mono.subscribe called with 42
15:29:41.708 [parallel-2] INFO  - mono.subscribe called with 42
  • The callback function passed in to the Mono.create is only called when there is a subscription. In addition, it is called once per subscription, in the exact same thread where the subscription is performed and before the subscribe method even returns.
  • Since the callback immediately emits a value (it.success(42)) when called, then the next processing phase in the pipeline (i.e. the one defined by the first map) is also immediately called (see map: multiplying value by 2), also before the subscribe method returns.

Act 3: data flow

As we saw in the previous log, the flow of data through the pipeline is only started when there is a subscription. Namely, both the create callback and the first processing phase happened synchronously with the subscription (i.e. at the same time and in the same thread).
However, the second map processing phase (see map: dividing value by 2) and the subscription callback are performed on a different thread, after approximately one second. That happens because between the first and the second map there is a delayElement: the value is only emitted into the second map one second after being produced by the first map.

This is an example of an asynchronous processing phase. Since a fundamental characteristic of reactive streams is that there should be no blocking, the thread that performs the first map is released and the second map is performed on a thread managed by Reactor, only when the delay period elapses.

The flow of data through the pipeline starts when there is a subscription but it doesn’t end there. Namely, if there are asynchronous processing phases in the pipeline, the flow can pause (without blocking) and resume when some asynchronous event happens (e.g. a time period elapses, a message from a remote system is received). That is, the flow can be divided in multiple segments, with non-blocking pauses between them.

Summary

  • There are at least three different moments (the acts) associated to reactive streams
    • The processing pipeline construction.
    • The subscription.
    • The data flow.
  • Nothing happens until there is a subscription.
  • Data flow can happen partially or totally during subscription.
  • Data flow can be divided into multiple segments, with non-blocking pauses between them (e.g. time delay, waiting for external system event).
  • On a cold publisher, there is a flow of data per each subscription. In the case of a mono, that flow will be comprised of a single element (at most).

The source code is available on the webflux-with-kotlin-by-example repository.

HTH