Rust : actor模式 与 Actix库

释放双眼,带上耳机,听听看~!

一直想了解rust中actor并发模式,Actix库是rust中知名的库。看看Actix库的说明,走进actor。
这个库的重要几个概念:
1、actor
任何实现Actor trait的类型,就是一个actor.actor有生命周期,几个状态:

(1)Started

(2) Running

(3)Stopping

(4)Stopped

我们来看一下Actor trait:
里面有start()、start_default()等不带参数的函数,大都返回的是Addr < Self >。我们看到了一个Addr类型。因为所有的Actors有一个邮箱Addr,而Actors之间的通信是通过messages来实现的,Actors之间只知道messages地址,而不能侵入到对方内部。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
1pub trait Actor: Sized + &#x27;static {
2
3    type Context: ActorContext;
4
5    fn started(&amp;mut self, ctx: &amp;mut Self::Context) {}
6
7    fn stopping(&amp;mut self, ctx: &amp;mut Self::Context) -&gt; Running {
8        Running::Stop
9    }
10    fn start(self) -&gt; Addr&lt;Self&gt;
11    where
12        Self: Actor&lt;Context = Context&lt;Self&gt;&gt;,
13    {
14        Context::new().run(self)
15    }
16
17    fn start_default() -&gt; Addr&lt;Self&gt;
18    where
19        Self: Actor&lt;Context = Context&lt;Self&gt;&gt; + Default,
20    {
21        Self::default().start()
22    }
23
24    /// Start new actor in arbiter&#x27;s thread.
25
26    fn start_in_arbiter&lt;F&gt;(arb: &amp;Arbiter, f: F) -&gt; Addr&lt;Self&gt;
27    where
28        Self: Actor&lt;Context = Context&lt;Self&gt;&gt;,
29        F: FnOnce(&amp;mut Context&lt;Self&gt;) -&gt; Self + Send + &#x27;static,
30    {
31        let (tx, rx) = channel::channel(DEFAULT_CAPACITY);
32        // create actor
33        arb.exec_fn(move || {
34            let mut ctx = Context::with_receiver(rx);
35            let act = f(&amp;mut ctx);
36            let fut = ctx.into_future(act);
37            actix_rt::spawn(fut);
38        });
39        Addr::new(tx)
40   }
41
42    fn create&lt;F&gt;(f: F) -&gt; Addr&lt;Self&gt;
43    where
44        Self: Actor&lt;Context = Context&lt;Self&gt;&gt;,
45        F: FnOnce(&amp;mut Context&lt;Self&gt;) -&gt; Self + &#x27;static,
46    {
47        let mut ctx = Context::new();
48        let act = f(&amp;mut ctx);
49        ctx.run(act)
50    }
51}
52
53

自定义类,实现Actor:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
1use actix::prelude::*;
2
3struct MyActor {
4    count: usize,
5}
6
7impl Actor for MyActor {
8    type Context = Context&lt;Self&gt;;
9    // 启动的时侯,进行一些个性化设置,比如
10     fn started(&amp;mut self, ctx: &amp;mut Self::Context) {
11        ctx.set_mailbox_capacity(1);
12    }
13}
14let addr = MyActor.start();
15
16

2、message 、handler、 address、Recipient

(1)任何实现Message trait类型,是一个message.


1
2
3
4
5
6
7
1pub trait Message {
2    /// The type of value that this message will resolved with if it is
3    /// successful.
4    type Result: &#x27;static;
5}
6
7

(2)所有的Actors之间的通信是通过messages来实现的。通信的message发往目标邮箱,Actors调用message handlers(句柄),执行上下文(context).

(3)handler是啥?


1
2
3
4
5
6
7
8
9
10
11
12
13
1pub trait Handler&lt;M&gt;
2where
3    Self: Actor,
4    M: Message,
5{
6    /// The type of value that this handler will return.
7    type Result: MessageResponse&lt;Self, M&gt;;
8
9    /// This method is called for every message received by this actor.
10    fn handle(&amp;mut self, msg: M, ctx: &amp;mut Self::Context) -&gt; Self::Result;
11}
12
13

实现handler:


1
2
3
4
5
6
7
8
9
10
11
12
1struct Ping(usize);
2impl Handler&lt;Ping&gt; for MyActor {
3    type Result = usize;
4
5    fn handle(&amp;mut self, msg: Ping, ctx: &amp;mut Context&lt;Self&gt;) -&gt; Self::Result {
6        self.count += msg.0;
7
8        self.count
9    }
10}
11
12

(4) 自定义类,实现Message:


1
2
3
4
5
6
7
8
9
1use actix::prelude::*;
2
3struct Ping(usize);
4
5impl Message for Ping {
6    type Result = usize;
7}
8
9

(5)如何发送message?
首先要用到Addr object。具体地说,有几种方法在actors之间发送message:

Addr::do_send(M) – 忽视返回任何错误的方式,因为邮箱可能满了,也可能关闭。
Addr::try_send(M) – 如果错误 ,会返回 SendError。
Addr::send(M) – 会返回future object ,带有处理过程的结果信息。

(6)Recipient -收件人
收件人是一个Actor发给另外一个不同类型Actor时的地址。比如,订阅者与发送者。

3、context (上下文)、Mailbox

(1)Context字面上是上下文。具体有什么东东?看看源码:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
1pub struct Context&lt;A&gt;
2where
3    A: Actor&lt;Context = Context&lt;A&gt;&gt;,
4{
5    parts: ContextParts&lt;A&gt;,
6    mb: Option&lt;Mailbox&lt;A&gt;&gt;,
7}
8
9impl&lt;A&gt; Context&lt;A&gt;
10where
11    A: Actor&lt;Context = Self&gt;,
12{
13    #[inline]
14    pub(crate) fn new() -&gt; Self {
15        let mb = Mailbox::default();
16        Self {
17            parts: ContextParts::new(mb.sender_producer()),
18            mb: Some(mb),
19        }
20    }
21    #[inline]
22    pub fn with_receiver(rx: AddressReceiver&lt;A&gt;) -&gt; Self {
23        let mb = Mailbox::new(rx);
24        Self {
25            parts: ContextParts::new(mb.sender_producer()),
26            mb: Some(mb),
27        }
28    }
29    #[inline]
30    pub fn run(self, act: A) -&gt; Addr&lt;A&gt; {
31        let fut = self.into_future(act);
32        let addr = fut.address();
33        actix_rt::spawn(fut);
34        addr
35    }
36    pub fn into_future(mut self, act: A) -&gt; ContextFut&lt;A, Self&gt; {
37        let mb = self.mb.take().unwrap();
38        ContextFut::new(self, act, mb)
39    }
40    pub fn handle(&amp;self) -&gt; SpawnHandle {
41        self.parts.curr_handle()
42    }
43    pub fn set_mailbox_capacity(&amp;mut self, cap: usize) {
44        self.parts.set_mailbox_capacity(cap)
45    }
46}
47
48impl&lt;A&gt; AsyncContextParts&lt;A&gt; for Context&lt;A&gt;
49where
50    A: Actor&lt;Context = Self&gt;,
51{
52    fn parts(&amp;mut self) -&gt; &amp;mut ContextParts&lt;A&gt; {
53        &amp;mut self.parts
54    }
55}
56pub trait ContextFutureSpawner&lt;A&gt;
57where
58    A: Actor,
59    A::Context: AsyncContext&lt;A&gt;,
60{
61    fn spawn(self, ctx: &amp;mut A::Context);
62    fn wait(self, ctx: &amp;mut A::Context);
63}
64
65impl&lt;A, T&gt; ContextFutureSpawner&lt;A&gt; for T
66where
67    A: Actor,
68    A::Context: AsyncContext&lt;A&gt;,
69    T: ActorFuture&lt;Item = (), Error = (), Actor = A&gt; + &#x27;static,
70{
71    #[inline]
72    fn spawn(self, ctx: &amp;mut A::Context) {
73        let _ = ctx.spawn(self);
74    }
75    #[inline]
76    fn wait(self, ctx: &amp;mut A::Context) {
77        ctx.wait(self);
78    }
79}
80
81

里面有 ContextParts和Option<Mailbox>两部分内容构成。 (2)找到ContextParts的源码,我们来看看:


1
2
3
4
5
6
7
8
9
10
11
12
13
1pub struct ContextParts&lt;A&gt;
2where
3    A: Actor,
4    A::Context: AsyncContext&lt;A&gt;,
5{
6    addr: AddressSenderProducer&lt;A&gt;,
7    flags: ContextFlags,
8    wait: SmallVec&lt;[ActorWaitItem&lt;A&gt;; 2]&gt;,
9    items: SmallVec&lt;[Item&lt;A&gt;; 3]&gt;,
10    handles: SmallVec&lt;[SpawnHandle; 2]&gt;,
11}
12
13

(3)我们来看一下Mailbox中的设计:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
1pub struct Mailbox&lt;A&gt;
2where
3    A: Actor,
4    A::Context: AsyncContext&lt;A&gt;,
5{
6    msgs: AddressReceiver&lt;A&gt;,
7}
8
9impl&lt;A&gt; Mailbox&lt;A&gt;
10where
11    A: Actor,
12    A::Context: AsyncContext&lt;A&gt;,
13{
14    #[inline]
15    pub fn new(msgs: AddressReceiver&lt;A&gt;) -&gt; Self {
16        Self { msgs }
17    }
18    pub fn capacity(&amp;self) -&gt; usize {
19        self.msgs.capacity()
20    }
21    pub fn set_capacity(&amp;mut self, cap: usize) {
22        self.msgs.set_capacity(cap);
23    }
24    #[inline]
25    pub fn connected(&amp;self) -&gt; bool {
26        self.msgs.connected()
27    }
28    pub fn address(&amp;self) -&gt; Addr&lt;A&gt; {
29        Addr::new(self.msgs.sender())
30    }
31    pub fn sender_producer(&amp;self) -&gt; AddressSenderProducer&lt;A&gt; {
32        self.msgs.sender_producer()
33    }
34    pub fn poll(&amp;mut self, act: &amp;mut A, ctx: &amp;mut A::Context) {
35        #[cfg(feature = &quot;mailbox_assert&quot;)]
36        let mut n_polls = 0u16;
37        loop {
38            let mut not_ready = true;
39            // sync messages
40            loop {
41                if ctx.waiting() {
42                    return;
43                }
44                match self.msgs.poll() {
45                    Ok(Async::Ready(Some(mut msg))) =&gt; {
46                        not_ready = false;
47                        msg.handle(act, ctx);
48                    }
49                    Ok(Async::Ready(None)) | Ok(Async::NotReady) | Err(_) =&gt; break,
50                }
51                #[cfg(feature = &quot;mailbox_assert&quot;)]
52                {
53                    n_polls += 1;
54                    assert!(n_polls &lt; MAX_SYNC_POLLS, &quot;Too many messages are being processed. Use Self::Context::notify() instead of direct use of address&quot;);
55                }
56            }
57            if not_ready {
58                return;
59            }
60        }
61    }
62}
63
64

4、Arbiter 、SyncArbiter

为Actors提供了异步执行的上下文环境,当一个actor运行时,Arbiters控制着actor包含特定执行状况的上下文环境。 Arbiters需要运行许多函数,包括起系统线程的函数、进行事件轮询、异步分发事件轮询任务、对异步任务进行支持。
当起一个actor时,是在一个系统的线程中运行的,这样效率比较高。也就是说,一个线程可能会针对N个actor.
事件轮询
在事件轮询时,对应的 Arbiter会控制事件轮询事件池的线程。 Arbiter会对任务队列进行排队,往往的情况是,你可以把Arbiter看成"single-threaded event loop".

5、future
从Future库可以看出:


1
2
3
4
5
6
7
8
9
1    pub trait Future {
2        /// The type of value produced on completion.
3        #[stable(feature = &quot;futures_api&quot;, since = &quot;1.36.0&quot;)]
4        type Output;
5        #[stable(feature = &quot;futures_api&quot;, since = &quot;1.36.0&quot;)]
6        fn poll(self: Pin&lt;&amp;mut Self&gt;, cx: &amp;mut Context&lt;&#x27;_&gt;) -&gt; Poll&lt;Self::Output&gt;;
7    }
8
9

给TA打赏
共{{data.count}}人
人已打赏
安全技术

C++ lambda表达式

2022-1-11 12:36:11

病毒疫情

中央应对新冠肺炎疫情工作领导小组印发通知 全面落实疫情防控一线城乡社区工作者关心关爱措施

2020-3-5 10:27:00

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索