Java高并发(七)——Future模式

释放双眼,带上耳机,听听看~!

       大家想下,多线程处理提高性能的根本本质在哪?其实就是将串行的处理步骤进行并行的处理,其实总时间是没有缩短的。也就是以前一个人干活需要10个小时,而十个人干同样的活需要1小时,从而缩短处理时间。但是如果干活有先后限制怎么办?例如工作中:测试前必须编码,编码前必须设计,设计前必须需求分析,分析前……如何提高这种情况的性能呢?或者说是如何让这中情况下的线程更加充分利用呢?Future模式——异步调用。好,在总结Future模式前,我们来先看一篇文章https://www.cnblogs.com/cz123/p/7693064.html。

       一,流程对比:对上边厨房做饭的例子,进行用Future模式的前后执行流程对比,从图中可以看出,其实就将能够并行的处理步骤通过异步调用进行了并行,并行不了的只能串行。

Java高并发(七)——Future模式

       二**,Future模式**:

      1,Future的核心结构图:

Java高并发(七)——Future模式

     2,看下实现简易代码:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
1/**
2 * 数据接口
3 */
4public interface Data {
5    /**
6     * 获取数据
7     * @return
8     */
9    String getResult();
10}
11
12
13/**
14 * RealData类
15 */
16public class RealData implements Data {
17    protected final String result;
18
19    public RealData(String para) {
20        StringBuffer sb = new StringBuffer();
21        for (int i = 0; i < 10; i++) {
22            sb.append(para);
23            try {
24                Thread.sleep(1000);
25            } catch (InterruptedException e) {
26                e.printStackTrace();
27            }
28        }
29        result = sb.toString();
30    }
31
32
33    @Override
34    public String getResult() {
35        return result;
36    }
37}
38
39
40/**
41 * FutureData类
42 */
43public class FutureData implements Data {
44    //FutureData是realData的包装
45    protected RealData realData = null;
46    protected boolean isReady = false;
47
48    public synchronized void setRealData(RealData realData) {
49        if (isReady) {
50            return;
51        }
52        this.realData = realData;
53        isReady = true;
54        notifyAll();
55    }
56
57
58    @Override
59    public synchronized String getResult() {
60        while (!isReady) {
61            try {
62                wait();
63            } catch (InterruptedException e) {
64                e.printStackTrace();
65            }
66        }
67        return realData.getResult();
68    }
69}
70
71
72
73public class Client {
74
75    public Data request(final String queryStr) {
76        final FutureData futureData = new FutureData();
77        //单起个线程进行数据处理
78        new Thread() {
79            @Override
80            public void run() {
81                RealData realData = new RealData(queryStr);
82                futureData.setRealData(realData);
83            }
84        }.start();
85        //立即返回
86        return futureData;
87    }
88
89
90    public static void main(String[] args) {
91        Client client = new Client();
92
93        Data data = client.request("ljhname");
94        System.out.println("请求完毕");
95
96        try {
97            Thread.sleep(2000);
98        } catch (InterruptedException e) {
99            e.printStackTrace();
100        }
101
102        System.out.println("数据" + data.getResult());
103    }
104}
105

       三**,JDK中的Future模式**:JDK已经帮我们准备了一套完整的实现,我们可以利用其进行非常方便的实现功能。

      1,先把上边的例子改为使用jdk的:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
1public class RealData implements Callable<String> {
2    private String para;
3
4    public RealData (String para){
5        this.para = para;
6    }
7
8    @Override
9    public String call() throws Exception {
10        StringBuffer sb = new StringBuffer();
11        for (int i = 0; i < 10; i++) {
12            sb.append(para);
13            try {
14                Thread.sleep(1000);
15            } catch (InterruptedException e) {
16                e.printStackTrace();
17            }
18        }
19
20        return sb.toString();
21    }
22}
23
24
25
26public class FutureMain {
27    public static void main(String[] args) throws ExecutionException, InterruptedException {
28
29        FutureTask<String> futureTask = new FutureTask<String>(new RealData("ljh"));
30        ExecutorService executorService = Executors.newFixedThreadPool(1);
31
32        executorService.submit(futureTask);
33
34        System.out.println("请求完毕");
35
36        try {
37            Thread.sleep(2000);
38        } catch (InterruptedException e) {
39            e.printStackTrace();
40        }
41
42        System.out.println("数据=" + futureTask.get());
43
44    }
45}
46

       2,看下FutureTask的类关系结构图:

Java高并发(七)——Future模式

       3,FutureTask源码分析:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
1   /**
2     * 一,属性:几个处理状态流程 callable,outcome runner,waiter...
3     * The run state of this task, initially NEW.  The run state
4     * transitions to a terminal state only in methods set,
5     * setException, and cancel.  During completion, state may take on
6     * transient values of COMPLETING (while outcome is being set) or
7     * INTERRUPTING (only while interrupting the runner to satisfy a
8     * cancel(true)). Transitions from these intermediate to final
9     * states use cheaper ordered/lazy writes because values are unique
10     * and cannot be further modified.
11     *
12     * Possible state transitions:
13     * NEW -> COMPLETING -> NORMAL
14     * NEW -> COMPLETING -> EXCEPTIONAL
15     * NEW -> CANCELLED
16     * NEW -> INTERRUPTING -> INTERRUPTED
17     */
18    private volatile int state;
19    private static final int NEW          = 0;
20    private static final int COMPLETING   = 1;
21    private static final int NORMAL       = 2;
22    private static final int EXCEPTIONAL  = 3;
23    private static final int CANCELLED    = 4;
24    private static final int INTERRUPTING = 5;
25    private static final int INTERRUPTED  = 6;
26
27    /** The underlying callable; nulled out after running */
28    private Callable<V> callable;
29    /** The result to return or exception to throw from get() */
30    private Object outcome; // non-volatile, protected by state reads/writes
31    /** The thread running the callable; CASed during run() */
32    private volatile Thread runner;
33    /** Treiber stack of waiting threads */
34    private volatile WaitNode waiters;
35
36
37    //二,两个构造方法
38    /**
39     * Creates a {@code FutureTask} that will, upon running, execute the
40     * given {@code Callable}.
41     *
42     * @param  callable the callable task
43     * @throws NullPointerException if the callable is null
44     */
45    public FutureTask(Callable<V> callable) {
46        if (callable == null)
47            throw new NullPointerException();
48        this.callable = callable;
49        this.state = NEW;       // ensure visibility of callable
50    }
51
52    /**
53     * Creates a {@code FutureTask} that will, upon running, execute the
54     * given {@code Runnable}, and arrange that {@code get} will return the
55     * given result on successful completion.
56     *
57     * @param runnable the runnable task
58     * @param result the result to return on successful completion. If
59     * you don't need a particular result, consider using
60     * constructions of the form:
61     * {@code Future<?> f = new FutureTask<Void>(runnable, null)}
62     * @throws NullPointerException if the runnable is null
63     */
64    public FutureTask(Runnable runnable, V result) {
65        this.callable = Executors.callable(runnable, result);
66        this.state = NEW;       // ensure visibility of callable
67    }
68
69
70    //三,取消任务
71    public boolean cancel(boolean mayInterruptIfRunning) {
72        if (!(state == NEW &&
73              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
74                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
75            return false;
76        try {    // in case call to interrupt throws exception
77            if (mayInterruptIfRunning) {
78                try {
79                    Thread t = runner;
80                    if (t != null)
81                        t.interrupt();
82                } finally { // final state
83                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
84                }
85            }
86        } finally {
87            finishCompletion();
88        }
89        return true;
90    }
91
92   //四,是否已经取消
93    public boolean isCancelled() {
94        return state >= CANCELLED;
95    }
96
97
98    //五,取的返回对象,get()直到去的为止,get(long timeout, TimeUnit unit)有取的等待时间,超过则报超时异常
99    /**
100     * @throws CancellationException {@inheritDoc}
101     */
102    public V get() throws InterruptedException, ExecutionException {
103        int s = state;
104        if (s <= COMPLETING)
105            s = awaitDone(false, 0L);
106        return report(s);
107    }
108
109    /**
110     * @throws CancellationException {@inheritDoc}
111     */
112    public V get(long timeout, TimeUnit unit)
113        throws InterruptedException, ExecutionException, TimeoutException {
114        if (unit == null)
115            throw new NullPointerException();
116        int s = state;
117        if (s <= COMPLETING &&
118            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
119            throw new TimeoutException();
120        return report(s);
121    }
122
123    //六,是否完成
124    public boolean isDone() {
125        return state != NEW;
126    }
127
128    //七,执行
129    public void run() {
130        if (state != NEW ||
131            !UNSAFE.compareAndSwapObject(this, runnerOffset,
132                                         null, Thread.currentThread()))
133            return;
134        try {
135            Callable<V> c = callable;
136            if (c != null && state == NEW) {
137                V result;
138                boolean ran;
139                try {
140                    result = c.call();
141                    ran = true;
142                } catch (Throwable ex) {
143                    result = null;
144                    ran = false;
145                    setException(ex);
146                }
147                if (ran)
148                    set(result);
149            }
150        } finally {
151            // runner must be non-null until state is settled to
152            // prevent concurrent calls to run()
153            runner = null;
154            // state must be re-read after nulling runner to prevent
155            // leaked interrupts
156            int s = state;
157            if (s >= INTERRUPTING)
158                handlePossibleCancellationInterrupt(s);
159        }
160    }
161

       好,Future模式先这样吧,主要理解其通过线程异步处理的过程,当然JDK8中又增加了CompletableFuture,功能更加强大,后边我会总结上。继续中……

 

给TA打赏
共{{data.count}}人
人已打赏
安全经验

Google Adsense广告CPC(单元点击价格)为什么会减少??

2021-10-11 16:36:11

安全经验

安全咨询服务

2022-1-12 14:11:49

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索