Press "Enter" to skip to content

Fun with RxJava 2 and Retrofit

RxJava and Retrofit. Two really good java libraries that I recently started playing with. RxJava is a java implementation of Reactive Streams standard. It is a specification of API for processing asynchronous processing of streams. RxJava, now in version 2, is a pretty mature implementation of standard and offers a wide array of tools for use with streams. It supports basic operations like map, reduce, filtering, grouping, joining, sorting, time operations like delay, timeout, interval, window, error handling with retrying and many more. Its API is just huge, it probably has everything you may need.

Retrofit is something different and much more simple. It’s just a HTTP client library, a typed HTTP client library! It lets you easily define a java interface for remote Rest service with just a few annotations. Thanks to this, you can use your rest service like plain, local, java service, just invoke a method, pass parameters and get the results, simple as that. Oh, and it has support for RxJava, so these two play with each other really nice.

RxJava

Regardless of all its features, using RxJava is really simple. It is all based on Observable, it is an object which wraps a stream of data, it can have, one value, many items, or even infinite. You can then process this stream however you want and subscribe to it to be notified of results.

Here are few examples.

Above, we create new Observable with a single value of 5 and subscribe to the stream events with the println method. so that for each event, a new line with events value will be written to console.

Another, similar, example with array input. Here, as a result, we will get one line printed for each value of input array, for a total of 7 lines.

Of course, we can transform values, for example with map operation, that takes the input value and somehow transforms it into another value. Above we have a square operation. As a result, we will receive the square of numbers from 1 to 8, each in a separate line.

If we are interested only in some of the events, we can filter out unneeded, let’s say that we are only interested in even square powers from the example above. We can do by adding a simple, single line inside the chain of calls in our observable.

Now, imagine that instead of static values, as input of stream, you have a remote services that provide constant stream of measurements, for example, temperature of different devices, and we want to receive notification if, in the last 5 minutes, at least 50% of these devices have had temperature above 45 degrees, at the same time. If we would want to do something like that in a conventional way, we would have a really complicated code, with complex logic and data structures, and of course, complex multithreading problems. With RxJava we could probably manage to do it in 10-15 lines of code.

 

Retrofit

Retrofit is just a simple HTTP client, but it differs from a standard client, as you do not specify payload, query parameters, method, headers etc. when you want to invoke remote service. You invoke a service by simply calling a method on a class that is generated as an implementation of an interface with endpoint configuration via annotations. It may sound complicated, but it’s not. Here’s a simple example.

Let’s assume that we have a service that generates numbers from given range with the following url:

/numbers/random/{from}/{to}

As you see, as an input, it requires two path parameters, that specify the range from which value must be generated.

To use this endpoint with Retrofit we must create a new java interface, in which we specify a method for each endpoint, one in this case. Now we just need to annotate this method, for example, we add @GET annotation with relative endpoint url, we also specify path variables from url, as method parameters. As a result, we have something like this:

Now, to use it, we need to somehow instantiate it. Here’s a spring-boot example of how to use it. We declare a new bean in which we create an instance of our client using Retrofit builder. We need to provide a base url and our interface class. We can also add message converters, that will automatically transform, for example, JSON data into java objects. There are many converters available, including Jackson, Gson and more.

With all this, we can just @Autowire our service and call our endpoint.

RxJava + Retrofit

So you now have a basic knowledge of RxJava and Retrofit. But how we use them together? It’s easy 🙂 We can add support for RxJava2 to Retrofit using CallAdapterFactory. With this instead of returning Double value from our interface method, we may return Observable<Double>. And that’s it, that is all you have to do to make those two play together.

Let’s play

Let’s say, that we have two services. The first one that generates random numbers from given range. And we want to use this to generate a pair of numbers, a coordinate, latitude and longitude. With this, we want to fetch an address from google services at that location. But there may be no address there, it may be a middle of a desert, lake or woods. So we should generate some other location. Also, as most of the world area is water, we should limit our search area to, for example, a rough square around Poland.

We will use our number generating service from the example above, and we have to add Retrofit interface, bean for google geocoder service and response class.

I’m using here Jacksons @JsonIgnoreProperties to just ignore all other properties from JSON response, and @JsonProperty for correctly mapping formatted_address property.

Now, what we need, is to generate two numbers, after this, we have to join them together, as google geocoder requires both values as a joined string.

Ok, coordinates are ready, so let’s invoke google service, and get the result address from the response, so let’s chain in some more methods.

But how to check for an empty result and retry? We can do this with first, filtering out empty results, so that data stream will be empty, and when it is empty we can throw an exception. And when an exception is thrown, we can react somehow to it, we can retry few times whole chain.

There is one more important thing. RxJava is lazy by default, as is it doesn’t do anything unless there is at least one subscriber that can consume data. So all this chain above doesn’t do anything yet. To make it work, we need to subscribe as in the first paragraph of this post, or we can try to get it in a blocking way, in current thread with blockingGet() method. Using subscribe is preferred if you want to create true multithreading and responsive application, with this you can easily specify threads in which RxJava will be processing all those requests and in which it will be waiting for a result using Schedulers.

And that’s all really, as you see, it is really easy to use and powerful combination. You can find the whole working example in my GitHub repository at https://github.com/lmonkiewicz/rxjava-http.


Also published on Medium.