操作符
通常情况下,我们需要对发布者发出的原始数据进行多个阶段的处理,并最终得到我们需要的数据。这种感觉就像是一条流水线,从流水线的源头进入传送带的是原料,经过流水线上各个工位的处理,逐渐从原料变成半成品、零件、组件、成品,最终成为消费者需要的包装品。这其中,流水线源头的下料机就相当于发布者,消费者就相当于订阅者,流水线上的一道道工序就相当于一个个操作符。
下面介绍一些常用的操作符。
map
map
操作可以将元素进行转换 / 映射,得到一个新的元素。
public final <V> Flux<V> map(Function<? super T, ? extends V> mapper)
public final <R> Mono<R> map(Function<? super T, ? extends R> mapper)
上面是 Flux
的 map
操作示意图,上方的箭头是原始序列的时间轴,下方箭头是经过 map
处理后的数据序列时间轴。
map
接受一个 Function
的函数式接口作为参数,这个函数定义了转换操作的策略,可以把它理解为 Java8 流式编程的 map
方法。举例说明:
Flux.range(1, 6) // 1
.map(i -> i * 2) // 2
.subscribe(System.out::println); // 3
输出如下:
2
4
6
8
8
10
12
Flux.range(1, 6)
用于生成从 “1” 开始,自增为 1 的 “6” 个整型数据map
接受 Lambdai -> i * 2
为参数,对每个数据进行乘 2 操作- 订阅数据流并输出每个元素值
flatMap
flatMap
操作可以将每个数据元素转换 / 映射为一个流,然后将这些流合并为一个大的数据流。
流的合并是异步的,先到先得,并非是按照原始序列的顺序(途中绿色和黄色方块是交叉的)。
public final <R> Flux<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper)
public final <R> Mono<R> flatMap(Function<? super T, ? extends Mono<? extends R>> transformer)
flatMap
也是接收一个 Function
的函数接口为参数,这个函数输入为一个 T 类型的数据值,对于 Flux
来说输出可以是 Flux
和 Mono
,对于 Mono
来说输出只能是 Mono
。
举例说明:
Flux.just("flux", "mono")
.flatMap(word -> Flux.fromArray(word.split("\\s*")) // 1
.delayElements(Duration.ofMillis(100)) // 2
)
.doOnNext(System.out::print) // 3
.subscribe(); // 4
TimeUnit.SECONDS.sleep(1);
- 对于每一个字符串 word, 将其拆分为包含一个字符的字符串流
- 每个元素延迟 100ms
- 对每个元素进行打印 (
doOnNext
是 “偷窥式” 的方法,类似于流式编程中的peek
,不会消费数据流) - 订阅数据流
打印结果为
mfolnuox
,原因在于各个拆分后的字符都是间隔 100ms 发出的,因此会交叉。 如果项目的顺序很重要,可以考虑改用flatMapSequential
运算符。
flatMap
通常用于每个元素又会引入数据流的情况,比如我们有一串用户 id 数据流,需要通过 id 信息查询用户数据,假设响应式的请求方法如下:
Mono<User> getUser(String userId) {...}
而用户 id 数据流为一个 Flux<String> ids
,为了获取所有的用户信息,需要用到 flatMap
ids.flatMap(id -> getUser(id));
其返回内容为 Flux<User>
类型的 User 流。
map 与 flatMap 的区别
map
是同步的,非阻塞,一对一的转换。flatMap
是异步的,非阻塞,一对多的转换。
/**
* Transform the items emitted by this {@link Flux} by applying a synchronous function
* to each item.
*/
public final <V> Flux<V> map(Function<? super T, ? extends V> mapper)
/**
* Transform the elements emitted by this {@link Flux} asynchronously into Publishers,
* then flatten these inner publishers into a single {@link Flux} through merging,
* which allow them to interleave.
*/
public final <R> Flux<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper)
当流被订阅后,映射器对输入流中的元素执行必要的转换(执行上述 mapper
操作)。这些元素中的每一个都可以转换为多个数据,然后用于创建新的流。
那么如何判断什么时候用 map
,什么时候用 flatMap
呢?
- 凡是涉及需要异步处理的方法,都用
flatMap
,比如请求接口,读取数据库数据,读取缓存等等。 - 其他同步方法,比如转换对象,对已有数据做计算等等,都用
map
。
filter
filter
操作可以对数据元素进行筛选
public final Flux<T> filter(Predicate<? super T> p)
public final Mono<T> filter(final Predicate<? super T> tester)
filter
接受一个 Predicate
的函数式接口为参数,这个函数式的作用是进行判断并返回 Boolean。
举例说明:
Flux.range(1, 10) // 1
.filter(i -> i % 3 == 0) // 2
.doOnNext(System.out::println)
.subscribe();
- 生成从 “1” 开始,自增为 1 的 “10” 个整型数据
filter
的 Lambda 参数表示过滤操作保留 3 的倍数
输出:
3
6
9
switchIfEmpty
switchIfEmpty
操作在一个序列元素为空时,替换为另一个元素
举例说明:
@Data
@AllArgsConstructor
@NoArgsConstructor
class User {
private String name;
}
public Mono<User> getUserFromCache() {
return Mono.just(new Random().nextBoolean())
.filter(b -> b)
.map(u -> new User("user from cache"));
}
从缓存中获取用户信息,可能返回为空。
public Mono<User> getUserFromConsole() {
return Mono.just(new Random().nextBoolean())
.filter(b -> b)
.map(u -> new User("user from console"));
}
从控制台中获取用户信息,可能返回为空。
@GetMapping("/user/get")
public Mono<User> getUser() {
return getUserFromCache() // 1
.switchIfEmpty(Mono.defer(() -> getUserFromConsole())) //2
.switchIfEmpty(Mono.defer(() -> Mono.error(new RuntimeException("用户不存在")))); //3
}
- 从缓存中获取用户信息
- 如果缓存中数据为空,则从控制台获取用户信息
- 如果控制台中数据仍为空,则抛出用户不存在异常终止序列
输出:
# 结果 1
{
"name": "user from cache"
}
# 结果 2
{
"name": "user from console"
}
# 结果 3
{
"timestamp": "2022-10-05T04:34:39.274+00:00",
"path": "/reactive/user/get",
"status": 500,
"error": "Internal Server Error",
"requestId": "653d3049-14"
}
defer
defer
把元素加入流中,但是延时加载,直到 subsribe()
时才会加载
举例说明:
Mono<String> dateTime = Mono.just(LocalDateTime.now().toString());
System.out.println("t0: " + dateTime.block());
Thread.sleep(10_000);
System.out.println("t1: " + dateTime.block());
Thread.sleep(5_000);
System.out.println("t2: " + dateTime.block());
输出:
t0: 2022-10-05T13:23:31.482
t1: 2022-10-05T13:23:31.482
t2: 2022-10-05T13:23:31.482
调用 Mono.just(LocalDateTime.now().toString())
时会立即执行 LocalDateTime.now().toString()
并且获取到结果,通过订阅 Mono
时只会将结果发出,所以订阅多次并不会改变原来的值。
Mono<String> dateTime = Mono.defer(() -> Mono.just(LocalDateTime.now().toString()));
System.out.println("t0: " + dateTime.block());
Thread.sleep(10_000);
System.out.println("t1: " + dateTime.block());
Thread.sleep(5_000);
System.out.println("t2: " + dateTime.block());
输出:
t0: 2022-10-05T13:24:40.500
t1: 2022-10-05T13:24:50.515
t2: 2022-10-05T13:24:55.519
defer
操作符会在每次订阅时重新评估 Lambda 表达式中的内容。
⭐Tips
有一个用法是,当需要定义异常 Mono.error()
时,并不需要每次都创建,而是在出现异常时才加载,这样可以节省内存。
Mono.defer(() -> Mono.error(new RuntimeException("异常")));
then
then
操作符会忽略前一个流的结果,并将序列转换为另一个 Mono<V>
或者 Mono<Void>
。
public final <V> Mono<V> then(Mono<V> other)
public final Mono<Void> then()
Mono.just(1)
.then() // 1
.doOnNext(item -> System.out.println(item)) // 2
.subscribe();
- 忽略掉
Mono.just
中的元素,返回Mono<Void>
。 - 输出序列中的值(因为前一个序列已经替换为空,所以此处不会有任何输出)
Mono.just(1)
.then(Mono.just("other item")) // 1
.doOnNext(item -> System.out.println(item))
.subscribe();
- 忽略掉
Mono.just
中的元素,返回新的Mono
序列。 - 输出序列中的值
输出:
other item
使用场景:
public class SaveSessionGatewayFilterFactory extends AbstractGatewayFilterFactory<Object> {
@Override
public GatewayFilter apply(Object config) {
return new GatewayFilter() {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
return exchange.getSession().map(WebSession::save).then(chain.filter(exchange));
}
@Override
public String toString() {
return filterToStringCreator(SaveSessionGatewayFilterFactory.this).toString();
}
};
}
}
参考自保存 Session 的网关过滤器 SaveSessionGatewayFilterFactory
,filter
方法中,map
调用了 save()
,该方法返回值是 Mono<Void>
,该序列会因为没有元素发出而终止。这里使用 then
,不管上游序列的输出是 Mono<Void>
还是其他元素,将其忽略,并向下继续调用其他过滤器的方法。
如果你关心前一个流的结果,可以使用
map
,flatMap
或者其他map
的变种方法,如果你只想要前一个流完成,不关心流的数据,则使用then
。
doOnNext
doOnNext
操作符,当一个可用且存在的数据成功发出时,触发添加的无副作用行为。
public final Mono<T> doOnNext(Consumer<? super T> onNext)
public final Flux<T> doOnNext(Consumer<? super T> onNext)
doOnNext
接收一个 Consumer
函数为参数,仅有入参,没有出参,并不会对原序列的数据类型造成影响。
举例说明:
Mono.just(new User("Alex"))
.doOnNext(u -> System.out.println(u.getName()))
.doOnNext(u -> u.setName("Bob"))
.doOnNext(u -> System.out.println(u.getName()))
.subscribe();
输出:
Alex
Bob
doOnSuccess
doOnSuccess
操作符,在 Mono
序列成功完成时触发,不管结果是 T
还是 null
。
举例说明:
Mono.empty()
.doOnSuccess(i -> System.out.println("on success: " + i))
.subscribe();
Mono.just("hello")
.doOnSuccess(i -> System.out.println("on success: " + i))
.subscribe();
输出:
on success: null
on success: hello
doOnSuccess
仅可作用在 Mono
序列中。
doOnNext 与 doOnSuccess 的区别
doOnNext
是可有用数据是才触发。doOnSuccess
只要上游执行的序列没有报错,都可以判断为成功,不管结果是什么。
doFinally
doFinally
操作符,会添加一个行为,当 Mono
序列因为任何原因终止时触发。
public final Mono<T> doFinally(Consumer<SignalType> onFinally)
public final Flux<T> doFinally(Consumer<SignalType> onFinally)
doFinally
接收一个 Consumer
函数,入参为 SignalType
信号类型,包括取消、完成、错误信号等等。
举例说明:
- 完成信号
Mono.just("hello")
.doOnNext(i -> System.out.println("next: " + i))
.doFinally(signalType -> System.out.println("finally, signalType: " + signalType.name()))
.subscribe();
输出:
next: hello
finally, signalType: ON_COMPLETE
- 异常信号
Mono.just("hello")
.then(Mono.error(new RuntimeException("exception")))
.doOnNext(i -> System.out.println("next: " + i))
.doFinally(signalType -> System.out.println("finally, signalType: " + signalType.name()))
.subscribe();
输出:
23:15:12.203 [main] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: exception
Caused by: java.lang.RuntimeException: exception
at com.opoa.controller.ReactiveController.main(ReactiveController.java:69)
finally, signalType: ON_ERROR
可以看到,在 Mono
序列中抛出了一个异常,终止了 doOnNext
的正常执行,但 doFinally
仍能触发,并且信号类型为错误,所以 doFinally
始终能够执行。
如果有连续的 doFinally
在执行,其顺序是倒序
Flux.just("hello", "world")
.doFinally(f -> System.out.println("finally - 1"))
.doOnNext(w -> System.out.println("on next 1: " + w))
.doFinally(f -> System.out.println("finally - 2"))
.doOnNext(w -> System.out.println("on next 2: " + w))
.doFinally(f -> System.out.println("finally - 3"))
.subscribe();
输出
on next 1: hello
on next 2: hello
on next 1: world
on next 2: world
finally - 3
finally - 2
finally - 1
doOnError
doOnError
操作符,会添加一个行为,当序列因为异常终止时触发。
public final Mono<T> doOnError(Consumer<? super Throwable> onError)
public final <E extends Throwable> Mono<T> doOnError(Class<E> exceptionType,
final Consumer<? super E> onError)
public final Mono<T> doOnError(Predicate<? super Throwable> predicate,
final Consumer<? super Throwable> onError)
doOnError
有三个重载方法:
- 一个异常类消费者,出现异常时调用。
- 传参为一个异常类的 Class 和一个对应消费者,当出现该类型的异常时才调用。
- 传参为一个断言函数和一个对应消费者,当断言结果为 true 时才调用。
举例说明:
Mono.just(9)
.map(i -> i / 0)
.doOnError(e -> log.error("an error occurred"))
.subscribe();
输出:
22:08:01.282 [main] ERROR com.opoa.controller.ReactiveController - an error occurred
22:08:01.284 [main] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.ArithmeticException: / by zero
Caused by: java.lang.ArithmeticException: / by zero
...(略)
onErrorResume
onErrorResume
,当序列发生错误时,订阅一个备用的发布者 Mono/Flux
。
public final Mono<T> onErrorResume(Function<? super Throwable, ? extends Mono<? extends
T>> fallback)
public final <E extends Throwable> Mono<T> onErrorResume(Class<E> type,
Function<? super E, ? extends Mono<? extends T>> fallback)
public final Mono<T> onErrorResume(Predicate<? super Throwable> predicate,
Function<? super Throwable, ? extends Mono<? extends T>> fallback)
onErrorResume
有三个重载方法:
- 单独一个备用函数作为参数
- 传参为一个异常类的 Class 和一个备用函数,当出现该类型的异常时才调用。
- 传参为一个断言函数和一个备用函数,当断言结果为 true 时才调用。
举例说明:
Mono.just(9)
.map(i -> i / 0)
.onErrorResume(e -> {
log.error("an error occurred: {}", e.getMessage());
return Mono.just(5);
})
.subscribe(System.out::println);
输出:
22:08:16.636 [main] ERROR com.opoa.controller.ReactiveController - an error occurred: / by zero
5
repeat
repeat
会在当前序列发送完成信号时,重新订阅。
举例说明:
Mono.just(5)
.doOnNext(s -> System.out.println("next 1"))
.doOnNext(s -> System.out.println("next 2"))
.doOnNext(s -> System.out.println("next 3"))
.repeat(2)
.subscribe();
输出
next 1
next 2
next 3
next 1
next 2
next 3
next 1
next 2
next 3
repeatWhen
repeatWhen
当满足特定条件时,会重复订阅这个序列。
举例说明:
Mono.just(5)
.doOnNext(s -> System.out.println("next 1"))
.doOnNext(s -> System.out.println("next 2"))
.doOnNext(s -> System.out.println("next 3"))
.repeatWhen(Repeat.onlyIf(s -> new Random().nextBoolean()).repeatMax(3))
.subscribe();
这个序列会随机进行重复订阅,当 new Random().nextBoolean()
为 false
或者达到最大重复次数 3 时,停止订阅。
输出:
next 1
next 2
next 3
22:18:09.620 [main] DEBUG reactor.retry.DefaultRepeat - Scheduling repeat attempt, retry context: iteration=1 repeatCompanionValue=1 backoff={0ms}
next 1
next 2
next 3
22:18:09.621 [main] DEBUG reactor.retry.DefaultRepeat - Stopping repeats since predicate returned false, retry context: iteration=2 repeatCompanionValue=1 backoff={0ms}
我们可以通过 repeatWhen 能使用更多高级选项
.repeatWhen(Repeat.times(3).randomBackoff(Duration.ofSeconds(1), Duration.ofSeconds(3)))
重试 3 次,每次重试间隔在 1 秒到 3 秒之间。
输出
next 1
next 2
next 3
22:22:33.025 [main] DEBUG reactor.retry.DefaultRepeat - Scheduling repeat attempt, retry context: iteration=1 repeatCompanionValue=1 backoff={1184ms/3000ms}
next 1
next 2
next 3
22:22:34.227 [parallel-1] DEBUG reactor.retry.DefaultRepeat - Scheduling repeat attempt, retry context: iteration=2 repeatCompanionValue=1 backoff={1939ms/3000ms}
next 1
next 2
next 3
22:22:36.170 [parallel-2] DEBUG reactor.retry.DefaultRepeat - Scheduling repeat attempt, retry context: iteration=3 repeatCompanionValue=1 backoff={1647ms/3000ms}
next 1
next 2
next 3
22:22:37.832 [parallel-3] DEBUG reactor.retry.DefaultRepeat - Repeats exhausted, retry context: iteration=4 repeatCompanionValue=1 backoff={EXHAUSTED}
如果你在返回 Mono
或者 Flux
的方法中使用了 repeat
或 repeatWhen
,该方法会在满足条件时被重复订阅。方法本身只会执行一次,repeat
或 repeatWhen
只是让这个方法被再次订阅。
示例:
public Mono<Boolean> tryLock() {
log.info("尝试获取锁,时间点:" + System.currentTimeMillis());
return Mono.just(new Random().nextBoolean());
}
获取锁的方法,该方法在执行时打印当前时间点,并且随机返回 Boolean。
@GetMapping("/getLock")
public Mono<Boolean> lock() {
AtomicBoolean locked = new AtomicBoolean(false); // 1
return tryLock() // 2
.doOnNext(locked::set) // 3
.doOnNext(l -> System.out.println("是否获取到锁:" + locked.get())) // 4
.repeatWhen(Repeat.onlyIf(r -> !locked.get()).randomBackoff(Duration.ofSeconds(2), Duration.ofSeconds(3))) // 5
.then(Mono.defer(() -> Mono.just(locked.get()))) // 6
.doFinally(f -> System.out.println("释放锁")); // 7
}
模拟一段获取锁的代码,需要获取锁成功时才返回结果。
- 创建原子布尔类型变量。
- 尝试获取锁,并输出时间节点。
- 将获取到的值赋给
locked
对象。 - 输出
locked
当前的变量值。 - 当未获取到锁时,一直进行重试,并且每次重试间隔 2 到 3 秒。
- 返回
locked
的变量。 - 最后,释放锁。
输出:
当第一次获取锁结果为 true
时
当第一次获取锁结果为 false
时
当第一次获取锁失败时,repeatWhen
触发,但 tryLock
只会执行一次,所以这段代码会一直拿不到锁,导致卡死。
为了确保 tryLock
每次重复时都执行,需要把它放到 defer
操作符中。
@GetMapping("/getLock")
public Mono<Boolean> lock() {
AtomicBoolean locked = new AtomicBoolean(false);
return Mono.defer(() -> tryLock())
.doOnNext(locked::set)
.doOnNext(l -> System.out.println("是否获取到锁:" + locked.get()))
.repeatWhen(Repeat.onlyIf(r -> !locked.get()).randomBackoff(Duration.ofSeconds(2), Duration.ofSeconds(3)))
.then(Mono.defer(() -> Mono.just(locked.get())))
.doFinally(f -> System.out.println("释放锁"));
}
输出:
当获取锁失败时,能正确执行 tryLock
,每次都去重新获取。
retry
retry
会在当前序列发送任何异常信号时,重新订阅。如果不指定重试次数时,默认重试次数为 Long
的最大值。
举例:
Flux.range(1, 5)
.doOnNext(i -> System.out.println("emitted: " + i))
.map(i -> {
if (i > 3) {
throw new RuntimeException("can not process > 3");
}
return i;
})
.subscribe(i -> System.out.println("received: " + i),
error -> System.out.println("error: " + error));
该序列会发出数字 1 到 5,但我们的流不能处理大于 3 的数字,否则会抛异常终止。
输出:
emitted: 1
received: 1
emitted: 2
received: 2
emitted: 3
received: 3
emitted: 4
error: java.lang.RuntimeException: can not process > 3
我们添加 retry(1)
,当收到异常信号时,重订阅一次当前序列。
Flux.range(1, 5)
.doOnNext(i -> System.out.println("emitted: " + i))
.map(i -> {
if (i > 3) {
throw new RuntimeException("can not process > 3");
}
return i;
})
.retry(1)
.subscribe(i -> System.out.println("received: " + i),
error -> System.out.println("error: " + error));
输出:
emitted: 1
received: 1
emitted: 2
received: 2
emitted: 3
received: 3
emitted: 4 // will retry
emitted: 1
received: 1
emitted: 2
received: 2
emitted: 3
received: 3
emitted: 4
error: java.lang.RuntimeException: can not process > 3
上面简单介绍了一些常用的操作符,但那也只是冰山一角,如果想要了解更多的操作符,可以通过以下途径:
- 想要实战的话,强烈推荐 Reactor 官方的 lite-rx-api-hands-on
项目,拿到项目后,你需要使用操作符,完成 “Todo” 的代码,让所有的
@Test
绿灯就 Ok 了。完成这些测试后,对常见的操作符就能了然于胸了。 - 在日常开发中,也可以通过 IDEA 进行查询,比如: