前言
本文為「The Rust Programming Language」語言指南的學習筆記。
做法
使用執行緒池可以來同時回應多重請求,但警告顯示 workers、id 與 thread 欄位沒有被直接使用,這提醒目前尚未清理所有內容。當使用比較不優雅的 ctrl-c 方式來中斷主執行緒時,所有其他執行緒也會立即停止,不管它們是否正在處理請求。
現在要實作 Drop 特徵來對池中的每個執行緒呼叫 join 方法,讓它們能在關閉前把任務處理完畢。然後要告訴執行緒它們該停止接收新的請求並關閉。為了觀察此程式碼的實際運作,以下會修改伺服器讓它在正常關機(graceful shutdown)前,只接收兩個請求。
實作 Drop 特徵
先對執行緒池實作 Drop 特徵。當池被釋放時,執行緒都該加入(join)回來以確保它們有完成它們的工作。首先,遍歷執行緒池中的每個 workers。對此使用 &mut 是因為 self 是個可變引用,而且我們也需要能夠改變 worker。對每個工作者印出訊息來說明此工作者正要關閉,然後對工作者的執行緒呼叫 join。如果 join 的呼叫失敗的話,我們使用 unwrap 來讓 Rust 恐慌,使其變成較不正常的關機方式。
| 1 | impl Drop for ThreadPool { | 
目前並無法呼叫 join,因為我們只有每個 worker 的可變借用,而 join 會取走其引數的所有權。要解決此問題,需要將 thread 中的執行緒移出 Worker 實例,讓 join 可以消耗該執行緒。如果 Worker 改持有 Option<thread::JoinHandle<()>> 的話,可以對 Option 呼叫 take 方法來移動 Some 變體中的數值,並在原處留下 None 變體。換句話說,thread 中有 Some 變體的話就代表 Worker 正在執行,而當我們清理 Worker 時,我們會將 Some 換成 None 來讓 Worker 沒有任何執行緒可以執行。
更新 Worker 的定義如以下。
| 1 | struct Worker { | 
當建立新的 Worker,需要將 thread 的數值封裝到 Some。
| 1 | impl Worker { | 
對 Option 呼叫 take 來將 thread 移出 worker。
| 1 | impl Drop for ThreadPool { | 
Option 的 take 方法會取走 Some 變體的數值並在原地留下 None。我們使用 if let 來解構 Some 並取得執行緒,然後我們對執行緒呼叫 join。如果工作者的執行緒已經是 None,就知道該該工作者已經清理其執行緒了,所以沒有必要再處理。
發送停止接收任務的信號
現在雖然有呼叫 join,但這無法關閉執行緒,因為它們會一直 loop 來尋找任務執行。如果嘗試以目前的 drop 實作釋放 ThreadPool 的話,主執行緒會被阻擋,一直等待第一個執行緒處理完成。
要修正此問題,要修改執行緒,讓它們除了接收 Job 來執行以外,也要能收到告訴它們要停止接收並離開無限迴圈的信號。所以我們的通道將傳送以下兩種枚舉變體,而不再是 Job 實例。
修改 src/lib.rs 檔,Message 枚舉可以是存有該執行緒要執行的 Job 的 NewJob 變體,或是通知執行緒離開其迴圈並停止的 Terminate 變體。
| 1 | enum Message { | 
需要調整通道來使用 Message 型別,而不是 Job 型別。
| 1 | pub struct ThreadPool { | 
為了改用 Message 枚舉,有兩個地方得將 Job 改成 Message:ThreadPool 的定義與 Worker::new 的簽名。ThreadPool 的 execute 方法需要傳送封裝成 Message::NewJob 的任務。然後在 Worker::new 中,也就是取得 Message 的通道接收端中,如果收到 NewJob 變體的話,其就會處理任務;而如果收到 Terminate 變體的話,執行緒就會打破迴圈。
有了這些改變,程式碼就能編譯並繼續執行。
| 1 | impl Drop for ThreadPool { | 
現在會遍歷工作者們 2 次,一次是傳送 Terminate 訊息給每個工作者,另一次是對每個工作者執行緒呼叫 join。如果我們嘗試在同個迴圈中傳送訊息並立即呼叫 join 的話,會無法保證在當前疊代中的工作者就是從通道中取得訊息的工作者。
為了更好理解為何我們需要兩個分開的迴圈,想像一個情境中有 2 個工作者。如果用一個迴圈來遍歷每個工作者,在第一次疊代中會有個關機訊息傳至通道,並對第一個工作者執行緒呼叫 join。如果第一個工作者正在忙於處理請求的話,第二個工作者就會從通道取得關機訊息並關閉。這樣會變成持續等待第一個工作者關閉,但是它永遠不會關閉,因為是第二個執行緒取得關機訊息的。死結(deadlock)就發生了!
為了預防此情形,我們首先在一個迴圈中對通道傳送所有的 Terminate 訊息,然後我們在另一個迴圈才將所有的執行緒加回來。每個工作者一旦收到關機訊息後,就會停止從通道中接收訊息。所以可以確定如果我們發送與執行緒數量相當的關機訊息的話,每個工作者都會在其執行緒被呼叫 join 前收到關機訊息。
要實際看到此程式碼的運作情形,修改 main 來在正常關閉伺服器前,只接收兩個請求。
修改 src/bin/main.rs 檔。
| 1 | fn main() { | 
在真實世界中的網頁伺服器當然不會只處理 2 個請求就關機。此程式碼只是用來說明正常關機與清理的運作流程。
take 方法是由 Iterator 特徵所定義,且限制該疊代最多只會取得前兩項。ThreadPool 會在 main 結束時離開作用域,然後 drop 的實作就會執行。
執行程式。
| 1 | cargo run | 
輸出以下訊息。
| 1 | $ cargo run | 
可能會看到不同順序的工作者與訊息輸出。可以從訊息中看到此程式碼如何執行的,工作者 0 與 3 獲得前兩個請求。然後對於第三個請求,伺服器會停止接受連線。當 ThreadPool 在 main 結尾離開作用域時,它 Drop 的實作就會生效,然後執行緒池告訴所有工作者關閉。當工作者看到關機訊息時,它們就會印出訊息,然後執行緒池會呼叫 join 來關閉每個工作者的執行緒。
此特定執行方式中有個有趣的地方值得注意:ThreadPool 傳送關機訊息至通道,且在任何工作者收到訊息前,我們就已經著將工作者 0 加入回來。工作者 0 此時尚未收到關機訊息,所以主執行緒會被擋住並等待工作者 0 完成。同一時間,每個工作者會開始收到關機訊息。當工作者 0 完成時,主執行緒會等待剩下的工作者完成任務。屆時,它們都會收到關機訊息並能夠關閉。