java之Flow API 中的 subscription.request(n) 如何在任意 n 值执行背压

lori 阅读:6 2024-04-12 10:01:50 评论:0

我在玩 Flow API,到目前为止,我知道 request()方法用于背压。大多数文章指出,这类似于控制消费速度。

但是,我看到的几乎每个示例代码都传递值 1request()方法,例如,subscription.request(1) .但我不太明白 request() 是怎么做的方法是控制消耗速度。

我试图通过向发布者发送一堆项目并打印线程名称来运行测试,似乎每个 onNext()运行在同一个工作线程上,因为我正在使用 request(1)request(50) :

@Override 
public void onNext(T item) { 
   System.out.println(Thread.getCurrent().getName()); 
   Thread.sleep(5000); 
   subscription.request(50); 
} 

如果 onNext()在不同的线程中运行,我可以理解 n传入 request(n) 的值将影响并行处理项目的速率(在 n 个线程中运行)。但在我的测试中似乎并非如此,因为它们都在相同的线程名称下运行。

在这种情况下, request(1) 和有什么区别?和 request(50)当它们仍然要在同一个线程上一个接一个地依次运行时?那消费率不是还是一样吗?

请您参考如下方法:

nrequest指示订阅者可以接受多少个元素并限制上游 Publisher 的项目数量可以发出。因此,这个生成器的减慢不是每个单独的项目,而是由消费者的处理时间交错生成的每个批次的平均时间。
onNext以序列化方式执行,并且取决于上游,也在同一线程上执行。因此,调用 request在那里通常表示它可以调用相同的上游 onNext , 当前通话结束后 , 下一个值(如果可用)。即,调用 Thread.sleep将推迟下一次调用 onNext .

一般来说,没有理由调用requestonNext终端订阅者,因为它与其直接上游同步运行 Publisher和单个 request(Long.MAX_VALUE) 之间没有实际区别并重复 request(1) .

调用 request 的少数理由之一如果onNext fork 异步工作本身,只有在该工作结束时才应该请求更多项目:

Executor executor = ... 
 
Subscription upstream; 
 
@Override public void onSubscribe(Subscription s) { 
    this.upstream = s; 
    executor.execute(() -> { 
       Thread.sleep(5000); 
       s.request(1); 
       return null; // Callable 
    }); 
} 
 
@Override public void onNext(T item) { 
    System.out.println("Start onNext"); 
    executor.execute(() -> { 
       System.out.println("Run work"); 
       Thread.sleep(5000); 
       System.out.println("Request more work"); 
       upstream.request(1); 
       return null; // Callable 
    }); 
    System.out.println("End onNext"); 
} 

使用此设置,上游将调用 onNext一次,只有当执行者执行的任务发出下一个请求时才会调用它。请注意,除非 Publisher从专用线程发出,上面的例子最终会拖拽 onNext调用到 executor的线程。


标签:java
声明

1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。

关注我们

一个IT知识分享的公众号