পাঠ ২১.২

Single থেকে Multithreaded server-এ

Turning Our Single-Threaded Server into a Multithreaded Server

Single-threaded server-এর সমস্যা — একটা slow request পুরো server-কে block করে দেয়। সমাধান — thread pool: fixed সংখ্যক thread, queue থেকে job নিয়ে কাজ করে। এই পাঠে আমরা ধাপে ধাপে নিজের ThreadPool implement করব — Worker, Job, mpsc, আর Arc<Mutex>

সমস্যা simulate করা

একটা /sleep route বানিয়ে দেখি — slow request এলে অন্য request কেমন wait করে:

src/main.rsrust
use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        handle_connection(stream);
    }
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}

এখন browser-এ এক tab-এ /sleep open করো — সেটা ৫ second wait করছে। তারপর আরেক tab-এ / খুলো — সেটাও wait করে! কারণ single thread, sequentially handle।

প্রতিটা request-এ নতুন thread? না।

সমাধান হিসেবে প্রতি request-এ thread::spawn করতেও পারি — কিন্তু attacker হাজার হাজার request পাঠালে server সব thread spawn করে memory exhaust হবে। তাই fixed-size thread pool

আদর্শ API আগে design

Compiler-driven development — যেমন API চাই সেটা আগে লিখো, তারপর compiler-এর error থেকে implementation:

src/main.rsrust
fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }
}

ThreadPool::new(4) চারটা thread; execute closure নেয়।

Step 1 — খালি ThreadPool

src/lib.rsrust
pub struct ThreadPool;

impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        ThreadPool
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

Trait bound গুলো thread::spawn-এর signature থেকে নেওয়া: FnOnce() (একবারই execute), Send (thread-এ transfer), 'static (lifetime অজানা)।

Step 2 — input validate

pub struct ThreadPool;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        ThreadPool
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

Step 3 — Worker struct

আমরা সরাসরি JoinHandle store না করে একটা Worker struct বানাই — id সহ:

use std::thread;

pub struct ThreadPool {
    workers: Vec<Worker>,
}

impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id));
        }

        ThreadPool { workers }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize) -> Worker {
        let thread = thread::spawn(|| {});

        Worker { id, thread }
    }
}

Vec::with_capacity — known size, allocation efficient।

Step 4 — channel দিয়ে job পাঠানো

use std::{sync::mpsc, thread};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id));
        }

        ThreadPool { workers, sender }
    }
    // --snip--
}

এক sender, এক receiver। কিন্তু একটা সমস্যা — Receiver-কে multiple worker-এর মধ্যে share করতে হবে। সরাসরি pass করতে গেলে first iteration-এ ownership move হয়ে যায়:

won't compilerust
for id in 0..size {
    workers.push(Worker::new(id, receiver));  // moved on first iteration
}

Step 5 — Arc<Mutex<Receiver>>

মাল্টিপল thread-এ একই Receiver share করতে হলে দু'টা জিনিস দরকার:

  • Arc — multiple ownership, atomic refcount।
  • Mutex — একসাথে শুধু একজন read করবে (যাতে job duplicate না হয়)।
use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }
    // --snip--
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker { id, thread }
    }
}

Step 6 — Job type এবং execute

Closure-এর exact type জানা যায় না, তাই trait object: Box<dyn FnOnce() + Send + 'static>। এটা Job alias-এ:

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

Step 7 — Worker-এ loop

Worker thread infinitely loop — channel থেকে job receive করে, execute করে:

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let job = receiver.lock().unwrap().recv().unwrap();

                println!("Worker {id} got a job; executing.");

                job();
            }
        });

        Worker { id, thread }
    }
}

receiver.lock() — Mutex acquire (MutexGuard ফেরত), recv() — channel থেকে job, job() — closure call।

কেন while let না?

এই version লোভনীয় কিন্তু ভুল:

anti-patternrust
impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            while let Ok(job) = receiver.lock().unwrap().recv() {
                println!("Worker {id} got a job; executing.");

                job();
            }
        });

        Worker { id, thread }
    }
}

সমস্যা — while let-এর temporary MutexGuard পুরো block (job() সহ) শেষ না হওয়া পর্যন্ত drop হয় না। ফলে অন্য worker lock পায় না, concurrency নষ্ট!

আগের let job = ... version-এ semicolon-এর সাথে সাথে guard drop — job execute হওয়ার আগেই lock release হয়। সেটাই সঠিক pattern।

পুরো lib.rs একসাথে

src/lib.rsrust
use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let job = receiver.lock().unwrap().recv().unwrap();

                println!("Worker {id} got a job; executing.");

                job();
            }
        });

        Worker { id, thread }
    }
}

Test

$ cargo run
   Compiling hello v0.1.0
    Finished `dev` profile
     Running `target/debug/hello`
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.

এক tab-এ /sleep, আরেক tab-এ / — দ্বিতীয় সাথে সাথে respond, কারণ আলাদা worker সেটা handle করছে।

এই পাঠ থেকে যা শিখলে

  • Thread pool — fixed worker, কাজ-এর queue, DoS থেকে protection।
  • Compiler-driven development — আগে desired API, তারপর implementation।
  • FnOnce + Send + 'static — closure যেকোনো thread-এ একবার চালানোর জন্য typical bound।
  • Box<dyn FnOnce()> trait object — heterogeneous closure store/transfer।
  • Arc<Mutex<Receiver>> — single receiver multiple thread-এ share করার pattern।
  • let vs while let — temporary MutexGuard-এর lifetime concurrency-তে গুরুত্বপূর্ণ।