Feb
5
2019

RxJava is a Java-based implementation of Reactive Programming.

Reactive Programming :

Reactive Programming is a programming paradigm oriented around data flows and the propagation of change i.e. it is all about responding to value changes. For example, let’s say we define x = y+z. When we change the value of y or z, the value of x automatically changes. This can be done by observing the values of y and z.

Reactive Extension is a library that follows Reactive Programming principles to compose asynchronous and event-based programs by using observable sequence.

RxAndroid is specific to Android platform which utilizes some classes on top of the RxJava library.

We must all have heard about the Reactive Programming principles when developing Android applications. While there are multiple resources written on how to get started in RxJava and RxAndroid, I found it difficult to keep track of everything in one place.

RxJava Basics
The building blocks of RxJava are:

Observable: The class that emits a stream of data or events. i.e. a class that can be used to perform some action, and publish the result.

Observable observable = Observable.just(“A”, “B”, “C”, “D”, “E”, “F”);

Observer: The class that receives the events or data and acts upon it. i.e. a class that waits and watches the Observable and reacts whenever the Observable publishes results.

You can create different types of observables.

Flowable T : Emits 0 or n items and terminates with a success or an error event. Supports backpressure, which allows controlling how fast a source emits items.

Observable T : Emits 0 or n items and terminates with a success or an error event.

Single T : Emits either a single item or an error event. The reactive version of a method call.

Maybe T : Succeeds with an item, or no item, or errors. The reactive version of an Optional.

Completable : Either complete with success or with an error event. It never emits items. The reactive version of a Runnable.

The Observer has 4 interface methods to know the different states of the Observable.

onSubscribe() : This method is invoked when the Observer is subscribed to the Observable.
onNext() : This method is called when a new item is emitted from the Observable.
onError() : This method is called when an error occurs and the emission of data is not successfully completed.
onComplete() : This method is called when the Observable has successfully completed emitting all items.

new Observer() {
    @Override
    public void onSubscribe(Disposable d) {
        System.out.println("onSubscribe");
    }

    @Override
    public void onNext(Object o) {
        System.out.println("onNext: " + o);
    }

    @Override
    public void onError(Throwable e) {
        System.out.println("onError: " + e.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("onComplete");
    }
};

RxJava provides some general use Schedulers:

Schedulers.computation() : Used for CPU intensive tasks.
Schedulers.io() : Used for IO bound tasks.
Schedulers.from(Executor) : Use with custom ExecutorService.
Schedulers.newThread() : It always creates a new thread when a worker is needed. Since it’s not thread pooled and always creates a new thread instead of reusing one, this schedule is not very useful.

Operators for creating Observables

Create : This operator creates an Observable from scratch by calling observer methods programmatically. An emitter is provided through which we can call the respective interface methods when needed.

Defer : This operator does not create the Observable until the Observer subscribes. The only downside to defer() is that it creates a new Observable each time you get a new Observer. create() can use the same function for each subscriber, so it’s more efficient.

From : This operator creates an Observable from a set of items using an Iterable, which means we can pass a list or an array of items to the Observable and each item is emitted one at a time. Some of the examples of the operators include fromCallable(), fromFuture(), fromIterable(), fromPublisher(), fromArray().

Interval : This operator creates an Observable that emits a sequence of integers spaced by a particular time interval.

Just : This operator takes a list of arguments (maximum 10) and converts the items into Observable items. just() makes only 1 emission. For instance, If an array is passed as a parameter to the just() method, the array is emitted as a single item instead of individual numbers. Note that if you pass null to just(), it will return an Observable that emits null as an item.

Range : This operator creates an Observable that emits a range of sequential integers. The function takes two arguments: the starting number and length.

Repeat : This operator creates an Observable that emits a particular item or sequence of items repeatedly. There is an option to pass the number of repetitions that can take place as well.

Timer : This operator creates an Observable that emits one particular item after a span of time that you specify.

Types of Subjects

PublishSubject : PublishSubject emits all the items at the point of subscription. This is the most basic form of Subject.

BehaviorSubject : BehaviorSubject emits the most recent item at the time of their subscription and all items after that. We will use the sample example we used for PublishSubject.

ReplaySubject : ReplaySubject emits all the items of the Observable, regardless of when the subscriber subscribes. We will use the sample example we used for the previous two subjects.

AsyncSubject : AsyncSubject emits only the last value of the Observable and this only happens after the Observable completes. Again, we will use the same example as above.

UnicastSubject : UnicastSubject allows only a single subscriber and it emits all the items regardless of the time of subscription. In the below example, we have an Observable that emits all integers from 1 to 5.

Example :

import android.support.v7.app.AppCompatActivity;
import android.os.Bundle;
import android.util.Log;

import info.androidhive.rxandroidexamples.R;
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;

public class Example1Activity extends AppCompatActivity {
/**
* Basic Observable, Observer, Subscriber example
* Observable emits list of animal names
*/

private static final String TAG = Example1Activity.class.getSimpleName();

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_example1);

        // observable
        Observable animalsObservable = getAnimalsObservable();

        // observer
        Observer animalsObserver = getAnimalsObserver();

         // observer subscribing to observable
        animalsObservable
        .observeOn(Schedulers.io())
        .subscribeOn(AndroidSchedulers.mainThread())
        .subscribe(animalsObserver);
    }

    private Observer getAnimalsObserver() {
     return new Observer() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "onSubscribe");
        }

        @Override
        public void onNext(String s) {
            Log.d(TAG, "Name: " + s);
        }

        @Override
        public void onError(Throwable e) {
            Log.e(TAG, "onError: " + e.getMessage());
        }

        @Override
        public void onComplete() {
            Log.d(TAG, "All items are emitted!");
        }

    };
    }

    private Observable getAnimalsObservable() {
        return Observable.just("Ant", "Bee", "Cat", "Dog", "Fox");
    }

    }