这个项目来自 Stanford CS 110L: Safety in Systems Programming.
课程地址: https://reberhardt.com/cs110l/spring-2020/
项目地址:https://reberhardt.com/cs110l/spring-2020/assignments/project-2/
我的实现:https://github.com/csBenClarkson/balancebeam
前言
在这个项目中,我们要用 Rust 实现一个网络负载均衡器。在网络请求数量增大时,我们需要对服务器扩容,但单机性能迟早会成为瓶颈,这时我们就需要引入更多的服务器,并把网络请求分发到这些服务器上了。
分发的方法有很多,最省事的就是为每个服务器分配一个公网 IP 和域名,让用户自己选择分流服务器;还可以利用 IP 提供的任播特性让用户作 DNS 解析时就近选择分流服务器;最后,还可以用一台专门的服务器接收所有请求,并分流到各个服务器,这个服务器就叫做负载均衡器(Load Balancer),这个方法不需要为每个后台服务器分配一个公网地址。
显而易见的是,这个负载均衡器必须高效地转发请求。在这个项目中,我们将分别使用多线程和异步 IO 来实现它,并比较它们的性能。另外,当后台服务器宕机时,负载均衡器需要检测到,把请求转发到其它的服务器上。
与这门课上一个项目一样,课程把实验分为了几个里程碑,并为我们提供了 start code,写好了转发 HTTP 请求的部分。我们只需要关心转发的实现。
关于测试
课程为我们提供了测试代码,我们可以直接运行
cargo test --test 01_single_upstream_tests
cargo test --test 02_multiple_upstream_tests test_load_distribution来执行测试。Rust 的测试框架十分容易上手,可以在实现的过程中编写自己的测试来帮助开发。
多线程实现
这种 IO-bounded 的任务很容易让人联想到多线程实现,每一个线程服务于一个客户端,当一个线程因处理网络 IO 而阻塞时,其它线程可以继续被调度运行。理想的情况是为每个客户端连接开启一个线程,而线程的数量刚好等于服务器的 CPU 数量,这样能加大每个线程正好被调度到每个 CPU 上的几率。但在现实中这显然不可能,客户端的数量会远远大于服务器的 CPU 数量。这时我们有两个解决方案,一是开启更多的线程,这会增大线程运行的 overhead 导致性能受限;二是使用线程池只开启 CPU 数量的线程,当线程池中有空闲线程时拿出来接受客户端的请求,如果线程池满了客户端需要等待。
这里我们使用线程池来实现。在 main 函数中,我们需要维护一个 ProxyState 来记录上游服务器等信息,这些信息应该被各个线程共享,因此我们需要创建对 ProxyState 的多个只读引用并传入线程执行的函数中。这引出了 Rust 中 lifetime 的概念。Rust 会在一个变量不再使用时自动调用 drop 函数释放资源,这时所有对该变量的引用都会失效,否则会导致类似悬空指针的问题。在我们这个情况中,我们需要把对 ProxyState 的引用传入线程中。
// This does not work!
// 创建 ProxyState
let state = ProxyState {
upstream_addresses: options.upstream,
active_health_check_interval: options.active_health_check_interval,
active_health_check_path: options.active_health_check_path,
max_requests_per_minute: options.max_requests_per_minute,
};
let pool = ThreadPool::new(num_cpus::get()); // 根据 CPU 数量创建线程池
for stream in listener.incoming() {
if let Ok(stream) = stream {
// Handle the connection!
let state_ref = &state;
// 给线程池加入任务
pool.execute(move || { handle_connection(stream, state_ref); });
}
}
// join 所有线程
pool.join();但是注意到变量 state 在开启完所有线程后就没有再被使用而应该被释放掉,那么在这些线程运行的时候,对 state 的引用就会失效。因此编译这段代码时,Rust 编译器会给出 "state does not live long enough" 的错误。
为了解决这个问题,我们需要使用 Rc<T> (Reference count)。这将创建对 T 的引用并进行引用计数,只有当所有引用被 drop 时,被引用的变量才会被释放。而对于多线程,我们需要使用 Arc<T> (Atomic reference count),Rc<T> 的 thread safe 版本来保证引用计数增减的原子性。
我们使用 Arc::new 来对一个对象创建引用,返回一个引用 ,使用时会自动解引用。对这个引用调用 clone 函数可以创建一个新的引用并使引用计数加一。这些引用便可以安全地传进多线程的任务函数中了。下面是修改后的代码。
let state = ProxyState { ... };
let state_ref = Arc::new(state); // 对 state 创建一个引用
let pool = ThreadPool::new(num_cpus::get());
for stream in listener.incoming() {
if let Ok(stream) = stream {
// Handle the connection!
let state = state_ref.clone(); // clone 引用给这个线程使用
pool.execute(move || { handle_connection(stream, &state); });
}
}
pool.join();注意传入函数 handle_connection 时需要给引用 state 带上 &。pool.execute 函数参数闭包中的 move 关键字表示把闭包捕获的变量以移动语义传入函数中,即转移所有权。
异步 IO 实现
异步 IO 涉及非阻塞 IO(Non-blocking IO) 和 Future 概念。无论 IO 是否已经准备好,非阻塞 IO 会立即返回。一般来说,非阻塞 IO 会返回一个错误值来表示 IO 还没有准备好,比如在 Linux 中为文件描述符设置 O_NONBLOCK 时,系统调用 read() 会在 IO 未就绪时直接返回 EAGAIN。那么我们就可以利用等待 IO 的时间做一些其它计算,等到 IO 返回结果后,再执行依赖于这个结果的计算。
// 前一阶段的计算
while (read(fd, buf, sizeof(buf)) == EAGAIN) {
// 做与这个 IO 的结果不相关的计算
do_something();
}
// IO 就绪,跳出 while 循环
// 进入下一阶段的计算Future 是什么
那么当我们需要等待多个耗时的 IO 操作时,我们是不是可以把它们组合起来,等任意一个 IO 返回结果的时候再去处理它的数据呢?这正好符合我们的应用场景:服务器需要响应大量客户端的连接,且涉及到耗时的网络 IO 操作。这样我们就可以把大量的客户端连接压缩到一个线程中执行,不仅不会受限于 CPU 数量,还可以省去了线程切换的 overhead。而我们只需要为每个 IO 维护一个状态,就可以实现单线程的 IO 并行。
可见管理这些 IO 操作和它们的结果的方式是通用的,为了不反复造轮子,Future 的概念出现了。在 Rust 中,Future 是一个 trait,需要实现一个 poll 函数来定义需要进行的操作。
fn poll(&mut self, cx: &mut Context) -> Poll<Self::Output>; 其中 Poll 是一个 Enum,表示这个 Future 的状态。
enum Poll<T> {
Ready(T),
Pending,
}当这个 Future 管理的操作完成后,poll 函数就会带着结果 T 返回 Ready 了。
当我们要进行一系列 IO 和计算时,我们就可以把这些 Future 组合起来,构成一个由依赖关系决定的有向图。当一个 Future 就绪时,所有依赖于这个结果的 Future 就可以启动,等待完成后又启动其它的 Future。那么我们就需要一个角色来启动和管理这些 Future 了,这里我们使用 Rust 中的 tokio 库。
使用 tokio 库实现 Non-blocking IO
接下来我们就不需要自己实现 Future trait 了,tokio 会让我们使用下面这套 async/await 语法处理好非阻塞 IO 的实现。比如定义一个异步函数,函数内对文件读取、处理后发送到网络上,我们可以这样写
async fn read_and_send(filename: String) {
let f = read_file(filename).await?; // non-blocking IO
do_some_calculation(&mut f); // blocking calculation
let err = send_to_network(f, net).await?; // non-blocking IO
}tokio 会把这个函数变成两个 Future,第一个对应读取文件的操作,第二个对应发送至网络的操作。执行这个函数时,第一个 Future 启动,让出 CPU,等到 IO 返回结果时第一个 Future 就功成身退,开始占用 CPU 做计算,等到计算完成后第二个 Future 启动,调用网络 IO 并让出 CPU,等到网络 IO 返回结果后 CPU 再来处理。
stage 1 ---> --- 任务开始
stage 2 ---> Future #1 --- 调用文件 IO
stage 3 ---> --- 文件 IO 返回
stage 4 ---> Calculation #1 --- 启动计算
stage 5 ---> --- 计算完成
stage 6 ---> Future #2 --- 调用网络 IO
stage 7 ---> --- 网络 IO 返回上图描述了这个函数在调用后的过程,tokio 会自动管理好这几个阶段。这样的一个执行流被 tokio 称为一个 task。如果我们在一个线程上为上面的操作创建多个 task,那么在一个 task 处于等待 IO 的阶段时,其它 task(可能处于不同的阶段)也可以继续执行,实现了 IO 并行。下面的内容引自 tokio 文档。
Tasks are light weight. Because tasks are scheduled by the Tokio runtime rather than the operating system, creating new tasks or switching between tasks does not require a context switch and has fairly low overhead. Creating, running, and destroying large numbers of tasks is quite cheap, especially compared to OS threads.
创建一个 task 的语法与创建线程的语法很像。
task::spawn(async {
// perform some work here...
});参数里的闭包函数必须是以 async 声明的函数。
既然 task 十分轻量化,我们可以为每个客户端连接创建一个 task 来处理它的 HTTP 请求,即调用 handler_connection 函数。那么 handler_connection 本身以及其调用的 IO 就需要使用非阻塞的版本了。
接下来我们需要把 handle_connection 里面的所有与 IO 有关的函数改为非阻塞的版本。tokio 库为标准库中的 IO 函数实现了非阻塞的版本,函数签名绝大部分保持一致,因此基本上我们只需要在原函数后面加上 .await 并在函数声明前加上 async 关键字来告诉 tokio 把这个函数编译为异步函数即可。
let state = ProxyState { ... };
let state_ref = Arc::new(state);
loop {
if let Ok((stream, _)) = listener.accept().await {
// Handle the connection!
let state = state_ref.clone();
tokio::task::spawn(async move { handle_connection(stream, &state).await; });
}
}


































































































































