পাঠ ২১.৩

Graceful Shutdown এবং Cleanup

Graceful Shutdown and Cleanup

আগের পাঠের server চলছে ঠিকই, কিন্তু Ctrl-C করলে সব thread আকস্মিকভাবে বন্ধ — চলমান request মাঝপথে কাটা পড়ে। ঠিক হবে — server বন্ধ হওয়ার সময় চলমান কাজ শেষ করে, তারপর gracefully exit। এই পাঠে Drop trait দিয়ে cleanup, channel close করে worker-কে stop signal, আর Option::take দিয়ে ownership handle।

প্রথম attempt — Drop implement

ThreadPool drop হলে প্রতিটা worker-এর thread-এ join করতে চাই — মানে worker thread শেষ না হওয়া পর্যন্ত wait:

won't compilerust
impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            worker.thread.join().unwrap();
        }
    }
}

Compiler error — JoinHandle::join argument-এর ownership নেয়; কিন্তু &mut-এর ভিতর থেকে move করা যাবে না।

সমাধান — Vec::drain

drain Vec থেকে সব element move করে নিয়ে আসে — ownership transfer হয়:

impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in self.workers.drain(..) {
            println!("Shutting down worker {}", worker.id);

            worker.thread.join().unwrap();
        }
    }
}

কিন্তু এতে আরেকটা সমস্যা — worker-এর loop infinite, কোনো signal ছাড়া কখনো শেষ হবে না। তাই join চিরকাল wait করবে।

Worker-কে stop signal পাঠানো

Idea — sender drop করলে channel close হয়, তখন recv() Err দেয়। সেটা পেলেই worker loop break করবে।

কিন্তু ThreadPool-এ sender field থাকলে, drop-এর সময় সেটা নিজেই pool-এর সাথে drop হবে — manually আগে drop করা যাবে না। সমাধান — Option-এ wrap, যাতে take() করা যায়।

struct update

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: Option<mpsc::Sender<Job>>,
}

new() update

pub fn new(size: usize) -> ThreadPool {
    assert!(size > 0);

    let (sender, receiver) = mpsc::channel();

    let receiver = Arc::new(Mutex::new(receiver));

    let mut workers = Vec::with_capacity(size);

    for id in 0..size {
        workers.push(Worker::new(id, Arc::clone(&receiver)));
    }

    ThreadPool {
        workers,
        sender: Some(sender),
    }
}

execute() update

pub fn execute<F>(&self, f: F)
where
    F: FnOnce() + Send + 'static,
{
    let job = Box::new(f);

    self.sender.as_ref().unwrap().send(job).unwrap();
}

as_ref()Option<Sender> থেকে Option<&Sender>; pool চালু থাকা অবস্থায় এটা সবসময় Some

Worker-এ Option<JoinHandle>

একই pattern Worker-এর thread-এ — কারণ drop-এর সময় ownership take করে join করতে চাই:

struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}

Worker::new — recv-এ match

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let message = receiver.lock().unwrap().recv();

                match message {
                    Ok(job) => {
                        println!("Worker {id} got a job; executing.");

                        job();
                    }
                    Err(_) => {
                        println!("Worker {id} disconnected; shutting down.");
                        break;
                    }
                }
            }
        });

        Worker {
            id,
            thread: Some(thread),
        }
    }
}

আগের recv().unwrap()-এর জায়গায় match। Channel close হলে Err আসে, worker break করে — clean exit।

Drop impl চূড়ান্ত

impl Drop for ThreadPool {
    fn drop(&mut self) {
        drop(self.sender.take());

        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

ক্রম গুরুত্বপূর্ণ:

  • প্রথমে self.sender.take() — option থেকে sender বের করে আনে, তারপর drop() দিয়ে drop। ফলে channel close, worker-এরা Err পেয়ে break হয়।
  • তারপর প্রতিটা worker-এর thread take().join() — wait এবং clean।

Test — দু'টা request পরে shutdown

Demo-র জন্য main-এ take(2) বসাই — শুধু দু'টা request handle করে server বন্ধ:

src/main.rsrust
use hello::ThreadPool;
use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming().take(2) {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }

    println!("Shutting down.");
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}
$ cargo run
Worker 0 got a job; executing.
Shutting down.
Shutting down worker 0
Worker 3 got a job; executing.
Worker 1 disconnected; shutting down.
Worker 2 disconnected; shutting down.
Worker 3 disconnected; shutting down.
Worker 0 disconnected; shutting down.
Shutting down worker 1
Shutting down worker 2
Shutting down worker 3

সব worker disconnect message print করে, তারপর pool-ের drop-এ একে একে join — graceful shutdown complete।

আরও কী করা যায়

  • ThreadPool এবং তার public method-এ আরও documentation।
  • Library-র জন্য test লেখা।
  • unwrap-এর জায়গায় robust error handling।
  • ThreadPool-কে শুধু web server না, অন্য task-এর জন্যেও ব্যবহার।
  • crates.io-তে existing thread pool crate দেখে compare — তারা কী আলাদা করেছে।

পুরো book শেষ — অভিনন্দন!

এই project-এ চ্যাপ্টার ১৬-এর thread, mpsc, Mutex, Arc; চ্যাপ্টার ১৭-এর channel ধারণা; চ্যাপ্টার ১৮-এর trait object, FnOnce closure; চ্যাপ্টার ৬-এর Option/match; চ্যাপ্টার ২০-এর Drop trait — সবকিছু একসাথে এসে পড়েছে। বইয়ের শুরু থেকে যা যা শিখেছ, তার সবকিছু নিয়ে এই server। এবার নিজে কিছু বানানো শুরু করো!

এই পাঠ থেকে যা শিখলে

  • Graceful shutdown — চলমান কাজ শেষ করে exit; abrupt termination-এর বিপরীত।
  • Drop trait দিয়ে cleanup logic; Vec::drain দিয়ে &mut থেকে ownership move।
  • Channel close করার trick — sender drop করলে recv Err দেয়।
  • Option<T> + take() — partial ownership transfer pattern।
  • JoinHandle::join ownership-নির্ভর — তাই Option<JoinHandle>
  • Worker-এ recv-এর result-এ match — Ok-এ kaj, Err-এ exit।