购买
下载掌阅APP,畅读海量书库
立即打开
畅读海量书库
扫码下载掌阅APP

6.3 Reactor

Reactor是第四代Reactive库,基于Reactive Streams规范在JVM上构建非阻塞应用程序。Reactor侧重于服务器端响应式编程,是一个基于Java 8实现的响应式流规范(Reactive Streams specification)响应式库。

作为Reactive Engine/SPI,Reactor Core和IO模块都为重点使用场景提供了响应流构造,最终与Spring、RxJava、Akka Streams和Ratpack等框架结合使用,作为Reactive API,Reactor框架模块提供了丰富的消费功能,如组合和发布订阅事件。

本节对Reactor的介绍以基本的概念和简单使用为主,更多Reactor高级特性可参考Reactor官网:http://projectreactor.io/。

6.3.1 Flux与Mono

在Reactor中,数据流发布者(Publisher)由Flux和Mono两个类表示,它们都提供了丰富的操作符(operator)。一个Flux对象代表一个包含0个或多个(0..N)元素的响应式序列,而一个Mono对象代表一个包含0或一个(0..1)元素的结果。

作为数据流发布者,Flux和Mono都可以发出三种数据信号,元素值、错误信号和完成信号。错误信号和完成信号都是终止信号。完成信号用来告知下游订阅者,数据流是正常结束的。错误信号在终止数据流的同时将错误信息传递给下游订阅者。这三种信号不是一定要完全具备的。

图6-1所示是一个Flux类型的数据流,横坐标是时间轴,⑥后的黑色竖线是完成信号。连续发出1~6共6个元素值,以及一个完成信号,完成信号告知订阅者数据流已经结束。

图6-1 Flux类型的数据流图

图6-2是一个Mono类型的数据流,其发出一个元素值后,立刻发出一个完成信号。

图6-2 Mono类型的数据流图

下面通过案例分析Reactor的使用。

首先创建一个maven项目,然后在pom.xml中加入对maven的依赖。可以到maven仓库https://mvnrepository.com/查询最新版本的Reactor。截止本书出版,Reactor最新的版本是3.2.0.RELEASE。

为了方便测试,还需要添加对reactor-test的依赖和Junit的依赖。

下面就可以开始用Reactor进行编码了。

首先使用代码声明图6-1和图6-2中的Flux和Mono,代码如下:

     Flux.just(1, 2, 3, 4, 5, 6);
     Mono.just(1);

Flux和Mono提供了多种创建数据流的方法,just是一种比较直接的声明数据流的方式,其参数就是数据元素。

对于图6-1中的场景,还可以使用如下多种声明方式。

基于数组的声明方式:

     Integer[] array = new Integer[]{1,2,3,4,5,6};
     Flux.fromArray(array);

基于集合的声明方式:

     List<Integer> list = Arrays.asList(array);
     Flux.fromIterable(list);

基于Stream的声明方式:

     Stream<Integer> stream = list.stream();
     Flux.fromStream(stream);

上文中提到元素值、错误信号和完成信号三者并不是要完全具备的,下面就给出几种情况:

     // 只有完成信号的空数据流
     Flux.just();
     Flux.empty();
     Mono.empty();
     Mono.justOrEmpty(Optional.empty());
     // 只有错误信号的数据流
     Flux.error(new Exception("some error"));
     Mono.error(new Exception("some error"));

6.3.2 subscribe()

subscribe()方法表示对数据流的订阅动作,subscribe()方法有多个重载的方法,下面介绍几种常见的subscribe()方法:

下面通过一个案例验证Flux、Mono和几种常见的subscribe()方法的使用。案例代码如下:

     /**
     * @Author zhouguanya
     * @Date 2018/10/22
     * @Description 第一个Reactor程序
     */
     public class FirstReactorDemo {
     public static void main(String[] args) {
     // 测试Flux
     Flux.just(1, 2, 3, 4, 5, 6).subscribe(System.out::print);
     System.out.println("\n----------------------------");
     // 测试Mono
     Mono.just(1).subscribe(System.out::println);
     System.out.println("----------------------------");
     // 测试两个参数的subscribe方法
     Flux.just(1, 2, 3, 4, 5, 6)
     .subscribe(System.out::print, System.err::println);
     System.out.println("\n----------------------------");
     // 测试三个参数的subscribe方法
     Flux.just(1, 2, 3, 4, 5, 6)
     .subscribe(System.out::print, System.err::println,
     () -> System.out.println("\ncomplete"));
     System.out.println("----------------------------");
     // 测试四个参数的subscribe方法
     Flux.just(1, 2, 3, 4, 5, 6)
     .subscribe(System.out::print, System.err::println,
     () -> System.out.println("\ncomplete"), subscription -> {
     System.out.println("订阅发生了");
     subscription.request(10);
     });
     }
     }

运行案例代码,得到如下运行结果:

     123456
     ----------------------------
     1
     ----------------------------
     123456
     ----------------------------
     123456
     complete
     ----------------------------
     订阅发生了
     123456
     complete

在命令式或同步式编程世界中,调试通常都是非常直观的——直接看stack trace就可以找到问题出现的位置以及异常信息等。

当切换到响应式的异步代码,事情就变得复杂多了。先了解一个基本的单元测试工具——StepVerifier。当测试关注点是每个数据元素的时候,就与StepVerifier的使用场景非常贴切。例如期望的数据或信号是什么,是否使用Flux发出某个特殊值,接下来100ms做什么,这些场景都可以使用StepVerifier API表示。

下面分别使用StepVerifier测试Flux和Mono,测试代码如下:

     /**
     * @Author zhouguanya
     * @Date 2018/10/25
     * @Description StepVerifier测试案例
     */
     public class StepVerifierDemo {
     public static void main(String[] args) {
     Flux flux = Flux.just(1, 2, 3, 4, 5, 6);
     // 使用StepVerifier测试Flux,应该正常
     StepVerifier.create(flux)
     //测试下一个期望的数据元素
     .expectNext(1, 2, 3, 4, 5, 6)
     //测试下一个元素是否为完成信号
     .expectComplete()
     .verify();
     Mono mono = Mono.error(new Exception("some error"));
     // 使用StepVerifier测试Mono,应该会出现异常
     StepVerifier.create(mono)
     //测试下一个元素是否为完成信号
     .expectComplete()
     .verify();
     }
     }

运行测试代码,测试结果如下:

6.3.3 操作符(Operator)

本节介绍Reactor一些常用的操作符。

1. map

map可以将数据元素转换成映射表,得到一个新的元素。map操作符示意图如图6-3所示。

图中上方的箭头是原始序列的时间轴,下方的箭头是经过map处理后的数据序列时间轴。map接受一个Function函数式接口,该接口用于定义转换操作的策略:

     public final <V> Flux<V> map(Function<? super T,? extends V> mapper)
     public final <R> Mono<R> map(Function<? super T, ? extends R> mapper)

图6-3 map操作示意图

下面使用案例阐述map操作符的用法。案例代码如下:

执行案例代码发现控制台无异常输出。如果修改立方后的数据为expectNext(10, 8, 27, 64, 125,216)将会出现如下异常:

2. flatMap

flatMap操作可以将每个数据元素转换/映射为各个流,然后将每个流合并为一个大的数据流。flatMap操作符示意图如图6-4所示。

图6-4 flatMap操作示意图

flatMap接收一个Function函数式接口为参数,这个函数式的输入为一个T类型数据值,输出可以是Flux或Mono:

下面使用案例阐述flatMap操作符的用法。案例代码如下:

执行案例代码,得到类似如下结果:

     fmlounxo

多次执行案例代码,会得到不同的输出结果。由此可以看出,流的合并是异步的,先来先到,并非是严格按照原始序列的顺序。

3. filter

filter操作可以对数据元素过滤,得到剩余的元素。filter操作符示意图如图6-5所示。

图6-5 filter操作示意图

filter接受一个Predicate的函数式接口为参数,这个函数式接口的作用是进行判断并返回boolean值:

     public final Flux<T> filter(Predicate<? super T> tester)
     public final Mono<T> filter(Predicate<? super T> tester)

下面使用案例阐述filter操作符的用法。案例代码如下:

执行案例代码发现控制台无异常输出。如果修改立方后的数据为expectNext(1, 127, 125)将会出现如下异常:

4. zip

zip能够将多个流一对一的合并起来。zip有多个方法变体,这里只介绍一个最常见的二合一的场景。zip操作符示意图如图6-6所示。

图6-6 zip操作示意图

zip可以从两个Flux/Mono流中,每次各取一个元素,组成一个二元组:

下面使用案例阐述zip操作符的用法。案例代码如下:

执行案例代码,得到如下结果:

     [I,0][am,1][Reactor,2]
5. 更多

(1)除了以上几个常见的操作符意外,Reactor中提供了非常丰富的操作符。

(2)用于编程方式自定义生成数据流的create和generate等及其变体方法。

(3)用于“无副作用的peek”场景的doOnNext、doOnError、doOncomplete、doOnSubscribe、doOnCancel等及其变体方法。

(4)用于数据流转换的when、and/or、merge、concat、collect、count、repeat等及其变体方法。

(5)用于过滤/拣选的take、first、last、sample、skip、limitRequest等及其变体方法。

(6)用于错误处理的timeout、onErrorReturn、onErrorResume、doFinally、retryWhen等及其变体方法。

(7)用于分批的window、buffer、group等及其变体方法。

(8)用于线程调度的publishOn和subscribeOn方法。

更多操作请见官方文档:https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html

6.3.4 线程模型

JDK提供的多线程工具类Executors提供了多种线程池,使开发人员可以方便地定义线程池进行多线程开发。Reactor使多线程编程更加容易,Schedulers类提供的静态方法可以更快创建以下几种多线程环境。

· 获取当前线程环境Schedulers.immediate()。

· 获取可重用的单线程环境Schedulers.single()。

· 获取弹性线程池环境Schedulers.elastic()。

· 获取固定大小线程池环境Schedulers.parallel()。

· 获取自定义线程池环境Schedulers.fromExecutorService(ExecutorService) 。

下面通过案例对比单线程同步阻塞和使用Schedulers异步非阻塞的场景。案例中分别有两个方法,hello()方法同步阻塞2s后,返回字符串“Hello, Reactor!”,helloAsync()方法使用Schedulers改进为异步非阻塞方式。案例代码如下:

执行案例代码,得到如下执行结果:

     Hello, Reactor!
     --------同步阻塞场景执行结束--------
     -------异步非阻塞场景执行结束-------
     Hello, Reactor!

观察执行结果,可以发现hello()方法是同步阻塞输出“Hello, Reactor!”,helloAsync()方法是异步非阻塞输出“Hello, Reactor!”的。 TShWMJlgkuriwZsL70HG5u6JlJcz2NDlc4KNUuSBK2v8/Zf2OCfLx+LyCPYtUB6J

点击中间区域
呼出菜单
上一章
目录
下一章
×