我们需要额外的背压控制吗?
背景
在探索全链路WebFlux响应式编程与gRPC调用时,我试图增加给生产者增加背压控制这个东西,寄希望于:消费端能通过一个gRPC接口控制生产端的生产速率。
但是后来我发现完全错了,这样的背压控制给生产端带来的逻辑复杂度不是一星半点。
所以本文主要探讨的问题就是:为什么我们不需要背压控制,以及带来了什么样的复杂度。
全链路WebFlux响应式编程与gRPC调用
当然开始之前,我们需要一个实际场景来说明问题:
试想一个场景:你需要通过gRPC从一个服务读取整个数据表到另一个服务,不用担心分页、内存溢出等问题。更神奇的是,当处理时间过长时,消费端会自动停止生产端的数据流,生产端甚至可以停止其响应式数据库驱动程序(如R2DBC),完全停止从数据库读取数据。
你觉得需要写多少代码才能实现这样健壮的功能?答案可能会让你惊讶:不到200行代码。
是的,这对于webflux来说是完全可以做到的。
开发的步骤甚至很简单:
- 生产端/消费端都引入gRPC框架
- 定义 data_stream.proto 文件
这里不做具体代码的展开了,网上的教程其实挺多的。
然后一对接,你就拥有了上述的功能,甚至不用分页就支持亿级数据传输。gRPC带来的性能提升也非常可观,吞吐量上升、延迟最低降低至不到1ms,这在REST API中是不可能的。
然后我开始想🤔,能不能消费端能通过一个gRPC接口控制生产端的生产速率呢?
结果是不行,甚至是错误的做法。
我曾试图增加一个额外的背压控制服务,代码类似如下:
根据requestedItems 请求的数据项数量,来限制上游对这个数据流的生产速率,但是实际操作后发现:
除非下游通过接口把积压情况传给上游,那么上游永远无法得知下游真实的积压情况。
而且如果 每个上游都使用复杂逻辑去控制生产速率,那么对于维护性来说是不可接受的。这种速率控制逻辑强耦合在数据生产的前后,即便是使用AOP切面也是难以处理的。
package org.example.sourceservice.service;
/**
* 背压控制服务
* 管理数据生产速率和消费者请求
*/
@Service
public class BackpressureControlService {
private static final Logger logger = LoggerFactory.getLogger(BackpressureControlService.class);
// 默认生产间隔(毫秒)
private final AtomicLong productionInterval = new AtomicLong(1000);
// private final AtomicLong productionInterval = new AtomicLong(0);
// 消费者请求映射
private final ConcurrentHashMap<Long, AtomicInteger> consumerRequests = new ConcurrentHashMap<>();
// 背压信号发射器
private final Sinks.Many<Long> backpressureSink = Sinks.many().multicast().onBackpressureBuffer();
/**
* 设置生产间隔
* @param intervalMillis 间隔毫秒数
*/
public void setProductionInterval(long intervalMillis) {
long oldInterval = this.productionInterval.getAndSet(intervalMillis);
logger.info("Production interval changed from {}ms to {}ms", oldInterval, intervalMillis);
// 发送背压信号
backpressureSink.tryEmitNext(intervalMillis);
}
/**
* 获取当前生产间隔
* @return 间隔毫秒数
*/
public long getProductionInterval() {
return productionInterval.get();
}
/**
* 注册消费者请求
* @param consumerId 消费者ID
* @param requestedItems 请求的数据项数量
*/
public void registerConsumerRequest(long consumerId, int requestedItems) {
consumerRequests.computeIfAbsent(consumerId, k -> new AtomicInteger(0))
.addAndGet(requestedItems);
logger.info("Consumer {} requested {} items, total pending: {}",
consumerId, requestedItems, consumerRequests.get(consumerId).get());
// 根据请求量动态调整生产速率
adjustProductionRate();
}
/**
* 消费数据项
* @param consumerId 消费者ID
* @return 是否成功消费
*/
public boolean consumeItem(long consumerId) {
//每消费一个就减少一个
AtomicInteger pending = consumerRequests.get(consumerId);
if (pending != null && pending.get() > 0) {
pending.decrementAndGet();
return true;
}
return false;
}
/**
* 获取消费者待处理请求数
* @param consumerId 消费者ID
* @return 待处理请求数
*/
public int getPendingRequests(long consumerId) {
AtomicInteger pending = consumerRequests.get(consumerId);
return pending != null ? pending.get() : 0;
}
/**
* 获取背压信号流
* @return 背压信号流
*/
public Sinks.Many<Long> getBackpressureSink() {
return backpressureSink;
}
/**
* 根据消费者请求动态调整生产速率
*/
private void adjustProductionRate() {
int totalPendingRequests = consumerRequests.values().stream()
.mapToInt(AtomicInteger::get)
.sum();
long newInterval;
if (totalPendingRequests > 100) {
// 请求积压过多,说明消费者的处理能力已经跟不上生产者的生产速度。减慢生产速率
newInterval = Math.min(5000, productionInterval.get() * 2);
} else if (totalPendingRequests < 10) {
// 请求较少,加快生产速率
newInterval = Math.max(0, productionInterval.get() / 2);
} else {
// 保持当前速率
return;
}
if (newInterval != productionInterval.get()) {
setProductionInterval(newInterval);
}
}
/**
* 移除消费者
* @param consumerId 消费者ID
*/
public void removeConsumer(long consumerId) {
consumerRequests.remove(consumerId);
logger.info("Consumer {} removed", consumerId);
}
}
后来我想到这个问题可以类比成:假设你是某个村的村长,每个月每个村民都找你领取固定的饲料喂猪。村长应该做的是饲料的稳定产出和对每个村民的领取存在上限/下限,而不应该由村民说:村长,这个月你给我的饲料我都用不完,下个月你少给我点
为什么?
村民的行为是错的。最了解村民需求的只有他自己,换言之:知道下游性能负载情况的只有下游本身。如果下游处理不过来,那么说明下游在request item的时候,一次性请求了太多的数据。
那么这种错误的请求反应到上游,自然是狂推数据给下游,但是这个速率一定有上限的。不能挤压其他下游数据流的速率。
这个场景中,即便出现问题。上游也不会挂,而下游如果没有配置背压策略,比如:.onBackpressureDrop(dropped -> System.out.println("Dropped: " + dropped))
那么下游出错或者崩溃是正常的,因为本身下游的服务已经在胡乱请求数据了,但是上游是感知不到的,即便是webflux,上游无法得知下游的内部积压。所以上游应该做的是保护自己不被下游的胡乱请求冲垮自身的服务。
上游 真正的职责
上游的职责很简单:
- 稳定产出:保证自己的服务能稳定、健康地输出数据流。
- 自我保护:一旦下游乱要(比如一下子 request 1 万条),上游可以选择丢弃、限速或只保留最新数据,但绝不会为了迎合下游的需求而牺牲自身稳定性。
- 协议契约:Reactive Streams 已经规定了 request(n) 机制,这本身就是背压的天然存在,不需要额外搞一个 gRPC 接口来人为控制。
所以说,背压控制不是由上游显式实现的,而是下游在消费的时候自然体现出来的。
下游该做的事
下游才是对自身能力最清楚的一方,它要做的是:
- 合理 request
- 不要一口气请求过量数据,BaseSubscriber 可以逐个或按批次拉取:
Flux.range(1, 1000)
.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(10); // 每次要10个
}
@Override
protected void hookOnNext(Integer value) {
process(value);
request(1); // 消费一个再拉一个
}
});
- 选择合适的策略
- 如果怕爆内存,用 .onBackpressureDrop()
- 如果业务只关心最新数据,用 .onBackpressureLatest()
- 如果能容忍缓存,用 .onBackpressureBuffer(1000)
- 限流与保护
- 下游可以在入口就加限流器(如 RateLimiter 或 Resilience4j),防止高并发情况下自爆。
为什么我们不需要“额外的背压控制”
总结一下:
- 背压天然存在:Reactive Streams 协议里就有 request(n),不需要再造轮子。
- 复杂度问题:如果消费端还要通过 gRPC 额外调用来告诉生产端“放慢点”,这反而增加了网络交互和状态同步的复杂度,容易出 bug。
- 职责分离:
- 上游 → 稳定产出、保护自己。
- 下游 → 合理 request、保护自己。
- 健壮性:这种模型下,即便下游挂掉,上游也能继续健康运行;反之,如果上游为了迎合下游而改逻辑,就可能被下游拖垮。
最后的思考
所以,当我们在讨论 “需不需要背压控制” 时,其实答案是:
👉 我们已经有了背压,不需要人为再造一个控制器。 真正需要的是下游如何自我约束,合理利用 Reactive Streams 提供的天然背压机制。
换句话说:
- 背压是 协议层面的自然产物,而不是应用层面的管理逻辑。
- 我们不需要再人为增加“生产速率接口”,否则只会带来复杂度,却不带来实质好处。