在CTP中,CTP thost会异步发送相关行情和交易回报信息给订阅方或策略管理者(这里简称为strategyManager)。那么,模拟一下CTP的机制,有利于在CTP平台上,构建策略交易支持体系。
一、主要涉及几方面:
1、thost异步发送相关信息
利用多线程,多生产者单消费者模式来模拟发送相关信息,可以选用标准库中的mpsc::channel.
2、策略管理者把相关信息发分给不同的策略
二、方案1: 单channel+单线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170 1use std::thread;
2use std::sync::mpsc::channel;
3use std::collections::VecDeque;
4use std::time::Duration;
5use rand::Rng;
6use std::sync::mpsc::Receiver;
7use std::sync::mpsc::Sender;
8use std::thread::Thread;
9use std::cell::RefCell;
10
11# [derive(Debug)]
12struct RtnMarketData(String);
13# [derive(Debug)]
14struct RtnOrder(String);
15# [derive(Debug)]
16struct RtnTrade(String);
17
18
19//模拟thost传过来的不同的值;
20# [derive(Debug)]
21enum TradeData{
22 RtnMarketData(String),
23 RtnOrder(String),
24 RtnTrade(String),
25}
26struct Strategy{
27 stra_name:String,
28 stra_thread_builder:thread::Builder,
29 stra_instructions:Vec<String>,//订阅的相关合约
30}
31impl Strategy{
32 fn new(name:String,instrs:Vec<String>)->Self{
33 Self{
34 stra_name: name,
35 stra_thread_builder: thread::Builder::new(),
36 stra_instructions:instrs,
37 }
38 }
39 fn OnRtnMarketData(&self,md:&TradeData){
40 println!(" -> strategy:{:?} RtnMarketData=> recv:{:?}",self.stra_name,md);
41 }
42 fn OnRtnOrder(&self,order:&TradeData){
43 println!(" -> strategy:{:?} RtnOrder => recv:{:?}",self.stra_name,order);
44 }
45 fn OnRtnTrade(&self,trade:&TradeData){
46 println!(" -> strategy:{:?} RtnTrade=> recv:{:?}",self.stra_name,trade);
47 }
48}
49struct StrategyGroup{
50 //stra_list : RefCell<Vec<Strategy>>,
51 stra_list : Vec<Strategy>,
52}
53impl StrategyGroup{
54 fn new(list:Vec<Strategy>) -> Self{
55 //stra_list:vec![Strategy::new("DSCJ",vec!["IC","IF"]),Strategy::new("WSDJ",vec!["cu,ag"])]
56 Self{
57 //stra_list:RefCell::new(list),
58 stra_list:list,
59 }
60 }
61}
62struct StrategyManager{
63 thread_builder : thread::Builder,
64 stra_group:StrategyGroup,
65}
66impl StrategyManager{
67 fn new(group:StrategyGroup)->Self{
68 Self{
69 thread_builder : thread::Builder::new(),
70 stra_group:group,
71 }
72 }
73}
74
75fn simulate_send(tx:Sender<TradeData>,n_thread:u32){
76 for i in 0..n_thread {
77 let tx = tx.clone();
78 thread::spawn(move||{
79 let mut n = 0;
80 loop{
81 let rand_value:u32 = rand::thread_rng().gen_range(0, 1000);
82 n = n + 1;
83 println!("rand_value:{:?} n:{:?} thread id :{:?}",rand_value,n,i);
84 thread::sleep(Duration::from_millis(300));
85 match rand_value {
86 0...600 => {
87 match rand_value{
88 0...100 => tx.send(TradeData::RtnMarketData("IC".to_string())).unwrap(),
89 100...300=> tx.send(TradeData::RtnMarketData("IF".to_string())).unwrap(),
90 300...400=> tx.send(TradeData::RtnMarketData("cu".to_string())).unwrap(),
91 _ =>tx.send(TradeData::RtnMarketData("ag".to_string())).unwrap(),
92 }
93 },
94
95 600...900 => {
96 match rand_value{
97 600...700 =>tx.send(TradeData::RtnOrder("IC".to_string())).unwrap(),
98 700...750 =>tx.send(TradeData::RtnOrder("IF".to_string())).unwrap(),
99 750...800 =>tx.send(TradeData::RtnOrder("cu".to_string())).unwrap(),
100 _ =>tx.send(TradeData::RtnOrder("ag".to_string())).unwrap(),
101 }
102 },
103 _ => {
104 match rand_value{
105 900...920 =>tx.send(TradeData::RtnTrade("IC".to_string())).unwrap(),
106 920...940 =>tx.send(TradeData::RtnTrade("IF".to_string())).unwrap(),
107 940...960 =>tx.send(TradeData::RtnTrade("cu".to_string())).unwrap(),
108 _ =>tx.send(TradeData::RtnTrade("ag".to_string())).unwrap(),
109 }
110
111 }
112 };
113
114 }
115 });
116 }
117}
118fn dispatch_data(rx:&Receiver<TradeData>,stra_group:&StrategyGroup){
119 //let (tx,rx) = channel();
120 let strategys = &*stra_group.stra_list;
121 loop{
122 let ref value = rx.recv().unwrap();
123 match value {
124 TradeData::RtnMarketData(d) =>{
125 for strategy in strategys {
126 if strategy.stra_instructions.contains(&d){
127 strategy.OnRtnMarketData(value);
128 }
129 }
130 },
131 TradeData::RtnOrder(e) =>{
132 for strategy in strategys {
133 if strategy.stra_instructions.contains(&e){
134 strategy.OnRtnOrder(value);
135 }
136 }
137 },
138 TradeData::RtnTrade(f) =>{
139 for strategy in strategys {
140 if strategy.stra_instructions.contains(&f){
141 strategy.OnRtnTrade(value)
142 }
143 }
144 },
145 }
146 }
147
148}
149fn generate_strategyManager()-> StrategyManager {
150 let strategy_01 =Strategy::new("DSCJ".to_string(),vec!["IC".to_string(),"IF".to_string()]);
151 let strategy_02 =Strategy::new("WSDJ".to_string(),vec!["IF".to_string()]);
152 let strategy_03 =Strategy::new("TTTT".to_string(),vec!["ag".to_string(),"cu".to_string()]);
153 let stra_group = StrategyGroup::new(vec![strategy_01,strategy_02,strategy_03]);
154 StrategyManager::new(stra_group)
155}
156
157fn main(){
158 //模拟生成相关的策略、策略group、策略管理者
159 let stra_manager = generate_strategyManager();
160 // 模拟thost
161 let (tx,rx) = channel::<TradeData>();
162 //模拟多线程异步进行接收thost相关的行情等信息
163 simulate_send(tx,10);
164 println!("main=>");
165
166 dispatch_data(&rx,&stra_manager.stra_group);
167 thread::sleep(Duration::from_millis(500000));
168}
169
170
三、效果图
四、资源占用情况
在去掉了sleep之后,可以看到,内存一直在上升;因为10个线程不断的发行情,这个压力是很大的;策略的分发的速度又来不及,占用的资源会较大。
这个请大家注意!
五、方案2:多channel + 多线程方案
1、这个模拟,并没有模拟订阅thost的机制;
2、子策略并没有专有线程去接收相关信息;现在是strategyManager中单线程进行分发,并负责策略中信号处理。这部分应交给子策略中的专有线程处理。 即处理的链条过长。
下面的代码实现了两层:channel以及多线程轮询交易所的行情等信息,大大提高了交易的效率。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250 1use std::thread;
2use std::sync::mpsc::channel;
3use std::collections::VecDeque;
4use std::time::Duration;
5use rand::Rng;
6use std::sync::mpsc::Receiver;
7use std::sync::mpsc::Sender;
8use std::thread::Thread;
9use std::sync::{Arc, Mutex};
10use std::marker::Send;
11use std::cell::RefCell;
12
13
14# [derive(Debug,Clone)]
15struct RtnMarketData(String);
16# [derive(Debug,Clone)]
17struct RtnOrder(String);
18# [derive(Debug,Clone)]
19struct RtnTrade(String);
20
21
22# [derive(Debug,Clone)]
23enum TradeData{
24 RtnMarketData(String),
25 RtnOrder(String),
26 RtnTrade(String),
27}
28trait Get_id{
29 fn get_request_id(&self)-> u32;
30}
31
32struct Strategy{
33 stra_name:String,
34 receiver:Receiver<TradeData>,
35 lock:Arc<Mutex<u32>>,
36}
37struct StrategyHandler{
38 stra_name: String,
39 stra_instructions: Vec<String>, //订阅的相关合约
40 sender:Sender<TradeData>,
41}
42impl StrategyHandler{
43 fn new(name:String,instrs:Vec<String>,send:Sender<TradeData>)->Self{
44 Self{
45 stra_name: name,
46 stra_instructions: instrs,
47 sender:send,
48 }
49 }
50}
51
52impl Strategy{
53 fn new(name:String,recv:Receiver<TradeData>,lock:Arc<Mutex<u32>>)->Self{
54 Self{
55 stra_name:name,
56 receiver:recv,
57 lock:lock,
58 }
59 }
60 fn spawn(self){
61 thread::spawn(move||{
62 loop{
63 let value = self.receiver.recv().unwrap();
64 match value {
65 TradeData::RtnMarketData(_) =>{
66 self.OnRtnMarketData(value);
67 },
68 TradeData::RtnOrder(_) =>{
69 self.OnRtnOrder(value);
70 },
71 TradeData::RtnTrade(_) =>{
72 self.OnRtnTrade(value);
73 },
74 }
75 };
76 });
77 }
78 fn OnRtnMarketData(&self,md:TradeData){
79 println!(" -> strategy:{:?} RtnMarketData=> recv:{:?}",self.stra_name,md);
80 let rand_value:u64 = rand::thread_rng().gen_range(200,500);
81 // 获取唯一ID,策略逻辑在此
82 if rand_value < 400 {
83 println!("触发交易信号:{:?} id:{:?}",rand_value,self.get_request_id());
84 }
85 thread::sleep(Duration::from_millis(rand_value));
86 }
87 fn OnRtnOrder(&self,order:TradeData){
88 println!(" -> strategy:{:?} RtnOrder => recv:{:?}",self.stra_name,order);
89 let rand_value:u64 = rand::thread_rng().gen_range(200,500);
90 // 获取唯一ID
91 if rand_value < 10 {
92 println!("触发OnRtnOrder信号:{:?} id:{:?}",rand_value,self.get_request_id());
93 }
94 }
95 fn OnRtnTrade(&self,trade:TradeData){
96 println!(" -> strategy:{:?} RtnTrade=> recv:{:?}",self.stra_name,trade);
97 let rand_value:u64 = rand::thread_rng().gen_range(200,500);
98 // 获取唯一ID
99 if rand_value < 300 {
100 println!("触发OnRtnTrade信号:{:?} id:{:?}",rand_value,self.get_request_id());
101 }
102 }
103}
104
105impl Get_id for Strategy{
106 fn get_request_id(&self)-> u32{
107 let lock = self.lock.clone();
108 let temp = *lock.lock().unwrap();
109 *lock.lock().unwrap()= temp +1 ;
110 println!("request_ID:{:?}",*lock.lock().unwrap());
111 temp+1
112 }
113}
114
115struct StrategyGroup{
116 stra_list : Vec<Strategy>,
117}
118impl StrategyGroup{
119 fn new(list:Vec<Strategy>) -> Self{
120 Self{
121 stra_list:list,
122 }
123 }
124}
125struct StrategyManager{
126 stra_group: StrategyGroup,
127}
128
129impl StrategyManager{
130 fn new(group: StrategyGroup)->Self{
131 Self{
132 stra_group: group,
133 }
134 }
135}
136
137fn simulate_send(tx:Sender<TradeData>,n_thread:u32){
138 for i in 0..n_thread {
139 let tx = tx.clone();
140 thread::spawn(move||{
141 let mut n = 0;
142 loop{
143 let rand_value:u32 = rand::thread_rng().gen_range(0, 1000);
144 n = n + 1;
145 println!("thost send info: thread id :{:?} ,次数 {:?} ",i,n);
146 thread::sleep(Duration::from_millis(500));
147 match rand_value {
148 0...600 => {
149 match rand_value{
150 0...100 => tx.send(TradeData::RtnMarketData("IC".to_string())).unwrap(),
151 100...300=> tx.send(TradeData::RtnMarketData("IF".to_string())).unwrap(),
152 300...400=> tx.send(TradeData::RtnMarketData("cu".to_string())).unwrap(),
153 _ =>tx.send(TradeData::RtnMarketData("ag".to_string())).unwrap(),
154 }
155 },
156
157 600...900 => {
158 match rand_value{
159 600...700 =>tx.send(TradeData::RtnOrder("IC".to_string())).unwrap(),
160 700...750 =>tx.send(TradeData::RtnOrder("IF".to_string())).unwrap(),
161 750...800 =>tx.send(TradeData::RtnOrder("cu".to_string())).unwrap(),
162 _ =>tx.send(TradeData::RtnOrder("ag".to_string())).unwrap(),
163 }
164 },
165 _ => {
166 match rand_value{
167 900...920 =>tx.send(TradeData::RtnTrade("IC".to_string())).unwrap(),
168 920...940 =>tx.send(TradeData::RtnTrade("IF".to_string())).unwrap(),
169 940...960 =>tx.send(TradeData::RtnTrade("cu".to_string())).unwrap(),
170 _ =>tx.send(TradeData::RtnTrade("ag".to_string())).unwrap(),
171 }
172
173 }
174 };
175
176 }
177 });
178 }
179 //thost::new(tx).thost_thread_builder.spawn()
180}
181fn dispatch_data(rx:&Receiver<TradeData>,_handlers: Vec<StrategyHandler>){
182 let handlers = &_handlers;
183 loop{
184 let ref value = rx.recv().unwrap();
185 match value {
186 TradeData::RtnMarketData(d) =>{
187 for handler in handlers {
188 if handler.stra_instructions.contains(&d){
189 //strategy.OnRtnMarketData(value)
190 let tx = handler.sender.clone();
191 tx.send(TradeData::RtnMarketData(d.to_string())).unwrap();
192 println!("dispatch:{:?}",d);
193 }
194 }
195 },
196 TradeData::RtnOrder(e) =>{
197 for handler in handlers {
198 if handler.stra_instructions.contains(&e){
199 let tx = handler.sender.clone();
200 tx.send(TradeData::RtnOrder(e.to_string())).unwrap();
201 println!("dispatch:{:?}",e);
202 }
203 }
204 },
205 TradeData::RtnTrade(f) =>{
206 for handler in handlers {
207 if handler.stra_instructions.contains(&f){
208 let tx = handler.sender.clone();
209 tx.send(TradeData::RtnTrade(f.to_string())).unwrap();
210 println!("dispatch:{:?}",f);
211 }
212 }
213 },
214 }
215 }
216}
217fn strategy_init()-> Vec<StrategyHandler> {
218 let (tx_01,rx_01) = channel::<TradeData>();
219 let (tx_02,rx_02) = channel::<TradeData>();
220 let (tx_03,rx_03) = channel::<TradeData>();
221 let id = Arc::new(Mutex::new(0_u32));
222 let stra_handler_01 = StrategyHandler::new("DSCJ".to_string(),vec!["IC".to_string(),"IH".to_string()],tx_01);
223 let strategy_01 = Strategy::new("DSCJ".to_string(),rx_01,id.clone());
224 println!("a");
225 strategy_01.spawn();
226 println!("b");
227 let stra_handler_02 = StrategyHandler::new("TTTT".to_string(),vec!["IC".to_string(),"IF".to_string()],tx_02);
228 let strategy_02 = Strategy::new("TTTT".to_string(),rx_02,id.clone());
229 strategy_02.spawn();
230 let stra_handler_03 = StrategyHandler::new("WSDJ".to_string(),vec!["ag".to_string(),"cu".to_string()],tx_03);
231 let strategy_03 = Strategy::new("WSDJ".to_string(),rx_03,id.clone());
232 strategy_03.spawn();
233 vec![stra_handler_01,stra_handler_02,stra_handler_03]
234}
235// 等待=>
236
237
238fn main(){
239 //模拟生成相关的策略、策略group、策略管理者
240 let handlers :Vec<StrategyHandler> = strategy_init();
241 //模拟thost
242 let (tx,rx) = channel::<TradeData>();
243 //模拟N个多线程异步进行接收thost相关的行情等信息
244 simulate_send(tx,2);
245 println!("main=>");
246 dispatch_data(&rx,handlers);
247 thread::sleep(Duration::from_millis(500000));
248}
249
250
上面的核心是:sender:Sender < TradeData >和 Receiver < TradeData >要分离,否则在启新线程中,就会把策略本身move进行。
上面不仅有多线程和异步接收相关的信息,而且还有同步交易的模拟,在功能上也比较完整。
六、方法3:单channel +多线程方案
当然,用channel很方便,但有没有简单用加锁的方式,来解决这个问题?
有的,但是架构上,仍二个Arc(Mutex< > ) 放在分开的两部分之中。
下面是代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282 1use std::thread;
2use std::sync::mpsc::channel;
3use std::collections::VecDeque;
4use std::time::Duration;
5use rand::Rng;
6use std::sync::mpsc::Receiver;
7use std::sync::mpsc::Sender;
8use std::thread::Thread;
9use std::sync::{Arc, Mutex};
10use std::marker::Send;
11use std::cell::RefCell;
12
13
14# [derive(Debug,Clone)]
15struct RtnMarketData(String);
16# [derive(Debug,Clone)]
17struct RtnOrder(String);
18# [derive(Debug,Clone)]
19struct RtnTrade(String);
20
21
22# [derive(Debug,Clone)]
23enum TradeData{
24 RtnMarketData(String),
25 RtnOrder(String),
26 RtnTrade(String),
27}
28trait Get_id{
29 fn get_request_id(&self)-> u32;
30}
31trait Get_data{
32 fn push_data(&self,td:TradeData);
33}
34struct Strategy{
35 stra_name:String,
36 stra_instructions: Vec<String>, //订阅的相关合约
37 lock_id:Arc<Mutex<u32>>,
38 lock_data:Arc<Mutex<VecDeque<TradeData>>>,
39}
40struct Handler{
41 stra_name:String,
42 stra_instructions: Vec<String>, //订阅的相关合约
43 lock_data:Arc<Mutex<VecDeque<TradeData>>>,
44}
45impl Handler{
46 fn new(name:String,instrs:Vec<String>,lock_data:Arc<Mutex<VecDeque<TradeData>>>)->Self{
47 Self{
48 stra_name:name,
49 stra_instructions: instrs, //订阅的相关合约
50 lock_data:lock_data,
51 }
52 }
53}
54impl Get_data for Handler{
55 fn push_data(&self,td:TradeData){
56 let s_clone = self.lock_data.clone();
57 if s_clone.is_poisoned(){
58 println!("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ get_data posion:{:?}",s_clone.is_poisoned());
59 }
60 let mut guard = match s_clone.lock() {
61 Ok(guard) => guard,
62 Err(poisoned) => {
63 println!(" posion!");
64 poisoned.into_inner()
65 },
66 };
67 println!("push_back 成功!:{:?}",td);
68 guard.push_back(td);
69
70 }
71}
72impl Strategy{
73 fn new(name:String,instrs:Vec<String>,lock_id:Arc<Mutex<u32>>,lock_data:Arc<Mutex<VecDeque<TradeData>>>)->Self{
74 Self{
75 stra_name:name,
76 stra_instructions: instrs, //订阅的相关合约
77 lock_id: lock_id,
78 lock_data:lock_data,
79 }
80 }
81 fn spawn(self){
82 let lock = self.lock_data.clone();
83 if lock.is_poisoned(){
84 println!("=======================================> spawn posion:{:?}",lock.is_poisoned());
85 }
86
87 thread::spawn(move||{
88 loop{
89 let value = lock.lock().unwrap().pop_back();//.unwrap();//front
90 println!("=> get value:{:?}",value);
91 thread::sleep(Duration::from_millis(100));
92 match value {
93 Some(TradeData::RtnMarketData(d)) =>{
94 self.OnRtnMarketData(TradeData::RtnMarketData(d));
95 },
96 Some(TradeData::RtnOrder(e)) =>{
97 self.OnRtnOrder(TradeData::RtnOrder(e));
98 },
99 Some(TradeData::RtnTrade(f)) =>{
100 self.OnRtnTrade(TradeData::RtnTrade(f));
101 },
102 _ =>{
103 println!("其它数据类型:{:?}",value);
104 continue;
105 },
106 }
107 };
108 });
109 }
110 fn OnRtnMarketData(&self,md:TradeData){
111 println!(" -> strategy:{:?} RtnMarketData=> recv:{:?}",self.stra_name,md);
112 let rand_value:u64 = rand::thread_rng().gen_range(200,500);
113 // 交易获取唯一ID
114 if rand_value < 400 {
115 println!("触发交易信号:{:?} id:{:?}",rand_value,self.get_request_id());
116 }
117 thread::sleep(Duration::from_millis(rand_value));
118 }
119 fn OnRtnOrder(&self,order:TradeData){
120 println!(" -> strategy:{:?} RtnOrder => recv:{:?}",self.stra_name,order);
121 let rand_value:u64 = rand::thread_rng().gen_range(200,500);
122 // 交易获取唯一ID
123 if rand_value < 10 {
124 println!("触发OnRtnOrder信号:{:?} id:{:?}",rand_value,self.get_request_id());
125 }
126 }
127 fn OnRtnTrade(&self,trade:TradeData){
128 println!(" -> strategy:{:?} RtnTrade=> recv:{:?}",self.stra_name,trade);
129 let rand_value:u64 = rand::thread_rng().gen_range(200,500);
130 // 交易获取唯一ID
131 if rand_value < 300 {
132 println!("触发OnRtnTrade信号:{:?} id:{:?}",rand_value,self.get_request_id());
133 }
134 }
135}
136
137impl Get_id for Strategy{
138 fn get_request_id(&self)-> u32{
139 let lock = self.lock_id.clone();
140 let temp = *lock.lock().unwrap();
141 *lock.lock().unwrap()= temp +1 ;
142 println!("request_ID:{:?}",*lock.lock().unwrap());
143 temp + 1
144 }
145}
146
147struct StrategyGroup{
148 stra_list : Vec<Strategy>,
149}
150impl StrategyGroup{
151 fn new(list:Vec<Strategy>) -> Self{
152 Self{
153 stra_list:list,
154 }
155 }
156}
157struct StrategyManager{
158 stra_group: StrategyGroup,
159}
160
161impl StrategyManager{
162 fn new(group: StrategyGroup)->Self{
163 Self{
164 stra_group: group,
165 }
166 }
167}
168
169fn simulate_send(tx:Sender<TradeData>,n_thread:u32){
170 for i in 0..n_thread {
171 let tx = tx.clone();
172 thread::spawn(move||{
173 let mut n = 0;
174 loop{
175 let rand_value:u32 = rand::thread_rng().gen_range(0, 1000);
176 n = n + 1;
177 println!("thost send info: thread id :{:?} ,次数 {:?} ",i,n);
178 thread::sleep(Duration::from_millis(50));
179 match rand_value {
180 0...600 => {
181 match rand_value{
182 0...100 => tx.send(TradeData::RtnMarketData("IC".to_string())).unwrap(),
183 100...300=> tx.send(TradeData::RtnMarketData("IF".to_string())).unwrap(),
184 300...400=> tx.send(TradeData::RtnMarketData("cu".to_string())).unwrap(),
185 _ =>tx.send(TradeData::RtnMarketData("ag".to_string())).unwrap(),
186 }
187 },
188
189 600...900 => {
190 match rand_value{
191 600...700 =>tx.send(TradeData::RtnOrder("IC".to_string())).unwrap(),
192 700...750 =>tx.send(TradeData::RtnOrder("IF".to_string())).unwrap(),
193 750...800 =>tx.send(TradeData::RtnOrder("cu".to_string())).unwrap(),
194 _ =>tx.send(TradeData::RtnOrder("ag".to_string())).unwrap(),
195 }
196 },
197 _ => {
198 match rand_value{
199 900...920 =>tx.send(TradeData::RtnTrade("IC".to_string())).unwrap(),
200 920...940 =>tx.send(TradeData::RtnTrade("IF".to_string())).unwrap(),
201 940...960 =>tx.send(TradeData::RtnTrade("cu".to_string())).unwrap(),
202 _ =>tx.send(TradeData::RtnTrade("ag".to_string())).unwrap(),
203 }
204
205 }
206 };
207
208 }
209 });
210 }
211 //thost::new(tx).thost_thread_builder.spawn()
212}
213fn dispatch_data(rx:&Receiver<TradeData>,ss: Vec<Handler>){
214 let ss = &ss;
215 loop{
216 let ref value = rx.recv().unwrap();
217 match value {
218 TradeData::RtnMarketData(d) =>{
219 for s in ss {
220 if s.stra_instructions.contains(&d){
221 println!("dispatch 2:{:?}",d);
222 s.push_data(TradeData::RtnMarketData(d.to_string()));
223 }
224 }
225 },
226 TradeData::RtnOrder(e) =>{
227 for s in ss {
228 if s.stra_instructions.contains(&e){
229 println!("dispatch 4:{:?}",e);
230 s.push_data(TradeData::RtnOrder(e.to_string()));
231 }
232 }
233 },
234 TradeData::RtnTrade(f) =>{
235 for s in ss {
236 if s.stra_instructions.contains(&f){
237 println!("dispatch 6:{:?}",f);
238 s.push_data(TradeData::RtnTrade(f.to_string()));
239 }
240 }
241 },
242 }
243 }
244}
245fn strategy_init()-> Vec<Handler> {
246 let id = Arc::new(Mutex::new(0_u32));
247
248 let v_01 :VecDeque<TradeData> = VecDeque::new();
249 let data_01 = Arc::new(Mutex::new(v_01));
250
251 let v_02 :VecDeque<TradeData> = VecDeque::new();
252 let data_02 = Arc::new(Mutex::new(v_02));
253
254 let v_03 :VecDeque<TradeData> = VecDeque::new();
255 let data_03 = Arc::new(Mutex::new(v_03));
256
257 let handler_01 = Handler::new("DSCJ".to_string(),vec!["IC".to_string(),"IF".to_string()],data_01.clone());
258 let strategy_01 = Strategy::new("DSCJ".to_string(),vec!["IC".to_string(),"IF".to_string()],id.clone(),data_01.clone());
259 strategy_01.spawn();
260 let handler_02 = Handler::new("TTTT".to_string(),vec!["IC".to_string(),"IH".to_string()],data_02.clone());
261 let strategy_02 = Strategy::new("TTTT".to_string(),vec!["IC".to_string(),"IH".to_string()],id.clone(),data_02.clone());
262 strategy_02.spawn();
263 let handler_03 = Handler::new("WSDJ".to_string(),vec!["cu".to_string(),"ag".to_string()],data_03.clone());
264 let strategy_03 = Strategy::new("WSDJ".to_string(),vec!["cu".to_string(),"ag".to_string()],id.clone(),data_03.clone());
265 strategy_03.spawn();
266 vec![handler_01,handler_02,handler_03]
267}
268
269fn main(){
270 //模拟生成相关的策略、策略group、策略管理者
271 let handlers :Vec<Handler> = strategy_init();
272 //模拟thost
273 let (tx,rx) = channel::<TradeData>();
274 //模拟N个多线程异步进行接收thost相关的行情等信息
275 simulate_send(tx,2);
276 println!("main=>");
277 dispatch_data(&rx,handlers);
278 thread::sleep(Duration::from_millis(500000));
279}
280
281
282