Rust Async Programming: Stream Trait

The Stream trait is similar to the Future trait. While Future represents the state change of a single item, Stream, akin to the Iterator trait in the standard library, can yield multiple values before it finishes. Or simply put, a Stream is made up of a series of Futures, from which we can read each Future’s result until the Stream completes. Definition of Stream The Future is the most fundamental concept in asynchronous programming. If a Future represents a one-time asynchronous value, then a Stream represents a series of asynchronous values. Future is 1, while Stream is 0, 1, or N. The signature of Stream is as follows: pub trait Stream { type Item; fn poll_next(self: Pin, cx: &mut Context where Self: Unpin { assert_future::(Next::new(self)) } } // `next` returns the `Next` struct pub struct Next Next Self { Self { stream } } } // Next implements Future, each poll() is essentially polling from the stream via poll_next() impl Future for Next) -> Poll { self.stream.poll_next_unpin(cx) } } Creating Streams The futures library provides several convenient methods to create basic Streams, such as: empty(): creates an empty Stream once(): creates a Stream containing a single value pending(): creates a Stream that never yields a value and always returns Poll::Pending repeat(): creates a Stream that repeatedly yields the same value repeat_with(): creates a Stream that lazily yields values via a closure poll_fn(): creates a Stream from a closure that returns Poll unfold(): creates a Stream from an initial state and a closure that returns a Future use futures::prelude::*; #[tokio::main] async fn main() { let mut st = stream::iter(1..10) .filter(|x| future::ready(x % 2 == 0)) .map(|x| x * x); // Iterate over the stream while let Some(x) = st.next().await { println!("Got item: {}", x); } } In the code above, stream::iter generates a Stream, which is then passed through filter and map operations. Finally, the stream is iterated, and the resulting data is printed. When you’re not concerned with async/await and only care about the stream behavior, Stream::iter is quite handy for testing. Another interesting method is repeat_with, which lets you pass a closure to lazily generate values on demand, for example: use futures::stream::{self, StreamExt}; // From the zeroth to the third power of two: async fn stream_repeat_with() { let mut curr = 1; let mut pow2 = futures::stream::repeat_with(|| { let tmp = curr; curr *= 2; tmp }); assert_eq!(Some(1), pow2.next().await); assert_eq!(Some(2), pow2.next().await); assert_eq!(Some(4), pow2.next().await); assert_eq!(Some(8), pow2.next().await); } Implementing a Stream Creating your own Stream involves two steps: First, define a struct to hold the stream’s state Then, implement the Stream trait for that struct Let’s create a stream called Counter that counts from 1 to 5: #![feature(async_stream)] // First, the struct: /// A stream that counts from one to five struct Counter { count: usize, } // We want the counter to start from one, so let’s add a `new()` method as a helper. // This isn’t strictly necessary, but it’s convenient. // Note that we start `count` from zero — the reason will be clear in the implementation of `poll_next()`. impl Counter { fn new() -> Counter { Counter { count: 0 } } } // Then, we implement `Stream` for `Counter`: impl Stream for Counter { // We’ll use `usize` for counting type Item = usize; // `poll_next()` is the only required method fn poll_next(mut self: Pin, cx: &mut Context break; } } An example of computing the sum of values in a stream: use futures_util::{pin_mut, Stream, stream, StreamExt}; async fn sum(stream: impl Stream) -> usize { // Don’t forget to pin the stream before iteration pin_mut!(stream); let mut sum: usize = 0; // Iterate over the stream while let Some(item) = stream.next().await { sum = sum + item; } sum } If you process one value at a time, you might miss out on the benefits of concurrency, which defeats the purpose of asynchronous programming. To process multiple values concurrently from a Stream, you can use for_each_concurrent and try_for_each_concurrent: use std::{pin::Pin, io}; use futures_util::{Stream, TryStreamExt}; async fn jump_around(stream: Pin) -> Result { // Use `try_for_each_concurrent` stream.try_for_each_concurrent(100, |num| async move { jump_n_times(num).await?; report_n_jumps(num).await?; Ok(()) }).await?; Ok(()) } async fn jump_n_times(num: i32) -> Result { println!("jump_n_times :{}", num + 1); Ok(()) } async fn report_n_jumps(num: i32) -> Result { println!("report_n_jumps : {}", num); Ok(()) } Summary Stream is similar to Future, but while Futu

Mar 22, 2025 - 20:52
 0
Rust Async Programming: Stream Trait

Cover

The Stream trait is similar to the Future trait. While Future represents the state change of a single item, Stream, akin to the Iterator trait in the standard library, can yield multiple values before it finishes. Or simply put, a Stream is made up of a series of Futures, from which we can read each Future’s result until the Stream completes.

Definition of Stream

The Future is the most fundamental concept in asynchronous programming. If a Future represents a one-time asynchronous value, then a Stream represents a series of asynchronous values. Future is 1, while Stream is 0, 1, or N. The signature of Stream is as follows:

pub trait Stream {
    type Item;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
}

The concept of Stream corresponds to the Iterator in synchronous primitives. Recall how similar even their signatures are!

pub trait Iterator {
    type Item;

    fn next(&mut self) -> Option<Self::Item>;
}

Stream is used to abstract continuous data sources, although it can also end (when poll returns None)

A common example of a Stream is the consumer Receiver in the futures crate's message channel. Every time a message is sent from the Send side, the receiver gets a Some(val) value. Once the Send side is closed (dropped) and there are no more messages in the channel, it receives a None.

use futures::channel::mpsc;
use futures::{executor::block_on, SinkExt, StreamExt};

async fn send_recv() {
    const BUFFER_SIZE: usize = 10;
    let (mut tx, mut rx) = mpsc::channel::<i32>(BUFFER_SIZE);

    println!("tx: Send 1, 2");
    tx.send(1).await.unwrap();
    tx.send(2).await.unwrap();
    drop(tx);

    // `StreamExt::next` is similar to `Iterator::next`, but instead of returning a value,
    // it returns a `Future>`, so you need `.await` to get the actual value
    assert_eq!(Some(1), rx.next().await);
    assert_eq!(Some(2), rx.next().await);
    assert_eq!(None, rx.next().await);
}

fn main() {
    block_on(send_recv());
}

Differences Between Iterator and Stream

  • Iterator allows repeatedly calling the next() method to get new values until it returns None. Iterator is blocking: each call to next() occupies the CPU until a result is obtained. In contrast, the asynchronous Stream is non-blocking and yields the CPU while waiting.
  • Stream's poll_next() method is quite similar to Future's poll() method, and its function is akin to the next() method of Iterator. However, calling poll_next() directly is inconvenient because you need to manually handle the Poll state, which isn’t very ergonomic. That’s why Rust provides StreamExt, an extension trait for Stream, which offers a next() method that returns a Future implemented by the Next struct. This way, you can directly iterate over a value with stream.next().await.

Note: StreamExt stands for Stream Extension. In Rust, it's a common practice to keep the minimal trait definition (like Stream) in one file, and put additional APIs (like StreamExt) in a separate, related file.

Note: Unlike Future, the Stream trait is not yet in Rust’s core library (std::core). It resides in the futures-util crate, and StreamExtensions is also not part of the standard library. This means different libraries might provide conflicting imports. For example, Tokio provides its own StreamExt, separate from futures-util. If possible, prefer using futures-util, as it's the most commonly used crate for async/await.

Implementation of StreamExt's next() method and the Next struct:

pub trait StreamExt: Stream {
    fn next(&mut self) -> Next<'_, Self> where Self: Unpin {
        assert_future::<Option<Self::Item>, _>(Next::new(self))
    }
}

// `next` returns the `Next` struct
pub struct Next<'a, St: ?Sized> {
    stream: &'a mut St,
}

// If Stream is Unpin, then Next is also Unpin
impl<St: ?Sized + Unpin> Unpin for Next<'_, St> {}

impl<'a, St: ?Sized + Stream + Unpin> Next<'a, St> {
    pub(super) fn new(stream: &'a mut St) -> Self {
        Self { stream }
    }
}

// Next implements Future, each poll() is essentially polling from the stream via poll_next()
impl<St: ?Sized + Stream + Unpin> Future for Next<'_, St> {
    type Output = Option<St::Item>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        self.stream.poll_next_unpin(cx)
    }
}

Creating Streams

The futures library provides several convenient methods to create basic Streams, such as:

  • empty(): creates an empty Stream
  • once(): creates a Stream containing a single value
  • pending(): creates a Stream that never yields a value and always returns Poll::Pending
  • repeat(): creates a Stream that repeatedly yields the same value
  • repeat_with(): creates a Stream that lazily yields values via a closure
  • poll_fn(): creates a Stream from a closure that returns Poll
  • unfold(): creates a Stream from an initial state and a closure that returns a Future
use futures::prelude::*;

#[tokio::main]
async fn main() {
    let mut st = stream::iter(1..10)
        .filter(|x| future::ready(x % 2 == 0))
        .map(|x| x * x);

    // Iterate over the stream
    while let Some(x) = st.next().await {
        println!("Got item: {}", x);
    }
}

In the code above, stream::iter generates a Stream, which is then passed through filter and map operations. Finally, the stream is iterated, and the resulting data is printed.

When you’re not concerned with async/await and only care about the stream behavior, Stream::iter is quite handy for testing. Another interesting method is repeat_with, which lets you pass a closure to lazily generate values on demand, for example:

use futures::stream::{self, StreamExt};

// From the zeroth to the third power of two:
async fn stream_repeat_with() {
    let mut curr = 1;
    let mut pow2 = futures::stream::repeat_with(|| { let tmp = curr; curr *= 2; tmp });

    assert_eq!(Some(1), pow2.next().await);
    assert_eq!(Some(2), pow2.next().await);
    assert_eq!(Some(4), pow2.next().await);
    assert_eq!(Some(8), pow2.next().await);
}

Implementing a Stream

Creating your own Stream involves two steps:

  1. First, define a struct to hold the stream’s state
  2. Then, implement the Stream trait for that struct

Let’s create a stream called Counter that counts from 1 to 5:

#![feature(async_stream)]

// First, the struct:
/// A stream that counts from one to five
struct Counter {
    count: usize,
}

// We want the counter to start from one, so let’s add a `new()` method as a helper.
// This isn’t strictly necessary, but it’s convenient.
// Note that we start `count` from zero — the reason will be clear in the implementation of `poll_next()`.
impl Counter {
    fn new() -> Counter {
        Counter { count: 0 }
    }
}

// Then, we implement `Stream` for `Counter`:
impl Stream for Counter {
    // We’ll use `usize` for counting
    type Item = usize;

    // `poll_next()` is the only required method
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        // Increment the counter. That’s why we started from zero.
        self.count += 1;

        // Check if we've finished counting.
        if self.count < 6 {
            Poll::Ready(Some(self.count))
        } else {
            Poll::Ready(None)
        }
    }
}

Stream Traits

There are several traits related to streams in Rust, such as Stream, TryStream, and FusedStream.

  • Stream is very similar to Iterator. However, when it returns None, it signifies that the stream is exhausted and should no longer be polled. Continuing to poll a stream after it returns None leads to undefined behavior and may cause unpredictable results.

  • TryStream is a specialized trait for streams that yield Result items. TryStream provides functions that make it easy to match and transform the inner Results. You can think of it as an API designed for streams that produce Result items, making it more convenient to work with error-handling cases.

  • FusedStream is similar to a regular stream but adds the ability for users to know whether the stream is truly exhausted after returning None, or if it can be safely polled again. For example, if you’re creating a stream backed by a circular buffer, the stream might return None on the first iteration, but with FusedStream, it would be safe to poll again later to resume a new round of iteration over the buffer.

Iteration and Concurrency

Just like the Iterator trait, Stream also supports iteration. For example, you can use methods like map, filter, fold, for_each, skip, as well as their error-aware counterparts: try_map, try_filter, try_fold, try_for_each, and so on.

Unlike Iterator, however, for loops can’t be used directly to iterate over a Stream. Instead, imperative-style loops like while let or loop can be used, repeatedly calling next or try_next explicitly. For example, you can read from a stream in either of the following ways:

// Iteration pattern 1
while let Some(value) = s.next().await {}

// Iteration pattern 2
loop {
    match s.next().await {
        Some(value) => {}
        None => break;
    }
}

An example of computing the sum of values in a stream:

use futures_util::{pin_mut, Stream, stream, StreamExt};

async fn sum(stream: impl Stream<Item=usize>) -> usize {
    // Don’t forget to pin the stream before iteration
    pin_mut!(stream);
    let mut sum: usize = 0;
    // Iterate over the stream
    while let Some(item) = stream.next().await {
        sum = sum + item;
    }
    sum
}

If you process one value at a time, you might miss out on the benefits of concurrency, which defeats the purpose of asynchronous programming. To process multiple values concurrently from a Stream, you can use for_each_concurrent and try_for_each_concurrent:

use std::{pin::Pin, io};
use futures_util::{Stream, TryStreamExt};

async fn jump_around(stream: Pin<&mut dyn Stream<Item = Result<i32, io::Error>>>) -> Result<(), io::Error> {
    // Use `try_for_each_concurrent`
    stream.try_for_each_concurrent(100, |num| async move {
        jump_n_times(num).await?;
        report_n_jumps(num).await?;
        Ok(())
    }).await?;

    Ok(())
}

async fn jump_n_times(num: i32) -> Result<(), io::Error> {
    println!("jump_n_times :{}", num + 1);
    Ok(())
}
async fn report_n_jumps(num: i32) -> Result<(), io::Error> {
    println!("report_n_jumps : {}", num);
    Ok(())
}

Summary

Stream is similar to Future, but while Future represents the state change of a single item, Stream behaves more like an Iterator that can yield multiple values before completion. Or put simply: a Stream consists of a series of Futures, and we can retrieve the result of each Future from the Stream until it finishes—making it an asynchronous iterator.

The poll_next function of a Stream can return one of three possible values:

  • Poll::Pending: indicates that the next value is not ready yet and we still need to wait.
  • Poll::Ready(Some(val)): indicates that a value is ready and has been successfully returned; you can call poll_next again to retrieve the next one.
  • Poll::Ready(None): indicates that the stream has ended and poll_next should no longer be called.

We are Leapcell, your top choice for hosting Rust projects.

Leapcell

Leapcell is the Next-Gen Serverless Platform for Web Hosting, Async Tasks, and Redis:

Multi-Language Support

  • Develop with Node.js, Python, Go, or Rust.

Deploy unlimited projects for free

  • pay only for usage — no requests, no charges.

Unbeatable Cost Efficiency

  • Pay-as-you-go with no idle charges.
  • Example: $25 supports 6.94M requests at a 60ms average response time.

Streamlined Developer Experience

  • Intuitive UI for effortless setup.
  • Fully automated CI/CD pipelines and GitOps integration.
  • Real-time metrics and logging for actionable insights.

Effortless Scalability and High Performance

  • Auto-scaling to handle high concurrency with ease.
  • Zero operational overhead — just focus on building.

Explore more in the Documentation!

Try Leapcell

Follow us on X: @LeapcellHQ

Read on our blog