当前位置: 首页 > news >正文

《Rust 程序设计语言》第二十一章:期末项目:构建多线程 Web 服务器

期末项目:构建多线程 Web 服务器

这段学习之旅虽然漫长,但我们总算走到了本书的最后一章。在这一章里,我们会一起完成最后一个项目 —— 通过它来演示最后几章讲到的部分概念,同时也回顾一下之前学过的内容。

我们这个期末项目要做的是一个 Web 服务器,它能返回 “hello” 信息,在浏览器里显示的效果就像图 21-1 那样。

图 21-1:我们的最终共享项目

下面是构建这个 Web 服务器的计划:

  1. 简单了解 TCP 和 HTTP 协议
  2. 在套接字(socket)上监听 TCP 连接
  3. 解析少量 HTTP 请求
  4. 生成标准的 HTTP 响应
  5. 用线程池提升服务器的吞吐量

开始之前,有两个细节需要说明一下。首先,我们接下来要用的方法,并不是用 Rust 构建 Web 服务器的 “最佳方案”。社区开发者已经在 crates.io 上发布了很多可用于生产环境的 crate,这些 crate 提供的 Web 服务器和线程池实现,比我们要写的完整得多。不过,本章的目的是帮大家学习知识,而不是走捷径。Rust 是一门系统级编程语言,我们可以自主选择想要的抽象层级,甚至能做到比其他语言更底层(而且更实用)的实现。

其次,我们这里不会用到 async/await。单是实现线程池就已经是个不小的挑战了,没必要再额外去构建异步运行时!不过,我们会提一下,面对本章中遇到的某些问题时,async/await 可能会有怎样的应用场景。说到底,就像我们在第 17 章提到的,很多异步运行时本身就是用线程池来管理任务的。

所以,我们会手动编写基础的 HTTP 服务器和线程池 —— 这样大家就能理解未来可能会用到的那些 crate 背后的核心思路和技术原理了。

构建单线程 Web 服务器

我们先从实现一个能工作的单线程 Web 服务器开始。在动手之前,先快速了解一下构建 Web 服务器会涉及到的协议。这些协议的细节超出了本书的范围,但简单过一遍,就能让大家掌握必要的知识。

Web 服务器主要涉及两个协议:超文本传输协议(HTTP)和传输控制协议(TCP)。这两个都是 “请求 - 响应” 式协议,也就是说,客户端发起请求,服务器监听请求并给客户端返回响应。而请求和响应里具体包含什么内容,是由协议来定义的。

TCP 是更底层的协议,它规定了数据如何从一台服务器传输到另一台服务器,但不会限定数据的具体内容。HTTP 则建立在 TCP 之上,它定义了请求和响应的数据格式。从技术上来说,HTTP 也能和其他协议配合使用,但在绝大多数情况下,HTTP 的数据都是通过 TCP 传输的。我们接下来就要直接处理 TCP 和 HTTP 请求、响应的原始字节数据。

监听 TCP 连接

我们的 Web 服务器需要监听 TCP 连接,这是第一步要实现的功能。Rust 标准库提供了std::net模块,正好能用来做这件事。按照常规流程,先创建一个新项目:

$ cargo new helloCreated binary (application) `hello` project
$ cd hello

接下来,把代码清单 21-1 中的代码复制到src/main.rs里。这段代码会在本地地址127.0.0.1:7878上监听传入的 TCP 流,一旦接收到流,就打印 “Connection established!”(已建立连接)。

文件名:src/main.rs

use std::net::TcpListener;fn main() {// 绑定本地地址127.0.0.1:7878,创建TCP监听器;若失败则直接终止程序let listener = TcpListener::bind("127.0.0.1:7878").unwrap();// 迭代处理监听器接收到的所有流for stream in listener.incoming() {// 若流获取成功则解包,失败则终止程序let stream = stream.unwrap();// 打印连接建立的提示println!("Connection established!");}
}

代码清单 21-1:监听传入的流,并在接收到流时打印一条消息

通过TcpListener,我们能在127.0.0.1:7878这个地址上监听 TCP 连接。地址中冒号前面的部分是 IP 地址,代表你的本地计算机(这个地址在所有电脑上都一样,不是作者专属的);后面的7878是端口号。我们选这个端口有两个原因:一是 HTTP 协议通常不用这个端口,所以我们的服务器不太可能和你电脑上正在运行的其他 Web 服务器冲突;二是在电话键盘上,7878对应 “rust” 这几个字母(是不是很巧~)。

这里的bind函数作用有点像new,调用后会返回一个新的TcpListener实例。之所以叫 “bind”(绑定),是因为在网络编程里,“将端口与监听操作关联” 这件事就叫 “绑定端口”。

bind函数返回的是Result<T, E>类型,这意味着端口绑定有可能失败。比如,如果你想绑定80端口(HTTP 默认端口),就需要管理员权限(非管理员只能监听 1023 以上的端口)—— 要是没权限就绑定80端口,肯定会失败。再比如,如果你同时运行两个我们这个程序,两个程序都想监听同一个端口,绑定也会失败。不过我们这个服务器只是用来学习的,不用处理这些复杂的错误情况,直接用unwrap,万一出错就让程序终止就好。

TcpListenerincoming方法会返回一个迭代器,这个迭代器会不断产生 “流”(更具体地说,是TcpStream类型的流)。一个 “流” 就代表客户端和服务器之间的一个打开的连接。所谓 “连接”,就是从客户端连到服务器、服务器生成响应、最后服务器关闭连接的整个请求 - 响应过程。所以,我们要从TcpStream里读取客户端发送的数据,然后往流里写入我们的响应,把数据返回给客户端。总的来说,这个 for 循环会依次处理每个连接,为我们生成一个又一个要处理的流。

目前,我们对 “流” 的处理很简单:用unwrap解包,如果流有错误就终止程序;如果没问题,就打印一条提示消息。下一个代码清单里,我们会给成功的情况添加更多功能。这里要说明一下,为什么incoming方法在客户端连接时可能会返回错误 —— 因为我们迭代的不是 “已建立的连接”,而是 “连接尝试”。连接尝试可能因为各种原因失败,很多还和操作系统有关。比如,很多操作系统对同时打开的连接数有上限,超过这个上限后,新的连接尝试就会失败,直到有一些已打开的连接被关闭。

咱们来试试运行这段代码!在终端里执行cargo run,然后在浏览器里打开127.0.0.1:7878。浏览器应该会显示 “连接重置” 之类的错误 —— 因为服务器现在还没返回任何数据。但你看终端,应该能看到好几条 “已建立连接” 的提示,这是浏览器连到服务器时打印的!

     Running `target/debug/hello`
Connection established!
Connection established!
Connection established!

有时候一次浏览器请求会触发多条提示,原因可能是浏览器不仅要请求页面内容,还要请求其他资源(比如浏览器标签栏里显示的favicon.ico图标)。

还有一种可能:因为服务器没返回任何数据,浏览器会尝试多次连接。当stream在循环结束后超出作用域被销毁时,drop方法会自动关闭连接。浏览器遇到关闭的连接,可能会重试(毕竟它觉得问题可能是暂时的)。

另外,浏览器有时会先和服务器建立多个连接,但不发送任何请求 —— 这样后面真要发请求时,就能更快响应。这种情况下,不管连接上有没有请求,我们的服务器都会检测到每个连接。比如很多基于 Chrome 的浏览器就会这么做,你可以用隐私模式关掉这个优化,或者换个浏览器试试。

不管怎样,关键是我们成功拿到了 TCP 连接的句柄!

记住,每次运行某个版本的代码后,要按ctrl+c停止程序。之后修改代码,再重新执行cargo run,确保运行的是最新代码。

读取请求

接下来,我们要实现从浏览器读取请求的功能!为了把 “获取连接” 和 “处理连接” 这两个逻辑分开,我们先写一个新函数来处理连接。在这个handle_connection函数里,我们会从 TCP 流中读取数据并打印出来,这样就能看到浏览器发送过来的内容了。把代码改成代码清单 21-2 的样子。

文件名:src/main.rs

use std::{io::{BufReader, prelude::*},net::{TcpListener, TcpStream},
};fn main() {let listener = TcpListener::bind("127.0.0.1:7878").unwrap();for stream in listener.incoming() {let stream = stream.unwrap();// 调用处理连接的函数,把流传进去handle_connection(stream);}
}// 处理TCP流的函数,参数是可变的TcpStream(因为要往流里写数据)
fn handle_connection(mut stream: TcpStream) {// 创建带缓冲的读取器,包装流的引用(缓冲能提高读取效率)let buf_reader = BufReader::new(&stream);// 读取HTTP请求的所有行,收集到向量中let http_request: Vec<_> = buf_reader.lines() // 按行读取(每行是Result<String, io::Error>类型).map(|result| result.unwrap()) // 解包Result,若出错则终止程序.take_while(|line| !line.is_empty()) // 读到空行就停止(HTTP请求以空行结束).collect(); // 把结果收集成向量// 用美化的调试格式打印请求内容println!("Request: {http_request:#?}");
}

代码清单 21-2:从 TcpStream 读取数据并打印

我们引入了std::io::preludestd::io::BufReader—— 前者能让我们使用读写流相关的 trait,后者是带缓冲的读取器。在main函数的 for 循环里,我们不再打印连接提示,而是调用新写的handle_connection函数,把流传进去。

handle_connection函数里,我们创建了一个BufReader实例,用它包装流的引用。BufReader会帮我们管理对std::io::Read trait 方法的调用,从而实现缓冲功能(比直接读流效率高)。

然后我们定义了http_request变量,用来收集浏览器发送给服务器的请求行。通过Vec<_>这个类型标注,我们告诉 Rust 要把这些行收集成一个向量。

BufReader实现了std::io::BufRead trait,这个 trait 提供了lines方法。lines方法会返回一个迭代器,每次迭代产生一个Result<String, std::io::Error>—— 它会在遇到换行字节时分割流数据。为了拿到每个String,我们用mapunwrap解包每个Result。如果数据不是有效的 UTF-8,或者读取流时出了问题,Result就会是错误状态;这里为了简单,我们直接让程序终止(实际项目里肯定要更优雅地处理错误)。

浏览器会通过发送两个连续的换行符来标识 HTTP 请求的结束,所以我们用take_while,一直读取行,直到遇到空行为止。把这些行收集到向量后,我们用美化的调试格式(#?)打印出来,这样就能清楚看到浏览器发给服务器的 “指令” 了。

咱们来运行这段代码!启动程序后,再用浏览器发一次请求。注意,浏览器还是会显示错误页面,但终端里的程序输出会变成这样:

$ cargo runCompiling hello v0.1.0 (file:///projects/hello)Finished dev [unoptimized + debuginfo] target(s) in 0.42sRunning `target/debug/hello`
Request: ["GET / HTTP/1.1","Host: 127.0.0.1:7878","User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:99.0) Gecko/20100101 Firefox/99.0","Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8","Accept-Language: en-US,en;q=0.5","Accept-Encoding: gzip, deflate, br","DNT: 1","Connection: keep-alive","Upgrade-Insecure-Requests: 1","Sec-Fetch-Dest: document","Sec-Fetch-Mode: navigate","Sec-Fetch-Site: none","Sec-Fetch-User: ?1","Cache-Control: max-age=0",
]

不同浏览器的输出可能会略有差异。现在我们能打印请求数据了,也就明白为什么一次浏览器请求会触发多个连接了 —— 看请求第一行GET后面的路径就知道。如果重复的连接都是请求/,那就是因为浏览器没收到服务器的响应,一直在重试获取/路径的内容。

咱们来拆解一下这个请求数据,看看浏览器在 “要求” 我们的程序做什么。

深入理解 HTTP 请求

HTTP 是基于文本的协议,一个请求的格式是这样的:

方法 请求URI HTTP版本 CRLF
请求头 CRLF
请求体

第一行是 “请求行”,包含客户端请求的关键信息。请求行的第一部分是 “方法”,比如GETPOST,用来表示客户端发起请求的方式。我们这里的客户端用的是GET请求,意思是 “获取信息”。

请求行的第二部分是/,代表客户端请求的 “统一资源标识符(URI)”。URI 和我们常说的 “统一资源定位符(URL)” 差不多,但不完全一样 —— 不过在本章里,不用纠结两者的区别,你可以直接把 URI 当成 URL 来理解。

请求行的最后一部分是客户端使用的 HTTP 版本,然后请求行以CRLF结尾。CRLF是 “回车换行符” 的缩写(源自打字机时代的概念),对应的字符是\r\n\r是回车,\n是换行)。CRLF用来分隔请求行和请求的其他部分,不过打印的时候,我们看到的是换行,而不是\r\n

看我们程序输出的请求行数据:GET / HTTP/1.1,方法是GET,请求 URI 是/,HTTP 版本是1.1

请求行之后,从Host:开始的 lines 都是 “请求头”。GET请求没有请求体。

你可以换个浏览器试试,或者请求一个不同的地址(比如127.0.0.1:7878/test),看看请求数据会怎么变。

现在我们知道浏览器想要什么了,接下来就给它返回点数据吧!

发送响应

我们要实现 “接收客户端请求后返回数据” 的功能。HTTP 响应的格式是这样的:

HTTP版本 状态码 原因短语 CRLF
响应头 CRLF
响应体

第一行是 “状态行”,包含响应使用的 HTTP 版本、一个表示请求处理结果的 “状态码”,以及对状态码的文字描述 “原因短语”。状态行之后是响应头,再用一个CRLF分隔,最后是响应体。

举个例子,下面是一个简单的成功响应:它用 HTTP 1.1 版本,状态码 200(表示成功),原因短语OK,没有响应头,也没有响应体:

HTTP/1.1 200 OK\r\n\r\n

我们就把这个响应写进流里,作为对成功请求的回复!在handle_connection函数里,删掉打印请求数据的println!,换成代码清单 21-3 里的代码。

文件名:src/main.rs

fn handle_connection(mut stream: TcpStream) {let buf_reader = BufReader::new(&stream);let http_request: Vec<_> = buf_reader.lines().map(|result| result.unwrap()).take_while(|line| !line.is_empty()).collect();// 定义成功响应的字符串(包含状态行和空行分隔符)let response = "HTTP/1.1 200 OK\r\n\r\n";// 把响应字符串转成字节,写入流;若失败则终止程序stream.write_all(response.as_bytes()).unwrap();
}

代码清单 21-3:往流里写入一个简单的成功 HTTP 响应

新增的第一行代码定义了response变量,里面存的是成功响应的数据。然后我们调用as_bytes,把字符串转成字节(因为流操作需要字节数据)。streamwrite_all方法会把&[u8]类型的字节数据直接通过连接发送出去。write_all也可能失败,所以我们还是用unwrap处理错误(实际项目里要加 proper 的错误处理)。

改完之后运行代码,再发一次请求。终端里不会再打印任何数据(除了 Cargo 的输出),但在浏览器里打开127.0.0.1:7878,你会看到一个空白页面,而不是之前的错误 —— 这说明我们成功手动处理了 HTTP 请求并返回了响应!

返回实际的 HTML 内容

我们再来优化一下,让服务器返回的不是空白页面,而是实际的 HTML 内容。在项目根目录(不是src目录)下新建一个hello.html文件,里面可以写任何 HTML 代码,代码清单 21-4 是一个示例。

文件名:hello.html

<!DOCTYPE html>
<html lang="en"><head><meta charset="utf-8"><title>Hello!</title></head><body><h1>Hello!</h1><p>Hi from Rust</p></body>
</html>

代码清单 21-4:要返回给客户端的示例 HTML 文件

这是一个极简的 HTML5 文档,包含一个标题和一段文字。为了让服务器收到请求时返回这个文件,我们要修改handle_connection函数 —— 读取 HTML 文件内容,把它作为响应体添加到响应里,然后发送出去。修改后的代码如代码清单 21-5 所示。

文件名:src/main.rs

use std::{fs, // 引入文件系统模块,用于读取文件io::{BufReader, prelude::*},net::{TcpListener, TcpStream},
};
// --snip--(省略中间不变的代码)fn handle_connection(mut stream: TcpStream) {let buf_reader = BufReader::new(&stream);let http_request: Vec<_> = buf_reader.lines().map(|result| result.unwrap()).take_while(|line| !line.is_empty()).collect();// 定义响应状态行(成功)let status_line = "HTTP/1.1 200 OK";// 读取hello.html文件的内容,转成字符串;若失败则终止程序let contents = fs::read_to_string("hello.html").unwrap();// 获取HTML内容的长度(用于设置响应头)let length = contents.len();// 拼接完整的响应:状态行 + Content-Length头 + 空行 + 响应体(HTML内容)let response =format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");// 把响应写入流stream.write_all(response.as_bytes()).unwrap();
}

代码清单 21-5:把 hello.html 的内容作为响应体发送

我们在use语句里加了fs,这样就能使用标准库的文件系统模块了。“读取文件内容并转成字符串” 这段代码应该很眼熟 —— 第 12 章的 I/O 项目里,我们读取文件内容时就用过fs::read_to_string

然后我们用format!宏,把文件内容作为成功响应的 “响应体” 拼接到响应里。为了保证 HTTP 响应格式正确,我们还加了Content-Length头,它的值是响应体(也就是hello.html文件)的大小。

运行代码(cargo run),在浏览器里打开127.0.0.1:7878—— 你应该能看到 HTML 页面被渲染出来了!

验证请求并选择性返回响应

目前,我们的服务器不管客户端请求什么,都会返回hello.html的内容。比如你在浏览器里请求127.0.0.1:7878/something-else,得到的还是同一个 HTML 页面。现在这个服务器功能太局限了,不符合大多数 Web 服务器的行为。我们希望能根据请求来定制响应:只有当请求是合法的/路径时,才返回 HTML 文件;其他请求都返回错误。

要实现这个功能,我们需要修改handle_connection函数 —— 先检查浏览器请求的是不是/路径,是就返回 HTML 文件,不是就返回错误。修改后的代码如代码清单 21-6 所示。新增的代码会把收到的请求和 “请求/路径” 的格式做对比,然后用if-else分支处理不同的请求。

文件名:src/main.rs

// --snip--(省略前面不变的代码)fn handle_connection(mut stream: TcpStream) {let buf_reader = BufReader::new(&stream);// 只读取HTTP请求的第一行(请求行),解包两次:第一次解包Option(迭代器是否有值),第二次解包Result(读取是否成功)let request_line = buf_reader.lines().next().unwrap().unwrap();// 如果请求行是“GET / HTTP/1.1”(即请求根路径)if request_line == "GET / HTTP/1.1" {let status_line = "HTTP/1.1 200 OK";let contents = fs::read_to_string("hello.html").unwrap();let length = contents.len();let response = format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");stream.write_all(response.as_bytes()).unwrap();} else {// 其他请求的处理逻辑(后面再加)}
}

代码清单 21-6:区分处理 “请求 / 路径” 和其他请求

我们只需要看 HTTP 请求的第一行(请求行),所以不用把整个请求读到向量里,直接用next从迭代器里取第一个元素就行。第一个unwrap处理Option(如果迭代器里没有元素,程序就终止),第二个unwrap处理Result(和代码清单 21-2 里的unwrap作用一样)。

然后我们检查request_line是不是等于 “GET / HTTP/1.1”—— 也就是GET方法请求根路径。如果是,if分支就返回hello.html的内容

如果request_line不是 “GET / HTTP/1.1”,就说明收到了其他请求。过一会儿我们会在else分支里加代码,处理这些请求。

现在运行代码,请求127.0.0.1:7878—— 能正常看到hello.html的内容。如果请求其他路径(比如127.0.0.1:7878/something-else),就会看到和代码清单 21-1、21-2 类似的连接错误。

接下来,我们在else分支里加代码,返回 “404 Not Found” 响应 —— 这个状态码表示 “请求的资源不存在”。同时,我们也返回一个 HTML 错误页面,让浏览器能显示给用户看。代码如代码清单 21-7 所示。

文件名:src/main.rs

    // --snip--(省略前面的if分支代码)} else {// 状态行:404 Not Found(资源未找到)let status_line = "HTTP/1.1 404 NOT FOUND";// 读取404页面的内容let contents = fs::read_to_string("404.html").unwrap();let length = contents.len();// 拼接404响应let response = format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");stream.write_all(response.as_bytes()).unwrap();}

代码清单 21-7:对非 / 路径的请求,返回 404 状态码和错误页面

这里的响应状态行用了 404 状态码,原因短语是NOT FOUND。响应体是404.html文件里的 HTML 内容。你需要在hello.html旁边新建一个404.html文件,里面可以写任何错误页面的 HTML 代码,代码清单 21-8 是一个示例。

文件名:404.html

<!DOCTYPE html>
<html lang="en"><head><meta charset="utf-8"><title>Hello!</title></head><body><h1>Oops!</h1><p>Sorry, I don't know what you're asking for.</p></body>
</html>

代码清单 21-8:404 响应对应的错误页面示例

改完之后重新运行服务器。请求127.0.0.1:7878会返回hello.html的内容,请求其他路径(比如127.0.0.1:7878/foo)会返回404.html的错误页面。

简单重构代码

现在ifelse分支里有很多重复代码:两者都要读文件,都要把文件内容写入流。唯一的区别是 “状态行” 和 “文件名”。我们可以把这两个不同的部分抽出来,用变量赋值,然后在后面统一用这些变量来读文件和写响应 —— 这样代码会更简洁。代码清单 21-9 展示了重构后的代码:把if-else大分支换成只返回状态行和文件名的小分支,然后用解构赋值把这两个值赋给变量,最后统一处理文件读取和响应发送。

文件名:src/main.rs

// --snip--(省略前面不变的代码)fn handle_connection(mut stream: TcpStream) {// --snip--(省略读取request_line的代码)// 用if-else判断请求行,返回对应的状态行和文件名(用元组存储)let (status_line, filename) = if request_line == "GET / HTTP/1.1" {("HTTP/1.1 200 OK", "hello.html")} else {("HTTP/1.1 404 NOT FOUND", "404.html")};// 统一读取文件、计算长度、拼接响应let contents = fs::read_to_string(filename).unwrap();let length = contents.len();let response =format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");stream.write_all(response.as_bytes()).unwrap();
}

代码清单 21-9:重构 if-else 分支,只保留差异化代码

现在if-else分支只返回包含 “状态行” 和 “文件名” 的元组,然后我们用解构赋值(第 19 章讲过的模式匹配),把这两个值分别赋给status_linefilename变量。

之前重复的代码(读文件、写响应)现在都放在if-else外面,用status_linefilename变量来统一处理。这样不仅能清楚看到两个分支的区别,还能做到 “一处修改,两处生效”—— 如果以后想改文件读取或响应发送的逻辑,只需要改一次代码就行。代码清单 21-9 的功能和代码清单 21-7 完全一样。

太棒了!现在我们用大约 40 行 Rust 代码,实现了一个简单的 Web 服务器:它能对/路径的请求返回内容页面,对其他所有请求返回 404 响应。

不过目前这个服务器是单线程的,也就是说同一时间只能处理一个请求。接下来我们先模拟一个 “慢请求”,看看单线程的问题,然后再把它改成多线程服务器,让它能同时处理多个请求。

把单线程服务器改成多线程服务器

现在我们的服务器是 “串行处理” 请求的 —— 必须等前一个请求处理完,才能处理下一个。如果服务器收到越来越多的请求,这种串行方式会越来越低效。要是遇到一个处理很慢的请求,后面的请求就算能快速处理,也得等着慢请求结束。我们得解决这个问题,不过在改代码之前,先实际看看单线程的问题有多严重。

模拟单线程服务器中的慢请求

我们先来看看 “慢请求” 会怎么影响单线程服务器的其他请求。代码清单 21-10 实现了对/sleep路径的处理:当收到这个请求时,服务器会先休眠 5 秒,再返回响应 —— 这样就能模拟一个慢请求。

文件名:src/main.rs

use std::{fs,io::{BufReader, prelude::*},net::{TcpListener, TcpStream},thread, // 引入线程模块,用于休眠time::Duration, // 引入Duration,用于指定休眠时间
};
// --snip--(省略前面不变的代码)fn handle_connection(mut stream: TcpStream) {// --snip--(省略读取request_line的代码)// 用match代替if-else,处理三种情况:/、/sleep、其他路径let (status_line, filename) = match &request_line[..] {"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),"GET /sleep HTTP/1.1" => {// 休眠5秒(模拟慢请求)thread::sleep(Duration::from_secs(5));("HTTP/1.1 200 OK", "hello.html")}_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),};// --snip--(省略后面统一处理响应的代码)
}

代码清单 21-10:用休眠 5 秒模拟慢请求

这里我们把if-else换成了match,因为现在有三个分支要处理。注意,match不会像相等判断那样自动引用和解引用,所以我们要显式地用&request_line[..],把String转成切片,才能和字符串字面量匹配。

第一个分支和之前的if分支一样,处理/路径的请求。第二个分支匹配/sleep路径的请求 —— 收到这个请求后,服务器会休眠 5 秒,然后再返回成功的 HTML 页面。第三个分支和之前的else一样,处理其他路径的请求。

cargo run启动服务器,然后打开两个浏览器窗口:一个访问http://127.0.0.1:7878/,另一个访问http://127.0.0.1:7878/sleep。你会发现:单独刷新/路径时,响应很快;但如果先刷新/sleep,再刷新//的请求会等/sleep休眠完 5 秒后才会响应。

要解决这个问题,有多种方案(比如第 17 章讲的 async/await),不过我们这次要实现的是 “线程池”。

用线程池提升吞吐量

“线程池” 是一组预先创建好的线程,它们处于等待状态,随时准备处理任务。当程序收到新任务时,会从池子里选一个线程分配给任务,让这个线程去处理任务;其他线程继续等待新任务。当线程处理完任务后,会回到池子里,等待下一个任务。用线程池可以让服务器并发处理请求,从而提高吞吐量。

我们会把线程池的线程数量限制在一个较小的值,这样能防止 DoS 攻击(如果每次收到请求都新建一个线程,攻击者发 1000 万个请求,就能耗尽服务器资源,让请求处理彻底停滞)。

我们不会无限制地新建线程,而是用固定数量的线程来等待任务。收到的请求会被放进一个队列里,池子里的每个线程都会从队列里取任务、处理任务,处理完再取下一个。这样设计的话,我们同一时间最多能处理N个请求(N是线程数量)。就算每个线程都在处理慢请求,后面的请求也只会在队列里排队,而不会让服务器资源被耗尽。

当然,这只是提升 Web 服务器吞吐量的方法之一。其他方案还包括 “分治模型(fork/join)”“单线程异步 I/O 模型”“多线程异步 I/O 模型” 等。如果你对这个话题感兴趣,可以去了解这些方案,甚至用 Rust 实现它们 —— 作为一门低级语言,Rust 能支持所有这些方案。

在实现线程池之前,我们先想想 “使用线程池” 应该是什么样子的。设计代码时,先确定 “客户端调用接口”,能帮我们理清设计思路。先写好调用代码的样子,再在这个结构里实现功能,而不是先实现功能再设计接口。

和第 12 章项目用 “测试驱动开发” 类似,我们这里用 “编译器驱动开发”:先写我们想调用的函数,然后根据编译器的错误提示,一步步修改代码,直到能跑起来。不过在这之前,我们先看看 “不推荐的方案”,作为对比。

为每个请求新建一个线程

首先,我们来看看 “每个请求新建一个线程” 的代码会是什么样。正如前面提到的,这不是最终方案(因为可能新建无限多线程),但可以作为 “多线程服务器” 的起点,之后再用线程池优化 —— 对比这两种方案会更清楚。代码清单 21-11 修改了main函数,在 for 循环里为每个流新建一个线程,用这个线程处理连接。

文件名:src/main.rs

fn main() {let listener = TcpListener::bind("127.0.0.1:7878").unwrap();for stream in listener.incoming() {let stream = stream.unwrap();// 新建一个线程,在新线程里处理连接thread::spawn(|| {handle_connection(stream);});}
}

代码清单 21-11:为每个流新建一个线程

正如第 16 章学到的,thread::spawn会新建一个线程,然后在新线程里执行闭包中的代码。你可以运行这段代码试试:在浏览器里打开/sleep,然后再打开两个/的标签页 —— 会发现/的请求不用等/sleep处理完就能响应。不过,正如我们之前说的,这种方案最终会耗尽系统资源,因为线程会无限新建。

你可能还记得第 17 章提到的:这种场景正是 async/await 的强项!在实现线程池的过程中,不妨想想如果用 async,代码会有哪些不同,哪些地方又会一样。

创建固定数量的线程

我们希望线程池的用法和 “新建线程” 差不多,这样切换起来不用大幅修改代码。代码清单 21-12 是我们想要的ThreadPool结构体接口 —— 用它来代替thread::spawn

文件名:src/main.rs

// 这段代码暂时不能编译!
fn main() {let listener = TcpListener::bind("127.0.0.1:7878").unwrap();// 创建一个包含4个线程的线程池let pool = ThreadPool::new(4);for stream in listener.incoming() {let stream = stream.unwrap();// 用线程池执行任务,闭包参数和thread::spawn类似pool.execute(|| {handle_connection(stream);});}
}

代码清单 21-12:我们理想中的 ThreadPool 接口

我们用ThreadPool::new创建一个线程池,参数是线程数量(这里是 4)。然后在 for 循环里,用pool.execute提交任务 —— 它的接口和thread::spawn类似,都是接收一个闭包。我们需要实现pool.execute,让它把闭包交给池子里的某个线程去执行。这段代码现在还不能编译,但我们先这么写,然后根据编译器的错误提示来修改。

用编译器驱动开发实现 ThreadPool

把代码清单 21-12 的修改应用到src/main.rs,然后执行cargo check,根据编译器的错误提示来开发。首先会看到这个错误:

$ cargo checkChecking hello v0.1.0 (file:///projects/hello)
error[E0433]: failed to resolve: use of undeclared type `ThreadPool`--> src/main.rs:11:16|
11 |     let pool = ThreadPool::new(4);|                ^^^^^^^^^^ use of undeclared type `ThreadPool`For more information about this error, try `rustc --explain E0433`.
error: could not compile `hello` (bin "hello") due to 1 previous error

很好!这个错误告诉我们需要一个ThreadPool类型或模块,那我们就来实现它。ThreadPool的实现和 Web 服务器的具体业务逻辑无关,所以我们把hello这个二进制 crate 改成库 crate,用来存放ThreadPool的实现。改成库 crate 后,这个线程池不仅能用于 Web 服务器,还能用于其他需要线程池的场景。

新建一个src/lib.rs文件,里面先写一个最简单的ThreadPool结构体定义:

// 文件名:src/lib.rs
pub struct ThreadPool;

然后修改main.rs,在顶部引入库 crate 里的ThreadPool

// 文件名:src/main.rs
use hello::ThreadPool; // 从hello库中引入ThreadPool

现在代码还是不能跑,但再执行cargo check,会看到下一个需要解决的错误:

$ cargo checkChecking hello v0.1.0 (file:///projects/hello)
error[E0599]: no function or associated item named `new` found for struct `ThreadPool` in the current scope--> src/main.rs:12:28|
12 |     let pool = ThreadPool::new(4);|                            ^^^ function or associated item not found in `ThreadPool`For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error

这个错误说明我们需要为ThreadPool实现一个关联函数newnew需要接收一个参数(比如 4),返回一个ThreadPool实例。我们先写一个最简单的new函数:

// 文件名:src/lib.rs
pub struct ThreadPool;impl ThreadPool {// 关联函数new:接收线程数量(usize类型),返回ThreadPool实例pub fn new(size: usize) -> ThreadPool {ThreadPool // 暂时只返回空结构体}
}

我们选usize作为size的类型,因为 “线程数量为负数” 是没意义的,而且usize是用来表示 “集合大小” 的类型(第 3 章 “整数类型” 里讲过)。

再执行cargo check,会看到新的错误:

$ cargo checkChecking hello v0.1.0 (file:///projects/hello)
error[E0599]: no method named `execute` found for struct `ThreadPool` in the current scope--> src/main.rs:17:14|
17 |         pool.execute(|| {|         -----^^^^^^^ method not found in `ThreadPool`For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error

现在的错误是ThreadPool没有execute方法。回想 “创建固定数量的线程” 那部分,我们希望execute的接口和thread::spawn类似。另外,execute需要接收一个闭包,把它交给池子里的空闲线程去执行。

我们给ThreadPool定义execute方法,让它接收一个闭包作为参数。第 13 章 “从闭包中移出捕获的值与 Fn trait” 里讲过,闭包作为参数时,有三种 trait 可以用:FnFnMutFnOnce。我们需要选哪种呢?

我们知道最终会用类似标准库thread::spawn的逻辑,所以可以先看看thread::spawn的签名对参数的约束。查文档会看到thread::spawn的签名是这样的:

pub fn spawn<F, T>(f: F) -> JoinHandle<T>whereF: FnOnce() -> T,F: Send + 'static,T: Send + 'static,

这里F是闭包的类型参数,T是闭包的返回值类型(我们暂时不关心T)。可以看到spawnF的约束是FnOnce—— 这也是我们想要的,因为每个请求的闭包只会执行一次,正好符合FnOnce的 “只能调用一次” 的特性。

另外,F还需要满足Send'static约束:Send能让闭包从一个线程传到另一个线程;'static是因为我们不知道线程会执行多久(闭包的生命周期要足够长)。所以我们给ThreadPoolexecute方法定义这样的泛型参数:

// 文件名:src/lib.rs
impl ThreadPool {// --snip--(省略new函数)// execute方法:接收一个闭包,交给线程池执行pub fn execute<F>(&self, f: F)where// 闭包约束:无参数、无返回值,可Send,生命周期'staticF: FnOnce() + Send + 'static,{// 暂时空实现}
}

注意,FnOnce()里的()表示闭包 “没有参数且返回值为单元类型(())”。和函数定义一样,返回值可以省略,但就算没有参数,括号也不能省。

再执行cargo check,会看到编译通过了:

$ cargo checkChecking hello v0.1.0 (file:///projects/hello)Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.24s

不过要注意,如果你现在执行cargo run,在浏览器里发请求,还是会看到之前的错误 —— 因为execute方法还没真正调用闭包呢!

提示:对于 Haskell、Rust 这类强类型编译型语言,有句说法是 “能编译通过,就基本能跑通”。但这句话不是绝对的 —— 我们的代码现在能编译,但啥也没干!如果是做真正的项目,这时候就该写单元测试了,既要保证代码能编译,也要保证行为符合预期。

思考一下:如果我们要执行的是 “future” 而不是闭包,这里的代码会有什么不同?

在 new 函数中验证线程数量

目前newexecute的参数都没用到。我们先完善new函数的逻辑。之前选usize作为size的类型,是因为线程数量不能为负,但size为 0 也是没意义的。所以我们要在new函数里加检查:如果size是 0,就让程序 panic。可以用assert!宏来做这件事,如代码清单 21-13 所示。

文件名:src/lib.rs

impl ThreadPool {/// 创建一个新的线程池。////// 参数`size`是线程池中的线程数量。////// # Panics////// 如果`size`为0,`new`函数会panic。pub fn new(size: usize) -> ThreadPool {// 断言size大于0,否则panicassert!(size > 0);ThreadPool}// --snip--(省略execute函数)
}

代码清单 21-13:实现 ThreadPool::new,若 size 为 0 则 panic

我们还加了文档注释(第 14 章讲过的规范),其中# Panics部分明确说明了函数会 panic 的场景 —— 这是良好的文档习惯。你可以执行cargo doc --open,点击ThreadPool结构体,看看生成的文档里new函数的说明是什么样的!

除了用assert!,我们也可以把new改成build,返回Result类型(就像第 12 章 I/O 项目里Config::build那样)。但在这个场景下,“创建一个没有线程的线程池” 是不可恢复的错误,所以用panic是合理的。如果你有兴趣,可以试试自己实现一个build函数,签名如下:

pub fn build(size: usize) -> Result<ThreadPool, PoolCreationError> {// 自己实现逻辑
}
为存储线程留出空间

既然已经确认size是有效的,接下来就要创建size个线程,把它们存到ThreadPool结构体里,然后返回ThreadPool实例。但 “存储线程” 具体是指存储什么呢?我们再看一下thread::spawn的签名:

pub fn spawn<F, T>(f: F) -> JoinHandle<T>whereF: FnOnce() -> T,F: Send + 'static,T: Send + 'static,

spawn函数返回的是JoinHandle<T>类型,其中T是闭包的返回值类型。我们的场景里,闭包处理连接后不返回任何值,所以T是单元类型(()),也就是JoinHandle<()>

那我们就试试用JoinHandle<()>来存储线程。代码清单 21-14 的代码能编译,但还没真正创建线程 —— 我们修改了ThreadPool的定义,让它持有一个Vec<thread::JoinHandle<()>>,然后在new函数里初始化这个向量(容量为size),最后返回包含这个向量的ThreadPool实例。

文件名:src/lib.rs

// 这段代码还没实现预期功能
use std::thread; // 引入thread模块,用于JoinHandlepub struct ThreadPool {// 存储线程的JoinHandlethreads: Vec<thread::JoinHandle<()>>,
}impl ThreadPool {// --snip--(省略文档注释和new函数的断言)pub fn new(size: usize) -> ThreadPool {assert!(size > 0);// 创建一个容量为size的向量(预分配空间,比Vec::new高效)let mut threads = Vec::with_capacity(size);for _ in 0..size {// 后面再写创建线程并存入向量的逻辑}// 返回包含线程向量的ThreadPool实例ThreadPool { threads }}// --snip--(省略execute函数)
}

代码清单 21-14:为 ThreadPool 创建存储线程的向量

我们在库 crate 里引入了std::thread,因为要用到thread::JoinHandle作为向量元素的类型。

确认size有效后,我们创建了一个容量为size的向量。Vec::with_capacityVec::new的作用类似,但它会预先分配size大小的空间 —— 因为我们明确知道要存size个元素,这样做比Vec::new(动态扩容)效率更高。

再执行cargo check,应该能编译通过。

用 Worker 结构体将任务从线程池传给线程

代码清单 21-14 的 for 循环里,我们还没写创建线程的逻辑。标准库的thread::spawn能创建线程,但它要求线程创建时就指定要执行的代码。而我们的需求是:先创建线程,让它们等待,之后再把要执行的代码传过去。标准库的线程没有这个功能,所以我们得自己实现。

我们可以在ThreadPool和线程之间加一个中间数据结构,用它来管理 “接收任务并让线程执行” 的逻辑 —— 这个结构叫Worker(工作线程),是线程池实现中的常用概念。Worker会 “领取” 要执行的代码,然后在自己的线程里运行。

可以把它想象成餐厅后厨的厨师:厨师们先等着,一旦有订单进来,就拿订单做菜。

之前ThreadPool里存的是JoinHandle<()>向量,现在改成存Worker实例向量。每个Worker会持有一个JoinHandle<()>。然后我们给Worker实现一个方法,让它接收要执行的闭包,把闭包传给已经运行的线程去执行。另外,给每个Worker加一个id,这样调试或打日志时,能区分不同的工作线程。

下面是创建ThreadPool的新流程(等Worker实现好后):

  1. 定义Worker结构体,包含idJoinHandle<()>
  2. 修改ThreadPool,让它持有Worker实例向量,而不是直接持有JoinHandle<()>
  3. 实现Worker::new函数:接收id,返回Worker实例(包含id和一个用空闭包创建的线程)。
  4. ThreadPool::new的 for 循环里,用循环计数器作为id,创建Worker实例,存入向量。

如果你想挑战一下,可以先自己试试实现这些修改,再看代码清单 21-15 的参考实现。

准备好了吗?代码清单 21-15 是一种实现方式:

文件名:src/lib.rs

use std::thread;pub struct ThreadPool {// 现在存的是Worker实例向量,不是直接存线程workers: Vec<Worker>,
}impl ThreadPool {// --snip--(省略文档注释和new函数的断言)pub fn new(size: usize) -> ThreadPool {assert!(size > 0);let mut workers = Vec::with_capacity(size);// 循环创建size个Worker,id从0开始for id in 0..size {workers.push(Worker::new(id));}ThreadPool { workers }}// --snip--(省略execute函数)
}// Worker结构体:私有,外部代码不用关心它的实现
struct Worker {id: usize, // 工作线程的ID,用于区分thread: thread::JoinHandle<()>, // 工作线程的JoinHandle
}impl Worker {// Worker::new:私有,只在ThreadPool内部使用fn new(id: usize) -> Worker {// 新建一个线程,暂时用空闭包(后面再改)let thread = thread::spawn(|| {});Worker { id, thread }}
}

代码清单 21-15:修改 ThreadPool,让它持有 Worker 实例而非直接持有线程

我们把ThreadPool的字段从threads改成了workers,因为现在存的是Worker实例。for 循环里用计数器作为id,创建Worker实例并存入向量。

Worker结构体和它的new函数都是私有的 —— 外部代码(比如src/main.rs里的服务器代码)不用知道ThreadPool内部用了Worker,这是 “封装” 的体现。Worker::new函数接收id,创建一个新线程(暂时用空闭包),然后返回包含id和线程JoinHandleWorker实例。

注意:如果操作系统因为资源不足无法创建线程,thread::spawn会 panic,导致整个服务器崩溃 —— 就算有些线程创建成功了也没用。为了简单,我们暂时接受这种行为,但在生产环境的线程池实现中,你可能会用std::thread::Builderspawn方法(它返回Result)来更优雅地处理这种错误。

这段代码能编译,也能按指定数量创建Worker实例,但execute方法还没处理传进来的闭包 —— 接下来就要解决这个问题。

通过通道给线程发送请求

现在的问题是:thread::spawn的闭包是空的,而我们要在execute方法里接收闭包并让线程执行。我们希望之前创建的Worker实例能从ThreadPool持有的队列里获取要执行的代码,然后传给自己的线程去运行。

第 16 章学的 “通道(channel)” 正好适合这个场景 —— 通道是线程间通信的简单方式。我们的计划是:

  1. ThreadPool创建一个通道,持有 “发送端(sender)”。
  2. 每个Worker持有 “接收端(receiver)”。
  3. 定义Job结构体,用来包装要通过通道发送的闭包。
  4. execute方法通过发送端把Job发送出去。
  5. Worker在自己的线程里循环:从接收端获取Job,执行Job里的闭包。

首先,我们在ThreadPool::new里创建通道,让ThreadPool持有发送端;同时定义Job结构体(暂时是空的,后面会用来包装闭包)。代码如代码清单 21-16 所示。

文件名:src/lib.rs

use std::{sync::mpsc, thread}; // 引入mpsc(多生产者单消费者)通道pub struct ThreadPool {workers: Vec<Worker>,sender: mpsc::Sender<Job>, // 通道的发送端,用于发送Job
}// Job结构体:暂时是空的,后面会用来包装闭包
struct Job;impl ThreadPool {// --snip--(省略文档注释)pub fn new(size: usize) -> ThreadPool {assert!(size > 0);// 创建一个通道:sender用于发送Job,receiver用于接收Joblet (sender, receiver) = mpsc::channel();let mut workers = Vec::with_capacity(size);for id in 0..size {workers.push(Worker::new(id));}ThreadPool { workers, sender }}// --snip--(省略execute函数)
}

代码清单 21-16:修改 ThreadPool,让它存储发送 Job 的通道发送端

ThreadPool::new里,我们创建了一个通道,让线程池持有发送端。这段代码能成功编译。

接下来,我们尝试把通道的接收端传给每个Worker—— 因为Worker的线程需要从接收端获取Job。代码清单 21-17 的代码暂时还不能编译,但我们先这么写,看看问题在哪。

文件名:src/lib.rs

// 这段代码暂时不能编译!
impl ThreadPool {// --snip--(省略前面的代码)pub fn new(size: usize) -> ThreadPool {assert!(size > 0);let (sender, receiver) = mpsc::channel();let mut workers = Vec::with_capacity(size);// 循环创建Worker时,把receiver传进去for id in 0..size {workers.push(Worker::new(id, receiver));}ThreadPool { workers, sender }}// --snip--
}// --snip--impl Worker {// 修改Worker::new,接收receiver作为参数fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {let thread = thread::spawn(|| {// 暂时只引用receiver,后面再处理Jobreceiver;});Worker { id, thread }}
}

代码清单 21-17:尝试把通道接收端传给每个 Worker

我们做了些简单修改:Worker::new增加了receiver参数,在闭包里引用receiverThreadPool::new创建Worker时,把receiver传进去。

执行cargo check,会看到这个错误:

$ cargo checkChecking hello v0.1.0 (file:///projects/hello)
error[E0382]: use of moved value: `receiver`--> src/lib.rs:26:42|
21 |         let (sender, receiver) = mpsc::channel();|                      -------- move occurs because `receiver` has type `std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
...
25 |         for id in 0..size {|         ----------------- inside of this loop
26 |             workers.push(Worker::new(id, receiver));|                                          ^^^^^^^^ value moved here, in previous iteration of loop|
note: consider changing this parameter type in method `new` to borrow instead if owning the value isn't necessary--> src/lib.rs:47:33|
47 |     fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {|        --- in this method       ^^^^^^^^^^^^^^^^^^^ this parameter takes ownership of the value
help: consider moving the expression out of the loop so it is only moved once|
25 ~         let mut value = Worker::new(id, receiver);
26 ~         for id in 0..size {
27 ~             workers.push(value);|

错误原因是:我们试图把同一个receiver传给多个Worker。但第 16 章讲过,Rust 的通道是 “多生产者,单消费者(MPSC)” 的 —— 也就是说,接收端不能克隆,没法传给多个Worker。而且我们也不希望 “一个消息发给多个消费者”,而是希望 “一个消息队列,多个Worker抢消息,每个消息只被处理一次”。

另外,从通道队列里取任务需要修改receiver,所以多个线程需要安全地共享和修改receiver—— 否则会出现竞态条件(第 16 章讲过)。

回想第 16 章的 “线程安全智能指针”:要在多个线程间共享所有权,并且允许线程修改值,需要用Arc<Mutex<T>>Arc能让多个Worker共享receiver的所有权,Mutex能保证同一时间只有一个Worker能从receiver获取任务。代码清单 21-18 是需要修改的地方。

文件名:src/lib.rs

use std::{sync::{Arc, Mutex, mpsc}, // 引入Arc和Mutexthread,
};
// --snip--impl ThreadPool {// --snip--pub fn new(size: usize) -> ThreadPool {assert!(size > 0);let (sender, receiver) = mpsc::channel();// 用Arc<Mutex>包装receiver,让多个Worker能共享并安全修改let receiver = Arc::new(Mutex::new(receiver));let mut workers = Vec::with_capacity(size);for id in 0..size {// 克隆Arc,增加引用计数,让Worker持有共享的receiverworkers.push(Worker::new(id, Arc::clone(&receiver)));}ThreadPool { workers, sender }}// --snip--
}// --snip--impl Worker {// 修改Worker::new的参数类型,接收Arc<Mutex<Receiver<Job>>>fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {// --snip--}
}

代码清单 21-18:用 Arc 和 Mutex 让多个 Worker 共享通道接收端

ThreadPool::new里,我们把receiver包在ArcMutex里。创建每个Worker时,我们克隆Arc(增加引用计数),这样Worker实例就能共享receiver的所有权。

改完之后,代码就能编译了!我们离目标越来越近了!

实现 execute 方法

最后,我们来实现ThreadPoolexecute方法。首先,把Job从结构体改成 “类型别名”—— 让它代表execute接收的闭包类型(用 trait 对象)。第 20 章 “用类型别名简化类型” 里讲过,类型别名能让长类型名更简洁。代码如代码清单 21-19 所示。

文件名:src/lib.rs

// --snip--// Job是类型别名,代表“包装了闭包的Box”
type Job = Box<dyn FnOnce() + Send + 'static>;impl ThreadPool {// --snip--pub fn execute<F>(&self, f: F)whereF: FnOnce() + Send + 'static,{// 把闭包包装成Job(Box<dyn FnOnce() + Send + 'static>)let job = Box::new(f);// 通过通道发送Job;若发送失败则panicself.sender.send(job).unwrap();}
}// --snip--

代码清单 21-19:定义 Job 类型别名,包装闭包并通过通道发送

我们先把execute接收的闭包f包装成Job实例(用Box::new),然后通过通道的发送端把Job发出去。send方法可能会失败(比如所有线程都停止执行了,接收端不再接收消息),这里我们用unwrap处理错误 —— 目前我们的线程会一直运行,直到线程池销毁,所以发送失败的情况暂时不会出现,但编译器不知道,所以需要unwrap

不过还没完!Workerthread::spawn闭包里还只是引用了receiver,没有处理任务。我们需要让这个闭包 “无限循环”:不断从通道接收端获取Job,拿到后就执行。代码清单 21-20 修改了Worker::new的逻辑。

文件名:src/lib.rs

// --snip--impl Worker {fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {let thread = thread::spawn(move || {// 无限循环,不断获取并执行Jobloop {// 1. 加锁获取MutexGuard,2. 解包Result(锁是否获取成功),3. 接收Job,4. 解包Result(是否收到Job)let job = receiver.lock().unwrap().recv().unwrap();// 打印日志:哪个Worker拿到了任务println!("Worker {id} got a job; executing.");// 执行任务(闭包)job();}});Worker { id, thread }}
}

代码清单 21-20:Worker 线程循环接收并执行 Job

这里的逻辑分几步:

  1. 调用receiver.lock()获取互斥锁(返回MutexGuard),unwrap处理 “锁获取失败” 的情况(比如其他线程持有锁时 panic,导致锁中毒)—— 这种情况下让当前线程 panic 是合理的,你也可以改成expect,加一段有意义的错误提示。
  2. 调用recv()从通道接收Job—— 这个方法会阻塞,直到有Job过来。
  3. unwrap处理 “接收失败” 的情况(比如发送端被销毁)。
  4. 打印日志,说明哪个Worker拿到了任务,然后执行job()(即执行闭包)。

Mutex<T>保证了同一时间只有一个Worker能调用recv()获取任务 —— 不会出现多个线程抢任务的情况。

现在我们的线程池终于能工作了!执行cargo run,然后发几个请求,终端会输出类似这样的内容:

$ cargo runCompiling hello v0.1.0 (file:///projects/hello)
warning: field `workers` is never read--> src/lib.rs:7:5|
6 | pub struct ThreadPool {|            ---------- field in this struct
7 |     workers: Vec<Worker>,|     ^^^^^^^|= note: `#[warn(dead_code)]` on by defaultwarning: fields `id` and `thread` are never read--> src/lib.rs:48:5|
47 | struct Worker {|        ------ fields in this struct
48 |     id: usize,|     ^^
49 |     thread: thread::JoinHandle<()>,|     ^^^^^^warning: `hello` (lib) generated 2 warningsFinished `dev` profile [unoptimized + debuginfo] target(s) in 4.91sRunning `target/debug/hello`
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.

成功了!现在我们的线程池能异步执行连接处理任务了。线程数量固定为 4 个,就算服务器收到很多请求,也不会耗尽系统资源。如果请求/sleep,服务器还能通过其他线程处理其他请求 —— 不会阻塞。

注意:如果你同时在多个浏览器窗口打开/sleep,可能会看到它们每隔 5 秒依次加载。这是因为有些浏览器为了缓存,会把相同的请求串行执行 —— 不是我们服务器的问题。

思考一下:在学了第 17、18 章的while let循环后,你可能会想:为什么不用while letWorker的线程逻辑?比如代码清单 21-21 这样。

文件名:src/lib.rs

// 这段代码不能实现预期功能
// --snip--impl Worker {fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {let thread = thread::spawn(move || {// 用while let代替loop + matchwhile let Ok(job) = receiver.lock().unwrap().recv() {println!("Worker {id} got a job; executing.");job();}});Worker { id, thread }}
}

代码清单 21-21:用 while let 实现 Worker::new 的替代方案

这段代码能编译运行,但达不到预期的多线程效果 —— 慢请求还是会阻塞其他请求。原因有点微妙:Mutex结构体没有公开的unlock方法,因为锁的所有权是通过MutexGuard<T>的生命周期来管理的(lock方法返回LockResult<MutexGuard<T>>)。编译时,借用检查器会确保 “只有持有锁的线程才能访问被保护的资源”。但这种实现方式可能会导致 “锁持有时间过长”—— 如果我们没注意MutexGuard<T>的生命周期。

代码清单 21-20 里的let job = receiver.lock().unwrap().recv().unwrap();之所以能工作,是因为let语句右侧表达式中的临时值(包括MutexGuard)会在let语句结束后立即被销毁。而while let(以及if letmatch)会把临时值的生命周期延长到整个代码块结束。在代码清单 21-21 里,锁会在 `job()执行期间一直被持有,这意味着其他Worker` 实例无法获取锁来接收新任务 —— 相当于又回到了单线程的阻塞状态。

优雅关闭与清理

代码清单 21-20 的实现已经能异步处理请求了,但编译器会提示一些 “未使用字段” 的警告(workersidthread),这提醒我们还没做 “清理工作”。目前如果用ctrl+c强制终止主线程,其他线程也会被立即停止,哪怕它们还在处理请求。

接下来,我们要实现Drop trait,让线程池销毁时调用每个线程的join方法,确保线程能完成当前处理的请求后再关闭。然后还要实现 “通知线程停止接收新任务” 的逻辑,最后修改服务器,让它处理两个请求后优雅关闭线程池。

需要注意的是:这部分代码不影响 “闭包执行” 的逻辑,就算我们用线程池来运行异步任务(futures),这里的关闭逻辑也基本一样。

为 ThreadPool 实现 Drop trait

首先,我们给ThreadPool实现Drop trait,在销毁线程池时 join 每个线程。代码清单 21-22 是第一次尝试,但这段代码暂时不能编译。

文件名:src/lib.rs

// 这段代码暂时不能编译!
impl Drop for ThreadPool {fn drop(&mut self) {// 遍历每个workerfor worker in &mut self.workers {println!("Shutting down worker {}", worker.id);// 尝试join worker的线程worker.thread.join().unwrap();}}
}

代码清单 21-22:线程池销毁时 join 每个线程

我们遍历线程池里的每个worker,打印 “关闭 worker” 的提示,然后调用worker.thread.join()等待线程结束。如果join失败,就用unwrap让程序 panic(进入非优雅关闭)。

编译这段代码会遇到如下错误:

$ cargo checkChecking hello v0.1.0 (file:///projects/hello)
error[E0507]: cannot move out of `worker.thread` which is behind a mutable reference--> src/lib.rs:52:13|
52 |             worker.thread.join().unwrap();|             ^^^^^^^^^^^^^ ------ `worker.thread` moved due to this method call|             ||             move occurs because `worker.thread` has type `JoinHandle<()>`, which does not implement the `Copy` trait|
note: `JoinHandle::<T>::join` takes ownership of the receiver `self`, which moves `worker.thread`--> /rustc/4eb161250e340c8f48f66e2b929ef4a5bed7c181/library/std/src/thread/mod.rs:1876:17

错误原因是:join方法需要获取JoinHandle的所有权,但我们只有worker的可变引用(&mut self.workers),无法转移所有权。要解决这个问题,一种方案是把Workerthread字段改成Option<JoinHandle<()>>,用take()方法把Some里的JoinHandle取出来(留下None),从而获取所有权(类似第 18 章的处理方式)。

但还有更简洁的方案:用Vec::drain(..)方法。drain会移除向量中的所有元素,并返回一个迭代器,迭代器中的元素是所有权转移后的类型(而不是引用)。所以我们可以修改Drop实现:

// 文件名:src/lib.rs
impl Drop for ThreadPool {fn drop(&mut self) {// 用drain获取所有worker的所有权(同时清空workers向量)for worker in self.workers.drain(..) {println!("Shutting down worker {}", worker.id);// 现在可以安全调用join,因为worker是所有权转移后的实例worker.thread.join().unwrap();}}
}

这样修改后,编译器不会再报错,也不需要改动其他代码。

通知线程停止接收新任务

虽然现在代码能编译且没有警告,但功能上还有问题:Worker线程里的loop是无限循环,会一直调用recv()等待任务 —— 就算我们调用join,主线程也会一直阻塞,永远等不到线程结束。

要解决这个问题,需要两步修改:一是在ThreadPoolDrop实现中 “关闭通道发送端”,二是让Worker线程在recv()返回错误时退出循环。

首先,修改ThreadPoolsender字段为Option<mpsc::Sender<Job>>,这样在Drop时可以用take()取出sender并销毁,从而关闭通道。代码清单 21-23 展示了这些修改。

文件名:src/lib.rs

// 这段代码还未实现完整功能
pub struct ThreadPool {workers: Vec<Worker>,sender: Option<mpsc::Sender<Job>>, // 改成Option
}
// --snip--
impl ThreadPool {pub fn new(size: usize) -> ThreadPool {// --snip--(省略之前的逻辑)ThreadPool {workers,sender: Some(sender), // 用Some包装sender}}pub fn execute<F>(&self, f: F)whereF: FnOnce() + Send + 'static,{let job = Box::new(f);// 用as_ref()获取Option中的引用,再unwrapself.sender.as_ref().unwrap().send(job).unwrap();}
}impl Drop for ThreadPool {fn drop(&mut self) {// 取出sender并销毁,关闭通道(此时没有发送端了)drop(self.sender.take());// 遍历worker,join线程for worker in self.workers.drain(..) {println!("Shutting down worker {}", worker.id);worker.thread.join().unwrap();}}
}

代码清单 21-23:显式销毁 sender,关闭通道后再 join 线程

销毁sender会关闭通道,此时所有Worker线程调用的recv()都会返回Err(因为没有发送端了)。接下来,我们修改Worker的循环逻辑,在recv()返回错误时退出循环 —— 这样线程就能正常结束,join也能成功返回。代码清单 21-24 展示了修改后的Worker::new

文件名:src/lib.rs

impl Worker {fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {let thread = thread::spawn(move || {loop {// 先接收消息,用match处理成功/失败let message = receiver.lock().unwrap().recv();match message {Ok(job) => {println!("Worker {id} got a job; executing.");job();}Err(_) => {// 接收失败(通道关闭),打印日志并退出循环println!("Worker {id} disconnected; shutting down.");break;}}}});Worker { id, thread }}
}

代码清单 21-24:recv () 返回错误时退出循环,让线程正常关闭

现在,当通道关闭后,recv()会返回ErrWorker线程会打印 “断开连接” 的日志,然后break退出循环 —— 线程执行完后,join就能顺利完成。

为了演示优雅关闭的效果,我们修改main函数,让服务器只处理两个请求后就关闭。代码清单 21-25 展示了修改后的main

文件名:src/main.rs

fn main() {let listener = TcpListener::bind("127.0.0.1:7878").unwrap();let pool = ThreadPool::new(4);// 用take(2)限制只处理前两个连接for stream in listener.incoming().take(2) {let stream = stream.unwrap();pool.execute(|| {handle_connection(stream);});}println!("Shutting down.");// pool在这里超出作用域,自动调用Drop,优雅关闭
}

代码清单 21-25:处理两个请求后关闭服务器,演示优雅关闭

实际的 Web 服务器不会只处理两个请求,但这段代码能清晰展示 “优雅关闭” 的逻辑:take(2)限制迭代器只取前两个连接,处理完后pool超出作用域,调用Drop关闭通道、join 所有线程。

启动服务器(cargo run),发三个请求 —— 第三个请求会失败,终端输出类似这样:

$ cargo runCompiling hello v0.1.0 (file:///projects/hello)Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.41sRunning `target/debug/hello`
Worker 0 got a job; executing.
Shutting down.
Shutting down worker 0
Worker 3 got a job; executing.
Worker 1 disconnected; shutting down.
Worker 2 disconnected; shutting down.
Worker 3 disconnected; shutting down.
Worker 0 disconnected; shutting down.
Shutting down worker 1
Shutting down worker 2
Shutting down worker 3

输出顺序可能不同,但能看到关键逻辑:前两个请求被Worker 0Worker 3处理;服务器打印 “Shutting down.” 后,pool触发Drop,关闭通道;所有Worker收到Err后打印 “断开连接” 并退出循环;最后主线程join所有线程,完成关闭。

恭喜!我们的项目终于完成了!现在我们有了一个基于线程池的多线程 Web 服务器,能异步处理请求,还支持优雅关闭 —— 关闭时会等待所有线程完成当前任务。

完整代码参考

文件名:src/main.rs

use hello::ThreadPool;
use std::{fs,io::{BufReader, prelude::*},net::{TcpListener, TcpStream},thread,time::Duration,
};fn main() {let listener = TcpListener::bind("127.0.0.1:7878").unwrap();let pool = ThreadPool::new(4);// 只处理前两个请求后关闭for stream in listener.incoming().take(2) {let stream = stream.unwrap();pool.execute(|| {handle_connection(stream);});}println!("Shutting down.");
}fn handle_connection(mut stream: TcpStream) {let buf_reader = BufReader::new(&stream);// 读取请求行(第一行)let request_line = buf_reader.lines().next().unwrap().unwrap();// 根据请求行选择状态行和文件名let (status_line, filename) = match &request_line[..] {"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),"GET /sleep HTTP/1.1" => {// 休眠5秒,模拟慢请求thread::sleep(Duration::from_secs(5));("HTTP/1.1 200 OK", "hello.html")}_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),};// 读取文件内容,拼接响应let contents = fs::read_to_string(filename).unwrap();let length = contents.len();let response = format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");// 发送响应stream.write_all(response.as_bytes()).unwrap();
}

文件名:src/lib.rs

use std::{sync::{Arc, Mutex, mpsc},thread,
};// 线程池结构体:持有工作线程和通道发送端
pub struct ThreadPool {workers: Vec<Worker>,sender: Option<mpsc::Sender<Job>>,
}// Job类型别名:包装要执行的闭包
type Job = Box<dyn FnOnce() + Send + 'static>;impl ThreadPool {/// 创建一个新的线程池。////// 参数`size`是线程池中的线程数量。////// # Panics////// 如果`size`为0,`new`函数会panic。pub fn new(size: usize) -> ThreadPool {assert!(size > 0);// 创建通道:发送端给线程池,接收端给所有工作线程let (sender, receiver) = mpsc::channel();// 用Arc+Mutex包装接收端,让多线程安全共享let receiver = Arc::new(Mutex::new(receiver));// 创建工作线程向量let mut workers = Vec::with_capacity(size);for id in 0..size {workers.push(Worker::new(id, Arc::clone(&receiver)));}ThreadPool {workers,sender: Some(sender),}}/// 提交一个任务到线程池执行。////// 参数`f`是要执行的闭包,需满足:无参数、无返回值、可Send、生命周期'static。pub fn execute<F>(&self, f: F)whereF: FnOnce() + Send + 'static,{let job = Box::new(f);// 发送任务到通道,若失败则panicself.sender.as_ref().unwrap().send(job).unwrap();}
}// 线程池的Drop实现:优雅关闭
impl Drop for ThreadPool {fn drop(&mut self) {// 销毁发送端,关闭通道(通知工作线程停止)drop(self.sender.take());// 等待所有工作线程完成for worker in &mut self.workers {println!("Shutting down worker {}", worker.id);// 取出线程并join,若失败则panicif let Some(thread) = worker.thread.take() {thread.join().unwrap();}}}
}// 工作线程结构体:持有ID和线程句柄
struct Worker {id: usize,thread: Option<thread::JoinHandle<()>>,
}impl Worker {/// 创建一个新的工作线程。////// 参数`id`是工作线程的唯一标识,`receiver`是共享的通道接收端。fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {let thread = thread::spawn(move || {loop {// 接收任务:加锁 -> 接收消息 -> 处理结果let message = receiver.lock().unwrap().recv();match message {Ok(job) => {println!("Worker {id} got a job; executing.");job(); // 执行任务}Err(_) => {// 通道关闭,退出循环println!("Worker {id} disconnected; shutting down.");break;}}}});Worker {id,thread: Some(thread),}}
}

后续优化方向

如果你想继续完善这个项目,可以试试以下方向:

  1. ThreadPool和它的公有方法补充更详细的文档注释。
  2. 为库的功能编写单元测试(比如测试线程池是否能正确执行任务、是否能优雅关闭)。
  3. unwrap换成更健壮的错误处理(比如返回Result,自定义错误类型)。
  4. 用这个线程池处理其他任务(不只是 Web 请求)。
  5. 在 crates.io 上找一个线程池 crate(比如threadpool),用它重写这个 Web 服务器,对比它和我们实现的线程池在 API 设计、健壮性上的差异。

总结

太棒了!你已经读完了整本书!感谢你和我们一起探索 Rust 的世界。现在你已经具备了实现自己的 Rust 项目、参与开源项目的能力。记住,Rust 社区非常友好,如果你在学习过程中遇到问题,很多开发者都愿意提供帮助。

祝你在 Rust 的旅程中一切顺利!

http://www.dtcms.com/a/415224.html

相关文章:

  • 作品 上海高端网站设计wordpress logo 编辑
  • day9.27
  • 做动画人设有哪些网站可以借鉴谷歌chrome浏览器下载
  • c++ 之多态虚函数表
  • 全屏网站 图片优化网站主机免费
  • 谷歌广告联盟网站同一个网站绑定多个域名
  • Java 大视界 -- Java 大数据机器学习模型在金融产品创新与客户需求匹配中的实战应用(417)
  • 美团网站是用什么做的网站开发企业开发
  • C语言风格哈希表vs C++风格哈希表的区别
  • 做数据分析网站做网站与数据库的关系
  • 六节tslib移植 、Qt移植到嵌入式linux
  • 做动漫图片的网站seo推广费用
  • 设计模式与原则精要
  • asp网站怎么做301定向系统商店
  • 大连html5网站建设价格泉州快速建站模板
  • LeetCode:64.搜索二维矩阵
  • 特殊矩阵的压缩存储
  • Qwen3-Omni多模态prompt输入解析
  • CVPR-2025 | 具身导航指令高效生成!MAPInstructor:基于场景图的导航指令生成Prompt调整策略
  • PRP (Product Requirement Prompts) - AI辅助开发提示词库
  • 昆明网站seo多少钱金舵设计园在线设计平台
  • AI识图 + MinIO图床 + 钉钉推送:打造全自动水质监测系统
  • EIGRP
  • 旅游电子商务网站开发方案网站运营数据周报表怎么做
  • 计算机视觉:人脸关键点定位与轮廓绘制
  • 手机网站建设基本流程专业的集团网站开发开发
  • Spring AI Alibaba:Java生态下的智能体开发全栈解决方案
  • 这么做网站网站三合一
  • Kurt-Blender零基础教程:第3章:材质篇——第3节:给模型上材质
  • Unity-导航寻路系统