一文彻底搞懂WebFlux中的publishOn 和 subscribeOn怎么使用
1. 前言
在学习project reactor的时候,对publishOn 和 subscribeOn 使用时总是让我觉得有点迷惑,因为经常我期望它并行执行的时候,却又是串行。这篇文章会以多种不同的例子区分两者,彻底搞清楚应该怎么使用。
2. 在开始之前
希望你已经知道了publishOn和subscribeOn 的基本概念。
publishOn用于改变后续操作符执行的线程,它会影响到其后的操作符在哪个线程上执行。
比如说如下代码:
Flux.range(0, 3)
.map(it - > {
System.out.println("当前线程名输出1:" + Thread.currentThread().getName());
return it;
})
.publishOn(Schedulers.newParallel("NEW"))
.map(it - > {
System.out.println("当前线程名输出2:" + Thread.currentThread().getName());
return it;
})
.subscribe();
在第一个map中输出的线程名是main,而在第二个map中输出的是NEW

subscribeOn的作用是在这个开始执行的时候,指定使用的线程调度器。
具体来说,subscribeOn 会影响到整个数据流的起始部分,即从订阅操作符开始,直到数据流的起始发布者(Publisher)的部分。它将决定订阅操作在哪个线程上执行,从而影响整个数据流的执行上下文。
以代码为例子:
Flux.range(0, 3)
.map(it - > {
System.out.println("当前线程名输出1:" + Thread.currentThread().getName());
return it;
})
.map(it - > {
System.out.println("当前线程名输出2:" + Thread.currentThread().getName());
return it;
})
.subscribeOn(Schedulers.newParallel("NEW"))
.subscribe();
虽然我把subscribeOn的操作放在最后,但是在执行的时候,会导致整个流从最开始就在NEW的线程上执行

这是最基本你需要知道的,接下来让我们看点不一样的。
2.1 Mock Server
我们需要一些准备工作,新建一个SpringBoot项目,搭建一个简单的Mock Server 用来模拟我们工作时,请求的第三方服务。
代码只写一个controller,并延迟一秒后进行返回
@SpringBootApplication
@RequestMapping("/")
@Controller
public class TempHttpServerApplication {
public static void main(String[] args) {
SpringApplication.run(TempHttpServerApplication.class, args);
}
@GetMapping("")
@ResponseBody
public String hello() throws InterruptedException {
Thread.sleep(1000);
return "hello SpringBoot";
}
}
3. 从最简单的开始
在下面的代码中,输出从1到10,这是再简单不过的例子了
public static void main(String[] args) {
Flux.range(1, 10)
.subscribe(it -> System.out.println(it));
}
接着我们增加一个executeRequest 方法用来请求我们刚刚的Mock Server,在流执行时我们进行调用,因为我们是在map方法中执行的,所以看输出的日期,显而易见整个流是串行执行的。
public static void main(String[] args) {
Flux.range(1, 10)
.map(it -> executeRequest())
.subscribe(it -> {
System.out.println(Instant.now() + " " +it);
});
}
private static String executeRequest(){
RestTemplate restTemplate = new RestTemplate();
return restTemplate.exchange("http://localhost:8888", HttpMethod.GET, null, String.class).getBody();
}
2024-02-24T02:59:38.612704Z hello SpringBoot
2024-02-24T02:59:39.626442Z hello SpringBoot
2024-02-24T02:59:40.635633Z hello SpringBoot
2024-02-24T02:59:41.643354Z hello SpringBoot
2024-02-24T02:59:42.646554Z hello SpringBoot
2024-02-24T02:59:43.654952Z hello SpringBoot
2024-02-24T02:59:44.658078Z hello SpringBoot
2024-02-24T02:59:45.662179Z hello SpringBoot
2024-02-24T02:59:46.666218Z hello SpringBoot
2024-02-24T02:59:47.671938Z hello SpringBoot
3.1 .map → .flatMap
在上面的那个例子中,如果我们.map改为.flatMap 时,因为flatMap中默认是并行执行的,但是实际在运行时,如果.flatMap中如果为同步操作,指:Mono.just(executeRequest()),会导致整个流变为同步。
public static void main(String[] args) throws InterruptedException {
Flux.range(1, 10)
.flatMap(it -> Mono.just(executeRequest()), 8)
.subscribe(it -> {
System.out.println(Instant.now() + " " +it);
});
}
private static String executeRequest(){
RestTemplate restTemplate = new RestTemplate();
return restTemplate.exchange("http://localhost:8888", HttpMethod.GET, null, String.class).getBody();
}
3.2 增加publishOn
为了让flatMap 中的执行改为并行,这时候有同学就要说了,使用publishOn !
可惜的是,在下面这个例子中,虽然使用了publishOn, 让下游线程执行在新的线程组中执行,但是因为flatMap中存在同步操作,指:Mono.just(executeRequest()),所以整个流仍然为同步执行。
public static void main(String[] args) throws InterruptedException {
Flux.range(1, 10)
.publishOn(Schedulers.newParallel("REQ-T", 2))
.flatMap(it - > Mono.just(executeRequest()), 2)
.subscribe(it - > {
System.out.println(Instant.now() + " " + it);
});
}
private static String executeRequest() {
RestTemplate restTemplate = new RestTemplate();
return restTemplate.exchange("http://localhost:8888", HttpMethod.GET, null, String.class).getBody();
}
3.3 如果使用subscribeOn 呢
订阅内部流的执行在其他线程, 且设置flatMap的并行度为2,但是整个流还是同步执行,因为
Mono.just 为意味着热数据,每次会同步的执行去获取,因此执行的时候仍然为同步。
public static void main(String[] args) throws InterruptedException {
Flux.range(1, 10)
.flatMap(it -> Mono.just(executeRequest())
.subscribeOn(Schedulers.newParallel("REQ-T", 2))
.log()
, 2)
.subscribe(it -> {
System.out.println(Instant.now() + " " +it);
});
}
private static String executeRequest(){
RestTemplate restTemplate = new RestTemplate();
return restTemplate.exchange("http://localhost:8888", HttpMethod.GET, null, String.class).getBody();
}
日志:
11:03:44.820 [main] INFO reactor.Mono.SubscribeOnValue.1 -- onSubscribe([Fuseable] FluxSubscribeOnValue.ScheduledScalar)
11:03:44.822 [main] INFO reactor.Mono.SubscribeOnValue.1 -- request(32)
11:03:44.824 [REQ-T-1] INFO reactor.Mono.SubscribeOnValue.1 -- onNext(hello SpringBoot)
2024-02-24T03:03:44.824966Z hello SpringBoot
11:03:44.827 [REQ-T-1] INFO reactor.Mono.SubscribeOnValue.1 -- onComplete()
11:03:45.829 [main] INFO reactor.Mono.SubscribeOnValue.2 -- onSubscribe([Fuseable] FluxSubscribeOnValue.ScheduledScalar)
11:03:45.829 [main] INFO reactor.Mono.SubscribeOnValue.2 -- request(32)
11:03:45.829 [REQ-T-2] INFO reactor.Mono.SubscribeOnValue.2 -- onNext(hello SpringBoot)
2024-02-24T03:03:45.830078Z hello SpringBoot
11:03:45.830 [REQ-T-2] INFO reactor.Mono.SubscribeOnValue.2 -- onComplete()
11:03:46.837 [main] INFO reactor.Mono.SubscribeOnValue.3 -- onSubscribe([Fuseable] FluxSubscribeOnValue.ScheduledScalar)
11:03:46.837 [main] INFO reactor.Mono.SubscribeOnValue.3 -- request(32)
11:03:46.838 [REQ-T-3] INFO reactor.Mono.SubscribeOnValue.3 -- onNext(hello SpringBoot)
2024-02-24T03:03:46.838210Z hello SpringBoot
11:03:46.838 [REQ-T-3] INFO reactor.Mono.SubscribeOnValue.3 -- onComplete()
11:03:47.848 [main] INFO reactor.Mono.SubscribeOnValue.4 -- onSubscribe([Fuseable] FluxSubscribeOnValue.ScheduledScalar)
11:03:47.849 [main] INFO reactor.Mono.SubscribeOnValue.4 -- request(32)
11:03:47.849 [REQ-T-4] INFO reactor.Mono.SubscribeOnValue.4 -- onNext(hello SpringBoot)
2024-02-24T03:03:47.850130Z hello SpringBoot
11:03:47.850 [REQ-T-4] INFO reactor.Mono.SubscribeOnValue.4 -- onComplete()
11:03:48.861 [main] INFO reactor.Mono.SubscribeOnValue.5 -- onSubscribe([Fuseable] FluxSubscribeOnValue.ScheduledScalar)
11:03:48.861 [main] INFO reactor.Mono.SubscribeOnValue.5 -- request(32)
11:03:48.861 [REQ-T-5] INFO reactor.Mono.SubscribeOnValue.5 -- onNext(hello SpringBoot)
2024-02-24T03:03:48.861965Z hello SpringBoot
11:03:48.862 [REQ-T-5] INFO reactor.Mono.SubscribeOnValue.5 -- onComplete()
11:03:49.871 [main] INFO reactor.Mono.SubscribeOnValue.6 -- onSubscribe([Fuseable] FluxSubscribeOnValue.ScheduledScalar)
11:03:49.871 [main] INFO reactor.Mono.SubscribeOnValue.6 -- request(32)
11:03:49.871 [REQ-T-6] INFO reactor.Mono.SubscribeOnValue.6 -- onNext(hello SpringBoot)
2024-02-24T03:03:49.871636Z hello SpringBoot
11:03:49.871 [REQ-T-6] INFO reactor.Mono.SubscribeOnValue.6 -- onComplete()
11:03:50.879 [main] INFO reactor.Mono.SubscribeOnValue.7 -- onSubscribe([Fuseable] FluxSubscribeOnValue.ScheduledScalar)
11:03:50.879 [main] INFO reactor.Mono.SubscribeOnValue.7 -- request(32)
11:03:50.880 [REQ-T-7] INFO reactor.Mono.SubscribeOnValue.7 -- onNext(hello SpringBoot)
2024-02-24T03:03:50.880186Z hello SpringBoot
11:03:50.880 [REQ-T-7] INFO reactor.Mono.SubscribeOnValue.7 -- onComplete()
11:03:51.884 [main] INFO reactor.Mono.SubscribeOnValue.8 -- onSubscribe([Fuseable] FluxSubscribeOnValue.ScheduledScalar)
11:03:51.885 [main] INFO reactor.Mono.SubscribeOnValue.8 -- request(32)
11:03:51.885 [REQ-T-8] INFO reactor.Mono.SubscribeOnValue.8 -- onNext(hello SpringBoot)
2024-02-24T03:03:51.885495Z hello SpringBoot
11:03:51.885 [REQ-T-8] INFO reactor.Mono.SubscribeOnValue.8 -- onComplete()
11:03:52.895 [main] INFO reactor.Mono.SubscribeOnValue.9 -- onSubscribe([Fuseable] FluxSubscribeOnValue.ScheduledScalar)
11:03:52.896 [main] INFO reactor.Mono.SubscribeOnValue.9 -- request(32)
11:03:52.896 [REQ-T-9] INFO reactor.Mono.SubscribeOnValue.9 -- onNext(hello SpringBoot)
2024-02-24T03:03:52.896721Z hello SpringBoot
11:03:52.896 [REQ-T-9] INFO reactor.Mono.SubscribeOnValue.9 -- onComplete()
11:03:53.903 [main] INFO reactor.Mono.SubscribeOnValue.10 -- onSubscribe([Fuseable] FluxSubscribeOnValue.ScheduledScalar)
11:03:53.904 [main] INFO reactor.Mono.SubscribeOnValue.10 -- request(32)
11:03:53.904 [REQ-T-10] INFO reactor.Mono.SubscribeOnValue.10 -- onNext(hello SpringBoot)
2024-02-24T03:03:53.904910Z hello SpringBoot
11:03:53.905 [REQ-T-10] INFO reactor.Mono.SubscribeOnValue.10 -- onComplete()
3.4 publishOn+subscribeOn
我们都知道publishOn 针对的是后面流所执行的操作,因为executeRequest为同步,且使用的是Mono.just,所以整个流还是同步。
public static void main(String[] args) throws InterruptedException {
Flux.range(1, 10)
.publishOn(Schedulers.newParallel("publishOn-T", 2))
.flatMap(it -> Mono.just(executeRequest())
.subscribeOn(Schedulers.newParallel("subscribeOn-T", 2))
,2)
.subscribe(it -> {
System.out.println(Instant.now() + " " +it);
});
}
private static String executeRequest(){
RestTemplate restTemplate = new RestTemplate();
return restTemplate.exchange("http://localhost:8888", HttpMethod.GET, null, String.class).getBody();
}