RxJava demands a bit of a mental shift if you’re used to imperative programming. Choosing the right combination of operators to express complex logic as a stream only comes with time and experience.
Recently, I ran into an old problem that I’d solved with unnecessarily verbose and stateful code. I want to share the original and revised versions, because the solution is incredibly succinct once RxJava is used to it’s full potential.
Assume the following
Observable makes an HTTP call and emits a single result:
Observable<ServiceResult> fetchResult(). The
ServiceResult may indicate that the output we’re really interested in is still processing, so:
- Poll every 2 seconds
- Return the first time
- Try a maximum of 3 times
It sounds like the requirements can be met with a simple
while loop. This is possible with
Statement.doWhile() from the RxJava Computation Expressions package, but it requires keeping track of polling state and providing a function for the loop condition.
The state can be updated each try in
doOnNext(). The delay between requests is simple to achieve with
Observable.timer() and a
filter() function prevents the pending results from being returned.
Streams over loops
The revised solution is equivalent, but it’s no longer necessary to manage any polling state.
- Start with a stream that emits on a 2 second interval
- Limit the stream to 3 values
- Each tick, fetch a result
- Take the first value that’s ready
The stopping condition that was previously implemented with the
shouldContinue() function is now part of the call chain definition in 2 and 4.
Observable.interval() have versions which accept a scheduler parameter. This is extremely useful for unit tests, since it allows assertions to be made about the values and events emitted as time is advanced with