Featured image of post 响应式编程 Spring Webflux 详解(二)

响应式编程 Spring Webflux 详解(二)

编写第一个 Reactive 项目,认识 Flux 和 Mono

Hello Reactive World

引入依赖

init-spring-webflux-project

在新建项目的时候引入 Spring Reactive Web,为了方便也引入了 Lombok。

编写 Controller

@RestController
@RequestMapping("/reactive")
public class ReactiveController {

    @GetMapping("/hello")
    public Mono<String> hello() {
        return Mono.just("Hello Reactive World");
    }

}

[           main] o.s.b.web.embedded.netty.NettyWebServer  : Netty started on port 8080

启动服务,可以看到程序是运行在 Netty 服务上的。

请求

spring-webflux-hello-world-demo

WebFlux 提供了与之前 WebMVC 相同的一套注解来定义请求的处理,使得 Spring 使用者迁移到响应式开发方式的过程变得异常轻松。

Flux 与 Mono

Reactor 中的发布者 (Publisher) 由 FluxMono 两个类定义,它们都提供了丰富的操作符 (operator)。

  • Flux: 代表一个包含 0-N 个元素的响应式序列。
  • Mono: 代表一个包含 0/1 个元素的响应式序列。 可以暂时简单地把 Mono 理解成单个对象,Flux 理解成 List 列表对象。

作为 “数据流” 的发布者,FluxMono 都可以发出三种 “数据信号”:元素值、错误信号、完成信号,错误信号和完成信号都是终止信号,完成信号用于告知下游订阅者该数据流正常结束,错误信号终止数据流的同时将错误传递给下游订阅者。

下图是一个 Flux 类型的数据流,黑色箭头是时间轴。它连续发出 1 - 6 共 6 个元素值,以及一个完成新信号(图中 6 后边的加粗竖线), 完成信号告知订阅者数据流已经结束。

flux

用代码声明:

Flux.just(1, 2, 3, 4, 5, 6);

下图是一个 Mono 类型的数据流,它发出一个元素值后,又发出一个完成信号。

mono

用代码声明:

Mono.just(1);

FluxMono 提供了多种创建数据流的方法,just 是一种比较直接的声明数据流的方式,其参数就是数据元素。 还可以通过如下方式声明

// 基于数组
Integer[] array = {1, 2, 3, 4, 5, 6};
Flux.fromArray(array);

// 基于集合
List<Integer> list = Arrays.asList(array);
Flux.fromIterable(list);

// 基于 Stream
Stream<Integer> stream = list.stream();
Flux.fromStream(stream);

不过,这三种信号都不是一定要具备的:

  • 错误信号和完成信号都是终止信号,二者不可能同时存在。
  • 如果没有发出任何一个元素值,而是直接发出完成 / 错误信号,表示这是一个空数据流。
  • 如果没有错误信号和完成信号,那么就是一个无限数据流。

比如,只有完成 / 错误信号的数据流:

// 只有完成信号的空数据流
Flux.just();
Flux.empty();
Mono.empty();

// 只有错误信号的数据流
Flux.error(new RuntimeException("some error"));
Mono.error(new RuntimeException("some error"));

空数据流有什么用呢?举个例子,当我们从响应式 DB 中获取结果的时候,就有可能为空:

Flux<User> findAll();
Mono<User> findById(long id);

无论是结果为空还是发生异常,都需要通过完成 / 错误信号告知订阅者,已经查询完毕,但是没有获取到值。

订阅前什么都不会发生

数据流有了,假设我们想把每个数据元素打印出来

Flux.just(1, 2, 3, 4, 4, 5, 6).subscribe(System.out::println);

输出如下

1
2
3
4
4
5
6

可见,subscribe 方法中的 Lambda 表达式作用在每一个元素上。FluxMono 提供了多个 subscribe 的重载方法

// 订阅并触发数据流
subscribe();

// 订阅并指定对正常数据元素如何处理
subscribe(Consumer<? super T> consumer);

// 订阅并定义对正常数据元素和错误信号的处理
subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer);

// 订阅并定义对正常数据元素、错误信号和完成信号的处理
subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer);
  1. 如果是订阅上边声明的 Flux
Flux.just(1, 2, 3, 4, 4, 5, 6).subscribe(
        System.out::println,
        System.err::println,
        () -> System.out.println("已完成")
);

输出如下

1
2
3
4
4
5
6
已完成
  1. 举一个有错误信号的例子
Mono.error(new RuntimeException("业务异常")).subscribe(
        System.out::println,
        System.err::println,
        () -> System.out.println("已完成")
);

输出如下

java.lang.RuntimeException: 业务异常

打印出了错误信号,没有输出已完成,表示没有发出完成信号。

注意,Flux.just(1, 2, 3, 4, 4, 5, 6) 仅仅声明了数据流,此时数据元素并未发出,只有 subcriber() 方法调用时才会触发数据流。所以,订阅前什么都不会发生

使用 Hugo 构建 主题 StackJimmy 设计
发表了 32 篇文章・ 总计 66.22 k 字
本站总访问量 · 总访客数
本博客已稳定运行 🧡