首页 > 其他分享 >用Rust实现一个多线程的web server

用Rust实现一个多线程的web server

时间:2022-11-07 10:36:37浏览次数:41  
标签:web thread Worker server ThreadPool let new 多线程 id


在本文之前,我们用Rust实现一个单线程的web server的例子,但是单线程的web server不够高效,所以本篇文章就来实现一个多线程的例子。

单线程web server存在的问题

请求只能串行处理,也就是说当第一个连结处理完之前不会处理第二个连结。考虑如下例子:

use std::net::{TcpListener, TcpStream};
use std::io::{Read, Write};
use std::fs;
use std::{thread, time};

fn handle_client(mut stream: TcpStream) {
let mut buffer = [0; 512];
stream.read(&mut buffer).unwrap();
let get = b"GET / HTTP/1.1\r\n";
let (status_line, filename) = if buffer.starts_with(get) {
("HTTP/1.1 200 OK\r\n\r\n", "main.html")
} else {
("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
};

let contents = fs::read_to_string(filename).unwrap();
let response = format!("{}{}", status_line, contents);

stream.write(response.as_bytes()).unwrap();
stream.flush().unwrap();

let ten_millis = time::Duration::from_millis(10000);
thread::sleep(ten_millis); //睡眠一段时间,模拟处理时间很长
}

fn main() -> std::io::Result<()> {
let listener = TcpListener::bind("127.0.0.1:8080")? ;

for stream in listener.incoming() {
handle_client(stream?);
}
Ok(())
}

在浏览器中打开两个窗口,分别输入127.0.0.1:8080,会发现在第一个处理完之前,第二个不会响应。

使用多线程来解决问题

  • 解决方式

修改main函数代码:

fn main() -> std::io::Result<()> {
let listener = TcpListener::bind("127.0.0.1:8080")?;
let mut thread_vec: Vec<thread::JoinHandle<()>> = Vec::new();

for stream in listener.incoming() {
// handle_client(stream?);
let stream = stream.unwrap();
let handle = thread::spawn(|| {
handle_client(stream);
});
thread_vec.push(handle);
}

for handle in thread_vec {
handle.join().unwrap();
}

Ok(())
}

从浏览器打开两个标签,进行测试,可以发现第一个没有处理完之前,第二个请求已经开始处理。

  • 存在问题

当存在海量请求时,系统也会跟着创建海量的线程,最终造成系统崩溃。

使用线程池来解决问题

  • 线程池

 

用Rust实现一个多线程的web server_rust程序设计

 

  • 知识点

多线程、管道。

从主线程将任务发送到管道,工作线程等待在管道的接收端,当收到任务时,进行处理。

线程池方式实现

1、初步设计

  • 定义ThreadPool结构
use std::thread;
pub struct ThreadPool {
thread: Vec<thread::JoinHandle<()>>,
}
  • 定义ThreadPool的方法
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
//--snip--
}

pub fn execute()
//pub fn execute<F>(&self, f: F)
// where
// F: FnOnce() + Send + 'static
{
//--snip--
}
}
  • 下面我们考虑new函数,可能的实现是这样
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let mut threads = Vec::with_capacity(size);
for _ in 0..size {
//创建线程:
//问题来了,创建线程的时候需要传入闭包,也就是具体做的动作,
//可是这个时候我们还没有具体的任务,怎么办?
}

ThreadPool {
threads
}
}
  • execute函数
//设计execute的函数,可以参考thread::spawn
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static
{

}

初步设计的问题总结:

主要是在创建线程池的new函数中,需要传入具体的任务,可是此时还没有具体的任务,如何解决?

2、解决线程创建的问题

  • 重新定义ThreadPool结构体
pub struct ThreadPool {
workers: Vec<Worker>,
}
  • ThreadPool的new方法
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);

let mut workers = Vec::with_capacity(size);

for id in 0..size {
workers.push(Worker::new(id));
}

ThreadPool {
workers
}
}
  • 在worker中创建线程
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}

impl Worker {
fn new(id: usize) -> Worker {
let thread = thread::spawn(|| {});

Worker {
id,
thread,
}
}
}

3、发送任务

  • 进一步将ThreadPool结构设计为
use std::sync::mpsc;

pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}

struct Job;
  • 完善new方法
impl ThreadPool {
// --snip--
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);

let (sender, receiver) = mpsc::channel();//add
let mut workers = Vec::with_capacity(size);

for id in 0..size {
//workers.push(Worker::new(id));
workers.push(Worker::new(id, receiver));
}

ThreadPool {
workers,
sender,//add
}
}
// --snip--
}

//--snip--

impl Worker {
fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
let thread = thread::spawn(|| {
receiver;
});

Worker {
id,
thread,
}
}
}

此段代码错误,因为receiver要在线程间传递,但是是非线程安全的。因此应该使用Arc<Mutex<T>>。重新撰写new方法如下:

use std::sync::Arc;
use std::sync::Mutex;
// --snip--

impl ThreadPool {
// --snip--
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);

let (sender, receiver) = mpsc::channel();

let receiver = Arc::new(Mutex::new(receiver));//add

let mut workers = Vec::with_capacity(size);

for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}

ThreadPool {
workers,
sender,
}
}

// --snip--
}

impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = receiver.lock().unwrap().recv().unwrap();

println!("Worker {} got a job; executing.", id);

job();
}
});

Worker {
id,
thread,
}
}
}
  • 实现execute方法
type Job = Box<dyn FnOnce() + Send + 'static>;//修改Job为trait对象的类别名称

impl ThreadPool {
// --snip--

pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static
{
let job = Box::new(f);

self.sender.send(job).unwrap();
}
}

完整代码

src/main.rs

use std::fs;
use std::io::{Read, Write};
use std::net::{TcpListener, TcpStream};
use std::{thread, time};
use mylib::ThreadPool;

fn handle_client(mut stream: TcpStream) {
let mut buffer = [0; 512];
stream.read(&mut buffer).unwrap();
let get = b"GET / HTTP/1.1\r\n";
let (status_line, filename) = if buffer.starts_with(get) {
("HTTP/1.1 200 OK\r\n\r\n", "main.html")
} else {
("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
};

let contents = fs::read_to_string(filename).unwrap();
let response = format!("{}{}", status_line, contents);

stream.write(response.as_bytes()).unwrap();
stream.flush().unwrap();

let ten_millis = time::Duration::from_millis(10000);
thread::sleep(ten_millis);
}

fn main() -> std::io::Result<()> {
let listener = TcpListener::bind("127.0.0.1:8080")?;
// let mut thread_vec: Vec<thread::JoinHandle<()>> = Vec::new();

let pool = ThreadPool::new(4);


for stream in listener.incoming() {
// // handle_client(stream?);
let stream = stream.unwrap();
// let handle = thread::spawn(|| {
// handle_client(stream);
// });
// thread_vec.push(handle);

pool.execute(|| {
handle_client(stream);
});
}

// for handle in thread_vec {
// handle.join().unwrap();
// }

Ok(())
}

src/mylib/lib.rs

use std::thread;
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;

struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}

impl Worker {
// fn new(id: usize) -> Worker {
// let thread = thread::spawn(|| {});

// Worker {
// id,
// thread,
// }
// }

// fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
// let thread = thread::spawn(|| {
// receiver;
// });

// Worker {
// id,
// thread,
// }
// }

fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = receiver.lock().unwrap().recv().unwrap();

println!("Worker {} got a job; executing.", id);

job();
}
});

Worker {
id,
thread,
}
}
}

pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}

// struct Job;
type Job = Box<dyn FnOnce() + Send + 'static>;//修改Job为trait对象的类别名称

impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
// let mut threads = Vec::with_capacity(size);
// for _ in 0..size {
// //创建线程:
// //问题来了,创建线程的时候需要传入闭包,也就是具体做的动作,
// //可是这个时候我们还没有具体的任务,怎么办?
// }

// ThreadPool {
// threads
// }

let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);

for id in 0..size {
//workers.push(Worker::new(id));
//workers.push(Worker::new(id, receiver));
workers.push(Worker::new(id, Arc::clone(&receiver)));
}

ThreadPool {
workers,
sender,
}
}

pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static
{
let job = Box::new(f);

self.sender.send(job).unwrap();
}
}

在main的Cargo.toml添加如下依赖:

[dependencies]
mylib = {path = "./mylib"}

当前版本存在的问题

线程池中的线程怎么结束?

想知道如何解决这个问题,请关注令狐一冲,下回为您分解。

标签:web,thread,Worker,server,ThreadPool,let,new,多线程,id
From: https://blog.51cto.com/u_15862521/5828078

相关文章

  • 用Rust创建一个简单的webserver
    目的本节的例子教大家用Rust语言创建一个简单的webserver程序。webserver中涉及到的两个主要协议是超文本传输协议(HypertextTransferProtocol,HTTP)和传输控制协议(Tran......
  • 004 Web Assembly康威游戏之优化
    0介绍视频地址:https://www.bilibili.com/video/BV1eg411g7c8相关源码:https://github.com/anonymousGiga/Rust-and-Web-Assembly1说明在上一节的实现中,我们是在Rust中实现......
  • HTML躬行记(4)——Web音视频基础
    公司目前的业务会接触比较多的音视频,所以有必要了解一些基本概念。文章涉及的一些源码已上传至Github,可随意下载。一、基础概念本节音视频的基础概念摘自......
  • webflux 实现前一个请求是后一个请求的参数
    当前一个请求结果是后一个请求的参数时,在Handler是进行逻辑处理:Service层publicFlux<Integer>responseDepartmentListByHospitalId(Integerid){returnth......
  • The Social Web: data representation
    参考链接:clickhere本次assignment所用的jupyterbook为a2_datarep.ipynb:{"cells":[{"cell_type":"markdown","metadata":{"id":"dcMf4aubeMI9......
  • java多线程生产者消费者线程并发协作测试心得
    图11-17生产者消费者示意图产品classChicken{intid;publicChicken(intid){this.id=id;}}缓冲区(装产品)classContainer{//定......
  • 多线程
    JUC并发编程多线程三种创建方式继承Thread(Thread实现了Runnable接口)实现Runnable(这种方式需要将该实现类作为参数调用Thread对象)实现Callable线程的状态线程方法......
  • 多线程
    1、程序、进程、线程程序(program)是为了完成特定任务、用某种语言编写的一组指令的集合。即指一段静态的代码,静态对象进程(process)是程序的一次执行过程,或是正在运行的一......
  • 120-nginx 配置server访问前端目录
    server{listen8082;server_namelocalhost;#charsetkoi8-r;#access_loglogs/host.access.logmain;locatio......
  • 《网络是怎样连接的》web浏览器的探索
     1.生成HTTP请求消息首先,这张图特别重要,这把网络整个的连接过程表示出来了,整个过程讲述的是在浏览器输入网址(url)之后,web浏览器解析网址信息,生成http请求消息,然后先把we......