1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
use crate::stream::{Fuse, FuturesUnordered, StreamExt};
use futures_core::future::Future;
use futures_core::stream::{Stream, FusedStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
use pin_project::{pin_project, project};
use core::fmt;
use core::pin::Pin;
#[pin_project]
#[must_use = "streams do nothing unless polled"]
pub struct BufferUnordered<St>
where
St: Stream,
{
#[pin]
stream: Fuse<St>,
in_progress_queue: FuturesUnordered<St::Item>,
max: usize,
}
impl<St> fmt::Debug for BufferUnordered<St>
where
St: Stream + fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("BufferUnordered")
.field("stream", &self.stream)
.field("in_progress_queue", &self.in_progress_queue)
.field("max", &self.max)
.finish()
}
}
impl<St> BufferUnordered<St>
where
St: Stream,
St::Item: Future,
{
pub(super) fn new(stream: St, n: usize) -> BufferUnordered<St>
where
St: Stream,
St::Item: Future,
{
BufferUnordered {
stream: super::Fuse::new(stream),
in_progress_queue: FuturesUnordered::new(),
max: n,
}
}
delegate_access_inner!(stream, St, (.));
}
impl<St> Stream for BufferUnordered<St>
where
St: Stream,
St::Item: Future,
{
type Item = <St::Item as Future>::Output;
#[project]
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
#[project]
let BufferUnordered { mut stream, in_progress_queue, max } = self.project();
while in_progress_queue.len() < *max {
match stream.as_mut().poll_next(cx) {
Poll::Ready(Some(fut)) => in_progress_queue.push(fut),
Poll::Ready(None) | Poll::Pending => break,
}
}
match in_progress_queue.poll_next_unpin(cx) {
x @ Poll::Pending | x @ Poll::Ready(Some(_)) => return x,
Poll::Ready(None) => {}
}
if stream.is_done() {
Poll::Ready(None)
} else {
Poll::Pending
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
let queue_len = self.in_progress_queue.len();
let (lower, upper) = self.stream.size_hint();
let lower = lower.saturating_add(queue_len);
let upper = match upper {
Some(x) => x.checked_add(queue_len),
None => None,
};
(lower, upper)
}
}
impl<St> FusedStream for BufferUnordered<St>
where
St: Stream,
St::Item: Future,
{
fn is_terminated(&self) -> bool {
self.in_progress_queue.is_terminated() && self.stream.is_terminated()
}
}
#[cfg(feature = "sink")]
impl<S, Item> Sink<Item> for BufferUnordered<S>
where
S: Stream + Sink<Item>,
S::Item: Future,
{
type Error = S::Error;
delegate_sink!(stream, Item);
}