পাঠ ১৭.২

Async দিয়ে concurrency apply করা

Applying Concurrency with Async

Chapter 16-এ thread দিয়ে যা যা করেছি — multiple task একসাথে চালানো, message passing — async দিয়েও সেগুলো করা যায়। API অনেকটা একই দেখাবে, কিন্তু আচরণে কিছু পার্থক্য আছে। এই পাঠে আমরা spawn_task, join, async channel, এবং multiple producer pattern দেখব।

spawn_task দিয়ে নতুন task

thread::spawn-এর async equivalent হলো trpl::spawn_task। এটা একটা future নেয়, runtime-কে দেয় background-এ চালানোর জন্য:

src/main.rsrust
use std::time::Duration;

fn main() {
    trpl::block_on(async {
        trpl::spawn_task(async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        });

        for i in 1..5 {
            println!("hi number {i} from the second task!");
            trpl::sleep(Duration::from_millis(500)).await;
        }
    });
}
hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!

সমস্যাটা thread-এর মতোই — main task শেষ হলে spawned task মাঝপথে বন্ধ। আমরা চাই first task পুরোপুরি ১ থেকে ৯ পর্যন্ত print করুক।

JoinHandle await করা

spawn_task একটা handle return করে — সেটা .await করলে task complete না হওয়া পর্যন্ত wait হয়:

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let handle = trpl::spawn_task(async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        });

        for i in 1..5 {
            println!("hi number {i} from the second task!");
            trpl::sleep(Duration::from_millis(500)).await;
        }

        handle.await.unwrap();
    });
}

এখন spawned task পুরোটাই চলে — output-এ ১ থেকে ৯ সব আসে।

join — fair concurrency

আসলে এখানে আলাদা task spawn করার দরকার নেই। দু'টো async block বানিয়ে trpl::join দিয়ে দু'টোকেই একসাথে progress করানো যায়:

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let fut1 = async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let fut2 = async {
            for i in 1..5 {
                println!("hi number {i} from the second task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        trpl::join(fut1, fut2).await;
    });
}
hi number 1 from the first task!
hi number 1 from the second task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!

trpl::join fair — অর্থাৎ প্রতিটা future-কে equally check করে, alternate করে। তাই output predictable। Thread-এ OS scheduler ঠিক করে কে আগে চলবে, এখানে runtime-ই সেটা নিয়ন্ত্রণ করে।

Async channel — message passing

Chapter 16-এর mpsc channel-এর async version:

fn main() {
    trpl::block_on(async {
        let (tx, mut rx) = trpl::channel();

        let val = String::from("hi");
        tx.send(val).unwrap();

        let received = rx.recv().await.unwrap();
        println!("received '{received}'");
    });
}

Thread version-এর সাথে কয়েকটা পার্থক্য:

  • rx mutable হতে হবে।
  • recv একটা future return করে — .await করতে হয়।
  • send block করে না — তাই await দরকার নেই।
  • Channel unbounded — capacity-র সীমা নেই।

Multiple message + sleep

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let (tx, mut rx) = trpl::channel();

        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("future"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            trpl::sleep(Duration::from_millis(500)).await;
        }

        while let Some(value) = rx.recv().await {
            println!("received '{value}'");
        }
    });
}

এই code চালালে দেখবে — সব message আগে পাঠানো হয়, তারপর সব receive। কারণ একটা single async block-এর code linearly execute হয়, await-এর order মেনে। Concurrent পেতে গেলে দু'টো আলাদা async block লাগবে।

আলাদা block-এ send আর recv

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let (tx, mut rx) = trpl::channel();

        let tx_fut = async {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        trpl::join(tx_fut, rx_fut).await;
    });
}

এখন প্রতিটা message পাঠানোর পরপরই receive হয়। কিন্তু একটা সমস্যা — program কখনো শেষ হয় না! কারণ tx_fut শেষ হলেও tx drop হয় না (outer scope-এ ধরা আছে), তাই rx.recv() চিরকাল wait করতে থাকে।

async move দিয়ে ownership

সমাধান — tx-এর ownership async block-এ move করো, যেন block শেষ হলে drop হয়:

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let (tx, mut rx) = trpl::channel();

        let tx_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        trpl::join(tx_fut, rx_fut).await;
    });
}

এখন tx_fut শেষ হলে tx drop, channel close, recv None দেয়, loop শেষ — program gracefully terminate।

Multiple producer + join! macro

trpl::join ঠিক দু'টো future নেয়। তিন বা তার বেশি হলে join! macro:

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let (tx, mut rx) = trpl::channel();

        let tx1 = tx.clone();
        let tx1_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx1.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        let tx_fut = async move {
            let vals = vec![
                String::from("more"),
                String::from("messages"),
                String::from("for"),
                String::from("you"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(1500)).await;
            }
        };

        trpl::join!(tx1_fut, tx_fut, rx_fut);
    });
}
received 'hi'
received 'more'
received 'from'
received 'the'
received 'messages'
received 'future'
received 'for'
received 'you'

tx.clone() দিয়ে আরেকটা sender, দু'টো task আলাদা interval-এ message পাঠায়। join! macro arbitrary সংখ্যক future সামলাতে পারে (যতক্ষণ সংখ্যা compile time-এ জানা)।

এই পাঠ থেকে যা শিখলে

  • trpl::spawn_task — async task spawn; handle .await দিয়ে join।
  • trpl::join(a, b) — দু'টো future-কে fair concurrency-তে চালায়; output predictable।
  • Async channelrecv async, send non-blocking, receiver mutable।
  • একটা async block-এর ভিতরে code linear; concurrency পেতে আলাদা block লাগে।
  • async move দিয়ে ownership transfer — sender drop ঠিকঠাক করতে জরুরি।
  • tx.clone() + join! macro — multiple producer pattern।