[易学易懂系列|rustlang语言|零基础|快速入门|(19)|多线程]
实用知识
多线程
我们今天来讲讲Rust中的多线程。
我直接来看看代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 1use std::thread;
2use std::time::Duration;
3
4fn main() {
5 thread::spawn(|| {
6 for i in 1..10 {
7 println!("hi number {} from the spawned thread!", i);
8 thread::sleep(Duration::from_millis(1));
9 }
10 });
11
12 for i in 1..5 {
13 println!("hi number {} from the main thread!", i);
14 thread::sleep(Duration::from_millis(1));
15 }
16}
17
运行结果,如下:
1
2
3
4
5
6
7
8
9 1hi number 1 from the main thread!
2hi number 1 from the spawned thread!
3hi number 2 from the main thread!
4hi number 2 from the spawned thread!
5hi number 3 from the spawned thread!
6hi number 3 from the main thread!
7hi number 4 from the main thread!
8hi number 4 from the spawned thread!
9
我们先来看看,线程定义的方法:
1
2
3
4
5
6
7 1thread::spawn(|| {
2 for i in 1..10 {
3 println!("hi number {} from the spawned thread!", i);
4 thread::sleep(Duration::from_millis(1));
5 }
6 });
7
在Rust中,创建新线程,用thread::spawn函数(这个函数来看std),这个函数传递一个闭包,闭包里包含线程运行代码:
1
2
3
4
5
6
7 1{
2 for i in 1..10 {
3 println!("hi number {} from the spawned thread!", i);
4 thread::sleep(Duration::from_millis(1));
5 }
6 }
7
这里特别说明一下,在Rust,线程模型跟操作系统的线程模型,是1:1的关系。
我们再来看看运行结果。
我们发现主线程,main thread打印的信息是从1到4,是完整的。
而自定义的线程spawned thread,也是只打印到4,也许你们的电脑可能打印到5(并没有打印1到10)。
结果不尽相同。
但这里说明一个问题,主线程结束后自定义的线程spawned thread,也立马关闭了。
那如何可以做到让自定义的线程spawned thread打印从1到10的结果呢?
用JoinHandle。
JoinHandle是thread::spawn函数的返回类型,它是有个方法join,可以让主线程等待子线程完成所有工作。
请看代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 1use std::thread;
2use std::time::Duration;
3
4fn main() {
5 let handle = thread::spawn(|| {
6 for i in 1..10 {
7 println!("hi number {} from the spawned thread!", i);
8 thread::sleep(Duration::from_millis(1));
9 }
10 });
11
12 for i in 1..5 {
13 println!("hi number {} from the main thread!", i);
14 thread::sleep(Duration::from_millis(1));
15 }
16
17 handle.join().unwrap();
18}
19
这个时候 ,我们看到我们的运行结果是:
1
2
3
4
5
6
7
8
9
10
11
12
13
14 1hi number 1 from the main thread!
2hi number 2 from the main thread!
3hi number 1 from the spawned thread!
4hi number 3 from the main thread!
5hi number 2 from the spawned thread!
6hi number 4 from the main thread!
7hi number 3 from the spawned thread!
8hi number 4 from the spawned thread!
9hi number 5 from the spawned thread!
10hi number 6 from the spawned thread!
11hi number 7 from the spawned thread!
12hi number 8 from the spawned thread!
13hi number 9 from the spawned thread!
14
可能你们的电脑打印的顺序不一样,但自定义的线程 spawned thread,所有循环都可执行完,主线程才会结束。
如果我们把相关的代码 handle.join().unwrap();
调整一下顺序,如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 1use std::thread;
2use std::time::Duration;
3
4fn main() {
5 let handle = thread::spawn(|| {
6 for i in 1..10 {
7 println!("hi number {} from the spawned thread!", i);
8 thread::sleep(Duration::from_millis(1));
9 }
10 });
11
12 handle.join().unwrap();//调整顺序到这里
13
14 for i in 1..5 {
15 println!("hi number {} from the main thread!", i);
16 thread::sleep(Duration::from_millis(1));
17 }
18}
19
打印结果又变成这样的:
1
2
3
4
5
6
7
8
9
10
11
12
13
14 1hi number 1 from the spawned thread!
2hi number 2 from the spawned thread!
3hi number 3 from the spawned thread!
4hi number 4 from the spawned thread!
5hi number 5 from the spawned thread!
6hi number 6 from the spawned thread!
7hi number 7 from the spawned thread!
8hi number 8 from the spawned thread!
9hi number 9 from the spawned thread!
10hi number 1 from the main thread!
11hi number 2 from the main thread!
12hi number 3 from the main thread!
13hi number 4 from the main thread!
14
我们看到主线程会等待子线程 spawned thread运行完后,再运行自己的代码。
好的,我们知道线程的基本用法后,我们来写一个简单的程序,如下 :
1
2
3
4
5
6
7
8
9
10
11
12 1use std::thread;
2
3fn main() {
4 let v = vec![1, 2, 3];
5
6 let handle = thread::spawn(|| {
7 println!("Here's a vector: {:?}", v);
8 });
9
10 handle.join().unwrap();//这代码表示主线程main,在这里等待子线程thread::spawn执行完成 后再结束
11}
12
编译代码,编译器女王会抛出一个错误信息给你:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 1error[E0373]: closure may outlive the current function, but it borrows `v`,
2which is owned by the current function
3 --> src/main.rs:6:32
4 |
56 | let handle = thread::spawn(|| {
6 | ^^ may outlive borrowed value `v`
77 | println!("Here's a vector: {:?}", v);
8 | - `v` is borrowed here
9 |
10help: to force the closure to take ownership of `v` (and any other referenced
11variables), use the `move` keyword
12 |
136 | let handle = thread::spawn(move || {
14 |
15
我们从错误信息可以看到,在子线程中的代码:
1
2 1println!("Here's a vector: {:?}", v);
2
会从主线程main中借用v的引用,但是,这里的v在主线程对数据拥有所有权,并且有可能在主线程的中把它销毁掉。这时,子线程在后台运行时,会发现v已经不存在。这就会发生灾难性后果。
rust的编译器是不允许这样的情况出现。
那怎么解决?
用move。
代码如下 :
1
2
3
4
5
6
7
8
9
10
11
12 1use std::thread;
2
3fn main() {
4 let v = vec![1, 2, 3];
5
6 let handle = thread::spawn(move || {
7 println!("Here's a vector: {:?}", v);
8 });
9
10 handle.join().unwrap();
11}
12
这时,我们再运行一下,会得出正确的结果:
1
2 1Here's a vector: [1, 2, 3]
2
这里的move关键词,让Rust编译器女王知道,子线程中的闭包函数拥有对v中数据的所有权,所以就不会再出现灾难性的后果。
好,现在我们再来试验一下,增加一段有趣的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14 1use std::thread;
2
3fn main() {
4 let v = vec![1, 2, 3];
5
6 let handle = thread::spawn(move || {
7 println!("Here's a vector: {:?}", v);
8 });
9
10 println!("Now vector: {:?}", v);//增加一段代码,访问v
11 handle.join().unwrap();
12}
13
14
我们编译一下代码,编译器又会报告出一个错误:
1
2
3
4
5
6
7
8
9
10
11
12
13
14 1error[E0382]: borrow of moved value: `v`
2 --> src\main.rs:29:35
3 |
423 | let v = vec![1, 2, 3];
5 | - move occurs because `v` has type `std::vec::Vec<i32>`, which does not implement the `Copy` trait
624 |
725 | let handle = thread::spawn(move || {
8 | ------- value moved into closure here
926 | println!("Here's a vector: {:?}", v);
10 | - variable moved due to use in closure
11...
1229 | println!("Now vector: {:?}", v);
13 | ^ value borrowed here after move
14
这个错误信息已经很详细,它的主要意思 就是,v绑定的数据所有权已经move给了线程代码中的闭包,你现在再借用,已经失效。因为v已经失去了数据所有权。
好,我们现在再来看看多个线程中如何通信。
我们了解到在golang的口号中,有一句这样的话:
Do not communicate by sharing memory; instead, share memory by communicating.
什么意思呢?
就是说:线程间通信不要用共享内存的方式来通信,而是要用通信的方式来共享内存。
简单来说,就要用actor的模式来处理线程的通信与共享数据的逻辑。
所以,在Rust中的个关键词channel,用来定义线程间通信的管道。
我们可以把它想象成,自来水的管道,而数据就是自来水。
那管道有上端和下端。
上端一般用来发送数据。
下端一般用来接收数据。
我们先来看看简单的代码:
1
2
3
4
5
6 1use std::sync::mpsc;
2
3fn main() {
4 let (tx, rx) = mpsc::channel();
5}
6
这段代码中,我们用元组来同时定义管道的上端tx,和下端rx。
mpsc代表什么意思 呢?
multiple producer, single consumer
也就是多个生产者,单个消费者。
知道多线程模型的同学应该很清楚。
我们把这个多个生产者,单个消费者的模型,可以想象成三通水管,像这样:
这里,多个生产者线程负责往管道中发送数据。
而只有一个消费者线程负责接收数据。
我们来看看完整代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 1use std::thread;
2use std::sync::mpsc;
3
4fn main() {
5 let (tx, rx) = mpsc::channel();
6
7 thread::spawn(move || {
8 let val = String::from("hi");
9 tx.send(val).unwrap();
10 });
11
12 let received = rx.recv().unwrap();
13 println!("Got: {}", received);
14}
15
thread::spawn创建的子线程负责往管道发送数据,而主线程负责接收数据。
打印结果为:
1
2 1Got: hi
2
上面有段代码:
1
2 1tx.send(val).unwrap();
2
这里有个方法:unwrap()。
这个方法,代表什么意思呢?
在Rust,它的含义是:如果一切正常,给我返回一个返回值;如果有错误或异常,停止执行程序。
我们再来看看,加一段有趣的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 1use std::thread;
2use std::sync::mpsc;
3
4fn main() {
5 let (tx, rx) = mpsc::channel();
6
7 thread::spawn(move || {
8 let val = String::from("hi");
9 tx.send(val).unwrap();
10 println!("val is {}", val);//增加一行打印val值的代码
11 });
12
13 let received = rx.recv().unwrap();
14 println!("Got: {}", received);
15}
16
结果如何?
报错:
1
2
3
4
5
6
7
8
9
10
11 1error[E0382]: use of moved value: `val`
2 --> src/main.rs:10:31
3 |
49 | tx.send(val).unwrap();
5 | --- value moved here
610 | println!("val is {}", val);
7 | ^^^ value used here after move
8 |
9 = note: move occurs because `val` has type `std::string::String`, which does
10not implement the `Copy` trait
11
这个错误信息明显告诉你,val的所有权已经移动move,即val已经不再拥有数据,所以你现在打印数据,会报错。
那要怎么处理?用如下代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 1use std::thread;
2use std::sync::mpsc;
3
4fn main() {
5 let (tx, rx) = mpsc::channel();
6
7 thread::spawn(move || {
8 let val = String::from("hi");
9 println!("val is {}", val);//增加一行打印val值的代码
10 tx.send(val).unwrap();
11
12 });
13
14 let received = rx.recv().unwrap();
15 println!("Got: {}", received);
16}
17
我们在tx.send方法之前处理就可以了。这时,数据所有权还没有移动。
好理解。
我们在看看水管的上端,不停地放水的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 1use std::thread;
2use std::sync::mpsc;
3use std::time::Duration;
4
5fn main() {
6 let (tx, rx) = mpsc::channel();
7
8 thread::spawn(move || {
9 let vals = vec![
10 String::from("hi"),
11 String::from("from"),
12 String::from("the"),
13 String::from("thread"),
14 ];
15
16 for val in vals {
17 tx.send(val).unwrap();
18 thread::sleep(Duration::from_secs(1));//线程休眠1秒
19 }
20 });
21
22 for received in rx {
23 println!("Got: {}", received);
24 }
25}
26
我们看到水管的下端rx,直接可以用for来遍历。
打印结果为:
1
2
3
4
5 1Got: hi
2Got: from
3Got: the
4Got: thread
5
我们来看看多个生产者线程的情况,代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41 1use std::sync::mpsc;
2use std::thread;
3use std::time::Duration;
4fn main() {
5 let (tx, rx) = mpsc::channel();
6
7 let tx1 = mpsc::Sender::clone(&tx);
8 thread::spawn(move || {
9 let vals = vec![
10 String::from("hi"),
11 String::from("from"),
12 String::from("the"),
13 String::from("thread"),
14 ];
15
16 for val in vals {
17 tx1.send(val).unwrap();
18 thread::sleep(Duration::from_secs(1));
19 }
20 });
21
22 thread::spawn(move || {
23 let vals = vec![
24 String::from("more"),
25 String::from("messages"),
26 String::from("for"),
27 String::from("you"),
28 ];
29
30 for val in vals {
31 tx.send(val).unwrap();
32 thread::sleep(Duration::from_secs(1));
33 }
34 });
35
36 for received in rx {
37 println!("Got: {}", received);
38 }
39}
40
41
结果可能为:
1
2
3
4
5
6
7
8
9 1Got: hi
2Got: more
3Got: from
4Got: messages
5Got: for
6Got: the
7Got: thread
8Got: you
9
这里说可能,是因为,每个电脑因为平台原因,打印的顺序会不一样。
现在我们来看看用互斥锁(Mutex)实现线程间的数据共享。
什么是互斥锁(Mutex),举个常见的例子:当我们公司开会的时候,我们一般有一个麦克风。
拿到麦克风的同学,才能讲话。麦克风,在我们同学之前传来传去,并且一次只允许一个人拥有麦克风。
这个麦克风,就是互斥锁:Mutex。
我们先来看看简单的例子:
1
2
3
4
5
6
7
8
9
10
11
12
13 1use std::sync::Mutex;
2
3fn main() {
4 let m = Mutex::new(5);
5
6 {
7 let mut num = m.lock().unwrap();
8 *num = 6;
9 }
10
11 println!("m = {:?}", m);
12}
13
我们这里直接用std库里的sync包中的Mutex。
Mutex是智能指针。
什么是智能指针?
在Rust,简单来说,相对普通指针,智能指针,除了保存内存地址外,还有额外的其他属性或元数据。
什么是普通指针?
在Rust,普通指针,就是保存内存地址的值。
好我们回到多线程的知识。
现在我们用Mutex来进行两个线程共享数据,请看代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 1use std::sync::Mutex;
2use std::thread;
3
4fn main() {
5 let counter = Mutex::new(0);
6 let mut handles = vec![];
7
8 let handle = thread::spawn(move || {
9 let mut num = counter.lock().unwrap();
10
11 *num += 1;
12 });
13 handles.push(handle);
14
15 let handle2 = thread::spawn(move || {
16 let mut num2 = counter.lock().unwrap();
17
18 *num2 += 1;
19 });
20 handles.push(handle2);
21
22 for handle in handles {
23 handle.join().unwrap();
24 }
25
26 println!("Result: {}", *counter.lock().unwrap());
27}
28
我们直接编译,报错了:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 1error[E0382]: use of moved value: `counter`
2 --> main.rs:98:33
3 |
488 | let counter = Mutex::new(0);
5 | ------- move occurs because `counter` has type `std::sync::Mutex<i32>`, which does not implement the `Copy` trait
6...
791 | let handle = thread::spawn(move || {
8 | ------- value moved into closure here
992 | let mut num = counter.lock().unwrap();
10 | ------- variable moved due to use in closure
11...
1298 | let handle2 = thread::spawn(move || {
13 | ^^^^^^^ value used here after move
1499 | let mut num2 = counter.lock().unwrap();
15 | ------- use occurs due to use in closure
16
17error[E0382]: borrow of moved value: `counter`
18 --> main.rs:109:29
19 |
2088 | let counter = Mutex::new(0);
21 | ------- move occurs because `counter` has type `std::sync::Mutex<i32>`, which does not implement the `Copy` trait
22...
2398 | let handle2 = thread::spawn(move || {
24 | ------- value moved into closure here
2599 | let mut num2 = counter.lock().unwrap();
26 | ------- variable moved due to use in closure
27...
28109 | println!("Result: {}", *counter.lock().unwrap());
29 | ^^^^^^^ value borrowed here after move
30
31
为什么?
因为,第一个线程代码中,已经把counter的数据所有权移动到闭包的作用域。
第二个线程再访问,肯定报错了。
那怎么办呢?
用 Arc,请看下面代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 1use std::sync::{Arc, Mutex};
2use std::thread;
3
4fn main() {
5 let ctr = Arc::new(Mutex::new(0));
6 let mut handles = vec![];
7 let counter = Arc::clone(&ctr);
8 let handle = thread::spawn(move || {
9 let mut num = counter.lock().unwrap();
10
11 *num += 1;
12 });
13 handles.push(handle);
14 let counter = Arc::clone(&ctr);
15 let handle2 = thread::spawn(move || {
16 let mut num2 = counter.lock().unwrap();
17
18 *num2 += 1;
19 });
20 handles.push(handle2);
21
22 for handle in handles {
23 handle.join().unwrap();
24 }
25
26 println!("Result: {}", *ctr.lock().unwrap());
27}
28
29
我们运行上面这段代码,结果打印:
1
2 1Result: 2
2
结果符合预期。
那Arc到底是何方神圣,有这么神奇的功效?
它的含义是:atomically reference counted type。
翻译成中文,就是原子引用计数类型。有点复杂是不是?
没有关系,我们现在只要记住它是专门用于多线程共享的引用计数值就行了。
它专门用来线程中安全地共享数据。
以上,希望对你有用。
1
2 1如果遇到什么问题,欢迎加入:rust新手群,在这里我可以提供一些简单的帮助,加微信:360369487,注明:博客园+rust
2
参考文章:
https://doc.rust-lang.org/1.4.0/book/dining-philosophers.html
https://doc.rust-lang.org/1.30.0/book/second-edition/ch16-01-threads.html
https://doc.rust-lang.org/1.30.0/book/second-edition/ch16-02-message-passing.html