পাঠ ১৬.২

Message Passing দিয়ে thread-এর মধ্যে data পাঠানো

Using Message Passing to Transfer Data Between Threads

নিরাপদ concurrency-র একটা জনপ্রিয় approach — message passing। Thread-গুলো একে অপরের memory share না করে, বরং channel-এর মাধ্যমে message পাঠায়। Go ভাষার slogan এখানে চমৎকার — "Do not communicate by sharing memory; instead, share memory by communicating।"

Channel কী

Channel-এর দু'টা মাথা — উপরে transmitter (যেখান দিয়ে পাঠাও) আর নিচে receiver (যেখানে আসে)। যেকোনো একটা মাথা drop হয়ে গেলে channel close বলা হয়।

Standard library-র std::sync::mpsc module এই সুবিধা দেয়। mpsc মানে multiple producer, single consumer — অনেক transmitter, কিন্তু receiver একটাই।

src/main.rsrust
use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();
}

mpsc::channel() একটা tuple return করে — tx (transmitter) আর rx (receiver)। নাম-গুলো convention।

প্রথম message পাঠানো

src/main.rsrust
use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });

    let received = rx.recv().unwrap();
    println!("Got: {received}");
}

Spawned thread-এ tx-এর ownership move করছি। ভেতরে tx.send(val) message পাঠায়; send একটা Result return করে — receiver drop হয়ে গেলে error। এখানে unwrap

Main thread-এ rx.recv() — block করে অপেক্ষা করে যতক্ষণ না value আসে। চলে এলে Result return করে।

Got: hi

recv বনাম try_recv

  • recv() — blocking; value না আসা পর্যন্ত অপেক্ষা।
  • try_recv() — non-blocking; সঙ্গে সঙ্গে Ok(value) বা Err। Thread-এর অন্য কাজও থাকলে এটা।

Ownership channel-এ

send তার argument-এর ownership নিয়ে নেয়। তাই send-এর পরে আর use করা যাবে না — ownership rule এখানেও data race থেকে রক্ষা করছে:

src/main.rsrust
use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
        println!("val is {val}");
    });

    let received = rx.recv().unwrap();
    println!("Got: {received}");
}
compile errortext
error[E0382]: borrow of moved value: `val`
  --> src/main.rs:10:27
   |
8  |         let val = String::from("hi");
   |             --- move occurs because `val` has type `String`
9  |         tx.send(val).unwrap();
   |                 --- value moved here
10 |         println!("val is {val}");
   |                           ^^^ value borrowed here after move

ভেবে দেখো — receiver thread already val modify করতে পারে; sender thread একই সময়ে সেটা use করলে inconsistent। Rust সেই সুযোগই দেয় না।

একসাথে অনেকগুলো value

Receiver-কে iterator-এর মতো ব্যবহার করা যায় — channel close হলে iteration শেষ:

src/main.rsrust
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {received}");
    }
}
Got: hi
Got: from
Got: the
Got: thread

Spawned thread প্রতিটা value পাঠানোর পরে এক সেকেন্ড করে ঘুমায়, তাই receiver-এ একটু একটু করে আসে।

Multiple producer — tx clone

mpsc-এ একাধিক producer থাকতে পারে। শুধু tx.clone() দিয়ে আরেকটা transmitter তৈরি করো:

src/main.rsrust
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    let tx1 = tx.clone();
    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx1.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    thread::spawn(move || {
        let vals = vec![
            String::from("more"),
            String::from("messages"),
            String::from("for"),
            String::from("you"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {received}");
    }
}
Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you

Order non-deterministic — system-এর timing-এর উপর নির্ভর করে। Sleep duration বদলে দিলে অন্য রকম interleaving দেখবে।

এই পাঠ থেকে যা শিখলে

  • Channel-এর দুই মাথা — transmitter আর receiver; mpsc::channel() দু'টোই দেয়।
  • tx.send(val) ownership transfer করে; rx.recv() block করে অপেক্ষা।
  • try_recv non-blocking — অন্য কাজের পাশাপাশি poll।
  • Receiver iterator-এর মতো — channel close না হওয়া পর্যন্ত value আসতে থাকে।
  • tx.clone() দিয়ে multiple producer; receiver একটাই।
  • Ownership rule message passing-কে compile time-এই data race থেকে মুক্ত রাখে।