v7.6.0
✨ Portable Concurrent Async Iteration ✨
This release introduces the ConcurrentStream
API: a concurrent async/.await
adaptation of Rayon's ParallelStream
API. We've been trying to implement this API for the past six or so years, and we've happy to announce we finally have a working implementation!
The main feature that sets this implementation apart from other, similar attempts is that it works for any existing Stream
impl. All you need to do is call the .co()
method to obtain a ConcurrentStream
, and from that point all Stream
combinators should just work as expected:
use futures_concurrency::prelude::*;
let v: Vec<_> = stream::repeat("chashu")
.co() // ← call this to convert any `Stream` into a `ConcurrentStream`
.take(2)
.map(|msg| async move { format!("hello {msg}") })
.collect()
.await;
assert_eq!(v, &["hello chashu", "hello chashu"]);
See the call to collect
at the end there? That's right: that's concurrent async iteration, collecting back into a single structure. This makes writing fan-out/fan-in pipelines trivial to author. But that's not all: in addition to converting into collections, we can also directly convert collections into ConcurrentStream
implementations too:
use futures_concurrency::prelude::*;
let v: Vec<_> = vec!["chashu", "nori"]
.into_co_stream() // ← call this to convert collections into concurrent async iterators
.map(|msg| async move { format!("hello {msg}") })
.collect()
.await;
The amount of concurrency by default is unbounded, but can be bounded by calling the limit method. This will apply backpressure should the stream produce items faster than the concurrent iterator can process them.
This API also resolves the buffered streams problem. ConcurrentStream
removes the need for the dreaded combination of mapping to futures and then calling the futures-rs buffered
method. Instead it ensures that the processing of items in a loop always happens in concert with the execution of the concurrent futures 1.
Notably this API will work with any async runtime or async framework, because it makes no assumptions about the underlying runtime. The only assumption is makes is that an allocator is available. This means that at this time, unlike most other APIs in futures-concurrency
this will not work on #[no_std]
environments. This, however, is not an inherent restriction but merely an artifact of the implementation. In the future we may explore porting this to a #[no_std]
compatible version - this will require some minor API changes, but should, as a system, likely work.
In order to make this system work with parallel execution it should be possible to write a custom adapter. We encourage async runtimes to wrap the ConcurrentStream
trait exposed in this crate to create their own ParallelStream
system. This can depend on the runtime, and ensure that all execution not only happens concurrently, but can also be scheduled on multiple cores.
We're excited for people to give this a try. We certainly hope this lowers the bar for correctly applying structured, asynchronous, concurrent streaming processing in Rust!
What's Changed
- Fix clippy warnings by @matheus-consoli in #170
- Concurrent stream by @yoshuawuyts in #164
- Update docs for docs.rs by @yoshuawuyts in #174
- Fixes redundant
todo!
by @yoshuawuyts in #176 - Add
{Future,Stream}::wait_until
by @yoshuawuyts in #167 - Manually track capacity of groups by @matheus-consoli in #177
Full Changelog: v7.5.0...v7.6.0
-
There is a more elaborate version of this problem we don't have a user story for yet. Rust's futures model couples "liveness" to "backpressure". In theory we might fail to send keepalive messages if we apply backpressure for too long; this is a direct artifact of queueing theory, and would be a reason to add some form of async
Future::pending
/Future::poll_pending
method for. This is a more subtle / niche issue than "barbara battles buffered streams". But it's worth explicitly calling out the limits of our solutions. ↩