Thinking in streams

RxJava demands a bit of a mental shift if you’re used to imperative programming. Choosing the right combination of operators to express complex logic as a stream only comes with time and experience.

Recently, I ran into an old problem that I’d solved with unnecessarily verbose and stateful code. I want to share the original and revised versions, because the solution is incredibly succinct once RxJava is used to it’s full potential.

Problem

Assume the following Observable makes an HTTP call and emits a single result: Observable<ServiceResult> fetchResult(). The ServiceResult may indicate that the output we’re really interested in is still processing, so:

  • Poll every 2 seconds
  • Return the first time serviceResult.isProcessing() is false
  • Try a maximum of 3 times

First attempt

It sounds like the requirements can be met with a simple while loop. This is possible with Statement.doWhile() from the RxJava Computation Expressions package, but it requires keeping track of polling state and providing a function for the loop condition.

The state can be updated each try in doOnNext(). The delay between requests is simple to achieve with Observable.timer() and a filter() function prevents the pending results from being returned.

Streams over loops

The revised solution is equivalent, but it’s no longer necessary to manage any polling state.

  1. Start with a stream that emits on a 2 second interval
  2. Limit the stream to 3 values
  3. Each tick, fetch a result
  4. Take the first value that’s ready

The stopping condition that was previously implemented with the shouldContinue() function is now part of the call chain definition in 2 and 4.

 Pro tip

Observable.timer() and Observable.interval() have versions which accept a scheduler parameter. This is extremely useful for unit tests, since it allows assertions to be made about the values and events emitted as time is advanced with TestScheduler.