Hello Reactive World
引入依赖
在新建项目的时候引入 Spring Reactive Web,为了方便也引入了 Lombok。
编写 Controller
@RestController
@RequestMapping("/reactive")
public class ReactiveController {
@GetMapping("/hello")
public Mono<String> hello() {
return Mono.just("Hello Reactive World");
}
}
[ main] o.s.b.web.embedded.netty.NettyWebServer : Netty started on port 8080
启动服务,可以看到程序是运行在 Netty 服务上的。
请求
WebFlux 提供了与之前 WebMVC 相同的一套注解来定义请求的处理,使得 Spring 使用者迁移到响应式开发方式的过程变得异常轻松。
Flux 与 Mono
Reactor 中的发布者 (Publisher) 由 Flux
和 Mono
两个类定义,它们都提供了丰富的操作符 (operator)。
- Flux: 代表一个包含 0-N 个元素的响应式序列。
- Mono: 代表一个包含 0/1 个元素的响应式序列。
可以暂时简单地把
Mono
理解成单个对象,Flux
理解成 List 列表对象。
作为 “数据流” 的发布者,Flux
和 Mono
都可以发出三种 “数据信号”:元素值、错误信号、完成信号,错误信号和完成信号都是终止信号,完成信号用于告知下游订阅者该数据流正常结束,错误信号终止数据流的同时将错误传递给下游订阅者。
下图是一个 Flux
类型的数据流,黑色箭头是时间轴。它连续发出 1 - 6 共 6 个元素值,以及一个完成新信号(图中 6 后边的加粗竖线), 完成信号告知订阅者数据流已经结束。
用代码声明:
Flux.just(1, 2, 3, 4, 5, 6);
下图是一个 Mono
类型的数据流,它发出一个元素值后,又发出一个完成信号。
用代码声明:
Mono.just(1);
Flux
和 Mono
提供了多种创建数据流的方法,just
是一种比较直接的声明数据流的方式,其参数就是数据元素。
还可以通过如下方式声明
// 基于数组
Integer[] array = {1, 2, 3, 4, 5, 6};
Flux.fromArray(array);
// 基于集合
List<Integer> list = Arrays.asList(array);
Flux.fromIterable(list);
// 基于 Stream
Stream<Integer> stream = list.stream();
Flux.fromStream(stream);
不过,这三种信号都不是一定要具备的:
- 错误信号和完成信号都是终止信号,二者不可能同时存在。
- 如果没有发出任何一个元素值,而是直接发出完成 / 错误信号,表示这是一个空数据流。
- 如果没有错误信号和完成信号,那么就是一个无限数据流。
比如,只有完成 / 错误信号的数据流:
// 只有完成信号的空数据流
Flux.just();
Flux.empty();
Mono.empty();
// 只有错误信号的数据流
Flux.error(new RuntimeException("some error"));
Mono.error(new RuntimeException("some error"));
空数据流有什么用呢?举个例子,当我们从响应式 DB 中获取结果的时候,就有可能为空:
Flux<User> findAll();
Mono<User> findById(long id);
无论是结果为空还是发生异常,都需要通过完成 / 错误信号告知订阅者,已经查询完毕,但是没有获取到值。
订阅前什么都不会发生
数据流有了,假设我们想把每个数据元素打印出来
Flux.just(1, 2, 3, 4, 4, 5, 6).subscribe(System.out::println);
输出如下
1
2
3
4
4
5
6
可见,subscribe
方法中的 Lambda 表达式作用在每一个元素上。Flux
和 Mono
提供了多个 subscribe 的重载方法
// 订阅并触发数据流
subscribe();
// 订阅并指定对正常数据元素如何处理
subscribe(Consumer<? super T> consumer);
// 订阅并定义对正常数据元素和错误信号的处理
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer);
// 订阅并定义对正常数据元素、错误信号和完成信号的处理
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer);
- 如果是订阅上边声明的 Flux
Flux.just(1, 2, 3, 4, 4, 5, 6).subscribe(
System.out::println,
System.err::println,
() -> System.out.println("已完成")
);
输出如下
1
2
3
4
4
5
6
已完成
- 举一个有错误信号的例子
Mono.error(new RuntimeException("业务异常")).subscribe(
System.out::println,
System.err::println,
() -> System.out.println("已完成")
);
输出如下
java.lang.RuntimeException: 业务异常
打印出了错误信号,没有输出已完成,表示没有发出完成信号。
注意,Flux.just(1, 2, 3, 4, 4, 5, 6)
仅仅声明了数据流,此时数据元素并未发出,只有 subcriber()
方法调用时才会触发数据流。所以,订阅前什么都不会发生。