使用 Rust 建立網頁伺服器(二):多執行緒伺服器

前言

本文為「The Rust Programming Language」語言指南的學習筆記。

做法

模擬延遲

現在的伺服器會依序處理請求,代表它處理完第一個連線之前,都無法處理第二個連線。如果伺服器收到越來越多請求,這樣的連續處理方式會變得越來越沒效率。如果伺服器收到一個會花很久時間才能處理完成的請求,之後的請求都得等待這個長時間的請求完成才行,就算新的請求能很快處理完成也是如此。我們需要修正此問題,但首先讓我們先觀察此問題怎麼發生的。

首先,模擬一個緩慢的請求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
use std::thread;
use std::time::Duration;
// ...

fn handle_connection(mut stream: TcpStream) {
// ...

let get = b"GET / HTTP/1.1\r\n";
let sleep = b"GET /sleep HTTP/1.1\r\n";

let (status_line, filename) = if buffer.starts_with(get) {
("HTTP/1.1 200 OK", "hello.html")
} else if buffer.starts_with(sleep) {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
} else {
("HTTP/1.1 404 NOT FOUND", "404.html")
};

// ...
}

實作執行緒池

執行緒池(thread pool)會產生一群執行緒來等待並隨時準備好處理任務。當程式收到新任務時,它會將此任務分配給執行緒池其中一條執行緒,然後該執行緒就會處理該任務。池中剩餘的執行緒在第一條執行緒處理任務時,仍能隨時處理任何其他來臨的任務。當第一條執行緒處理完成時,他會回到閒置執行緒池之中,等待處理新的任務。執行緒池讓你能並行處理連線,增加伺服器的吞吐量。

這裡會限制執行緒池的數量為少量的數量就好,以避免造成阻斷服務(Denial of Service, DOS)攻擊。如果程式每次遇到新的請求時就產生新的執行緒,某個人就可以產生一千萬個請求至伺服器,來破壞並用光伺服器的資源,並導致所有請求的處理都被擱置。

所以與其產生無限制的執行緒,會有個固定數量的執行緒在池中等待。當有請求來臨時,它們會被送至池中處理。此池會維護一個接收請求的佇列(queue)。每個執行緒會從此佇列彈出一個請求、處理該請求,然後再繼續向佇列索取下一個請求。有了此設計,我們就可以同時處理 N 個請求,其中 N 就是執行緒的數量。如果每個執行緒都負責到需要長時間處理的請求,隨後的請求還是會阻塞佇列,但是我們至少增加了能夠同時處理長時間請求的數量。

此技巧只是其中一種改善網頁伺服器吞吐量的方式而已。其他可能會探索到的選項還有 fork/join 模型或是單執行緒非同步模型。

實作

如果程式碼都對每次連線建立新的執行緒會怎樣?以下在 mainfor 迴圈中,對每個流都產生一條新的執行緒。

1
2
3
4
5
6
7
8
9
10
11
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

for stream in listener.incoming() {
let stream = stream.unwrap();

thread::spawn(|| {
handle_connection(stream);
});
}
}

想要執行緒池能以類似的方式運作,這樣從執行緒切換成執行緒池時,使用我們 API 的程式碼就不必作出大量修改。以下顯示一個想使用的假想 ThreadPool 結構體,而非使用 thread::spawn

1
2
3
4
5
6
7
8
9
10
11
12
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);

for stream in listener.incoming() {
let stream = stream.unwrap();

pool.execute(|| {
handle_connection(stream);
});
}
}

使用 ThreadPool::new 來建立新的執行緒池且有個可設置的執行緒數量參數,在此例中設為 4。然後在 for 迴圈中 ,pool.execute 的介面類似於 thread::spawn,其會接收一個執行緒池執行在每個流中的閉包。我們需要實作 pool.execute,使其能接收閉包並傳給池中的執行緒來執行。

ThreadPool 實作會與網頁伺服器相互獨立,所以讓我們將 rust_web_server crate 從二進制 crate 轉換成函式庫 crate 來存放我們的 ThreadPool 實作。這樣在我們切換成函式庫 crate 之後,我們就能夠將分出來的執行緒池函式庫用在其他我們想使用執行緒池的地方,而不僅僅是作為網頁請求所用。

新增 src/lib.rs 檔。

1
pub struct ThreadPool;

然後建立一個新的目錄 src/bin,將 src/main.rs 的二進制 crate 移至 src/bin/main.rs

修改 src/lib.rs 檔。選擇 usize 作為參數 size 的型別,因為我們知道負數對執行緒數量來說沒有任何意義。也知道 4 會作為執行緒集合的元素個數,這正是使用 usize 型別的原因。

1
2
3
4
5
6
7
pub struct ThreadPool;

impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
ThreadPool
}
}

可以透過三種不同的特徵來接受閉包:FnFnMutFnOnce。在此需要決定這裡該使用何種閉包。其行為會類似於標準函式庫中 thread::spawn 的實作,所以來看看 thread::spawn 簽名中的參數有哪些界限吧。

1
2
3
4
5
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,

ThreadPool 建立 execute 方法,並採用泛型參數型別 F 與其界限:

1
2
3
4
5
6
7
8
impl ThreadPool {
// ...
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}

零條執行緒的池沒有任何意義,但零卻可以是完全合理的 usize。因此要在回傳 ThreadPool 前,加上程式碼來檢查 size 有大於零,並透過 assert! 來判定。如果為零的話就會恐慌。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);

ThreadPool
}

// ...
}

以下變更了 ThreadPool 的定義來儲存一個有 thread::JoinHandle<()> 實例的向量,用 size 來初始化向量的容量,設置一個會執行些程式碼來建立執行緒的 for 迴圈,然後回傳包含它們的 ThreadPool 實例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
use std::thread;

pub struct ThreadPool {
threads: Vec<thread::JoinHandle<()>>,
}

impl ThreadPool {
// ...
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);

let mut threads = Vec::with_capacity(size);

for _ in 0..size {
// 就產生些執行緒並儲存至向量
}

ThreadPool { threads }
}

// ...
}

與其在執行緒池中儲存 JoinHandle<()> 實例的向量,我們可以儲存 Worker 結構體的實例。每個 Worker 會儲存一個 JoinHandle<()> 實例。然後對 Worker 實作一個方法來取得閉包要執行的程式碼,並傳入已經在執行的執行緒來處理。我們也會給每個 Worker 一個 id,好讓我們在紀錄日誌或除錯時,分辨池中不同的工作者。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
use std::thread;

pub struct ThreadPool {
workers: Vec<Worker>,
}

impl ThreadPool {
// ...
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);

let mut workers = Vec::with_capacity(size);

for id in 0..size {
workers.push(Worker::new(id));
}

ThreadPool { workers }
}
// ...
}

struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}

impl Worker {
fn new(id: usize) -> Worker {
let thread = thread::spawn(|| {});

Worker { id, thread }
}
}

ThreadPool 中欄位的名稱從 threads 改為 workers,因為它現在儲存的是 Worker 實例而非 JoinHandle<()> 實例。使用 for 迴圈的計數作為 Worker::new 的引數,然後我們將每個新的 Worker 儲存到名稱為 workers 的向量中。

外部的程式碼(像是在 src/bin/main.rs 的伺服器)不需要知道 ThreadPool 內部實作細節已經改為使用 Worker 結構體,所以我們讓 Worker 結構體與其 new 函式維持私有。Worker::new 函式會使用我們給予的 id 並儲存一個 JoinHandle<()> 實例,這是用空閉包產生的新執行緒所建立的。

此程式碼會編譯通過並透過 ThreadPool::new 的指定引數儲存一定數量的 Worker 實例。

ThreadPool::new 建立通道並讓 ThreadPool 實例儲存發送端。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// ...
use std::sync::mpsc;

pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
// ...
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);

let (sender, receiver) = mpsc::channel();

let mut workers = Vec::with_capacity(size);

for id in 0..size {
workers.push(Worker::new(id));
}

ThreadPool { workers, sender }
}
// ...
}

ThreadPool::new 中,建立了一個新的通道並讓執行緒池儲存發送端。這能成功編譯,但還是會有些警告。嘗試在執行緒池建立通道時,將通道接收端傳給每個 Worker。我們想在 Worker 產生的執行緒中使用接收端,所以得在閉包中引用 receiver 參數。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
impl ThreadPool {
// ...
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);

let (sender, receiver) = mpsc::channel();

let mut workers = Vec::with_capacity(size);

for id in 0..size {
workers.push(Worker::new(id, receiver));
}

ThreadPool { workers, sender }
}
// ...
}

// ...

impl Worker {
fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
let thread = thread::spawn(|| {
receiver;
});

Worker { id, thread }
}
}

Rust 提供的通道實作是多重生產者、單一消費者。這意味著不能只是克隆通道的接收端來修正此程式碼。就算真的可以,這也不會是想用的技巧。我們實際想做的是分享單一 receiver 給所有工作者,來分配任務給數個執行緒。

除此之外,從通道佇列取得任務會需要可變的 receiver,所以執行緒需要有個安全的方式來共享並修改 receiver。不然的話,我們可能會遇到競爭條件。

執行緒安全智慧指標:要在多重執行緒共享所有權並允許執行緒改變數值的話,需要使用 Arc<Mutex<T>>Arc 型別能讓數個工作者能擁有接收端,而 Mutex 能確保同時間只有一個工作者能獲取任務。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
use std::sync::Arc;
use std::sync::Mutex;
// ...

impl ThreadPool {
// ...
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);

let (sender, receiver) = mpsc::channel();

let receiver = Arc::new(Mutex::new(receiver));

let mut workers = Vec::with_capacity(size);

for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}

ThreadPool { workers, sender }
}

// ...
}

// ...

impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
// ...
}
}

ThreadPool::new 中,我們將通道接收端放入 ArcMutex 之中。對於每個新的工作者,我們會克隆 Arc 來增加引用計數,讓工作者可以共享接收端的所有權。

最後讓我們來對 ThreadPool 實作 execute 方法吧。我們還會將 Job 的型別從結構體改為特徵物件的型別別名,這會儲存 execute 收到的閉包型別。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// ...

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
// ...

pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);

self.sender.send(job).unwrap();
}
}

// ...

在使用 execute 收到的閉包來建立新的 Job 實例之後,將該任務傳送至通道的發送端。對 send 呼叫 unwrap 來處理發送失敗的情況。舉例來說,這可能會發生在當停止所有執行緒時,這意味著接收端不再接收新的訊息。不過目前還無法讓執行緒停止執行,只要執行緒池還在,執行緒就會繼續執行。使用 unwrap 的原因是因為我們知道失敗不可能發生,但編譯器並不知情。

在工作者中,傳給 thread::spawn 的閉包仍然只有引用通道接收端。需要讓閉包一直循環,向通道接收端請求任務,並在取得任務時執行它。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// ...

impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || loop {
let job = receiver.lock().unwrap().recv().unwrap();

println!("Worker {} got a job; executing.", id);

job();
});

Worker { id, thread }
}
}

在此首先對 receiver 呼叫 lock 以取得互斥鎖,然後呼叫 unwrap 讓任何錯誤都會恐慌。如果互斥鎖處於污染(poisoned)狀態的話,該鎖可能就會失敗,這在其他執行緒持有鎖時,卻發生恐慌而沒有釋放鎖的話就可能發生。在這種情形,呼叫 unwrap 來讓此執行緒恐慌是正確的選擇。也可以將 unwrap 改成 expect 來加上一些更有幫助的錯誤訊息。

如果得到互斥鎖的話,呼叫 recv 來從通道中取得 Job。最後的 unwrap 也繞過了任何錯誤,這在持有通道發送端的執行緒被關閉時就可能發生;就和如果接收端關閉時 send 方法就會回傳 Err 的情況類似。

recv 的呼叫會阻擋執行緒,所以如果沒有任何任務的話,當前執行緒對等待直到下一個任務出現為止。Mutex<T> 確保同時間只會有一個 Worker 執行緒嘗試取得任務。

執行程式。

1
cargo run

輸出以下訊息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
$ cargo run
Compiling hello v0.1.0 (file:///projects/hello)
warning: field is never read: `workers`
--> src/lib.rs:7:5
|
7 | workers: Vec<Worker>,
| ^^^^^^^^^^^^^^^^^^^^
|
= note: `#[warn(dead_code)]` on by default

warning: field is never read: `id`
--> src/lib.rs:48:5
|
48 | id: usize,
| ^^^^^^^^^

warning: field is never read: `thread`
--> src/lib.rs:49:5
|
49 | thread: thread::JoinHandle<()>,
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

warning: 3 warnings emitted

Finished dev [unoptimized + debuginfo] target(s) in 1.40s
Running `target/debug/main`
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.

現在有個執行緒池能非同步地處理連線。產生的執行緒不超過四條,所以如果伺服器收到大量請求時,我們的系統就不會超載。如果下達 /sleep 的請求,伺服器會有其他執行緒來處理其他請求並執行它們。

注意:如果在數個瀏覽器視窗同時打開 /sleep,它們可能會彼此間格 5 秒鐘來讀取。這是因為有些瀏覽器會對多個相同請求的實例做快取。這項限制不是網頁伺服器造成的。

程式碼

參考資料