Rust : channel、多线程与 CTP相关机制的模拟

释放双眼,带上耳机,听听看~!

在CTP中,CTP thost会异步发送相关行情和交易回报信息给订阅方或策略管理者(这里简称为strategyManager)。那么,模拟一下CTP的机制,有利于在CTP平台上,构建策略交易支持体系。

一、主要涉及几方面:

1、thost异步发送相关信息

利用多线程,多生产者单消费者模式来模拟发送相关信息,可以选用标准库中的mpsc::channel.

2、策略管理者把相关信息发分给不同的策略

二、方案1: 单channel+单线程


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
1use std::thread;
2use std::sync::mpsc::channel;
3use std::collections::VecDeque;
4use std::time::Duration;
5use rand::Rng;
6use std::sync::mpsc::Receiver;
7use std::sync::mpsc::Sender;
8use std::thread::Thread;
9use std::cell::RefCell;
10
11# [derive(Debug)]
12struct RtnMarketData(String);
13# [derive(Debug)]
14struct RtnOrder(String);
15# [derive(Debug)]
16struct RtnTrade(String);
17
18
19//模拟thost传过来的不同的值;
20# [derive(Debug)]
21enum TradeData{
22    RtnMarketData(String),
23    RtnOrder(String),
24    RtnTrade(String),
25}
26struct Strategy{
27    stra_name:String,
28    stra_thread_builder:thread::Builder,
29    stra_instructions:Vec<String>,//订阅的相关合约
30}
31impl Strategy{
32    fn new(name:String,instrs:Vec<String>)->Self{
33        Self{
34            stra_name: name,
35            stra_thread_builder: thread::Builder::new(),
36            stra_instructions:instrs,
37        }
38    }
39    fn OnRtnMarketData(&self,md:&TradeData){
40         println!(" -> strategy:{:?} RtnMarketData=> recv:{:?}",self.stra_name,md);
41    }
42    fn OnRtnOrder(&self,order:&TradeData){
43         println!(" -> strategy:{:?} RtnOrder => recv:{:?}",self.stra_name,order);
44    }
45    fn OnRtnTrade(&self,trade:&TradeData){
46         println!(" -> strategy:{:?} RtnTrade=> recv:{:?}",self.stra_name,trade);
47    }
48}
49struct StrategyGroup{
50    //stra_list : RefCell<Vec<Strategy>>,
51    stra_list : Vec<Strategy>,
52}
53impl StrategyGroup{
54    fn new(list:Vec<Strategy>) -> Self{
55        //stra_list:vec![Strategy::new("DSCJ",vec!["IC","IF"]),Strategy::new("WSDJ",vec!["cu,ag"])]
56        Self{
57            //stra_list:RefCell::new(list),
58            stra_list:list,
59        }
60    }
61}
62struct StrategyManager{
63    thread_builder : thread::Builder,
64    stra_group:StrategyGroup,
65}
66impl StrategyManager{
67    fn new(group:StrategyGroup)->Self{
68       Self{
69           thread_builder : thread::Builder::new(),
70           stra_group:group,
71       }
72    }
73}
74
75fn simulate_send(tx:Sender<TradeData>,n_thread:u32){
76    for i in 0..n_thread {
77        let tx = tx.clone();
78        thread::spawn(move||{
79            let mut n = 0;
80            loop{
81                let rand_value:u32 = rand::thread_rng().gen_range(0, 1000);
82                n = n + 1;
83                println!("rand_value:{:?} n:{:?} thread id :{:?}",rand_value,n,i);
84                thread::sleep(Duration::from_millis(300));
85                match rand_value {
86                    0...600 => {
87                        match rand_value{
88                            0...100 => tx.send(TradeData::RtnMarketData("IC".to_string())).unwrap(),
89                            100...300=> tx.send(TradeData::RtnMarketData("IF".to_string())).unwrap(),
90                            300...400=> tx.send(TradeData::RtnMarketData("cu".to_string())).unwrap(),
91                            _ =>tx.send(TradeData::RtnMarketData("ag".to_string())).unwrap(),
92                        }
93                    },
94
95                    600...900 => {
96                        match rand_value{
97                            600...700 =>tx.send(TradeData::RtnOrder("IC".to_string())).unwrap(),
98                            700...750 =>tx.send(TradeData::RtnOrder("IF".to_string())).unwrap(),
99                            750...800 =>tx.send(TradeData::RtnOrder("cu".to_string())).unwrap(),
100                            _ =>tx.send(TradeData::RtnOrder("ag".to_string())).unwrap(),
101                        }
102                    },
103                    _ => {
104                        match rand_value{
105                            900...920 =>tx.send(TradeData::RtnTrade("IC".to_string())).unwrap(),
106                            920...940 =>tx.send(TradeData::RtnTrade("IF".to_string())).unwrap(),
107                            940...960 =>tx.send(TradeData::RtnTrade("cu".to_string())).unwrap(),
108                            _ =>tx.send(TradeData::RtnTrade("ag".to_string())).unwrap(),
109                        }
110
111                    }
112                };
113
114            }
115        });
116    }
117}
118fn dispatch_data(rx:&Receiver<TradeData>,stra_group:&StrategyGroup){
119    //let (tx,rx) = channel();
120    let strategys = &*stra_group.stra_list;
121    loop{
122        let  ref value = rx.recv().unwrap();
123        match value {
124            TradeData::RtnMarketData(d) =>{
125                for strategy in strategys {
126                    if strategy.stra_instructions.contains(&d){
127                        strategy.OnRtnMarketData(value);
128                    }
129                }
130            },
131            TradeData::RtnOrder(e) =>{
132                for strategy in strategys {
133                    if strategy.stra_instructions.contains(&e){
134                        strategy.OnRtnOrder(value);
135                    }
136                }
137            },
138            TradeData::RtnTrade(f) =>{
139                for strategy in strategys {
140                    if strategy.stra_instructions.contains(&f){
141                        strategy.OnRtnTrade(value)
142                    }
143                }
144            },
145        }
146    }
147
148}
149fn generate_strategyManager()-> StrategyManager {
150    let strategy_01 =Strategy::new("DSCJ".to_string(),vec!["IC".to_string(),"IF".to_string()]);
151    let strategy_02 =Strategy::new("WSDJ".to_string(),vec!["IF".to_string()]);
152    let strategy_03 =Strategy::new("TTTT".to_string(),vec!["ag".to_string(),"cu".to_string()]);
153    let stra_group = StrategyGroup::new(vec![strategy_01,strategy_02,strategy_03]);
154    StrategyManager::new(stra_group)
155}
156
157fn main(){
158    //模拟生成相关的策略、策略group、策略管理者
159    let stra_manager = generate_strategyManager();
160    // 模拟thost
161    let (tx,rx) = channel::<TradeData>();
162    //模拟多线程异步进行接收thost相关的行情等信息
163    simulate_send(tx,10);
164    println!("main=>");
165
166    dispatch_data(&rx,&stra_manager.stra_group);
167    thread::sleep(Duration::from_millis(500000));
168}
169
170

三、效果图

四、资源占用情况

在去掉了sleep之后,可以看到,内存一直在上升;因为10个线程不断的发行情,这个压力是很大的;策略的分发的速度又来不及,占用的资源会较大。

这个请大家注意!
五、方案2:多channel + 多线程方案

1、这个模拟,并没有模拟订阅thost的机制;
2、子策略并没有专有线程去接收相关信息;现在是strategyManager中单线程进行分发,并负责策略中信号处理。这部分应交给子策略中的专有线程处理。 即处理的链条过长。

下面的代码实现了两层:channel以及多线程轮询交易所的行情等信息,大大提高了交易的效率。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
1use std::thread;
2use std::sync::mpsc::channel;
3use std::collections::VecDeque;
4use std::time::Duration;
5use rand::Rng;
6use std::sync::mpsc::Receiver;
7use std::sync::mpsc::Sender;
8use std::thread::Thread;
9use std::sync::{Arc, Mutex};
10use std::marker::Send;
11use std::cell::RefCell;
12
13
14# [derive(Debug,Clone)]
15struct RtnMarketData(String);
16# [derive(Debug,Clone)]
17struct RtnOrder(String);
18# [derive(Debug,Clone)]
19struct RtnTrade(String);
20
21
22# [derive(Debug,Clone)]
23enum TradeData{
24    RtnMarketData(String),
25    RtnOrder(String),
26    RtnTrade(String),
27}
28trait Get_id{
29    fn get_request_id(&self)-> u32;
30}
31
32struct Strategy{
33    stra_name:String,
34    receiver:Receiver<TradeData>,
35    lock:Arc<Mutex<u32>>,
36}
37struct StrategyHandler{
38    stra_name: String,
39    stra_instructions: Vec<String>, //订阅的相关合约
40    sender:Sender<TradeData>,
41}
42impl StrategyHandler{
43    fn new(name:String,instrs:Vec<String>,send:Sender<TradeData>)->Self{
44        Self{
45            stra_name: name,
46            stra_instructions: instrs,
47            sender:send,
48        }
49    }
50}
51
52impl Strategy{
53    fn new(name:String,recv:Receiver<TradeData>,lock:Arc<Mutex<u32>>)->Self{
54        Self{
55            stra_name:name,
56            receiver:recv,
57            lock:lock,
58        }
59    }
60    fn spawn(self){
61       thread::spawn(move||{
62            loop{
63                   let  value = self.receiver.recv().unwrap();
64                   match value {
65                       TradeData::RtnMarketData(_) =>{
66                           self.OnRtnMarketData(value);
67                       },
68                       TradeData::RtnOrder(_) =>{
69                           self.OnRtnOrder(value);
70                       },
71                       TradeData::RtnTrade(_) =>{
72                           self.OnRtnTrade(value);
73                       },
74                  }
75            };
76       });
77    }
78    fn OnRtnMarketData(&self,md:TradeData){
79        println!(" -> strategy:{:?} RtnMarketData=> recv:{:?}",self.stra_name,md);
80        let rand_value:u64 = rand::thread_rng().gen_range(200,500);
81        // 获取唯一ID,策略逻辑在此
82        if rand_value < 400 {
83            println!("触发交易信号:{:?} id:{:?}",rand_value,self.get_request_id());
84        }
85        thread::sleep(Duration::from_millis(rand_value));
86    }
87    fn OnRtnOrder(&self,order:TradeData){
88        println!(" -> strategy:{:?} RtnOrder => recv:{:?}",self.stra_name,order);
89        let rand_value:u64 = rand::thread_rng().gen_range(200,500);
90        // 获取唯一ID
91        if rand_value < 10 {
92            println!("触发OnRtnOrder信号:{:?} id:{:?}",rand_value,self.get_request_id());
93        }
94    }
95    fn OnRtnTrade(&self,trade:TradeData){
96        println!(" -> strategy:{:?} RtnTrade=> recv:{:?}",self.stra_name,trade);
97        let rand_value:u64 = rand::thread_rng().gen_range(200,500);
98        // 获取唯一ID
99        if rand_value < 300 {
100            println!("触发OnRtnTrade信号:{:?} id:{:?}",rand_value,self.get_request_id());
101        }
102    }
103}
104
105impl Get_id for Strategy{
106    fn get_request_id(&self)-> u32{
107        let lock = self.lock.clone();
108        let temp = *lock.lock().unwrap();
109        *lock.lock().unwrap()= temp +1 ;
110        println!("request_ID:{:?}",*lock.lock().unwrap());
111        temp+1
112    }
113}
114
115struct StrategyGroup{
116    stra_list : Vec<Strategy>,
117}
118impl StrategyGroup{
119    fn new(list:Vec<Strategy>) -> Self{
120        Self{
121            stra_list:list,
122        }
123    }
124}
125struct StrategyManager{
126    stra_group: StrategyGroup,
127}
128
129impl StrategyManager{
130    fn new(group: StrategyGroup)->Self{
131       Self{
132           stra_group: group,
133       }
134    }
135}
136
137fn simulate_send(tx:Sender<TradeData>,n_thread:u32){
138    for i in 0..n_thread {
139        let tx = tx.clone();
140        thread::spawn(move||{
141            let mut n = 0;
142            loop{
143                let rand_value:u32 = rand::thread_rng().gen_range(0, 1000);
144                n = n + 1;
145                println!("thost send info: thread id :{:?} ,次数 {:?} ",i,n);
146                thread::sleep(Duration::from_millis(500));
147                match rand_value {
148                    0...600 => {
149                        match rand_value{
150                            0...100 => tx.send(TradeData::RtnMarketData("IC".to_string())).unwrap(),
151                            100...300=> tx.send(TradeData::RtnMarketData("IF".to_string())).unwrap(),
152                            300...400=> tx.send(TradeData::RtnMarketData("cu".to_string())).unwrap(),
153                            _ =>tx.send(TradeData::RtnMarketData("ag".to_string())).unwrap(),
154                        }
155                    },
156
157                    600...900 => {
158                        match rand_value{
159                            600...700 =>tx.send(TradeData::RtnOrder("IC".to_string())).unwrap(),
160                            700...750 =>tx.send(TradeData::RtnOrder("IF".to_string())).unwrap(),
161                            750...800 =>tx.send(TradeData::RtnOrder("cu".to_string())).unwrap(),
162                            _ =>tx.send(TradeData::RtnOrder("ag".to_string())).unwrap(),
163                        }
164                    },
165                    _ => {
166                        match rand_value{
167                            900...920 =>tx.send(TradeData::RtnTrade("IC".to_string())).unwrap(),
168                            920...940 =>tx.send(TradeData::RtnTrade("IF".to_string())).unwrap(),
169                            940...960 =>tx.send(TradeData::RtnTrade("cu".to_string())).unwrap(),
170                            _ =>tx.send(TradeData::RtnTrade("ag".to_string())).unwrap(),
171                        }
172
173                    }
174                };
175
176            }
177        });
178    }
179    //thost::new(tx).thost_thread_builder.spawn()
180}
181fn dispatch_data(rx:&Receiver<TradeData>,_handlers: Vec<StrategyHandler>){
182    let handlers = &_handlers;
183    loop{
184        let  ref value = rx.recv().unwrap();
185        match value {
186            TradeData::RtnMarketData(d) =>{
187                for  handler in handlers {
188                    if handler.stra_instructions.contains(&d){
189                        //strategy.OnRtnMarketData(value)
190                        let tx = handler.sender.clone();
191                        tx.send(TradeData::RtnMarketData(d.to_string())).unwrap();
192                        println!("dispatch:{:?}",d);
193                    }
194                }
195            },
196            TradeData::RtnOrder(e) =>{
197                for  handler in handlers {
198                    if handler.stra_instructions.contains(&e){
199                        let tx = handler.sender.clone();
200                        tx.send(TradeData::RtnOrder(e.to_string())).unwrap();
201                        println!("dispatch:{:?}",e);
202                    }
203                }
204            },
205            TradeData::RtnTrade(f) =>{
206                for  handler in handlers {
207                    if handler.stra_instructions.contains(&f){
208                        let tx = handler.sender.clone();
209                        tx.send(TradeData::RtnTrade(f.to_string())).unwrap();
210                        println!("dispatch:{:?}",f);
211                    }
212                }
213            },
214        }
215    }
216}
217fn strategy_init()-> Vec<StrategyHandler> {
218    let (tx_01,rx_01) = channel::<TradeData>();
219    let (tx_02,rx_02) = channel::<TradeData>();
220    let (tx_03,rx_03) = channel::<TradeData>();
221    let id = Arc::new(Mutex::new(0_u32));
222    let stra_handler_01 = StrategyHandler::new("DSCJ".to_string(),vec!["IC".to_string(),"IH".to_string()],tx_01);
223    let strategy_01 = Strategy::new("DSCJ".to_string(),rx_01,id.clone());
224    println!("a");
225    strategy_01.spawn();
226    println!("b");
227    let stra_handler_02 = StrategyHandler::new("TTTT".to_string(),vec!["IC".to_string(),"IF".to_string()],tx_02);
228    let strategy_02 = Strategy::new("TTTT".to_string(),rx_02,id.clone());
229    strategy_02.spawn();
230    let stra_handler_03 = StrategyHandler::new("WSDJ".to_string(),vec!["ag".to_string(),"cu".to_string()],tx_03);
231    let strategy_03 = Strategy::new("WSDJ".to_string(),rx_03,id.clone());
232    strategy_03.spawn();
233    vec![stra_handler_01,stra_handler_02,stra_handler_03]
234}
235// 等待=>
236
237
238fn main(){
239    //模拟生成相关的策略、策略group、策略管理者
240    let handlers :Vec<StrategyHandler> = strategy_init();
241    //模拟thost
242    let (tx,rx) = channel::<TradeData>();
243    //模拟N个多线程异步进行接收thost相关的行情等信息
244    simulate_send(tx,2);
245    println!("main=>");
246    dispatch_data(&rx,handlers);
247    thread::sleep(Duration::from_millis(500000));
248}
249
250

上面的核心是:sender:Sender < TradeData >和 Receiver < TradeData >要分离,否则在启新线程中,就会把策略本身move进行。
上面不仅有多线程和异步接收相关的信息,而且还有同步交易的模拟,在功能上也比较完整。

六、方法3:单channel +多线程方案

当然,用channel很方便,但有没有简单用加锁的方式,来解决这个问题?

有的,但是架构上,仍二个Arc(Mutex< > ) 放在分开的两部分之中。

下面是代码:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
1use std::thread;
2use std::sync::mpsc::channel;
3use std::collections::VecDeque;
4use std::time::Duration;
5use rand::Rng;
6use std::sync::mpsc::Receiver;
7use std::sync::mpsc::Sender;
8use std::thread::Thread;
9use std::sync::{Arc, Mutex};
10use std::marker::Send;
11use std::cell::RefCell;
12
13
14# [derive(Debug,Clone)]
15struct RtnMarketData(String);
16# [derive(Debug,Clone)]
17struct RtnOrder(String);
18# [derive(Debug,Clone)]
19struct RtnTrade(String);
20
21
22# [derive(Debug,Clone)]
23enum TradeData{
24    RtnMarketData(String),
25    RtnOrder(String),
26    RtnTrade(String),
27}
28trait Get_id{
29    fn get_request_id(&amp;self)-&gt; u32;
30}
31trait Get_data{
32    fn push_data(&amp;self,td:TradeData);
33}
34struct Strategy{
35    stra_name:String,
36    stra_instructions: Vec&lt;String&gt;, //订阅的相关合约
37    lock_id:Arc&lt;Mutex&lt;u32&gt;&gt;,
38    lock_data:Arc&lt;Mutex&lt;VecDeque&lt;TradeData&gt;&gt;&gt;,
39}
40struct Handler{
41    stra_name:String,
42    stra_instructions: Vec&lt;String&gt;, //订阅的相关合约
43    lock_data:Arc&lt;Mutex&lt;VecDeque&lt;TradeData&gt;&gt;&gt;,
44}
45impl Handler{
46    fn new(name:String,instrs:Vec&lt;String&gt;,lock_data:Arc&lt;Mutex&lt;VecDeque&lt;TradeData&gt;&gt;&gt;)-&gt;Self{
47        Self{
48            stra_name:name,
49            stra_instructions: instrs, //订阅的相关合约
50            lock_data:lock_data,
51        }
52    }
53}
54impl Get_data for Handler{
55    fn push_data(&amp;self,td:TradeData){
56        let s_clone = self.lock_data.clone();
57        if s_clone.is_poisoned(){
58            println!(&quot;@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ get_data  posion:{:?}&quot;,s_clone.is_poisoned());
59        }
60        let mut guard = match s_clone.lock() {
61            Ok(guard) =&gt; guard,
62            Err(poisoned) =&gt; {
63                println!(&quot; posion!&quot;);
64                poisoned.into_inner()
65            },
66        };
67        println!(&quot;push_back 成功!:{:?}&quot;,td);
68        guard.push_back(td);
69
70    }
71}
72impl Strategy{
73    fn new(name:String,instrs:Vec&lt;String&gt;,lock_id:Arc&lt;Mutex&lt;u32&gt;&gt;,lock_data:Arc&lt;Mutex&lt;VecDeque&lt;TradeData&gt;&gt;&gt;)-&gt;Self{
74        Self{
75            stra_name:name,
76            stra_instructions: instrs, //订阅的相关合约
77            lock_id: lock_id,
78            lock_data:lock_data,
79        }
80    }
81    fn spawn(self){
82       let lock = self.lock_data.clone();
83       if lock.is_poisoned(){
84           println!(&quot;=======================================&gt;  spawn posion:{:?}&quot;,lock.is_poisoned());
85       }
86
87       thread::spawn(move||{
88            loop{
89                   let  value = lock.lock().unwrap().pop_back();//.unwrap();//front
90                   println!(&quot;=&gt; get value:{:?}&quot;,value);
91                   thread::sleep(Duration::from_millis(100));
92                   match value {
93                       Some(TradeData::RtnMarketData(d)) =&gt;{
94                           self.OnRtnMarketData(TradeData::RtnMarketData(d));
95                       },
96                       Some(TradeData::RtnOrder(e)) =&gt;{
97                           self.OnRtnOrder(TradeData::RtnOrder(e));
98                       },
99                       Some(TradeData::RtnTrade(f)) =&gt;{
100                           self.OnRtnTrade(TradeData::RtnTrade(f));
101                       },
102                       _ =&gt;{
103                           println!(&quot;其它数据类型:{:?}&quot;,value);
104                           continue;
105                       },
106                  }
107            };
108       });
109    }
110    fn OnRtnMarketData(&amp;self,md:TradeData){
111        println!(&quot; -&gt; strategy:{:?} RtnMarketData=&gt; recv:{:?}&quot;,self.stra_name,md);
112        let rand_value:u64 = rand::thread_rng().gen_range(200,500);
113        // 交易获取唯一ID
114        if rand_value &lt; 400 {
115            println!(&quot;触发交易信号:{:?} id:{:?}&quot;,rand_value,self.get_request_id());
116        }
117        thread::sleep(Duration::from_millis(rand_value));
118    }
119    fn OnRtnOrder(&amp;self,order:TradeData){
120        println!(&quot; -&gt; strategy:{:?} RtnOrder =&gt; recv:{:?}&quot;,self.stra_name,order);
121        let rand_value:u64 = rand::thread_rng().gen_range(200,500);
122        // 交易获取唯一ID
123        if rand_value &lt; 10 {
124            println!(&quot;触发OnRtnOrder信号:{:?} id:{:?}&quot;,rand_value,self.get_request_id());
125        }
126    }
127    fn OnRtnTrade(&amp;self,trade:TradeData){
128        println!(&quot; -&gt; strategy:{:?} RtnTrade=&gt; recv:{:?}&quot;,self.stra_name,trade);
129        let rand_value:u64 = rand::thread_rng().gen_range(200,500);
130        // 交易获取唯一ID
131        if rand_value &lt; 300 {
132            println!(&quot;触发OnRtnTrade信号:{:?} id:{:?}&quot;,rand_value,self.get_request_id());
133        }
134    }
135}
136
137impl Get_id for Strategy{
138    fn get_request_id(&amp;self)-&gt; u32{
139        let lock = self.lock_id.clone();
140        let temp = *lock.lock().unwrap();
141        *lock.lock().unwrap()= temp +1 ;
142        println!(&quot;request_ID:{:?}&quot;,*lock.lock().unwrap());
143        temp + 1
144    }
145}
146
147struct StrategyGroup{
148    stra_list : Vec&lt;Strategy&gt;,
149}
150impl StrategyGroup{
151    fn new(list:Vec&lt;Strategy&gt;) -&gt; Self{
152        Self{
153            stra_list:list,
154        }
155    }
156}
157struct StrategyManager{
158    stra_group: StrategyGroup,
159}
160
161impl StrategyManager{
162    fn new(group: StrategyGroup)-&gt;Self{
163       Self{
164           stra_group: group,
165       }
166    }
167}
168
169fn simulate_send(tx:Sender&lt;TradeData&gt;,n_thread:u32){
170    for i in 0..n_thread {
171        let tx = tx.clone();
172        thread::spawn(move||{
173            let mut n = 0;
174            loop{
175                let rand_value:u32 = rand::thread_rng().gen_range(0, 1000);
176                n = n + 1;
177                println!(&quot;thost send info: thread id :{:?} ,次数 {:?} &quot;,i,n);
178                thread::sleep(Duration::from_millis(50));
179                match rand_value {
180                    0...600 =&gt; {
181                        match rand_value{
182                            0...100 =&gt; tx.send(TradeData::RtnMarketData(&quot;IC&quot;.to_string())).unwrap(),
183                            100...300=&gt; tx.send(TradeData::RtnMarketData(&quot;IF&quot;.to_string())).unwrap(),
184                            300...400=&gt; tx.send(TradeData::RtnMarketData(&quot;cu&quot;.to_string())).unwrap(),
185                            _ =&gt;tx.send(TradeData::RtnMarketData(&quot;ag&quot;.to_string())).unwrap(),
186                        }
187                    },
188
189                    600...900 =&gt; {
190                        match rand_value{
191                            600...700 =&gt;tx.send(TradeData::RtnOrder(&quot;IC&quot;.to_string())).unwrap(),
192                            700...750 =&gt;tx.send(TradeData::RtnOrder(&quot;IF&quot;.to_string())).unwrap(),
193                            750...800 =&gt;tx.send(TradeData::RtnOrder(&quot;cu&quot;.to_string())).unwrap(),
194                            _ =&gt;tx.send(TradeData::RtnOrder(&quot;ag&quot;.to_string())).unwrap(),
195                        }
196                    },
197                    _ =&gt; {
198                        match rand_value{
199                            900...920 =&gt;tx.send(TradeData::RtnTrade(&quot;IC&quot;.to_string())).unwrap(),
200                            920...940 =&gt;tx.send(TradeData::RtnTrade(&quot;IF&quot;.to_string())).unwrap(),
201                            940...960 =&gt;tx.send(TradeData::RtnTrade(&quot;cu&quot;.to_string())).unwrap(),
202                            _ =&gt;tx.send(TradeData::RtnTrade(&quot;ag&quot;.to_string())).unwrap(),
203                        }
204
205                    }
206                };
207
208            }
209        });
210    }
211    //thost::new(tx).thost_thread_builder.spawn()
212}
213fn dispatch_data(rx:&amp;Receiver&lt;TradeData&gt;,ss: Vec&lt;Handler&gt;){
214    let ss = &amp;ss;
215    loop{
216        let  ref value = rx.recv().unwrap();
217        match value {
218            TradeData::RtnMarketData(d) =&gt;{
219                for  s in ss {
220                    if s.stra_instructions.contains(&amp;d){
221                        println!(&quot;dispatch 2:{:?}&quot;,d);
222                        s.push_data(TradeData::RtnMarketData(d.to_string()));
223                    }
224                }
225            },
226            TradeData::RtnOrder(e) =&gt;{
227                for  s in ss {
228                    if s.stra_instructions.contains(&amp;e){
229                        println!(&quot;dispatch 4:{:?}&quot;,e);
230                        s.push_data(TradeData::RtnOrder(e.to_string()));
231                    }
232                }
233            },
234            TradeData::RtnTrade(f) =&gt;{
235                for  s in ss {
236                    if s.stra_instructions.contains(&amp;f){
237                        println!(&quot;dispatch 6:{:?}&quot;,f);
238                        s.push_data(TradeData::RtnTrade(f.to_string()));
239                    }
240                }
241            },
242        }
243    }
244}
245fn strategy_init()-&gt; Vec&lt;Handler&gt; {
246    let id = Arc::new(Mutex::new(0_u32));
247
248    let v_01 :VecDeque&lt;TradeData&gt; = VecDeque::new();
249    let data_01 = Arc::new(Mutex::new(v_01));
250
251    let v_02 :VecDeque&lt;TradeData&gt; = VecDeque::new();
252    let data_02 = Arc::new(Mutex::new(v_02));
253
254    let v_03 :VecDeque&lt;TradeData&gt; = VecDeque::new();
255    let data_03 = Arc::new(Mutex::new(v_03));
256
257    let handler_01 = Handler::new(&quot;DSCJ&quot;.to_string(),vec![&quot;IC&quot;.to_string(),&quot;IF&quot;.to_string()],data_01.clone());
258    let strategy_01 = Strategy::new(&quot;DSCJ&quot;.to_string(),vec![&quot;IC&quot;.to_string(),&quot;IF&quot;.to_string()],id.clone(),data_01.clone());
259    strategy_01.spawn();
260    let handler_02 = Handler::new(&quot;TTTT&quot;.to_string(),vec![&quot;IC&quot;.to_string(),&quot;IH&quot;.to_string()],data_02.clone());
261    let strategy_02 = Strategy::new(&quot;TTTT&quot;.to_string(),vec![&quot;IC&quot;.to_string(),&quot;IH&quot;.to_string()],id.clone(),data_02.clone());
262    strategy_02.spawn();
263    let handler_03 = Handler::new(&quot;WSDJ&quot;.to_string(),vec![&quot;cu&quot;.to_string(),&quot;ag&quot;.to_string()],data_03.clone());
264    let strategy_03 = Strategy::new(&quot;WSDJ&quot;.to_string(),vec![&quot;cu&quot;.to_string(),&quot;ag&quot;.to_string()],id.clone(),data_03.clone());
265    strategy_03.spawn();
266    vec![handler_01,handler_02,handler_03]
267}
268
269fn main(){
270    //模拟生成相关的策略、策略group、策略管理者
271    let handlers :Vec&lt;Handler&gt; = strategy_init();
272    //模拟thost
273    let (tx,rx) = channel::&lt;TradeData&gt;();
274    //模拟N个多线程异步进行接收thost相关的行情等信息
275    simulate_send(tx,2);
276    println!(&quot;main=&gt;&quot;);
277    dispatch_data(&amp;rx,handlers);
278    thread::sleep(Duration::from_millis(500000));
279}
280
281
282

给TA打赏
共{{data.count}}人
人已打赏
安全技术

C++遍历文件夹

2022-1-11 12:36:11

安全运维

开源中文分词FudanNLP

2021-12-12 17:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索