假如现在有两个数,B和C。如果要计算(B+C)*B/2,那么这个运算过程就是无法并行的。原因是,如果B+C没有执行完成,则永远算不出(B+C)*B,这就是数据相关性。
可以借鉴日常生产中的流水线思想,首先将计算过程拆分为三个步骤:
P1:A=B+C
P2:D=AxB
P3:D=D/2
上述步骤中P1、P2和P3均在单独的线程中计算,并且每个线程只负责自己的工作。此时,P3的计算结果就是最终需要的答案。
简单实现
载体
为了实现这个功能,我们需要定义一个在线程间携带结果进行信息交换的载体
1
2
3
4
5
6
7 1public class Msg {
2 public double i;
3 public double j;
4 public String orgStr=null;
5}
6
7
P1计算的加法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 1public class Plus implements Runnable{
2
3 public static BlockingQueue<Msg> bg=new LinkedBlockingQueue<Msg>();
4 @Override
5 public void run() {
6 // TODO Auto-generated method stub
7 while(true){
8 try {
9 Msg msg=bg.take();
10 msg.j=msg.i+msg.j;
11 Multiply.bg.add(msg);
12 } catch (Exception e) {
13
14 }
15 }
16 }
17}
18
可以看出,P1从共享管道BlockingQueue取出Msg,然后使用Msg封装的i和j进行求和操作,然后通过把结果添加到P2的共享管道BlockingQueue,传递给P2
P2计算的乘法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 1public class Multiply implements Runnable {
2 public static BlockingQueue<Msg> bg=new LinkedBlockingQueue<Msg>();
3 @Override
4 public void run() {
5 while(true){
6 try {
7 Msg msg=bg.take();
8 msg.i=msg.i*msg.j;
9 Div.bg.add(msg);
10 } catch (Exception e) {
11 // TODO: handle exception
12 }
13 }
14
15 }
16
17}
18
P3计算的除法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 1public class Div implements Runnable {
2 public static BlockingQueue<Msg> bg = new LinkedBlockingQueue<Msg>();
3
4 @Override
5 public void run() {
6 while (true) {
7 try {
8 Msg msg=bg.take();
9 msg.i=msg.i/2;
10 System.out.println(msg.orgStr+"="+msg.i);
11 } catch (Exception e) {
12 }
13 }
14 }
15}
16
这个可以得到最终结果,所以我们就输出最终结果,msg.orgStr是计算格式的字符串
测试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 1public class PStreamMain {
2 public static void main(String[] args) {
3 new Thread(new Plus()).start();
4 new Thread(new Multiply()).start();
5 new Thread(new Div()).start();
6
7 for(int i=0;i<=1000;i++){
8 for(int j=0;j<=1000;j++){
9 Msg msg=new Msg();
10 msg.i=i;
11 msg.j=j;
12 msg.orgStr="(("+i+"+"+j+")*"+i+")/2";
13 Plus.bg.add(msg);
14 }
15
16 }
17 }
18}
19
总结
你会发现并发流水线是通过BlockingQueue这个共享管道,通过依赖关系串联起来,把操作分配在不同线程中进行计算,尽可能利用多核优势
参考:《Java高并发程序设计》