2015. október 28., szerda

Comparison of Reactive-Streams implementations (part 1)

Introduction

The Reactive-Streams initiative becomes more and more known in concurrency/parallelism circles and there appear to be several implementations of the specification, most notably Akka-Streams, Project Reactor and RxJava 2.0.

In this blog post, I'm going to look at how one can use each library to build up a couple of simple flow of values and while I'm at it, benchmark them with JMH. For comparison and sanity checking, I'll also include the results of RxJava 1.0.14, Java and j.u.c.Stream.

In this part, I'm going to compare the synchronous behavior of the 4 libraries through the following tasks:

  1. Observe a range of integers from 1 to (1, 1000, 1.000.000) directly.
  2. Apply flatMap to the range of integers (1) and transform each value into a single value sequence.
  3. Apply flatMap to the range of integers (1) and transform each value into a range of two elements.
The runtime environment:
  • Gradle 2.8
  • JMH 1.11.1
    • Threads: 1
    • Forks: 1
    • Mode: Throughput
    • Unit: ops/s
    • Warmup: 5, 1s each
    • Iterations: 5, 2s each
  • i7 4790 @ 3.5GHz stock settings CPU
  • 16GB DDR3 @ 1600MHz stock RAM
  • Windows 7 x64
  • Java 8 update 66 x64

RxJava

Let's start with the implementation of the tasks in RxJava. First, one has to include the library within the build.gradle file. For RxJava 1.x:

compile 'io.reactivex:rxjava:1.0.14'

For RxJava 2.x:

repositories {
    mavenCentral()

    maven { url 'https://oss.jfrog.org/libs-snapshot' }
}

compile 'io.reactivex:rxjava:2.0.0-DP0-SNAPSHOT'

Unfortunately, one can't really have multiple versions of the same ArtifactID so either we swap the compile ref or switch to my RxJava 2.x backport, which is under a different name and different package naming:

compile 'com.github.akarnokd:rxjava2-backport:2.0.0-RC1'

Once the libs are set up, let's see the flows:


@Params({"1", "1000", "1000000"})
int times;
//...

Observable<Integer> range = Observable.range(1, times);

Observable<Integer> rangeFlatMapJust = range
    .flatMap(Observable::just);

Observable<Integer> rangeFlatMapRange = range
    .flatMap(v -> Observable.range(v, 2));

The code looks the same for both versions, only the imports have to be changed. Nothing complicated.

Observation of the streams will generally be performed via the LatchedObserver instance which extends/implements Observer and will be reused for the other libraries as well:


public class LatchedObserver<T> extends Observer<T> {
    public CountDownLatch latch = new CountDownLatch(1);
    private final Blackhole bh;
    public LatchedRSObserver(Blackhole bh) {
        this.bh = bh;
    }
    @Override
    public void onComplete() {
        latch.countDown();
    }
    @Override
    public void onError(Throwable e) {
        latch.countDown();
    }
    @Override
    public void onNext(T t) {
        bh.consume(t);
    }
}

Since these flows are synchronous, we won't utilize the latch itself but simply subscribe to the flows:


@Benchmark
public void range(Blackhole bh) {
    range.subscribe(new LatchedObserver<Integer>(bh));
}

Let's run it for both 1.x and 2.x and see the benchmark results:


This is a screenshot of my JMH comparison tool; it can display colored comparison of throughput values: green is better than the baseline, red is worse. Lighter color means at least +/- 3%, stronger color means +/- 10% difference.

Here and all the subsequent images, a larger number is better. You may want to multiply the times with the measured value to get the number of events transmitted. Here, Range with times = 1000000 means that there were ~253 million numbers emitted.

It appears RxJava 2.x can do quite the numbers better, except in the two RangeFlatMapJust cases. What's going on? Let me explain.

The improvements come from the fact that RxJava 2.x has generally less subscribe() overhead than 1.x. In 1.x when one creates a Subscriber, it will be wrapped into a SafeSubscribe instance and when the Producer is set on it, there is a small arbitration happening inside setProducer(). As far as I can tell, the JIT in 1.x will do its best to remove the allocation and the synchronization, but the arbitration won't be removed which means more instructions for the CPU to execute. In contrast, in 2.x there is no wrapping and no arbitration at all.

The lower performance of the RangeFlatMapJust comes from a single operator: just(). In 1.x, the operator just() immediately emits its value without bothering with Producers and requests which means it doesn't support or respect backpressure. In 2.x, however, just() has to consider backpressure requests which involves a mandatory atomic CAS (~10 ns or 35 cycles uncontended (!)). This is the cost of correctness.

Edit: (wrong explanation before)

The lower performance comes from the serialization approaches the two versions use: 1.x uses the synchronized-based emitter-loop and 2.x uses the atomics-based queue-drain approach. The former is elided by the JIT whereas the latter can't be and there is always a ~17 ns overhead per value. I'm planning a performance overhaul for 2.x anyways so this won't remain the case for too long.

In conclusion, I think RxJava does a good job both in terms of usability and performance. Why am I mentioning usability? Read on.


Project Reactor

Project Reactor is another library that supports the Reactive-Streams specification and provides a similar fluent API as RxJava.

I've briefly benchmarked one of its earlier version (2.0.5-RELEASE) and posted a picture of it, but I'm going to use the latest snapshot of it. For this, we need to adjust our build.gradle file.

repositories {
    mavenCentral()

    maven { url 'http://repo.spring.io/libs-snapshot' }
}

compile 'io.projectreactor:reactor-stream:2.1.0.BUILD-SNAPSHOT'
This should make sure I'm using a version with the most performance enhancements possible.

The source code for the flows look quite similar:


Stream<Integer> range = Streams.range(1, times);

Stream<Integer> rangeFlatMapJust = raRange.flatMap(Streams::just);

Stream<Integer> rangeFlatMapRange = raRange
    .flatMap(v -> Streams.range(v, 2));

A small note on the Streams.range() here. It appears the API has changed between 2.0.5 and the snapshot. In 2.0.5, the operator's parameters were start+end (both inclusive) which is now changed to start+count thus matches RxJava's range().

The same LatchedObserver can be used here so let's see the run results:



Here, reactor2 stands for the 2.1.0 snapshot and reactor1 is 2.0.5 release. Clearly, Reactor has improved its performance by reducing the overhead in the operators (by a factor of ~10).

There is, however a curious result with RangeFlatMapJust, similar to RxJava: both RxJava 1.x and Reactor 2.1.0 outperform RxJava 2.x and with roughly the same amount! What's happening there?

I know that flatMap in RxJava 1.x is faster in single-threaded use because it uses the emitter-loop approach (which utilizes synchronized) which can be nicely elided by the JIT compiler and thus the overhead is removed. In 2.x, the code, currently, uses queue-drain with 2 unavoidable a atomic operations per value on the fast path.

So let's find out what Reactor does. Its flatMap is implemented in the FlatMapOperator class and what do I see? It's almost the same as RxJava 2.x flatMap! Even the bugs are the same!

Just kidding about the bugs. There are a few differences so let's check the same fast-path and why it can do 4-8 million values more.

The doNext() looks functionally identical: if the source is a Supplier, it gets the held value directly without subscription then tries to emit it via tryEmit().

Potential bug: If this path crashes and goes into reportError(), the execution falls through and the Publisher gets subscribed to.

Potential bug: In RxJava 2.0, we always wrap user-supplied functions into try-catches so an exception from them is handled in-place. In Reactor's implementation, this is missing from doNext (but may be present somewhere else up in the call chain).

The tryEmit() is almost the same as well with a crucial difference: it batches up requests instead of requesting one-by-one. Interesting!


if (maxConcurrency != Integer.MAX_VALUE && !cancelled
       && ++lastRequest == limit) {
    lastRequest = 0;
    subscription.request(limit);
}

The same re-batching happens with the inner subscribers in both implementations (although this doesn't come into play in the given flow example). Nice work Project Reactor!

In the RangeFlatMapRange case, which doesn't exercise this fast path, Reactor is slower although it uses the same flatMap logic. The answer is a few lines above in the results: Reactor's range produces 100 million values less per second.

Following the references along, there are a bunch of wrappers and generalizations, but those only apply once per Subscriber so they can't be the cause for the times = 1000000 case.

The reason appears to be that range() is implemented like RxJava 2.x's generator (i.e., SyncOnSubscribe). The ForEachBiConsumer looks tidy enough but I can spot a few potential deficiencies:


  • Atomic read and increment is involved which forces the JIT'd code to re-read the instance fields from cache instead of keeping it in a register. The requestConsumer could be read into a local variable before the loop.
  • Use == or != as much as possible because the other kind of comparisons appear to be slower on x86.
  • The atomic decrement is an expensive operation (~10ns) but can be delayed quite a bit: once the current known requested amount runs out, one should try to read the requested amount first to see if there were more requests issued in the mean time. If so, keep emitting, otherwise subtract all that has been emitted from the request count.
RxJava's range doesn't do this latter at the moment; HotSpot's register allocator seems to be hectic at times: too many local variables and performance drops because of register spill (on x64!). Implementing this latter optimization involves more local variables and thus the risk of making things worse.

In conclusion, Project Reactor gets better and better with each release, especially when it adopts RxJava 2.x structures and algorithms ;)

Akka-Streams

I believe Akka-Streams was the most advertised library from the list. With a company behind it and a port from Scala, what could go wrong?

So let's include it in the build.gradle:

compile 'com.typesafe.akka:akka-stream-experimental_2.11:1.0'

So far so good, but where do I start? Looking at the web I came across a ton of examples, in Scala. Unfortunately, I don't know Scala enough so it was difficult for me to figure out what to use. Plus, it doesn't help that with Eclipse, the source code of the library is hard to navigate because it's in Scala (and I don't want to install the plugin). Okay, we won't look at the source code.

It turns out, Akka-Streams doesn't have a range operator, therefore, I have prepopulate a List with the values and use it as a source:

List<Integer> values = rx2Range
    .toList().toBlocking().first();

Source.from(values).???

A good thing RxJava is around. Akka-Stream uses the Source object as factory method for creating sources. However, Source does not implement Publisher at all!

One does not simply observe a Source.

After digging a bit, I found an example which shows one has to use runWith that takes a Sink.publisher() parameter. Let's apply them:


Publisher<Integer> range = Source
    .from(values).runWith(Sink.publisher());

Doesn't work; the example was out of date and one needs a Materializer in runWith. Looking at the hierarchy, ActorMaterializer does implement it so let's get one.


ActorMaterializer materializer = ActorMaterializer
    .create(???);

Publisher<Integer> range = Source.from(values)
    .runWith(Sink.publisher(), materializer);

Hmm, it requires an ActorRefFactory. But hey, I remember the examples creating an ActorSystem, so let's do that.


ActorSystem actorSystem = ActorSystem.create("sys");

ActorMaterializer materializer = ActorMaterializer
    .create(actorSystem);

Publisher<Integer> range = Source.from(values)
    .runWith(Sink.publisher(), materializer);

Finally, no more dependencies. Let's run it!

Doesn't work, crashes with missing configuration for akka.stream. Huh? After spending some time figuring out things, it appears Akka defaults to a reference.conf file in the classpath's root. But both jars of the library have this reference.conf!

As it turns out, when the Gradle-JMH plugin packages up the benchmark jar, it puts both reference.conf files into the jar and both of them end up in there under the same name; Akka then picks up the wrong one.

The solution: pull the one from the streams jar out and put it under a different name into the Gradle sources/resources.

Sidenote: this is still not enough as by default Gradle ignores non java files, especially if they are not under src/main/java. I had to add the following code to build.gradle to make it work:

processResources {
  from ('src/main/java') {
    include '**/*.conf'
  }
}

With all these set up, lets finish the preparation:


Config cfg = ConfigFactory.parseResources(
     ReactiveStreamsImpls.class, "/akka-streams.conf");
ActorSystem actorSystem = ActorSystem.create("sys", cfg);

ActorMaterializer materializer = ActorMaterializer
    .create(actorSystem);
        
List<Integer> values = rx2Range
    .toList().toBlocking().first();
        
Publisher<Integer> range = Source.from(values)
            .runWith(Sink.publisher(), materializer);


Compiles? Yes! Benchmark jar contains everything? Yes! The setup runs? Yes! Benchmark method works? No?!

After one iteration, it throws an error because the range Publisher can't be subscribed to more than once. I've asked for solutions on StackOverflow to no avail; whatever I've got back either didn't compile or didn't run. At this point, I just gave up on it and used a trick to make it work multiple times: defer(). I have to defer the creation of the whole Publisher so I get something fresh every time:


Publisher<Integer> range = s -> Source.from(values)
            .runWith(Sink.publisher(), materializer).subscribe(s);


In addition, as I suspected, there is no way to run Akka-Streams synchronously, therefore, any benchmark with the other synchronous guys can't be directly compared. Plus, I have to use the CountDownLatch to await the termination:


@Benchmark
public void akRange(Blackhole bh) throws InterruptedException {
    LatchedObserver<Integer> lo = new LatchedObserver<>(bh);
    akRange.subscribe(lo);
    
    if (times == 1) {
        while (lo.latch.getCount() != 0);
    } else {
        lo.latch.await();
    }
}

Note: I have to use a spin-loop over the latch for times == 1 because Windows' timer resolution and wakeup takes pretty long milliseconds to happen at times and without spinning, the benchmark produces 35% lower throughput.

Almost ready, we still need the RangeFlatMapJust and RangeFlatMapRange equivalents. Unfortunately, Akka-Streams doesn't have flatMap but has a flatten method on Source. No problem (by now):


Publisher<Integer> rangeFlatMapJust = s -> 
                Source.from(values)
                .map(v -> Source.single(v))
                .flatten(FlattenStrategy.merge())
                .runWith(Sink.publisher(), materializer)
                .subscribe(s)
                ;

Nope. Doesn't work because there is no FlattenStrategy.merge(), despite all the examples. But there is a FlattenStrategy.concat(). Have to do.

Nope, still doesn't compile because of type inference problems. Have to introduce a local variable:

FlattenStrategy<Source<Integer, BoxedUnit>> flatten = 
    FlattenStrategy.concat();

Works in Eclipse, javac fails with ambiguity error. As it turns out, javadsl.FlattenStrategy extends scaladsl.FlattenStrategy which both have the same concat() factory method but different number of type arguments. This isn't the first time javac can't disambiguate but Eclipse can!

We don't give up and use reflection to get the proper method called:


Method m = akka.stream.javadsl.FlattenStrategy
    .class.getMethod("concat");

@SuppressWarnings({ "rawtypes", "unchecked" })
FlattenStrategy<Source<Integer, BoxedUnit>, Integer> flatten = 
    (FlattenStrategy)m.invoke(null);

Publisher<Integer> rangeFlatMapJust = s -> 
                Source.from(values)
                .map(v -> Source.single(v))
                .flatten(flatten)
                .runWith(Sink.publisher(), materializer)
                .subscribe(s)
                ;

Finally, Akka-Streams works. Let's see the benchmark results:



Remember, since Akka can't run synchronously and we had to do a bunch of workarounds, we should expect numbers will be lower by a factor of 5-10.

I don't know what's going on here. Some numbers are 100x lower. Akka certainly doesn't throw an Exception somewhere because we'd see 5M ops/s in those cases, regardless of times.

In conclusion, I'm disappointed with Akka-Streams; it takes quite a hassle to get a simple sequence running and apparently requires more thought to a reasonable performance.

Plain Java and j.u.c.Stream

Just for reference, let's see how the same task looks and works with plain Java for loops and j.u.c.Streams.

For plain Java, the benchmarks look simple:


@Benchmark
public void javaRange(Blackhole bh) {
    int n = times;
    for (int i = 0; i < n; i++) {
        bh.consume(i);
    }
}

@Benchmark
public void javaRangeFlatMapJust(Blackhole bh) {
    int n = times;
    for (int i = 0; i < n; i++) {
        for (int j = i; j < i + 1; j++) {
            bh.consume(j);
        }
    }
}

@Benchmark
public void javaRangeFlatMapRange(Blackhole bh) {
    int n = times;
    for (int i = 0; i < n; i++) {
        for (int j = i; j < i + 2; j++) {
            bh.consume(j);
        }
    }
}

The Stream implementation is a bit complicated because a j.u.c.Stream is not reusable and has to be recreated every time one wants to consume it:


@Benchmark
public void streamRange(Blackhole bh) {
    values.stream().forEach(bh::consume);
}

@Benchmark
public void streamRangeFlatMapJust(Blackhole bh) {
    values.stream()
    .flatMap(v -> Collections.singletonList(v).stream())
    .forEach(bh::consume);
}

@Benchmark
public void streamRangeFlatMapRange(Blackhole bh) {
    values.stream()
    .flatMap(v -> Arrays.asList(v, v + 1).stream())
    .forEach(bh::consume);
}

Finally, just for fun, let's do a parallel version of the stream benchmarks:


@Benchmark
public void pstreamRange(Blackhole bh) {
    values.parallelStream().forEach(bh::consume);
}

@Benchmark
public void pstreamRangeFlatMapJust(Blackhole bh) {
    values.parallelStream()
    .flatMap(v -> Collections.singletonList(v).stream())
    .forEach(bh::consume);
}

@Benchmark
public void pstreamRangeFlatMapRange(Blackhole bh) {
    values.parallelStream()
    .flatMap(v -> Arrays.asList(v, v + 1).stream())
    .forEach(bh::consume);
}

Great! Let's see the results:



Impressive, except for some parallel cases where the forEach synchronizes all parallel operations back to a single thread I presume, negating all benefits.

In conclusion, if you have a synchronous task, try plain Java first.

Conclusion


In this blog post, I've compared the three Reactive-Streams library for usability and performance in case of a synchronous flow. Both RxJava and Reactor did quite well, relative to Java, but Akka-Streams was quite complicated to set up and didn't perform adequate "out of box".



However, there might be some remedy for Akka-Streams in the next part where I compare the libraries in asynchronous mode.


Nincsenek megjegyzések:

Megjegyzés küldése