使用 Rust 建立網頁伺服器(三):關機與清理

前言

本文為「The Rust Programming Language」語言指南的學習筆記。

做法

使用執行緒池可以來同時回應多重請求,但警告顯示 workersidthread 欄位沒有被直接使用,這提醒目前尚未清理所有內容。當使用比較不優雅的 ctrl-c 方式來中斷主執行緒時,所有其他執行緒也會立即停止,不管它們是否正在處理請求。

現在要實作 Drop 特徵來對池中的每個執行緒呼叫 join 方法,讓它們能在關閉前把任務處理完畢。然後要告訴執行緒它們該停止接收新的請求並關閉。為了觀察此程式碼的實際運作,以下會修改伺服器讓它在正常關機(graceful shutdown)前,只接收兩個請求。

實作 Drop 特徵

先對執行緒池實作 Drop 特徵。當池被釋放時,執行緒都該加入(join)回來以確保它們有完成它們的工作。首先,遍歷執行緒池中的每個 workers。對此使用 &mut 是因為 self 是個可變引用,而且我們也需要能夠改變 worker。對每個工作者印出訊息來說明此工作者正要關閉,然後對工作者的執行緒呼叫 join。如果 join 的呼叫失敗的話,我們使用 unwrap 來讓 Rust 恐慌,使其變成較不正常的關機方式。

1
2
3
4
5
6
7
8
9
impl Drop for ThreadPool {
fn drop(&mut self) {
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);

worker.thread.join().unwrap();
}
}
}

目前並無法呼叫 join,因為我們只有每個 worker 的可變借用,而 join 會取走其引數的所有權。要解決此問題,需要將 thread 中的執行緒移出 Worker 實例,讓 join 可以消耗該執行緒。如果 Worker 改持有 Option<thread::JoinHandle<()>> 的話,可以對 Option 呼叫 take 方法來移動 Some 變體中的數值,並在原處留下 None 變體。換句話說,thread 中有 Some 變體的話就代表 Worker 正在執行,而當我們清理 Worker 時,我們會將 Some 換成 None 來讓 Worker 沒有任何執行緒可以執行。

更新 Worker 的定義如以下。

1
2
3
4
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}

當建立新的 Worker,需要將 thread 的數值封裝到 Some

1
2
3
4
5
6
7
8
9
10
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
// ...

Worker {
id,
thread: Some(thread),
}
}
}

Option 呼叫 take 來將 thread 移出 worker

1
2
3
4
5
6
7
8
9
10
11
impl Drop for ThreadPool {
fn drop(&mut self) {
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);

if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}

Optiontake 方法會取走 Some 變體的數值並在原地留下 None。我們使用 if let 來解構 Some 並取得執行緒,然後我們對執行緒呼叫 join。如果工作者的執行緒已經是 None,就知道該該工作者已經清理其執行緒了,所以沒有必要再處理。

發送停止接收任務的信號

現在雖然有呼叫 join,但這無法關閉執行緒,因為它們會一直 loop 來尋找任務執行。如果嘗試以目前的 drop 實作釋放 ThreadPool 的話,主執行緒會被阻擋,一直等待第一個執行緒處理完成。

要修正此問題,要修改執行緒,讓它們除了接收 Job 來執行以外,也要能收到告訴它們要停止接收並離開無限迴圈的信號。所以我們的通道將傳送以下兩種枚舉變體,而不再是 Job 實例。

修改 src/lib.rs 檔,Message 枚舉可以是存有該執行緒要執行的 JobNewJob 變體,或是通知執行緒離開其迴圈並停止的 Terminate 變體。

1
2
3
4
enum Message {
NewJob(Job),
Terminate,
}

需要調整通道來使用 Message 型別,而不是 Job 型別。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Message>,
}

// ...

impl ThreadPool {
// ...

pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);

self.sender.send(Message::NewJob(job)).unwrap();
}
}

// ...

impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
let thread = thread::spawn(move || loop {
let message = receiver.lock().unwrap().recv().unwrap();

match message {
Message::NewJob(job) => {
println!("Worker {} got a job; executing.", id);

job();
}
Message::Terminate => {
println!("Worker {} was told to terminate.", id);

break;
}
}
});

Worker {
id,
thread: Some(thread),
}
}
}

為了改用 Message 枚舉,有兩個地方得將 Job 改成 MessageThreadPool 的定義與 Worker::new 的簽名。ThreadPoolexecute 方法需要傳送封裝成 Message::NewJob 的任務。然後在 Worker::new 中,也就是取得 Message 的通道接收端中,如果收到 NewJob 變體的話,其就會處理任務;而如果收到 Terminate 變體的話,執行緒就會打破迴圈。

有了這些改變,程式碼就能編譯並繼續執行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
impl Drop for ThreadPool {
fn drop(&mut self) {
println!("Sending terminate message to all workers.");

for _ in &self.workers {
self.sender.send(Message::Terminate).unwrap();
}

println!("Shutting down all workers.");

for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);

if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}

現在會遍歷工作者們 2 次,一次是傳送 Terminate 訊息給每個工作者,另一次是對每個工作者執行緒呼叫 join。如果我們嘗試在同個迴圈中傳送訊息並立即呼叫 join 的話,會無法保證在當前疊代中的工作者就是從通道中取得訊息的工作者。

為了更好理解為何我們需要兩個分開的迴圈,想像一個情境中有 2 個工作者。如果用一個迴圈來遍歷每個工作者,在第一次疊代中會有個關機訊息傳至通道,並對第一個工作者執行緒呼叫 join。如果第一個工作者正在忙於處理請求的話,第二個工作者就會從通道取得關機訊息並關閉。這樣會變成持續等待第一個工作者關閉,但是它永遠不會關閉,因為是第二個執行緒取得關機訊息的。死結(deadlock)就發生了!

為了預防此情形,我們首先在一個迴圈中對通道傳送所有的 Terminate 訊息,然後我們在另一個迴圈才將所有的執行緒加回來。每個工作者一旦收到關機訊息後,就會停止從通道中接收訊息。所以可以確定如果我們發送與執行緒數量相當的關機訊息的話,每個工作者都會在其執行緒被呼叫 join 前收到關機訊息。

要實際看到此程式碼的運作情形,修改 main 來在正常關閉伺服器前,只接收兩個請求。

修改 src/bin/main.rs 檔。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);

for stream in listener.incoming().take(2) {
let stream = stream.unwrap();

pool.execute(|| {
handle_connection(stream);
});
}

println!("Shutting down.");
}

在真實世界中的網頁伺服器當然不會只處理 2 個請求就關機。此程式碼只是用來說明正常關機與清理的運作流程。

take 方法是由 Iterator 特徵所定義,且限制該疊代最多只會取得前兩項。ThreadPool 會在 main 結束時離開作用域,然後 drop 的實作就會執行。

執行程式。

1
cargo run

輸出以下訊息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
$ cargo run
Compiling hello v0.1.0 (file:///projects/hello)
Finished dev [unoptimized + debuginfo] target(s) in 1.0s
Running `target/debug/main`
Worker 0 got a job; executing.
Worker 3 got a job; executing.
Shutting down.
Sending terminate message to all workers.
Shutting down all workers.
Shutting down worker 0
Worker 1 was told to terminate.
Worker 2 was told to terminate.
Worker 0 was told to terminate.
Worker 3 was told to terminate.
Shutting down worker 1
Shutting down worker 2
Shutting down worker 3

可能會看到不同順序的工作者與訊息輸出。可以從訊息中看到此程式碼如何執行的,工作者 0 與 3 獲得前兩個請求。然後對於第三個請求,伺服器會停止接受連線。當 ThreadPoolmain 結尾離開作用域時,它 Drop 的實作就會生效,然後執行緒池告訴所有工作者關閉。當工作者看到關機訊息時,它們就會印出訊息,然後執行緒池會呼叫 join 來關閉每個工作者的執行緒。

此特定執行方式中有個有趣的地方值得注意:ThreadPool 傳送關機訊息至通道,且在任何工作者收到訊息前,我們就已經著將工作者 0 加入回來。工作者 0 此時尚未收到關機訊息,所以主執行緒會被擋住並等待工作者 0 完成。同一時間,每個工作者會開始收到關機訊息。當工作者 0 完成時,主執行緒會等待剩下的工作者完成任務。屆時,它們都會收到關機訊息並能夠關閉。

程式碼

參考資料