Introduction to reactive programming

How do you process the infinite data asynchronously? Here you can go for it

When it comes to huge volumes of data or multi-users, we often need asynchronous processing to make our systems fast and responsive. In Java, a representative of old object-oriented programming, asynchronicity can become really troublesome and make the code hard to understand and maintain. So, reactive programming is especially beneficial for this ‘purely’ object-oriented environment as it simplifies dealing with asynchronous flows.

The basic building blocks of reactive code are Observables and Subscribers. An Observable emits items; a Subscriber consumes those items. The concept of reactive programming very similar for the people who are aware of observable and subscriber design patterns but with a difference – Observables often don’t start emitting items until someone explicitly subscribes to them.

Reactive Programming is a paradigm that enables to build of applications in a non-blocking asynchronous way. We had also discussed two different data types named Flux & Mono that come from the Reactor Core Java 8 library. Let’s discuss a few of the functionalities of Flux & Mono.

RxJava was the first Reactive Extension API specific for the Java platform. It works with Java 6 and provides an opportunity to write asynchronous, event-based programs for both Java and Android Java, which is very convenient.

The benefit of reactive programming is that It best suits when it requires to handles huge volumes of data in a quicker manner hence it increases the performance of the application and user responsiveness.

Spring Reactor is another framework for Java from Spring developers. It is quite similar to RxJava but has simpler abstraction. The framework has managed to win popularity due to the possibility to leverage the benefits of Java 8.

Flux is a data type that can emit 0…N items & then completes. Below are a few examples of how different types of Flux can be created –

To create an empty Flux –Flux.empty();

Below is an example of a flux that emits one item –

Flux flux = Flux.just(“Sample String”);

Flux.subscribe(System.out::println); //prints Sample String

Flux can also be created from Streams using Flux.fromStream() or from iterable (list, map etc.) using Flux.fromIterable().

You can also throw an error from a flux like – Flux flux = Flux.error(new RuntimeException("Error Thrown"));

Flux also provides the option to create more complex functional operations. Below is an example that provides the square of the even numbers from a Flux of numbers –

Flux flux = Flux .just(1,23,22,35,44) .filter(integer -> integer % 2 == 0) .map(integer -> (Math.pow(integer, 2)));

Mono is one more data type, but unlike Flux, it can emit 0..1 item. Mono also provides two similar methods — Mono.just() & Mono.error().

For example, if you have one source of data (producer) and one target for data (consumer); then after connecting the consumer to subscriber – a reactive programming framework is responsible for pushing the data, generated by the producer, to the consumer. Please note that an observable can have any number of subscribers.

Let us look at the simple plain reactive example

 public class RxJava2Example 

 {
  public static void main(String[] args) 
  {     

        //producer
      Observable<String> observable = Observable.just("how", "to", "do", "in", "java");


        //consumer
        Consumer<? super String> consumer = System.out::println;

        //Attaching producer to consumer
        observable.subscribe(consumer);
      }
   }

In above example, "how", "to", "do", "in", "java" can be considered as stream of events. An observable is created for these events. Then we create a consumer which can act on these words – in this case it is just printing them out to console. This consumer is nothing but the subscriber.

Lastly, we connect the subscriber to the consumer using subscribe(). As soon as, we connect both, words/events start flowing and subscribers start printing them in the console.

Internally in code, when a new word is emitted from the observable, the onNext() method is called on each subscriber. When the observable finishes all of words either successful or with an error, the onComplete() or the onError() method is called on the subscriber.

Conclusion

The way observable and subscriber are connected loosely, it brings great advantage to developers. They don’t need to think about whole concurrency paradigm, which is already a daunting task for many of us. You just connect the producer and subscriber – and everything just works – flawlessly.

Also, you don’t need to think about both observable and subscriber at same time. You can develop them independently with their best design choices, and then connect them using the transformation concept. Great !!

Happy Learning !!