一直想了解rust中actor并发模式,Actix库是rust中知名的库。看看Actix库的说明,走进actor。
这个库的重要几个概念:
1、actor
任何实现Actor trait的类型,就是一个actor.actor有生命周期,几个状态:
(1)Started
(2) Running
(3)Stopping
(4)Stopped
我们来看一下Actor trait:
里面有start()、start_default()等不带参数的函数,大都返回的是Addr < Self >。我们看到了一个Addr类型。因为所有的Actors有一个邮箱Addr,而Actors之间的通信是通过messages来实现的,Actors之间只知道messages地址,而不能侵入到对方内部。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53 1pub trait Actor: Sized + 'static {
2
3 type Context: ActorContext;
4
5 fn started(&mut self, ctx: &mut Self::Context) {}
6
7 fn stopping(&mut self, ctx: &mut Self::Context) -> Running {
8 Running::Stop
9 }
10 fn start(self) -> Addr<Self>
11 where
12 Self: Actor<Context = Context<Self>>,
13 {
14 Context::new().run(self)
15 }
16
17 fn start_default() -> Addr<Self>
18 where
19 Self: Actor<Context = Context<Self>> + Default,
20 {
21 Self::default().start()
22 }
23
24 /// Start new actor in arbiter's thread.
25
26 fn start_in_arbiter<F>(arb: &Arbiter, f: F) -> Addr<Self>
27 where
28 Self: Actor<Context = Context<Self>>,
29 F: FnOnce(&mut Context<Self>) -> Self + Send + 'static,
30 {
31 let (tx, rx) = channel::channel(DEFAULT_CAPACITY);
32 // create actor
33 arb.exec_fn(move || {
34 let mut ctx = Context::with_receiver(rx);
35 let act = f(&mut ctx);
36 let fut = ctx.into_future(act);
37 actix_rt::spawn(fut);
38 });
39 Addr::new(tx)
40 }
41
42 fn create<F>(f: F) -> Addr<Self>
43 where
44 Self: Actor<Context = Context<Self>>,
45 F: FnOnce(&mut Context<Self>) -> Self + 'static,
46 {
47 let mut ctx = Context::new();
48 let act = f(&mut ctx);
49 ctx.run(act)
50 }
51}
52
53
自定义类,实现Actor:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 1use actix::prelude::*;
2
3struct MyActor {
4 count: usize,
5}
6
7impl Actor for MyActor {
8 type Context = Context<Self>;
9 // 启动的时侯,进行一些个性化设置,比如
10 fn started(&mut self, ctx: &mut Self::Context) {
11 ctx.set_mailbox_capacity(1);
12 }
13}
14let addr = MyActor.start();
15
16
2、message 、handler、 address、Recipient
(1)任何实现Message trait类型,是一个message.
1
2
3
4
5
6
7 1pub trait Message {
2 /// The type of value that this message will resolved with if it is
3 /// successful.
4 type Result: 'static;
5}
6
7
(2)所有的Actors之间的通信是通过messages来实现的。通信的message发往目标邮箱,Actors调用message handlers(句柄),执行上下文(context).
(3)handler是啥?
1
2
3
4
5
6
7
8
9
10
11
12
13 1pub trait Handler<M>
2where
3 Self: Actor,
4 M: Message,
5{
6 /// The type of value that this handler will return.
7 type Result: MessageResponse<Self, M>;
8
9 /// This method is called for every message received by this actor.
10 fn handle(&mut self, msg: M, ctx: &mut Self::Context) -> Self::Result;
11}
12
13
实现handler:
1
2
3
4
5
6
7
8
9
10
11
12 1struct Ping(usize);
2impl Handler<Ping> for MyActor {
3 type Result = usize;
4
5 fn handle(&mut self, msg: Ping, ctx: &mut Context<Self>) -> Self::Result {
6 self.count += msg.0;
7
8 self.count
9 }
10}
11
12
(4) 自定义类,实现Message:
1
2
3
4
5
6
7
8
9 1use actix::prelude::*;
2
3struct Ping(usize);
4
5impl Message for Ping {
6 type Result = usize;
7}
8
9
(5)如何发送message?
首先要用到Addr object。具体地说,有几种方法在actors之间发送message:
Addr::do_send(M) – 忽视返回任何错误的方式,因为邮箱可能满了,也可能关闭。
Addr::try_send(M) – 如果错误 ,会返回 SendError。
Addr::send(M) – 会返回future object ,带有处理过程的结果信息。
(6)Recipient -收件人
收件人是一个Actor发给另外一个不同类型Actor时的地址。比如,订阅者与发送者。
3、context (上下文)、Mailbox
(1)Context字面上是上下文。具体有什么东东?看看源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81 1pub struct Context<A>
2where
3 A: Actor<Context = Context<A>>,
4{
5 parts: ContextParts<A>,
6 mb: Option<Mailbox<A>>,
7}
8
9impl<A> Context<A>
10where
11 A: Actor<Context = Self>,
12{
13 #[inline]
14 pub(crate) fn new() -> Self {
15 let mb = Mailbox::default();
16 Self {
17 parts: ContextParts::new(mb.sender_producer()),
18 mb: Some(mb),
19 }
20 }
21 #[inline]
22 pub fn with_receiver(rx: AddressReceiver<A>) -> Self {
23 let mb = Mailbox::new(rx);
24 Self {
25 parts: ContextParts::new(mb.sender_producer()),
26 mb: Some(mb),
27 }
28 }
29 #[inline]
30 pub fn run(self, act: A) -> Addr<A> {
31 let fut = self.into_future(act);
32 let addr = fut.address();
33 actix_rt::spawn(fut);
34 addr
35 }
36 pub fn into_future(mut self, act: A) -> ContextFut<A, Self> {
37 let mb = self.mb.take().unwrap();
38 ContextFut::new(self, act, mb)
39 }
40 pub fn handle(&self) -> SpawnHandle {
41 self.parts.curr_handle()
42 }
43 pub fn set_mailbox_capacity(&mut self, cap: usize) {
44 self.parts.set_mailbox_capacity(cap)
45 }
46}
47
48impl<A> AsyncContextParts<A> for Context<A>
49where
50 A: Actor<Context = Self>,
51{
52 fn parts(&mut self) -> &mut ContextParts<A> {
53 &mut self.parts
54 }
55}
56pub trait ContextFutureSpawner<A>
57where
58 A: Actor,
59 A::Context: AsyncContext<A>,
60{
61 fn spawn(self, ctx: &mut A::Context);
62 fn wait(self, ctx: &mut A::Context);
63}
64
65impl<A, T> ContextFutureSpawner<A> for T
66where
67 A: Actor,
68 A::Context: AsyncContext<A>,
69 T: ActorFuture<Item = (), Error = (), Actor = A> + 'static,
70{
71 #[inline]
72 fn spawn(self, ctx: &mut A::Context) {
73 let _ = ctx.spawn(self);
74 }
75 #[inline]
76 fn wait(self, ctx: &mut A::Context) {
77 ctx.wait(self);
78 }
79}
80
81
里面有 ContextParts和Option<Mailbox>两部分内容构成。 (2)找到ContextParts的源码,我们来看看:
1
2
3
4
5
6
7
8
9
10
11
12
13 1pub struct ContextParts<A>
2where
3 A: Actor,
4 A::Context: AsyncContext<A>,
5{
6 addr: AddressSenderProducer<A>,
7 flags: ContextFlags,
8 wait: SmallVec<[ActorWaitItem<A>; 2]>,
9 items: SmallVec<[Item<A>; 3]>,
10 handles: SmallVec<[SpawnHandle; 2]>,
11}
12
13
(3)我们来看一下Mailbox中的设计:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64 1pub struct Mailbox<A>
2where
3 A: Actor,
4 A::Context: AsyncContext<A>,
5{
6 msgs: AddressReceiver<A>,
7}
8
9impl<A> Mailbox<A>
10where
11 A: Actor,
12 A::Context: AsyncContext<A>,
13{
14 #[inline]
15 pub fn new(msgs: AddressReceiver<A>) -> Self {
16 Self { msgs }
17 }
18 pub fn capacity(&self) -> usize {
19 self.msgs.capacity()
20 }
21 pub fn set_capacity(&mut self, cap: usize) {
22 self.msgs.set_capacity(cap);
23 }
24 #[inline]
25 pub fn connected(&self) -> bool {
26 self.msgs.connected()
27 }
28 pub fn address(&self) -> Addr<A> {
29 Addr::new(self.msgs.sender())
30 }
31 pub fn sender_producer(&self) -> AddressSenderProducer<A> {
32 self.msgs.sender_producer()
33 }
34 pub fn poll(&mut self, act: &mut A, ctx: &mut A::Context) {
35 #[cfg(feature = "mailbox_assert")]
36 let mut n_polls = 0u16;
37 loop {
38 let mut not_ready = true;
39 // sync messages
40 loop {
41 if ctx.waiting() {
42 return;
43 }
44 match self.msgs.poll() {
45 Ok(Async::Ready(Some(mut msg))) => {
46 not_ready = false;
47 msg.handle(act, ctx);
48 }
49 Ok(Async::Ready(None)) | Ok(Async::NotReady) | Err(_) => break,
50 }
51 #[cfg(feature = "mailbox_assert")]
52 {
53 n_polls += 1;
54 assert!(n_polls < MAX_SYNC_POLLS, "Too many messages are being processed. Use Self::Context::notify() instead of direct use of address");
55 }
56 }
57 if not_ready {
58 return;
59 }
60 }
61 }
62}
63
64
4、Arbiter 、SyncArbiter
为Actors提供了异步执行的上下文环境,当一个actor运行时,Arbiters控制着actor包含特定执行状况的上下文环境。 Arbiters需要运行许多函数,包括起系统线程的函数、进行事件轮询、异步分发事件轮询任务、对异步任务进行支持。
当起一个actor时,是在一个系统的线程中运行的,这样效率比较高。也就是说,一个线程可能会针对N个actor.
事件轮询
在事件轮询时,对应的 Arbiter会控制事件轮询事件池的线程。 Arbiter会对任务队列进行排队,往往的情况是,你可以把Arbiter看成"single-threaded event loop".
5、future
从Future库可以看出:
1
2
3
4
5
6
7
8
9 1 pub trait Future {
2 /// The type of value produced on completion.
3 #[stable(feature = "futures_api", since = "1.36.0")]
4 type Output;
5 #[stable(feature = "futures_api", since = "1.36.0")]
6 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
7 }
8
9