- Hands-On Cloud:Native Microservices with Jakarta EE
- Luigi Fugaro Mauro Vocale
- 1043字
- 2021-07-02 13:47:10
RxJava
RxJava is a Java framework that implements the specifications that are described in Reactive Extensions (ReactiveX http://reactivex.io/) about APIs for asynchronous programming with observable streams. Using RxJava framework, developers can easily create event-based and asynchronous applications through observable sequences.
This framework is built on top of the observer pattern to create sequences of events—it also exposes classes and methods that allow developers to compose these sequences. It also makes it easy to handle features that are hard to implement but absolutely necessary in reactive systems, such as threading, synchronization, thread safety, concurrent data structures, and so on.
In this section, we will use version 2 of the framework, since version 1 reached its end of life as of March 31, 2018. It is published under the Apache 2.0 license and is open source.
The main elements of RxJava are as follows:
- Observables: They are the sources for the data and begin to provide data once a subscriber listens on them. An observable source may emit nothing or any number of items and it can finish successfully or with an error. Sources may never terminate and can potentially produce an infinite stream of events. This feature is the same as the ones that are provided by the stream class that was introduced in Java 8.
- Subscribers: An observable can have multiple subscribers. Every time a new item is published by the observable, the onNext() method is called on each subscriber until the observable finishes its data flow. If the outcome is successful, then the onComplete() method will be called on each subscriber; otherwise, the onError() method will be called on each subscriber. In every situation, the flow must be processed and an action must be taken to react on the stimuli received by the system and represented by the data stream.
To use the RxJava2 framework, you must add it on the dependencies declaration of your project. If you're using Maven, you can do it simply by adding RxJava via the following snippet:
<!--https://mvnrepository.com/artifact/io.reactivex.rxjava2/rxjava -->
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.1.14</version>
</dependency>
Now, we are ready to write our simple first class:
package com.reactive.examples;
import io.reactivex.*;
public class RxJavaSimpleTest {
public static void main(String ... args) {
Disposable subscribe = Flowable.just("Welcome on RxJava2 World")
.subscribe(System.out::println);
subscribe.dispose();
}
}
In this class I used a Flowable class that, as described in the Javadoc (http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html), implements the Reactive Streams pattern. It exposes intermediate operators, factory methods, and the features that make your application able to consume reactive dataflow.
The Flowable object creates a data flow based on the string "Welcome on RxJava2 World". After that, it signals the given String to the subscriber (in my example, the System.out::println method) and, finally, it completes. After running the class, you will see the following output on the console—"Welcome on RxJava2 World".
RxJava2 provides base classes that enable you to process different types of data streams and perform different operations against them.
The following is another simple use example of RxJava2 base classes:
package com.reactive.examples;
import io.reactivex.Completable;
import io.reactivex.Flowable;
import io.reactivex.Maybe;
import io.reactivex.Single;
import io.reactivex.functions.Consumer;
public class RxJavaBaseClassesTest {
public static void main(String... args) {
// A flow of exactly 1 item or an error
Single.just(1)
.map(i -> i * 5)
.map(Object::toString)
.subscribe(System.out::println);
// A flow with exactly one item.
Maybe.just("May be I will do something...").subscribe(System.out::println);
// Never sends any items or notifications to a MaybeObserver
Maybe.never().subscribe(o -> System.out.println("Something is here...never happened"));
// A flow without items but only a completion or error signal
Completable.complete().subscribe(() -> System.out.println("Completed"));
// Example of simple data flow processing
Flowable.just("mauro", "luigi", "marco")
.filter(s -> s.startsWith("m"))
.map(String::toUpperCase)
.subscribe(System.out::println);
}
}
If you execute this class, you should see the following output:
5
May be I will do something...
Completed
MAURO
MARCO
The output reflects the description of the components' behavior that I made in the Java class. I obtained the following:
- The exact computation in the first line: 1 * 5 result
- The output that confirms the execution of a Maybe condition
- The output that confirms the end of the execution process
- The result of a simple filter and map operation
RxJava contains a list of modules that enable interaction with other important frameworks that are used to implement the main patterns that are used in MSA, as follows:
- Hystrix: A latency and fault tolerance library that's used to prevent the cascading failure, which is derived from service outage third-party remote systems
- Camel-rx: A library that simplifies the use of Camel components to implement a Reactive Extension
- Vert.x RxJava: A module that provides easy integration with Vert.x toolkit, which we will examine later
- RxJava-jdbc: A library that enables you to change the way you interact with the database, usually in synchronous and blocking mode, using the stream API to process result sets and do functional composition of statements
The impact of this framework in a Java SE environment is very important. This experience gave birth to reactive systems initiative that aims to define a standard protocol for asynchronous streams running on Java Virtual Machine (JVM).
The RxJava team was part of this initiative that led to the definition, in Java SE 9, of a set of interfaces, java.util.concurrent.Flow.*, dedicated to the implementation of the concepts defined by the Reactive Streams—actually, the interfaces available in JDK 9 are equivalent to their respective Reactive Streams counterparts.
Although the examples that are presented are very simple, they still make it clear how different the design of applications in reactive architectures is. The classic request/reply model, with a consequent block waiting for the reply, is replaced by a completely asynchronous, non-blocking model, such as the publish/subscribe model. For Java EE developers, this is a known and widely used model in the Java Message Service (JMS) specification.
Frameworks such as RxJava expand these concepts, allowing their use even in scenarios of exposure and the use of APIs, typically of MSAs. Javascript developers, or in general those more used to working in client-side contexts with new technologies, who are HTML 5 oriented, know about these paradigms very well, which are the basis of their implementation model.
You can find more details about RxJava by visiting the GitHub repository of the library: https://github.com/ReactiveX/RxJava/.