পাঠ ১৭.৩

একাধিক Future নিয়ে কাজ

Working With Any Number of Futures

Async-এ যেহেতু সব future cooperatively চলে — অর্থাৎ runtime control নিতে পারে শুধু .await point-এ — তখন একটা future যদি দীর্ঘ synchronous কাজ করে await ছাড়াই, অন্যরা না খেয়েই বসে থাকবে। এই পাঠে আমরা দেখব কখন yield_now দরকার, এবং কীভাবে select ব্যবহার করে নিজেরাই async abstraction (যেমন timeout) বানানো যায়।

Runtime-কে control return করা

Rust async block-গুলোকে pause করে runtime-কে control ফিরিয়ে দেয় শুধু await point-এ। দু'টো await-এর মাঝে যা আছে — সেটা synchronous, runtime সেখানে interrupt করতে পারে না।

চলো একটা synchronous slow operation simulate করি thread::sleep দিয়ে (এটা thread-কে block করে, runtime-কে নয়):

use std::{thread, time::Duration};

fn main() {
    trpl::block_on(async {
        // We will call `slow` here later
    });
}

fn slow(name: &str, ms: u64) {
    thread::sleep(Duration::from_millis(ms));
    println!("'{name}' ran for {ms}ms");
}

Starvation দেখা

use std::{thread, time::Duration};

fn main() {
    trpl::block_on(async {
        let a = async {
            println!("'a' started.");
            slow("a", 30);
            slow("a", 10);
            slow("a", 20);
            trpl::sleep(Duration::from_millis(50)).await;
            println!("'a' finished.");
        };

        let b = async {
            println!("'b' started.");
            slow("b", 75);
            slow("b", 10);
            slow("b", 15);
            slow("b", 350);
            trpl::sleep(Duration::from_millis(50)).await;
            println!("'b' finished.");
        };

        trpl::select(a, b).await;
    });
}

fn slow(name: &str, ms: u64) {
    thread::sleep(Duration::from_millis(ms));
    println!("'{name}' ran for {ms}ms");
}
'a' started.
'a' ran for 30ms
'a' ran for 10ms
'a' ran for 20ms
'b' started.
'b' ran for 75ms
'b' ran for 10ms
'b' ran for 15ms
'b' ran for 350ms
'a' finished.

লক্ষ্য করো — a-এর সব slow call শেষ না হওয়া পর্যন্ত b শুরুই হয়নি। কারণ a-এর await point আসেইনি — তিনটা synchronous slow-এর মাঝে runtime-এর কিছু করার নেই।

await দিয়ে মাঝে break

প্রতিটা slow-এর পরে একটু trpl::sleep await করি, যেন runtime অন্য future-কে চলার সুযোগ দিতে পারে:

use std::{thread, time::Duration};

fn main() {
    trpl::block_on(async {
        let one_ms = Duration::from_millis(1);

        let a = async {
            println!("'a' started.");
            slow("a", 30);
            trpl::sleep(one_ms).await;
            slow("a", 10);
            trpl::sleep(one_ms).await;
            slow("a", 20);
            trpl::sleep(one_ms).await;
            println!("'a' finished.");
        };

        let b = async {
            println!("'b' started.");
            slow("b", 75);
            trpl::sleep(one_ms).await;
            slow("b", 10);
            trpl::sleep(one_ms).await;
            slow("b", 15);
            trpl::sleep(one_ms).await;
            slow("b", 350);
            trpl::sleep(one_ms).await;
            println!("'b' finished.");
        };

        trpl::select(a, b).await;
    });
}

fn slow(name: &str, ms: u64) {
    thread::sleep(Duration::from_millis(ms));
    println!("'{name}' ran for {ms}ms");
}
'a' started.
'a' ran for 30ms
'b' started.
'b' ran for 75ms
'a' ran for 10ms
'b' ran for 10ms
'a' ran for 20ms
'b' ran for 15ms
'a' finished.

এখন a আর b alternate করছে। কিন্তু 1ms-এর জন্যেও sleep করানো আসলে একটু overhead — timer setup ইত্যাদি।

yield_now — clean ভাবে control হস্তান্তর

শুধু runtime-কে control return করার জন্য trpl::yield_now better — কোনো timer overhead নেই:

use std::{thread, time::Duration};

fn main() {
    trpl::block_on(async {
        let a = async {
            println!("'a' started.");
            slow("a", 30);
            trpl::yield_now().await;
            slow("a", 10);
            trpl::yield_now().await;
            slow("a", 20);
            trpl::yield_now().await;
            println!("'a' finished.");
        };

        let b = async {
            println!("'b' started.");
            slow("b", 75);
            trpl::yield_now().await;
            slow("b", 10);
            trpl::yield_now().await;
            slow("b", 15);
            trpl::yield_now().await;
            slow("b", 350);
            trpl::yield_now().await;
            println!("'b' finished.");
        };

        trpl::select(a, b).await;
    });
}

fn slow(name: &str, ms: u64) {
    thread::sleep(Duration::from_millis(ms));
    println!("'{name}' ran for {ms}ms");
}

এটা cooperative multitasking-এর মূল ধারণা — প্রতিটা future নিজেই ঠিক করে কখন control ছেড়ে দেবে। তবে সবসময় প্রতিটা line-এর পরে yield করার দরকার নেই — strategic জায়গায় break point রাখাই পারফরম্যান্সের জন্য ভালো।

নিজের timeout abstraction

এমন একটা timeout চাই যেটা ৫ second-এর slow operation-কে ২ second-এর বেশি wait করতে দেবে না:

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let slow = async {
            trpl::sleep(Duration::from_secs(5)).await;
            "Finally finished"
        };

        match timeout(slow, Duration::from_secs(2)).await {
            Ok(message) => println!("Succeeded with '{message}'"),
            Err(duration) => {
                println!("Failed after {} seconds", duration.as_secs())
            }
        }
    });
}

Signature

API design:

  • Async function — যাতে await করা যায়।
  • Generic — যেকোনো future গ্রহণ করতে হবে।
  • Duration param — সর্বোচ্চ wait।
  • Result return — Ok-এ future-এর output, Err-এ overflow duration।
async fn timeout<F: Future>(
    future_to_try: F,
    max_time: Duration,
) -> Result<F::Output, Duration> {
    // Here is where our implementation will go!
}

select + sleep দিয়ে implementation

use std::time::Duration;

use trpl::Either;

async fn timeout<F: Future>(
    future_to_try: F,
    max_time: Duration,
) -> Result<F::Output, Duration> {
    match trpl::select(future_to_try, trpl::sleep(max_time)).await {
        Either::Left(output) => Ok(output),
        Either::Right(_) => Err(max_time),
    }
}
Failed after 2 seconds

কয়েকটা গুরুত্বপূর্ণ point:

  • trpl::select fair নয়join-এর মতো প্রতিটাকে equally check করে না, argument-এর order-এই poll করে। তাই future_to_try-কে আগে রাখি, যাতে সেটা ready হলেই Left ধরা পড়ে।
  • Either::Left — original future আগে শেষ → Ok।
  • Either::Right — timer আগে শেষ → Err।

লক্ষ্য করো — আমরা একটাও নতুন low-level primitive লিখিনি। শুধু select আর sleep-কে compose করে নতুন abstraction। এটাই async-এর শক্তি।

কখন কোনটা — practical advice

  • Daily code-এ সরাসরি async / .await ব্যবহার করো।
  • Coordination দরকার হলে select, join, join!
  • Compute-bound part থাকলে strategically yield_now বসাও — কিন্তু measure করে।
  • প্রতিটা line-এ yield-await অপ্রয়োজনীয় overhead — state machine বড় হয়।

এই পাঠ থেকে যা শিখলে

  • Async block-গুলো .await point ছাড়া যেতে পারে না — long synchronous কাজ অন্যদের starve করে।
  • trpl::yield_now — runtime-কে control ফিরিয়ে দেওয়ার সবচেয়ে সহজ ও সস্তা উপায়।
  • trpl::select ordered — argument-এর order-এ poll, fair না।
  • Future + select compose করে নিজে timeout-এর মতো abstraction বানানো যায়।
  • Generic F: Future, F::Output associated type — যেকোনো future সহ কাজ।