[易学易懂系列|rustlang语言|零基础|快速入门|(26)|实战3:Http服务器(多线程版本)]
项目实战
实战3:Http服务器
我们今天来进一步开发我们的Http服务器,用多线程实现。
我们在原来工程h_server更新代码如下:
src/main.rs:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39 1use h_server::*;
2use std::fs;
3use std::io::prelude::*;
4use std::net::TcpListener;
5use std::net::TcpStream;
6
7fn main() {
8 let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
9 let pool = ThreadPool::new(4);
10 println!("multi-threads server is up!");
11 for stream in listener.incoming() {
12 let stream = stream.unwrap();
13 println!("multi-threads server get request!");
14 pool.execute(|| {
15 handle_connection(stream);
16 });
17 }
18}
19fn handle_connection(mut stream: TcpStream) {
20 let mut buffer = [0; 512];
21 stream.read(&mut buffer).unwrap();
22
23 let get = b"GET / HTTP/1.1\r\n";
24
25 let (status_line, filename) = if buffer.starts_with(get) {
26 ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
27 } else {
28 ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
29 };
30
31 let contents = fs::read_to_string(filename).unwrap();
32
33 let response = format!("{}{}", status_line, contents);
34
35 stream.write(response.as_bytes()).unwrap();
36 stream.flush().unwrap();
37}
38
39
src/lib.rs:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116 1use std::sync::mpsc;
2use std::sync::Arc;
3use std::sync::Mutex;
4use std::thread;
5
6enum Message {
7 NewJob(Job),
8 Terminate,
9}
10
11pub struct ThreadPool {
12 workers: Vec<Worker>,
13 sender: mpsc::Sender<Message>,
14}
15
16trait FnBox {
17 fn call_box(self: Box<Self>);
18}
19
20impl<F: FnOnce()> FnBox for F {
21 fn call_box(self: Box<F>) {
22 (*self)()
23 }
24}
25
26type Job = Box<dyn FnBox + Send + 'static>;
27
28impl ThreadPool {
29 /// Create a new ThreadPool.
30 ///
31 /// The size is the number of threads in the pool.
32 ///
33 /// # Panics
34 ///
35 /// The `new` function will panic if the size is zero.
36 pub fn new(size: usize) -> ThreadPool {
37 assert!(size > 0);
38
39 let (sender, receiver) = mpsc::channel();
40
41 let receiver = Arc::new(Mutex::new(receiver));
42
43 let mut workers = Vec::with_capacity(size);
44
45 for id in 0..size {
46 workers.push(Worker::new(id, Arc::clone(&receiver)));
47 }
48
49 ThreadPool { workers, sender }
50 }
51
52 pub fn execute<F>(&self, f: F)
53 where
54 //这里定义闭包,是FnOnce类型,代表一个线程只运行一次
55 //Send类型,代表闭包可以在不同线程中传递
56 //'static,代表闭包生命周期跟整个程序一样
57 F: FnOnce() + Send + 'static,
58 {
59 let job = Box::new(f);
60
61 self.sender.send(Message::NewJob(job)).unwrap();
62 }
63}
64//实现Drop特征,用于处理资源释放相关逻辑
65impl Drop for ThreadPool {
66 fn drop(&mut self) {
67 println!("Sending terminate message to all workers.");
68
69 for _ in &mut self.workers {
70 self.sender.send(Message::Terminate).unwrap();
71 }
72
73 println!("Shutting down all workers.");
74
75 for worker in &mut self.workers {
76 println!("Shutting down worker {}", worker.id);
77
78 if let Some(thread) = worker.thread.take() {
79 thread.join().unwrap();
80 }
81 }
82 }
83}
84
85struct Worker {
86 id: usize,
87 thread: Option<thread::JoinHandle<()>>,
88}
89
90impl Worker {
91 fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
92 let thread = thread::spawn(move || loop {
93 let message = receiver.lock().unwrap().recv().unwrap();
94
95 match message {
96 Message::NewJob(job) => {
97 println!("Worker {} got a job; executing.", id);
98
99 job.call_box();
100 }
101 Message::Terminate => {
102 println!("Worker {} was told to terminate.", id);
103
104 break;
105 }
106 }
107 });
108
109 Worker {
110 id,
111 thread: Some(thread),
112 }
113 }
114}
115
116
直接运行命令:
1
2 1cargo run
2
启动服务器。
然后用浏览器访问:
页面显示:
1
2
3
4
5 1Hello!
2
3Hi from Rust
4
5
以上,希望对你有用。
1
2 1如果遇到什么问题,欢迎加入:rust新手群,在这里我可以提供一些简单的帮助,加微信:360369487,注明:博客园+rust
2
参考文章: