0%

JAVA-从异步编程到响应式编程

JAVA-从异步编程到响应式编程

Q:响应式编程带来了什么好处?
A:当并发连接增加,很多时候我们是通过多线程、线程池的方式来提升性能,然而线程数增加后系统开销也增加了,而且很多场景下线程大部分是执行IO操作,其大部分时间是处于idle状态的,资源利用率不高。响应式编程通过引入非阻塞non-blocking的特性,实现了线程资源的充分利用。

这是一幅盗来的图^1,展示了响应式编程的发展历程,尤其是在JVM或者说JAVA领域。这里面比较核心的就是Reactive Streams规范、ReacticeX项目及其包含的RxJava、Project Reactor项目、JAVA方面的CompletableFuture API和Flow API。

CompletableFuture API and Reactive Programming Timeline

一、CompletableFuture

首先,CompletableFuture是Future,意味着它支持任务异步执行。在此基础上,它还支持异步任务之间的依赖关系、异步任务的回调等。对于日常业务逻辑,使用CompletableFuture能够通过将业务逻辑异步化,来提升cpu利用率、缩短响应时间,达到提升性能的目的。

那么CompletableFuture和响应式编程的区别是什么呢?

关键区别在于Stream这个概念上,CompletableFuture描述的是单次执行的结果,尽管可以通过异步任务之间的依赖关系构建一串异步任务组成的执行图,本质上面向的是单次操作,例如一次rpc请求的处理^1

Java CompletableFuture API Example Invoice Path

响应式编程则面向的是Stream,或者说叫“流”,也即是数据或事件的序列,有点类似JAVA的Stream API。

当然用CompletableFuture也能够实现对数据/事件流的处理,但是有一个关键的特性是响应式编程具备而CompletableFuture则需要重新实现的,并且实现起来很复杂,那就是back pressure。back pressure就是在下游无法承载上游的压力时,采取一些措施。

至于Java中提供的CallBack、Future机制在实现响应式编程中的问题和缺点,例如“callback hell”,不支持lazy computation等,可以从https://projectreactor.io/docs/core/release/reference/#getting-started中找到答案。

所以,我们需要一套框架或者说类库来更好的实现响应式编程,需要具备以下特性:

  • 支持异步任务的封装及组装,需要API来对异步任务进行包装,并且需要很多种算子来对异步任务进行组装,包括过滤、组装、异常处理等算子
  • 减少异步任务的嵌套,减少代码的复杂性,增加可读性,避免“callback hell”等复杂度很高的代码
  • 支持背压back pressure,也就是下游和上游之间能协商数据流的速度

二、响应式编程-从ReactiveX到RxJava

项目主页:http://reactivex.io/

github:https://github.com/ReactiveX?page=2

官网定义如下:

ReactiveX is a collection of open source projects. The content of this page is licensed under Creative Commons Attribution 3.0 License, and code samples are licensed under the BSD License.

ReactiveX is a combination of the best ideas from the Observer pattern, the Iterator pattern, and functional programming

ReactiveX is a library for composing asynchronous and event-based programs by using observable sequences.

ReactiveX基于观察者模式、迭代者模式和函数式编程的组合,为构建异步的基于事件的应用提供了一套类库,并且它是一个多语言的项目,例如,JAVA中对应的就是RxJava类库。

2.1 核心概念

2.1.1 Observable^2

img

如上图所示,一个Observable其实就是一个产生数据流的对象,它能够转换为另一个Observable。

Observable可以被Observer订阅(subscribe),一个完整的Observer订阅Observable的例子的伪代码如下:

1
def myOnNext     = { item -> /* do something useful with item */ };
2
def myError      = { throwable -> /* react sensibly to a failed call */ };
3
def myComplete   = { /* clean up after the final response */ };
4
def myObservable = someMethod(itsParameters);
5
myObservable.subscribe(myOnNext, myError, myComplete);
6
// go on about my business

其中onNext, onError, onComplete三个方法是Observer需要实现的。分别用于对数据流的正常处理,异常处理,完成操作。

2.1.2 Observable的各种算子(Operator)

算子可用于连接Observable,或者改变Observable的行为。Operator能够将Observable串联起来,形成一条chain。

ReactiveX支持的算子数量较多,这里就不展开了,可以到 http://reactivex.io/documentation/operators.html 这里查看。

2.1.3 Single

Single是RxJava中引进的类似Observable,但只产生一个数据的对象,因此订阅它的Observer只需要实现onSuccess,onError方法即可。

Single也有很多对应的算子,可用于创建Single、对Single进行转换、组装、连接、延时等,具体可查看:http://reactivex.io/documentation/single.html

2.1.4 Subject

http://reactivex.io/documentation/subject.html

一个Subject既是Observable,又是Observer,表明它既可以订阅一个或多个Observable,又可以作为Observable将其订阅到的数据流重新发出,因此它类似一种桥梁或者说代理。

2.1.5 Scheduler

http://reactivex.io/documentation/scheduler.html

Rx默认是单线程的,整条链路上的调用都在Subscribe方法被调用的线程中执行。不过一些算子接收Scheduler作为参数,控制算子在哪个Scheduler上执行,这里可以认为Scheduler是某种线程池,提供了多线程资源。

SubscribeOn算子指定Observable开始执行的线程,而SubscribeOn算子在链路的什么地方被调用并不重要。

ObserveOn算子指定了从该调用开始往后的算子所在的执行线程,如果要改变线程,则需要重新调用ObserveOn,指定一个新的Scheduler。

2.2 RxJava

github:https://github.com/ReactiveX/RxJava

github wiki:https://github.com/ReactiveX/RxJava/wiki

RxJava是ReactiveX项目的一部分,它也遵守Reactive Streams规范。不要以为RxJava是只针对Java语言的,RxJava本身也是支持多语言的,支持一堆基于JVM的语言。下面是RxJava的最最最入门级demo,包括Observable的创建,组装,订阅:

1
public class RxJavaDemo {
2
    
3
    public static void main(String[] args) {
4
        RxJavaDemo demo = new RxJavaDemo();
5
        demo.hello("pcl", "j");
6
        demo.customObservableBlocking().subscribe(s -> System.out.println(s));
7
        demo.customObservableNonBlocking().subscribe(s -> System.out.println(s));
8
        demo.simpleComposition();
9
    }
10
    /**
11
     * hello world
12
     *
13
     * @param args
14
     */
15
    private void hello(String... args) {
16
        Flowable.fromArray(args).subscribe(s -> System.out.println("Hello " + s + "!"));
17
    }
18
    /**
19
     * Creating an Observable from an Existing Data Structure
20
     */
21
    private void createObservables() {
22
        Observable<String> o1 = Observable.fromArray(new String[]{"a", "b", "c"});
23
24
        Integer[] list = {5, 6, 7, 8};
25
        Observable<Integer> o2 = Observable.fromArray(list);
26
27
        Observable<String> o3 = Observable.just("one object");
28
    }
29
    /**
30
     * This example shows a custom Observable that blocks
31
     * when subscribed to (does not spawn an extra thread).
32
     *
33
     * @return
34
     */
35
    private Observable<String> customObservableBlocking() {
36
        return Observable.<String>create(emitter -> {
37
            for (int i = 0; i < 50; i++) {
38
                emitter.onNext("count_" + i);
39
            }
40
            emitter.onComplete();
41
        });
42
    }
43
    /**
44
     * 创建一个非阻塞的Observable
45
     *
46
     * @return
47
     */
48
    private Observable<String> customObservableNonBlocking() {
49
        return Observable.<String>create(emitter -> {
50
            new Thread() {
51
                @Override
52
                public void run() {
53
                    for (int i = 0; i < 50; i++) {
54
                        emitter.onNext("count_" + i);
55
                    }
56
                    emitter.onComplete();
57
                }
58
            }.start();
59
        });
60
    }
61
		/**
62
     * 用算子对Observable进行组装,或者转变
63
     */
64
    private void simpleComposition() {
65
        customObservableNonBlocking().skip(10).take(5)
66
                .map(stringValue -> {return stringValue + "_xform";})
67
                .subscribe(s -> System.out.println("onNext => " + s));
68
    }
69
}

其中simpleComposition()方法的marble diagram如下图所示^3,在reactive编程模型中,常常用marble diagram表示数据流的流转。

img

2.2.1 异常处理

Observable一般不抛出异常,而是发出一个onError通知,不过有些情况下,例如调用onError()方法本身失败的话,会抛出RuntimeException、OnErrorFailedException、OnErrorNotImplementedException异常。

可以使用异常处理算子来处理onError()方法通知的异常。subscribe的时候可以传入第二个方法处理异常。

2.2.1 RxJava的算子

只能说RxJava支持的算子很多,就不多说了,当然你也可以自己写新的算子,这就属于比较难的操作了。

2.2.2 back pressure

其实从上面一路写下来,有没有发现RxJava和Stream API非常相似,我是这么觉得的。但是这里就要说一说RxJava所具备的背压这一特性,是响应式编程的非常重要的一个特性。

back pressure就是在下游无法承载上游的压力时,采取一些措施,通知上游放慢速度。

1
PublishProcessor<Integer> source = PublishProcessor.create();
2
3
    source
4
    .observeOn(Schedulers.computation())
5
    .subscribe(v -> compute(v), Throwable::printStackTrace);
6
7
    for (int i = 0; i < 1_000_000; i++) {
8
        source.onNext(i);
9
    }
10
11
    Thread.sleep(10_000);

这段代码会抛出MissingBackpressureException异常,因为PublishProcessor不支持背压,而observeOn算子内部的buffer是有边界的,当PublishProcessor产生数据的速度超过计算的速度时,数据会存在observeOn的内部buffer中,当溢出时就抛出MissingBackpressureException异常。

如果改成下面的代码则可以正常运行,因为Flowable.range支持背压,range可以和observeOn之间通过类似协程的机制,协商应该以什么样的速度产生数据。具体的机制是,range通过调用observeOn的onSubscribe方法发送一个callback方法(org.reactivestreams.Subscription接口的实现)给订阅者,observeOn回调Subscription.request(n)方法告诉range产生n个数据。

1
Flowable.range(1, 1_000_000)
2
.observeOn(Schedulers.computation())
3
.subscribe(v -> compute(v), Throwable::printStackTrace);
4
5
Thread.sleep(10_000);

下面通过一个更显式的例子来说明,先调用onStart方法要求range发送一个数据,然后异步调用onNext方法进行数据计算,并再发一个request要求一个数据,注意onStart中request执行完后,range就会发送数据,会异步触发onNext,若此时onStart中初始化操作尚未完成,则可能产生异常

1
Flowable.range(1, 1_000_000)
2
                .subscribe(new DisposableSubscriber<Integer>() {
3
                    @Override
4
                    public void onStart() {
5
                        request(1);
6
                    }
7
                    public void onNext(Integer v) {
8
                        v = v * 2;
9
                        System.out.println(v);
10
                        request(1);
11
                    }
12
                    @Override
13
                    public void onError(Throwable ex) {
14
                        ex.printStackTrace();
15
                    }
16
                    @Override
17
                    public void onComplete() {
18
                        System.out.println("Done!");
19
                    }
20
                });
2.2.2.1 解决背压问题-增加buffer size
1
PublishProcessor<Integer> source = PublishProcessor.create();
2
3
    source.observeOn(Schedulers.computation(), 1024 * 1024)
4
          .subscribe(e -> { }, Throwable::printStackTrace);
5
6
    for (int i = 0; i < 1_000_000; i++) {
7
        source.onNext(i);
8
    }

仍然有机会产生MissingBackpressureException

2.2.2.2 解决背压问题-批量算子/采样算子
1
PublishProcessor<Integer> source = PublishProcessor.create();
2
3
    source
4
          .buffer(1024)
5
          .observeOn(Schedulers.computation(), 1024)
6
          .subscribe(list -> { 
7
              list.parallelStream().map(e -> e * e).first();
8
          }, Throwable::printStackTrace);
9
10
    for (int i = 0; i < 1_000_000; i++) {
11
        source.onNext(i);
12
    }
13
    
14
PublishProcessor<Integer> source = PublishProcessor.create();
15
16
    source
17
          .sample(1, TimeUnit.MILLISECONDS)
18
          .observeOn(Schedulers.computation(), 1024)
19
          .subscribe(v -> compute(v), Throwable::printStackTrace);
20
21
    for (int i = 0; i < 1_000_000; i++) {
22
        source.onNext(i);
23
    }

仍然有机会产生MissingBackpressureException

2.2.2.3 解决背压问题-onBackpressureXXX 算子

onBackpressureBuffer()
onBackpressureBuffer(int capacity)
onBackpressureBuffer(int capacity, Action onOverflow)
onBackpressureBuffer(int capacity, Action onOverflow, BackpressureOverflowStrategy strategy)
onBackpressureDrop()
onBackpressureLatest()

还有一个办法是创建支持背压的数据源。

三、响应式编程-从Reactive Streams到Project Reactor

上文提到RxJava属于ReactiveX项目,也遵守Reactive Streams规范,这里就介绍一下Reactive Streams规范,以及在其基础上的Project Reactor。

3.1 Reactive Streams

主页:https://www.reactive-streams.org/

Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure. This encompasses efforts aimed at runtime environments (JVM and JavaScript) as well as network protocols.

Reactive Streams是一套规范,为异步流处理及非阻塞的背压提供了标准。

对于Java程序员,Reactive Streams是一个API。Reactive Streams为我们提供了Java中的Reactive Programming的通用API。Reactive Streams API中仅仅包含了如下四个接口^4

1
//发布者
2
public  interface  Publisher < T > {
3
	public  void  subscribe(Subscriber <?super  T >  s);
4
}
5
6
//订阅者
7
public  interface  Subscriber < T > {
8
	public  void  onSubscribe(Subscription  s);
9
	public  void  onNext(T  t);
10
	public  void  onError(Throwable  t);
11
	public  void  onComplete();
12
}
13
14
//表示Subscriber消费Publisher发布的一个消息的生命周期
15
public interface Subscription {
16
	public void request(long n);
17
	public void cancel();
18
}
19
20
//处理器,表示一个处理阶段,它既是订阅者也是发布者,并且遵守两者的契约
21
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
22
	
23
}

https://github.com/reactive-streams/reactive-streams-jvm 定义了Reactive Streams的JVM规范。

Java9中Flow类下的内容与Reactive Streams完全一致,这部分留到后面研究。

其他还有AKKA Streams支持Reactive Streams。

Publisher-Subscriber接口既有观察者模式,又包含了迭代器模式。Subscriber通过订阅的方式实现了观察者模式。由Publisher通过push元素实现对元素的迭代,而相应的Java中的Iterator是通过pull的方式实现迭代。push的方式是响应式编程的关键,Publisher通过调用Subscriber的onNext方法通知Subscriber下一个元素,通过onError通知异常,通过onComplete通知完成。

3.2 Project Reactor

主页:https://projectreactor.io/

Reactor is a fourth-generation reactive library, based on the Reactive Streams
specification, for building non-blocking applications on the JVM

Reactor文档:

https://projectreactor.io/docs/core/release/reference/

Reactor Core运行需要Java8+环境,可以直接和Java8的函数式API交互,包括CompletableFuture,Stream和Duration。它实现了Reactive Streams规范,并提供Flux和Mono两个异步序列API,Flux是支持N个元素的序列,Mono是支持0个或1个的序列(类似RxJava中的Single)。reactor-netty项目支持非阻塞的IPC(inter-process communication),它还提供了支持HTTP(包括WebSockets)、TCP、UDP协议的带有背压特性的网络引擎,支持响应式的边界码,适合微服务框架。

Reactor实现了Reactive Streams规范,因此实现了Publisher, Subscriber接口。需要注意的一点是Publisher创建后,并不开始产生数据,各种算子对Publisher的包装也不会导致数据开始产生,只有当调用链由Subscriber订阅后,内部调用request方法才会触发数据开始流动。

Reactor背压的实现机制和RxJava类似。也是通过上面提到的request机制来进行协商。

3.2.1 Flux与Mono

Flux与Mono是Publisher的实现,其创建代码示例如下:

1
private void createFluxMono() {
2
        Flux<String> seq1 = Flux.just("foo", "bar", "foobar");
3
4
        List<String> iterable = Arrays.asList("foo", "bar", "foobar");
5
        Flux<String> seq2 = Flux.fromIterable(iterable);
6
7
        Mono<String> noData = Mono.empty();
8
9
        Mono<String> data = Mono.just("foo");
10
11
        Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3);
12
    }

其subscribe代码示例如下:

1
private void subscribe() {
2
  			//最后一个函数表示对每一个元素执行的操作
3
        Flux<Integer> ints1 = Flux.range(1, 3);
4
        ints1.subscribe(i -> System.out.println(i));
5
				//最后一个函数表示异常时执行的操作
6
        Flux<Integer> ints2 = Flux.range(1, 4)
7
                .map(i -> {
8
                    if (i <= 3) return i;
9
                    throw new RuntimeException("Got to 4");
10
                });
11
        ints2.subscribe(i -> System.out.println(i),
12
                error -> System.err.println("Error: " + error));
13
				//最后一个函数表示完成时执行的操作
14
        Flux<Integer> ints3 = Flux.range(1, 4);
15
        ints3.subscribe(i -> System.out.println(i),
16
                error -> System.err.println("Error " + error),
17
                () -> System.out.println("Done"));
18
  			//最后一个函数表示订阅时执行的操作
19
  			Flux<Integer> ints4 = Flux.range(1, 4);
20
        ints4.subscribe(i -> System.out.println(i),
21
                error -> System.err.println("Error " + error),
22
                () -> System.out.println("Done"),
23
                sub -> sub.request(2));
24
    }

以上是Reactor的一些基操,其他部分的内容包括算子的使用、背压的实现、异步的实现、异常处理、测试、Debug以及一些高级特性等,建议还是看官方文档,不是一篇文章能讲完的。

总的来说,Reactor的实现和RxJava在很多方面是相似的。

3.3 Spring WebFlux

Spring Reactive diagram

在Project Reactor提供的类库的基础上,Spring构建了自己的reactive技术栈。Spring WebFlux可以运行在Netty, Undertow服务器上或者Servlet 3.1+ 的web容器中。

再一次推荐官方文档:

https://docs.spring.io/spring/docs/current/spring-framework-reference/web-reactive.html#webflux

参考文献