পাঠ ১৭.৪

Stream: Future-এর sequence

Streams: Futures in Sequence

আগের পাঠে async channel-এর recv ব্যবহার করেছি — সময়ের সাথে সাথে একের পর এক item। এই pattern-টাই হলো Stream। Queue-তে item আসছে, file থেকে chunk-by-chunk read হচ্ছে, network থেকে data trickle করে আসছে — সব stream। Stream হলো async iterator।

Iterator vs Stream

Chapter 13-এ Iterator দেখেছি — synchronous next। Async channel-এর receiver-এ recv ছিল asynchronous। দু'টো API-এর pattern প্রায় একই — দু'টো জায়গাতেই sequence-এর next item চাইছি, পার্থক্য শুধু:

  • Time — Iterator synchronous, stream asynchronous।
  • API — Iterator-এর next blocking নয়, stream-এর next future return করে।

Rust-এ যেকোনো iterator থেকে stream বানানো যায়।

Iterator থেকে stream

src/main.rsrust
fn main() {
    trpl::block_on(async {
        let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
        let iter = values.iter().map(|n| n * 2);
        let mut stream = trpl::stream_from_iter(iter);

        while let Some(value) = stream.next().await {
            println!("The value was: {value}");
        }
    });
}

ধাপগুলো:

  • একটা array থেকে iterator, .map দিয়ে value double।
  • trpl::stream_from_iter — iterator কে stream-এ convert।
  • while let Some(value) = stream.next().await — async iteration।

কিন্তু এই code এখনও compile করবে না।

Compile error — trait scope-এ নেই

compile errortext
error[E0599]: no method named `next` found for struct `tokio_stream::iter::Iter` in the current scope
  --> src/main.rs:10:40
   |
10 |         while let Some(value) = stream.next().await {
   |                                        ^^^^
   |
   = help: items from traits can only be used if the trait is in scope
help: the following traits which provide `next` are implemented but not in scope; perhaps you want to import one of them
   |
1  + use crate::trpl::StreamExt;
   |
1  + use futures_util::stream::stream::StreamExt;
   |
1  + use std::iter::Iterator;
   |
1  + use std::str::pattern::Searcher;
   |
help: there is a method `try_next` with a similar name
   |
10 |         while let Some(value) = stream.try_next().await {
   |                                        ~~~~~~~~

Compiler বলছে — next method-টা যেই trait-এ আছে সেটা scope-এ নেই। তুমি ভাবতে পারো এটা Stream trait হবে — কিন্তু আসলে এটা StreamExt

Stream vs StreamExt

  • Stream — low-level interface; IteratorFuture-কে effectively combine করে।
  • StreamExt — Stream-এর উপর high-level API set, যেমন next। "Ext" মানে extension — Rust community-র common pattern, base trait-কে utility method দিয়ে extend করে।

আপাতত Stream এবং StreamExt দু'টোর কোনোটাই standard library-তে নেই, কিন্তু ecosystem crate (যেমন futures) মোটামুটি একই definition use করে।

Fix — StreamExt import

src/main.rsrust
use trpl::StreamExt;

fn main() {
    trpl::block_on(async {
        let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
        let iter = values.iter().map(|n| n * 2);
        let mut stream = trpl::stream_from_iter(iter);

        while let Some(value) = stream.next().await {
            println!("The value was: {value}");
        }
    });
}

এক লাইন use trpl::StreamExt; add করলেই compile করে। এখন StreamExt-এর সব utility method (filter, map, take, ইত্যাদি — যা iterator-এ আছে তার async equivalent) ব্যবহার করা যাবে।

কেন stream দরকার

Stream future, তাই অন্য future-এর সাথে combine করা যায় — যেমন:

  • অনেক network call avoid করার জন্য event-গুলো batch করা।
  • Long-running operation-এর sequence-এ timeout বসানো।
  • UI event-গুলো throttle করে অপ্রয়োজনীয় কাজ বাঁচানো।

এই পাঠ থেকে যা শিখলে

  • Stream — সময়ের সাথে item return করা async sequence; iterator-এর async version।
  • trpl::stream_from_iter — যেকোনো iterator কে stream-এ convert।
  • while let Some(v) = stream.next().await — async iteration pattern।
  • next method Stream-এ না, StreamExt extension trait-এ — তাই import দরকার।
  • Stream-গুলো future-ও — অন্য future-এর সাথে select/join/timeout-এ ব্যবহার করা যায়।