Featured image of post 响应式编程 Spring Webflux 详解(三)

响应式编程 Spring Webflux 详解(三)

操作符详解

操作符

通常情况下,我们需要对发布者发出的原始数据进行多个阶段的处理,并最终得到我们需要的数据。这种感觉就像是一条流水线,从流水线的源头进入传送带的是原料,经过流水线上各个工位的处理,逐渐从原料变成半成品、零件、组件、成品,最终成为消费者需要的包装品。这其中,流水线源头的下料机就相当于发布者,消费者就相当于订阅者,流水线上的一道道工序就相当于一个个操作符。

下面介绍一些常用的操作符。

map

map 操作可以将元素进行转换 / 映射,得到一个新的元素。

operator-map

public final <V> Flux<V> map(Function<? super T, ? extends V> mapper)
public final <R> Mono<R> map(Function<? super T, ? extends R> mapper)

上面是 Fluxmap 操作示意图,上方的箭头是原始序列的时间轴,下方箭头是经过 map 处理后的数据序列时间轴。 map 接受一个 Function 的函数式接口作为参数,这个函数定义了转换操作的策略,可以把它理解为 Java8 流式编程的 map 方法。举例说明:

Flux.range(1, 6) // 1
        .map(i -> i * 2) // 2
        .subscribe(System.out::println); // 3

输出如下:

2
4
6
8
8
10
12
  1. Flux.range(1, 6) 用于生成从 “1” 开始,自增为 1 的 “6” 个整型数据
  2. map 接受 Lambda i -> i * 2 为参数,对每个数据进行乘 2 操作
  3. 订阅数据流并输出每个元素值

flatMap

flatMap 操作可以将每个数据元素转换 / 映射为一个流,然后将这些流合并为一个大的数据流。

operator-flatmap

流的合并是异步的,先到先得,并非是按照原始序列的顺序(途中绿色和黄色方块是交叉的)。

public final <R> Flux<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper)
public final <R> Mono<R> flatMap(Function<? super T, ? extends Mono<? extends R>> transformer)

flatMap 也是接收一个 Function 的函数接口为参数,这个函数输入为一个 T 类型的数据值,对于 Flux 来说输出可以是 FluxMono,对于 Mono 来说输出只能是 Mono。 举例说明:

Flux.just("flux", "mono")
        .flatMap(word -> Flux.fromArray(word.split("\\s*")) // 1
                .delayElements(Duration.ofMillis(100)) // 2
        )
        .doOnNext(System.out::print) // 3
        .subscribe(); // 4

TimeUnit.SECONDS.sleep(1);
  1. 对于每一个字符串 word, 将其拆分为包含一个字符的字符串流
  2. 每个元素延迟 100ms
  3. 对每个元素进行打印 (doOnNext 是 “偷窥式” 的方法,类似于流式编程中的 peek,不会消费数据流)
  4. 订阅数据流 打印结果为 mfolnuox,原因在于各个拆分后的字符都是间隔 100ms 发出的,因此会交叉。 如果项目的顺序很重要,可以考虑改用 flatMapSequential 运算符。

flatMap 通常用于每个元素又会引入数据流的情况,比如我们有一串用户 id 数据流,需要通过 id 信息查询用户数据,假设响应式的请求方法如下:

Mono<User> getUser(String userId) {...}

而用户 id 数据流为一个 Flux<String> ids,为了获取所有的用户信息,需要用到 flatMap

ids.flatMap(id -> getUser(id));

其返回内容为 Flux<User> 类型的 User 流。

map 与 flatMap 的区别

  • map 是同步的,非阻塞,一对一的转换。
  • flatMap 是异步的,非阻塞,一对多的转换。
/**
 * Transform the items emitted by this {@link Flux} by applying a synchronous function
 * to each item.
 */
public final <V> Flux<V> map(Function<? super T, ? extends V> mapper)

/**
 * Transform the elements emitted by this {@link Flux} asynchronously into Publishers,
 * then flatten these inner publishers into a single {@link Flux} through merging,
 * which allow them to interleave.
 */
public final <R> Flux<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper)

当流被订阅后,映射器对输入流中的元素执行必要的转换(执行上述 mapper 操作)。这些元素中的每一个都可以转换为多个数据,然后用于创建新的流。

那么如何判断什么时候用 map,什么时候用 flatMap 呢?

  1. 凡是涉及需要异步处理的方法,都用 flatMap,比如请求接口,读取数据库数据,读取缓存等等。
  2. 其他同步方法,比如转换对象,对已有数据做计算等等,都用 map

filter

filter 操作可以对数据元素进行筛选

operator-filter

public final Flux<T> filter(Predicate<? super T> p)
public final Mono<T> filter(final Predicate<? super T> tester)

filter 接受一个 Predicate 的函数式接口为参数,这个函数式的作用是进行判断并返回 Boolean。 举例说明:

Flux.range(1, 10) // 1
        .filter(i -> i % 3 == 0) // 2
        .doOnNext(System.out::println)
        .subscribe();
  1. 生成从 “1” 开始,自增为 1 的 “10” 个整型数据
  2. filter 的 Lambda 参数表示过滤操作保留 3 的倍数

输出:

3
6
9

switchIfEmpty

operator-switchifempty

switchIfEmpty 操作在一个序列元素为空时,替换为另一个元素

举例说明:

@Data
@AllArgsConstructor
@NoArgsConstructor
class User {
    private String name;
}

public Mono<User> getUserFromCache() {
    return Mono.just(new Random().nextBoolean())
            .filter(b -> b)
            .map(u -> new User("user from cache"));
}

从缓存中获取用户信息,可能返回为空。

public Mono<User> getUserFromConsole() {
    return Mono.just(new Random().nextBoolean())
            .filter(b -> b)
            .map(u -> new User("user from console"));
}

从控制台中获取用户信息,可能返回为空。

@GetMapping("/user/get")
public Mono<User> getUser() {
    return getUserFromCache() // 1
            .switchIfEmpty(Mono.defer(() -> getUserFromConsole())) //2
            .switchIfEmpty(Mono.defer(() -> Mono.error(new RuntimeException("用户不存在")))); //3
}
  1. 从缓存中获取用户信息
  2. 如果缓存中数据为空,则从控制台获取用户信息
  3. 如果控制台中数据仍为空,则抛出用户不存在异常终止序列

输出:

# 结果 1
{
    "name": "user from cache"
}

# 结果 2
{
    "name": "user from console"
}

# 结果 3
{
    "timestamp": "2022-10-05T04:34:39.274+00:00",
    "path": "/reactive/user/get",
    "status": 500,
    "error": "Internal Server Error",
    "requestId": "653d3049-14"
}

defer

defer 把元素加入流中,但是延时加载,直到 subsribe() 时才会加载

operator-defer

举例说明:

Mono<String> dateTime = Mono.just(LocalDateTime.now().toString());
System.out.println("t0: " + dateTime.block());

Thread.sleep(10_000);
System.out.println("t1: " + dateTime.block());

Thread.sleep(5_000);
System.out.println("t2: " + dateTime.block());

输出:

t0: 2022-10-05T13:23:31.482
t1: 2022-10-05T13:23:31.482
t2: 2022-10-05T13:23:31.482

调用 Mono.just(LocalDateTime.now().toString()) 时会立即执行 LocalDateTime.now().toString() 并且获取到结果,通过订阅 Mono 时只会将结果发出,所以订阅多次并不会改变原来的值。

Mono<String> dateTime = Mono.defer(() -> Mono.just(LocalDateTime.now().toString()));
System.out.println("t0: " + dateTime.block());

Thread.sleep(10_000);
System.out.println("t1: " + dateTime.block());

Thread.sleep(5_000);
System.out.println("t2: " + dateTime.block());

输出:

t0: 2022-10-05T13:24:40.500
t1: 2022-10-05T13:24:50.515
t2: 2022-10-05T13:24:55.519

defer 操作符会在每次订阅时重新评估 Lambda 表达式中的内容。

⭐Tips

有一个用法是,当需要定义异常 Mono.error() 时,并不需要每次都创建,而是在出现异常时才加载,这样可以节省内存。

Mono.defer(() -> Mono.error(new RuntimeException("异常")));

then

then 操作符会忽略前一个流的结果,并将序列转换为另一个 Mono<V> 或者 Mono<Void>

operator-then

public final <V> Mono<V> then(Mono<V> other)
public final Mono<Void> then()

Mono.just(1)
        .then() // 1
        .doOnNext(item -> System.out.println(item)) // 2
        .subscribe();
  1. 忽略掉 Mono.just 中的元素,返回 Mono<Void>
  2. 输出序列中的值(因为前一个序列已经替换为空,所以此处不会有任何输出)
Mono.just(1)
        .then(Mono.just("other item")) // 1
        .doOnNext(item -> System.out.println(item))
        .subscribe();
  1. 忽略掉 Mono.just 中的元素,返回新的 Mono 序列。
  2. 输出序列中的值

输出:

other item

使用场景:

public class SaveSessionGatewayFilterFactory extends AbstractGatewayFilterFactory<Object> {

    @Override
    public GatewayFilter apply(Object config) {
        return new GatewayFilter() {
            @Override
            public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
                return exchange.getSession().map(WebSession::save).then(chain.filter(exchange));
            }

            @Override
            public String toString() {
                return filterToStringCreator(SaveSessionGatewayFilterFactory.this).toString();
            }
        };
    }

}

then-example

参考自保存 Session 的网关过滤器 SaveSessionGatewayFilterFactoryfilter 方法中,map 调用了 save(),该方法返回值是 Mono<Void>,该序列会因为没有元素发出而终止。这里使用 then,不管上游序列的输出是 Mono<Void> 还是其他元素,将其忽略,并向下继续调用其他过滤器的方法。

如果你关心前一个流的结果,可以使用 map, flatMap 或者其他 map 的变种方法,如果你只想要前一个流完成,不关心流的数据,则使用 then

doOnNext

operator-doonnext

doOnNext 操作符,当一个可用且存在的数据成功发出时,触发添加的无副作用行为。

public final Mono<T> doOnNext(Consumer<? super T> onNext)
public final Flux<T> doOnNext(Consumer<? super T> onNext)

doOnNext 接收一个 Consumer 函数为参数,仅有入参,没有出参,并不会对原序列的数据类型造成影响。

举例说明:

Mono.just(new User("Alex"))
        .doOnNext(u -> System.out.println(u.getName()))
        .doOnNext(u -> u.setName("Bob"))
        .doOnNext(u -> System.out.println(u.getName()))
        .subscribe();

输出:

Alex
Bob

doOnSuccess

doOnSuccess 操作符,在 Mono 序列成功完成时触发,不管结果是 T 还是 null

operator-doonsuccess

举例说明:

Mono.empty()
        .doOnSuccess(i -> System.out.println("on success: " + i))
        .subscribe();

Mono.just("hello")
        .doOnSuccess(i -> System.out.println("on success: " + i))
        .subscribe();

输出:

on success: null
on success: hello

doOnSuccess 仅可作用在 Mono 序列中。

doOnNext 与 doOnSuccess 的区别

  • doOnNext 是可有用数据是才触发。
  • doOnSuccess 只要上游执行的序列没有报错,都可以判断为成功,不管结果是什么。

doFinally

doFinally 操作符,会添加一个行为,当 Mono 序列因为任何原因终止时触发。

operator-dofinally

public final Mono<T> doFinally(Consumer<SignalType> onFinally)
public final Flux<T> doFinally(Consumer<SignalType> onFinally)

doFinally 接收一个 Consumer 函数,入参为 SignalType 信号类型,包括取消、完成、错误信号等等。

举例说明:

  • 完成信号
Mono.just("hello")
        .doOnNext(i -> System.out.println("next: " + i))
        .doFinally(signalType -> System.out.println("finally, signalType: " + signalType.name()))
        .subscribe();

输出:

next: hello
finally, signalType: ON_COMPLETE
  • 异常信号
Mono.just("hello")
        .then(Mono.error(new RuntimeException("exception")))
        .doOnNext(i -> System.out.println("next: " + i))
        .doFinally(signalType -> System.out.println("finally, signalType: " + signalType.name()))
        .subscribe();

输出:

23:15:12.203 [main] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: exception
Caused by: java.lang.RuntimeException: exception
    at com.opoa.controller.ReactiveController.main(ReactiveController.java:69)
finally, signalType: ON_ERROR

可以看到,在 Mono 序列中抛出了一个异常,终止了 doOnNext 的正常执行,但 doFinally 仍能触发,并且信号类型为错误,所以 doFinally 始终能够执行。

如果有连续的 doFinally 在执行,其顺序是倒序

Flux.just("hello", "world")
        .doFinally(f -> System.out.println("finally - 1"))
        .doOnNext(w -> System.out.println("on next 1: " + w))
        .doFinally(f -> System.out.println("finally - 2"))
        .doOnNext(w -> System.out.println("on next 2: " + w))
        .doFinally(f -> System.out.println("finally - 3"))
        .subscribe();

输出

on next 1: hello
on next 2: hello
on next 1: world
on next 2: world
finally - 3
finally - 2
finally - 1

doOnError

doOnError 操作符,会添加一个行为,当序列因为异常终止时触发。

operator-doonerror

public final Mono<T> doOnError(Consumer<? super Throwable> onError)
public final <E extends Throwable> Mono<T> doOnError(Class<E> exceptionType,
            final Consumer<? super E> onError)
public final Mono<T> doOnError(Predicate<? super Throwable> predicate,
            final Consumer<? super Throwable> onError)

doOnError 有三个重载方法:

  1. 一个异常类消费者,出现异常时调用。
  2. 传参为一个异常类的 Class 和一个对应消费者,当出现该类型的异常时才调用。
  3. 传参为一个断言函数和一个对应消费者,当断言结果为 true 时才调用。

举例说明:

Mono.just(9)
        .map(i -> i / 0)
        .doOnError(e -> log.error("an error occurred"))
        .subscribe();

输出:

22:08:01.282 [main] ERROR com.opoa.controller.ReactiveController - an error occurred
22:08:01.284 [main] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.ArithmeticException: / by zero
Caused by: java.lang.ArithmeticException: / by zero
    ...()

onErrorResume

onErrorResume,当序列发生错误时,订阅一个备用的发布者 Mono/Flux

operator-onerrorresume

public final Mono<T> onErrorResume(Function<? super Throwable, ? extends Mono<? extends
            T>> fallback)
public final <E extends Throwable> Mono<T> onErrorResume(Class<E> type,
            Function<? super E, ? extends Mono<? extends T>> fallback)
public final Mono<T> onErrorResume(Predicate<? super Throwable> predicate,
            Function<? super Throwable, ? extends Mono<? extends T>> fallback)

onErrorResume 有三个重载方法:

  1. 单独一个备用函数作为参数
  2. 传参为一个异常类的 Class 和一个备用函数,当出现该类型的异常时才调用。
  3. 传参为一个断言函数和一个备用函数,当断言结果为 true 时才调用。

举例说明:

Mono.just(9)
        .map(i -> i / 0)
        .onErrorResume(e -> {
            log.error("an error occurred: {}", e.getMessage());
            return Mono.just(5);
        })
        .subscribe(System.out::println);

输出:

22:08:16.636 [main] ERROR com.opoa.controller.ReactiveController - an error occurred: / by zero
5

repeat

repeat 会在当前序列发送完成信号时,重新订阅。

operator-repeat

举例说明:

Mono.just(5)
    .doOnNext(s -> System.out.println("next 1"))
    .doOnNext(s -> System.out.println("next 2"))
    .doOnNext(s -> System.out.println("next 3"))
    .repeat(2)
    .subscribe();

输出

next 1
next 2
next 3
next 1
next 2
next 3
next 1
next 2
next 3

repeatWhen

repeatWhen 当满足特定条件时,会重复订阅这个序列。

operator-repeatwhen

举例说明:

Mono.just(5)
    .doOnNext(s -> System.out.println("next 1"))
    .doOnNext(s -> System.out.println("next 2"))
    .doOnNext(s -> System.out.println("next 3"))
    .repeatWhen(Repeat.onlyIf(s -> new Random().nextBoolean()).repeatMax(3))
    .subscribe();

这个序列会随机进行重复订阅,当 new Random().nextBoolean()false 或者达到最大重复次数 3 时,停止订阅。

输出:

next 1
next 2
next 3
22:18:09.620 [main] DEBUG reactor.retry.DefaultRepeat - Scheduling repeat attempt, retry context: iteration=1 repeatCompanionValue=1 backoff={0ms}
next 1
next 2
next 3
22:18:09.621 [main] DEBUG reactor.retry.DefaultRepeat - Stopping repeats since predicate returned false, retry context: iteration=2 repeatCompanionValue=1 backoff={0ms}

我们可以通过 repeatWhen 能使用更多高级选项

.repeatWhen(Repeat.times(3).randomBackoff(Duration.ofSeconds(1), Duration.ofSeconds(3)))

重试 3 次,每次重试间隔在 1 秒到 3 秒之间。

输出

next 1
next 2
next 3
22:22:33.025 [main] DEBUG reactor.retry.DefaultRepeat - Scheduling repeat attempt, retry context: iteration=1 repeatCompanionValue=1 backoff={1184ms/3000ms}
next 1
next 2
next 3
22:22:34.227 [parallel-1] DEBUG reactor.retry.DefaultRepeat - Scheduling repeat attempt, retry context: iteration=2 repeatCompanionValue=1 backoff={1939ms/3000ms}
next 1
next 2
next 3
22:22:36.170 [parallel-2] DEBUG reactor.retry.DefaultRepeat - Scheduling repeat attempt, retry context: iteration=3 repeatCompanionValue=1 backoff={1647ms/3000ms}
next 1
next 2
next 3
22:22:37.832 [parallel-3] DEBUG reactor.retry.DefaultRepeat - Repeats exhausted, retry context: iteration=4 repeatCompanionValue=1 backoff={EXHAUSTED}

如果你在返回 Mono 或者 Flux 的方法中使用了 repeatrepeatWhen,该方法会在满足条件时被重复订阅。方法本身只会执行一次,repeatrepeatWhen 只是让这个方法被再次订阅。

示例:

public Mono<Boolean> tryLock() {
    log.info("尝试获取锁,时间点:" + System.currentTimeMillis());
    return Mono.just(new Random().nextBoolean());
}

获取锁的方法,该方法在执行时打印当前时间点,并且随机返回 Boolean。

@GetMapping("/getLock")
public Mono<Boolean> lock() {
    AtomicBoolean locked = new AtomicBoolean(false); // 1
    return tryLock() // 2
            .doOnNext(locked::set) // 3
            .doOnNext(l -> System.out.println("是否获取到锁:" + locked.get())) // 4
            .repeatWhen(Repeat.onlyIf(r -> !locked.get()).randomBackoff(Duration.ofSeconds(2), Duration.ofSeconds(3))) // 5
            .then(Mono.defer(() -> Mono.just(locked.get()))) // 6
            .doFinally(f -> System.out.println("释放锁")); // 7
}

模拟一段获取锁的代码,需要获取锁成功时才返回结果。

  1. 创建原子布尔类型变量。
  2. 尝试获取锁,并输出时间节点。
  3. 将获取到的值赋给 locked 对象。
  4. 输出 locked 当前的变量值。
  5. 当未获取到锁时,一直进行重试,并且每次重试间隔 2 到 3 秒。
  6. 返回 locked 的变量。
  7. 最后,释放锁。

输出: 当第一次获取锁结果为 true

operator-repeatwhen-demo1

当第一次获取锁结果为 false

operator-repeatwhen-demo2

当第一次获取锁失败时,repeatWhen 触发,但 tryLock 只会执行一次,所以这段代码会一直拿不到锁,导致卡死。

为了确保 tryLock 每次重复时都执行,需要把它放到 defer 操作符中。

@GetMapping("/getLock")
public Mono<Boolean> lock() {
    AtomicBoolean locked = new AtomicBoolean(false);
    return Mono.defer(() -> tryLock())
            .doOnNext(locked::set)
            .doOnNext(l -> System.out.println("是否获取到锁:" + locked.get()))
            .repeatWhen(Repeat.onlyIf(r -> !locked.get()).randomBackoff(Duration.ofSeconds(2), Duration.ofSeconds(3)))
            .then(Mono.defer(() -> Mono.just(locked.get())))
            .doFinally(f -> System.out.println("释放锁"));
}

输出:

operator-repeatwhen-demo3

当获取锁失败时,能正确执行 tryLock,每次都去重新获取。

retry

retry 会在当前序列发送任何异常信号时,重新订阅。如果不指定重试次数时,默认重试次数为 Long 的最大值。

operator-retry

举例:

Flux.range(1, 5)
        .doOnNext(i -> System.out.println("emitted: " + i))
        .map(i -> {
            if (i > 3) {
                throw new RuntimeException("can not process > 3");
            }
            return i;
        })
        .subscribe(i -> System.out.println("received: " + i),
                error -> System.out.println("error: " + error));

该序列会发出数字 1 到 5,但我们的流不能处理大于 3 的数字,否则会抛异常终止。

输出:

emitted: 1
received: 1
emitted: 2
received: 2
emitted: 3
received: 3
emitted: 4
error: java.lang.RuntimeException: can not process > 3

我们添加 retry(1),当收到异常信号时,重订阅一次当前序列。

Flux.range(1, 5)
        .doOnNext(i -> System.out.println("emitted: " + i))
        .map(i -> {
            if (i > 3) {
                throw new RuntimeException("can not process > 3");
            }
            return i;
        })
        .retry(1)
        .subscribe(i -> System.out.println("received: " + i),
                error -> System.out.println("error: " + error));

输出:

emitted: 1
received: 1
emitted: 2
received: 2
emitted: 3
received: 3
emitted: 4 // will retry
emitted: 1
received: 1
emitted: 2
received: 2
emitted: 3
received: 3
emitted: 4
error: java.lang.RuntimeException: can not process > 3

上面简单介绍了一些常用的操作符,但那也只是冰山一角,如果想要了解更多的操作符,可以通过以下途径:

  1. 想要实战的话,强烈推荐 Reactor 官方的 lite-rx-api-hands-on 项目,拿到项目后,你需要使用操作符,完成 “Todo” 的代码,让所有的 @Test 绿灯就 Ok 了。完成这些测试后,对常见的操作符就能了然于胸了。
  2. 在日常开发中,也可以通过 IDEA 进行查询,比如:

idea-hint

使用 Hugo 构建 主题 StackJimmy 设计
发表了 33 篇文章・ 总计 66.74 k 字
本站总访问量 · 总访客数
本博客已稳定运行 🧡