[易学易懂系列|rustlang语言|零基础|快速入门|(26)|实战3:Http服务器(多线程版本)]

释放双眼,带上耳机,听听看~!

[易学易懂系列|rustlang语言|零基础|快速入门|(26)|实战3:Http服务器(多线程版本)]

项目实战

实战3:Http服务器

我们今天来进一步开发我们的Http服务器,用多线程实现。

我们在原来工程h_server更新代码如下:

src/main.rs:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
1use h_server::*;
2use std::fs;
3use std::io::prelude::*;
4use std::net::TcpListener;
5use std::net::TcpStream;
6
7fn main() {
8    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
9    let pool = ThreadPool::new(4);
10    println!("multi-threads server is up!");
11    for stream in listener.incoming() {
12        let stream = stream.unwrap();
13        println!("multi-threads server get request!");
14        pool.execute(|| {
15            handle_connection(stream);
16        });
17    }
18}
19fn handle_connection(mut stream: TcpStream) {
20    let mut buffer = [0; 512];
21    stream.read(&mut buffer).unwrap();
22
23    let get = b"GET / HTTP/1.1\r\n";
24
25    let (status_line, filename) = if buffer.starts_with(get) {
26        ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
27    } else {
28        ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
29    };
30
31    let contents = fs::read_to_string(filename).unwrap();
32
33    let response = format!("{}{}", status_line, contents);
34
35    stream.write(response.as_bytes()).unwrap();
36    stream.flush().unwrap();
37}
38
39

src/lib.rs:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
1use std::sync::mpsc;
2use std::sync::Arc;
3use std::sync::Mutex;
4use std::thread;
5
6enum Message {
7    NewJob(Job),
8    Terminate,
9}
10
11pub struct ThreadPool {
12    workers: Vec<Worker>,
13    sender: mpsc::Sender<Message>,
14}
15
16trait FnBox {
17    fn call_box(self: Box<Self>);
18}
19
20impl<F: FnOnce()> FnBox for F {
21    fn call_box(self: Box<F>) {
22        (*self)()
23    }
24}
25
26type Job = Box<dyn FnBox + Send + 'static>;
27
28impl ThreadPool {
29    /// Create a new ThreadPool.
30    ///
31    /// The size is the number of threads in the pool.
32    ///
33    /// # Panics
34    ///
35    /// The `new` function will panic if the size is zero.
36    pub fn new(size: usize) -> ThreadPool {
37        assert!(size > 0);
38
39        let (sender, receiver) = mpsc::channel();
40
41        let receiver = Arc::new(Mutex::new(receiver));
42
43        let mut workers = Vec::with_capacity(size);
44
45        for id in 0..size {
46            workers.push(Worker::new(id, Arc::clone(&receiver)));
47        }
48
49        ThreadPool { workers, sender }
50    }
51
52    pub fn execute<F>(&self, f: F)
53    where
54        //这里定义闭包,是FnOnce类型,代表一个线程只运行一次
55        //Send类型,代表闭包可以在不同线程中传递
56        //'static,代表闭包生命周期跟整个程序一样
57        F: FnOnce() + Send + 'static,
58    {
59        let job = Box::new(f);
60
61        self.sender.send(Message::NewJob(job)).unwrap();
62    }
63}
64//实现Drop特征,用于处理资源释放相关逻辑
65impl Drop for ThreadPool {
66    fn drop(&mut self) {
67        println!("Sending terminate message to all workers.");
68
69        for _ in &mut self.workers {
70            self.sender.send(Message::Terminate).unwrap();
71        }
72
73        println!("Shutting down all workers.");
74
75        for worker in &mut self.workers {
76            println!("Shutting down worker {}", worker.id);
77
78            if let Some(thread) = worker.thread.take() {
79                thread.join().unwrap();
80            }
81        }
82    }
83}
84
85struct Worker {
86    id: usize,
87    thread: Option<thread::JoinHandle<()>>,
88}
89
90impl Worker {
91    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
92        let thread = thread::spawn(move || loop {
93            let message = receiver.lock().unwrap().recv().unwrap();
94
95            match message {
96                Message::NewJob(job) => {
97                    println!("Worker {} got a job; executing.", id);
98
99                    job.call_box();
100                }
101                Message::Terminate => {
102                    println!("Worker {} was told to terminate.", id);
103
104                    break;
105                }
106            }
107        });
108
109        Worker {
110            id,
111            thread: Some(thread),
112        }
113    }
114}
115
116

直接运行命令:


1
2
1cargo run
2

启动服务器。

然后用浏览器访问:

http://127.0.0.1:7878/

页面显示:


1
2
3
4
5
1Hello!
2
3Hi from Rust
4
5

以上,希望对你有用。


1
2
1如果遇到什么问题,欢迎加入:rust新手群,在这里我可以提供一些简单的帮助,加微信:360369487,注明:博客园+rust
2

参考文章:

https://doc.rust-lang.org/stable/book/ch20-02-multithreaded.html\#creating-a-similar-interface-for-a-finite-number-of-threads

给TA打赏
共{{data.count}}人
人已打赏
安全技术

C/C++内存泄漏及检测

2022-1-11 12:36:11

病毒疫情

“待我凯旋,再一起过节”

2020-2-9 9:47:00

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索