Java并发编程 | 第五篇:Java并行模式之并行流水线

释放双眼,带上耳机,听听看~!

假如现在有两个数,B和C。如果要计算(B+C)*B/2,那么这个运算过程就是无法并行的。原因是,如果B+C没有执行完成,则永远算不出(B+C)*B,这就是数据相关性。
可以借鉴日常生产中的流水线思想,首先将计算过程拆分为三个步骤:
P1:A=B+C
P2:D=AxB
P3:D=D/2
上述步骤中P1、P2和P3均在单独的线程中计算,并且每个线程只负责自己的工作。此时,P3的计算结果就是最终需要的答案。


简单实现

载体

为了实现这个功能,我们需要定义一个在线程间携带结果进行信息交换的载体


1
2
3
4
5
6
7
1public class Msg {
2  public double i;
3  public double j;
4  public String orgStr=null;
5}
6
7

P1计算的加法


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
1public class Plus implements Runnable{
2
3    public static BlockingQueue<Msg> bg=new LinkedBlockingQueue<Msg>();
4    @Override
5    public void run() {
6        // TODO Auto-generated method stub
7        while(true){
8            try {
9                Msg msg=bg.take();
10                msg.j=msg.i+msg.j;
11                Multiply.bg.add(msg);
12            } catch (Exception e) {
13
14            }
15        }
16    }
17}
18

可以看出,P1从共享管道BlockingQueue取出Msg,然后使用Msg封装的i和j进行求和操作,然后通过把结果添加到P2的共享管道BlockingQueue,传递给P2

P2计算的乘法


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
1public class Multiply implements Runnable {
2    public static BlockingQueue<Msg> bg=new LinkedBlockingQueue<Msg>();
3    @Override
4    public void run() {
5        while(true){
6            try {
7                Msg msg=bg.take();
8                msg.i=msg.i*msg.j;
9                Div.bg.add(msg);
10            } catch (Exception e) {
11                // TODO: handle exception
12            }
13        }
14
15    }
16
17}
18

P3计算的除法


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
1public class Div implements Runnable {
2    public static BlockingQueue<Msg> bg = new LinkedBlockingQueue<Msg>();
3
4    @Override
5    public void run() {
6        while (true) {
7        try {
8          Msg msg=bg.take();
9          msg.i=msg.i/2;
10          System.out.println(msg.orgStr+"="+msg.i);
11            } catch (Exception e) {
12          }
13        }
14    }
15}
16

这个可以得到最终结果,所以我们就输出最终结果,msg.orgStr是计算格式的字符串

测试


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
1public class PStreamMain {
2     public static void main(String[] args) {
3        new Thread(new Plus()).start();
4        new Thread(new Multiply()).start();
5        new Thread(new Div()).start();
6
7        for(int i=0;i<=1000;i++){
8            for(int j=0;j<=1000;j++){
9                Msg msg=new Msg();
10                msg.i=i;
11                msg.j=j;
12                msg.orgStr="(("+i+"+"+j+")*"+i+")/2";
13                Plus.bg.add(msg);
14            }
15
16        }
17    }
18}
19

总结

你会发现并发流水线是通过BlockingQueue这个共享管道,通过依赖关系串联起来,把操作分配在不同线程中进行计算,尽可能利用多核优势

参考:《Java高并发程序设计》

给TA打赏
共{{data.count}}人
人已打赏
安全技术

详解Node.js API系列C/C++ Addons(1) API文档

2021-12-21 16:36:11

安全技术

从零搭建自己的SpringBoot后台框架(二十三)

2022-1-12 12:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索