dubbo入门

释放双眼,带上耳机,听听看~!

dubbo是阿里巴巴开源的单一长连接服务框架,底层通信采用nio框架,支持netty,mina,grizzly,默认是netty。对dubbo比较感兴趣的是:

  1. client端的线程模型是什么样的?

传统的io client是请求应答模式,发送请求–>等待远程应答。dubbo底层是异步IO的,所有请求复用单一长连接,所以调用都不会阻在IO上,而是阻在Future超时wait上。
2. server端的线程模型是什么样的?
这个比较成熟了,现在一般的server都是基于nio,一批io thread负责处理io,一批worker thread负责处理业务。

 

 一. 快速启动

学习dubbo最好的方式是快速运行起来,由于dubbo还是比较重量级的产品,之前遇到一些问题。
server端:

 


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
1import java.io.IOException;
2
3import com.alibaba.dubbo.config.ApplicationConfig;
4import com.alibaba.dubbo.config.ProtocolConfig;
5import com.alibaba.dubbo.config.ServiceConfig;
6import com.duitang.dboss.client.test.BlogQueryService;
7import com.duitang.dboss.client.test.BlogQueryServiceImpl;
8
9public class DubboServerTester {
10
11    public static void main(String[] args) throws IOException {
12        BlogQueryService blogQueryService = new BlogQueryServiceImpl();
13        ApplicationConfig application = new ApplicationConfig();
14        application.setName("dubbo-test");
15
16        ProtocolConfig protocol = new ProtocolConfig();
17        protocol.setName("dubbo");
18        protocol.setPort(8989);
19        protocol.setThreads(200);
20
21        // RegistryConfig registry = new RegistryConfig();
22        // registry.setAddress("10.20.130.230:9090");
23        // registry.setUsername("aaa");
24        // registry.setPassword("bbb");
25
26        ServiceConfig<BlogQueryService> service = new ServiceConfig<BlogQueryService>(); // 此实例很重,封装了与注册中心的连接,请自行缓存,否则可能造成内存和连接泄漏
27        service.setApplication(application);
28
29        // service.setRegistry(registry);
30        service.setRegister(false);
31        service.setProtocol(protocol); // 多个协议可以用setProtocols()
32        service.setInterface(BlogQueryService.class);
33        service.setRef(blogQueryService);
34        service.setVersion("1.0.0");
35        // 暴露及注册服务
36        service.export();
37        
38        System.out.println("Press any key to exit.");
39        System.in.read();
40    }
41}
42

1
2
1  注意:dubbo export服务默认依赖于RegistryConfig,如果没有配置RegistryConfig会报错.可以通过service.setRegister(false)禁用。
2

 

client:

 


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
1import java.io.IOException;
2import java.util.ArrayList;
3import java.util.List;
4import java.util.concurrent.Callable;
5import java.util.concurrent.ExecutionException;
6import java.util.concurrent.ExecutorService;
7import java.util.concurrent.Executors;
8import java.util.concurrent.Future;
9import java.util.concurrent.ThreadFactory;
10import java.util.concurrent.atomic.AtomicInteger;
11
12import com.alibaba.dubbo.config.ApplicationConfig;
13import com.alibaba.dubbo.config.ReferenceConfig;
14import com.duitang.dboss.client.test.BlogQueryService;
15
16public class DubboClientTester {
17
18    public static void main(String[] args) throws InterruptedException, IOException {
19        ApplicationConfig application = new ApplicationConfig();
20        application.setName("dubbo-test");
21
22        ReferenceConfig<BlogQueryService> reference = new ReferenceConfig<BlogQueryService>();
23        reference.setUrl("dubbo://127.0.0.1:8989/com.duitang.dboss.client.test.BlogQueryService");
24        reference.setTimeout(500);
25        reference.setConnections(10);
26        reference.setApplication(application);
27        reference.setInterface(BlogQueryService.class);
28        reference.setVersion("1.0.0");
29        final BlogQueryService blogQueryService = reference.get();
30
31        long begin = System.currentTimeMillis();
32        System.out.println(blogQueryService.test());
33        long end = System.currentTimeMillis();
34        System.out.println(" cost:" + (end - begin));
35
36        ExecutorService es = Executors.newFixedThreadPool(50, new NamedThreadFactory("my test"));
37        List<Callable<String>> tasks = new ArrayList<Callable<String>>();
38        for (int i = 0; i < 100000; ++i) {
39            tasks.add(new Callable<String>() {
40
41                @Override
42                public String call() throws Exception {
43                    System.out.println("run");
44                    System.out.println(blogQueryService.test());
45                    System.out.println("run success");
46                    return null;
47                }
48            });
49        }
50        List<Future<String>> futurelist = es.invokeAll(tasks);
51        for (Future<String> future : futurelist) {
52            try {
53                String result = future.get();
54            } catch (ExecutionException e) {
55                e.printStackTrace();
56            }
57            System.out.println("------------------------------------------------------------------------------------------------------------------------------------------------\r\n");
58        }
59        es.shutdown();
60        System.out.println("end");
61        System.in.read();
62    }
63
64    static class NamedThreadFactory implements ThreadFactory {
65
66        private static final AtomicInteger POOL_SEQ   = new AtomicInteger(1);
67
68        private final AtomicInteger        mThreadNum = new AtomicInteger(1);
69
70        private final String               mPrefix;
71
72        private final boolean              mDaemo;
73
74        private final ThreadGroup          mGroup;
75
76        public NamedThreadFactory(){
77            this("pool-" + POOL_SEQ.getAndIncrement(), false);
78        }
79
80        public NamedThreadFactory(String prefix){
81            this(prefix, false);
82        }
83
84        public NamedThreadFactory(String prefix, boolean daemo){
85            mPrefix = prefix + "-thread-";
86            mDaemo = daemo;
87            SecurityManager s = System.getSecurityManager();
88            mGroup = (s == null) ? Thread.currentThread().getThreadGroup() : s.getThreadGroup();
89        }
90
91        public Thread newThread(Runnable runnable) {
92            String name = mPrefix + mThreadNum.getAndIncrement();
93            Thread ret = new Thread(mGroup, runnable, name, 0);
94            ret.setDaemon(mDaemo);
95            return ret;
96        }
97
98        public ThreadGroup getThreadGroup() {
99            return mGroup;
100        }
101
102    }
103}
104

1
2
1  
2

 

  1. 通过setUrl("")来实现远程服务直连。
  2. 需要注意的是默认connection只有一个,可以通过setConnections()来指定connection pool。在高负载环境下,nio的单连接也会遇到瓶颈,此时你可以通过设置连接池来让更多的连接分担dubbo的请求负载,从而提高系统的吞吐量。”

二. 代码流程
这里重点分析一下client的调用过程,client调用分为三个部分:
1). 初始化,建立连接。
2). 发送请求。
3). 等待远程应答。
(一).初始化

  1. DubboProtocol.initClient()
  2. Exchangers.connect(URL url, ExchangeHandler handler)   
  3. Exchangers.getExchanger(url).connect(url, handler)
  4. HeaderExchanger.connect(URL url, ExchangeHandler handler)
  5. return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
  6. Transporters.getTransporter().connect(URL url, ChannelHandler handler)
  7. NettyTransporter.connect(URL url, ChannelHandler listener)
  8. new NettyClient(url, listener) //timeout默认值:timeout=1000;connectTimeout=3000;
  9. NettyClient.doOpen()        //创建netty的ClientBootstrap

bootstrap = new ClientBootstrap(channelFactory);
bootstrap.setOption("keepAlive", true);
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("connectTimeoutMillis", getTimeout()); //注意:此timeout是timeout,而非connectTimeout
10. AbstractClient.connect()
11. NettyClient.doConnect()  //如果远程地址无法连接,抛出timeout异常流程结束。
ChannelFuture future = bootstrap.connect(getConnectAddress());
boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);
(二).发送请求
1.DubboInvoker.doInvoke(Invocation invocation) //currentClient.request(invocation, timeout).get()
2.HeaderExchangeClient.request(invocation, timeout)
3.HeaderExchangeChannel.request(Invocation invocation,timeout)
4.AbstractPeer.send(Request request)
5.NettyChannel.send(Object message, boolean sent)
6.NioClientSocketChannel.write(message)
7.NettyHandler.writeRequested(ChannelHandlerContext ctx, MessageEvent e)
8.AbstractPeer.sent(Channel ch, Request request)
(三).等待远程应答
在调用DubboInvoker.doInvoke(Invocation invocation)中实际是调用currentClient.request(invocation, timeout).get(),此方法会返回DefaultFuture,调用get方法会阻塞直到超时,在阻塞的同时netty的io线程会接收到远程应答,如果收到响应会产生io事件调用NettyHandler.messageReceived。
1.NettyHandler.messageReceived(ChannelHandlerContext ctx, MessageEvent e)
2.AbstractPeer.received(Channel ch, Object msg)
3.MultiMessageHandler.received(Channel channel, Object message) 
4.AllChannelHandler.received(Channel channel, Object message)
5.DecodeHandler.received(Channel channel, Object message)
6.HeaderExchangeHandler.received(Channel channel, Object message)
7.DefaultFuture.received(Channel channel, Response response)  //注意是static方法
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
future.doReceived(response);
}
三. dubbo client的核心

我认为dubbo client的核心在DefaultFuture。所以远程调用都不会阻在IO上,而是阻在Future超时wait上,下面忽略掉远程调用把future抽取出来。

dubbo入门

下面是代码实现


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
1package executor;
2
3import java.util.concurrent.Callable;
4import java.util.concurrent.ExecutionException;
5import java.util.concurrent.ExecutorService;
6import java.util.concurrent.Executors;
7import java.util.concurrent.Future;
8import java.util.concurrent.atomic.AtomicLong;
9
10public class Commands {
11
12    private ExecutorService senders   = Executors.newCachedThreadPool();
13    private ExecutorService receviers = Executors.newCachedThreadPool();
14    private AtomicLong      counter   = new AtomicLong();
15
16    public CommandResponse execute(Callable<Object> task, int timeout) {
17        Future<Object> result = senders.submit(task);
18        long id = counter.getAndIncrement();
19        CommandFuture commandFuture = new CommandFuture(id);
20        receviers.submit(new ReceiveWorker(id, result));
21        return commandFuture.get(timeout);
22    }
23
24    static class ReceiveWorker implements Runnable {
25
26        private Future<Object> result;
27        private Long           id;
28
29        public ReceiveWorker(Long id, Future<Object> result){
30            super();
31            this.result = result;
32            this.id = id;
33        }
34
35        @Override
36        public void run() {
37            try {
38                Object obj = result.get();
39                CommandFuture.received(new CommandResponse(id, obj));
40            } catch (InterruptedException e) {
41                e.printStackTrace();
42            } catch (ExecutionException e) {
43                e.printStackTrace();
44            }
45        }
46    }
47
48    public void shutdown() {
49        senders.shutdown();
50        receviers.shutdown();
51    }
52}
53
54

 


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
1package executor;
2
3import java.util.Map;
4import java.util.concurrent.ConcurrentHashMap;
5import java.util.concurrent.TimeUnit;
6import java.util.concurrent.locks.Condition;
7import java.util.concurrent.locks.Lock;
8import java.util.concurrent.locks.ReentrantLock;
9
10public class CommandFuture {
11
12    private final Lock                            lock    = new ReentrantLock();
13
14    private final Condition                       done    = lock.newCondition();
15
16    private CommandResponse                                response;
17
18    private static final Map<Long, CommandFuture> FUTURES = new ConcurrentHashMap<Long, CommandFuture>();
19
20    
21    public CommandFuture(Long id){
22        FUTURES.put(id, this);
23    }
24
25    public boolean isDone() {
26        return response != null;
27    }
28
29    public CommandResponse get(int timeout) {
30
31        if (!isDone()) {
32            long start = System.currentTimeMillis();
33            lock.lock();
34            try {
35                while (!isDone()) {
36                    done.await(timeout, TimeUnit.MILLISECONDS);
37                    if (isDone() || System.currentTimeMillis() - start >= timeout) {
38                        break;
39                    }
40                }
41            } catch (InterruptedException e) {
42                throw new RuntimeException(e);
43            } finally {
44                lock.unlock();
45            }
46            if (!isDone()) {
47                throw new TimeoutException("timeout");
48            }
49        }
50        return response;
51    }
52
53    public void doReceived(CommandResponse response) {
54        lock.lock();
55        try {
56            this.response = response;
57            if (done != null) {
58                done.signal();
59            }
60        } finally {
61            lock.unlock();
62        }
63
64    }
65
66    public static void received(CommandResponse response) {
67        try {
68            CommandFuture future = FUTURES.remove(response.getId());
69            if (future != null) {
70                future.doReceived(response);
71            } else {
72                System.out.println("some error!");
73            }
74        } finally {
75            // CHANNELS.remove(response.getId());
76        }
77    }
78}
79
80

 


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
1package executor;
2
3import java.util.concurrent.Callable;
4import java.util.concurrent.ExecutionException;
5import java.util.concurrent.ExecutorService;
6import java.util.concurrent.Executors;
7import java.util.concurrent.Future;
8import java.util.concurrent.atomic.AtomicLong;
9
10public class Commands {
11
12    private ExecutorService senders   = Executors.newCachedThreadPool();
13    private ExecutorService receviers = Executors.newCachedThreadPool();
14    private AtomicLong      counter   = new AtomicLong();
15
16    public CommandResponse execute(Callable<Object> task, int timeout) {
17        Future<Object> result = senders.submit(task);
18        long id = counter.getAndIncrement();
19        CommandFuture commandFuture = new CommandFuture(id);
20        receviers.submit(new ReceiveWorker(id, result));
21        return commandFuture.get(timeout);
22    }
23
24    static class ReceiveWorker implements Runnable {
25
26        private Future<Object> result;
27        private Long           id;
28
29        public ReceiveWorker(Long id, Future<Object> result){
30            super();
31            this.result = result;
32            this.id = id;
33        }
34
35        @Override
36        public void run() {
37            try {
38                Object obj = result.get();
39                CommandFuture.received(new CommandResponse(id, obj));
40            } catch (InterruptedException e) {
41                e.printStackTrace();
42            } catch (ExecutionException e) {
43                e.printStackTrace();
44            }
45        }
46    }
47
48    public void shutdown() {
49        senders.shutdown();
50        receviers.shutdown();
51    }
52}
53
54

 

下面是jstack
dubbo入门

给TA打赏
共{{data.count}}人
人已打赏
安全网络

CDN安全市场到2022年价值76.3亿美元

2018-2-1 18:02:50

安全运维

hadoop生态系统学习之路(二)如何编写MR以及运行测试

2021-12-12 17:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索