Hadoop心跳机制源码分析

释放双眼,带上耳机,听听看~!

正文:
一.体系背景

首先和大家说明一下:hadoop的心跳机制的底层是通过RPC机制实现的,

  1. hadoop的RPC机制
  2. 动态代理
  3. Java NIO

**二.心跳机制 **

  1. hadoop集群是master/slave模式,master包括Namenode和Jobtracker,slave包括Datanode和Tasktracker。
  2. master启动的时候,会开一个ipc server在那里,等待slave心跳。
  3. slave启动时,会连接master,并每隔3秒钟主动向master发送一个“心跳”,这个时间可 以通过”heartbeat.recheck.interval”属性来设置。将自己的状态信息告诉master,然后master也是通过这个心跳的返回值,向slave节点传达指令。
  4. 需要指出的是:namenode与datanode之间的通信,jobtracker与tasktracker之间的通信,都是通过“心跳”完成的。

三.Datanode、Namenode心跳源码分析

既然“心跳”是Datanode主动给Namenode发送的。那Datanode是怎么样发送的呢?下面贴出Datanode.class中的关键代码:
代码一:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
1
21. /** 
31.    \* 循环调用“发送心跳”方法,直到shutdown 
41.    \* 调用远程Namenode的方法 
51.    */  
61.   public void offerService() throws Exception {  
7    ...  
81.     while (shouldRun) {  
91.       try {  
101.         long startTime = now();  
111.          // heartBeatInterval是在启动Datanode时根据配置文件设置的,是心跳间隔时间  
121.         if (startTime - lastHeartbeat > heartBeatInterval) {  
131.           lastHeartbeat = startTime;  
141. //Datanode发送心跳  
151.           DatanodeCommand[] cmds = namenode.sendHeartbeat(dnRegistration,  
161.                                                        data.getCapacity(),  
171.                                                        data.getDfsUsed(),  
181.                                                        data.getRemaining(),  
191.                                                        xmitsInProgress.get(),  
201.                                                        getXceiverCount());  
211.           myMetrics.addHeartBeat(now() - startTime);  
221.            
231.           if (!processCommand(cmds))  
241.             continue;  
251.         }  
261.          
271.       ...
281.       }  
291.     } // while (shouldRun)  
301.   } // offerService  
31
32

 

需要注意的是:发送心跳的对象并不是datanode,而是一个名为namenode的对象,难道在datanode端就直接有个namenode的引用吗?其实不然,我们来看看这个namenode吧:
代码二:


1
2
3
4
1
2public DatanodeProtocol namenode = null;  
3
4

namenode其实是一个DatanodeProtocol的引用,在对hadoop RPC机制分析的文章中我提到过,这是一个Datanode和Namenode通信的协议,其中有许多未实现的接口方法,sendHeartbeat()就是其中的一个。下面看看这个namenode对象是怎么被实例化的吧:
代码三:


1
2
3
4
5
6
7
11. this.namenode = (DatanodeProtocol)   
21.     RPC.waitForProxy(DatanodeProtocol.class,  
31.                      DatanodeProtocol.versionID,  
41.                      nameNodeAddr,   
51.                      conf);  
6
7

 

其实这个namenode并不是Namenode的一个对象,而只是一个Datanode端对Namenode的代理对象,正是这个代理完成了“心跳”。代理的底层实现就是RPC机制了。
四.Tasktracker、Jobtracker心跳源码分析

同样我们从Tasktracker入手,下面贴出Tasktracker.class的关键代码:
代码四:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
11. State offerService() throws Exception {  
21.     long lastHeartbeat = System.currentTimeMillis();  
31.     while (running && !shuttingDown) {  
41.      ...  
51.           
61.         // 发送心跳,调用代码二  
71.         HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);  
81.   
91.       ...  
101.     return State.NORMAL;  
111.   }  
121.   
131. 代码二:  
141. HeartbeatResponse transmitHeartBeat(long now) throws IOException {  
151.    ...  
161.     HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status,   
171.                                                               justStarted,  
181.                                                               justInited,  
191.                                                               askForNewTask,   
201.                                                          heartbeatResponseId);                                
211. ...  
221.     return heartbeatResponse;  
231.   }  
24
25

 
其实我觉得分析到这里大家就可以自己分析了,jobClient也是一个协议:
代码五:


1
2
1InterTrackerProtocol jobClient;
2

 

该协议用于定义Tasktracker和Jobtracker的通信。同样,它也是一个代理对象:

 

代码六:


1
2
3
4
5
6
7
8
9
10
11
11. this.jobClient = (InterTrackerProtocol)   
21.  UserGroupInformation.getLoginUser().doAs(  
31.      new PrivilegedExceptionAction<Object>() {  
41.    public Object run() throws IOException {  
51.      return RPC.waitForProxy(InterTrackerProtocol.class,  
61.          InterTrackerProtocol.versionID,  
71.          jobTrackAddr, fConf);  
81.    }  
91.  });  
10
11

代理的底层实现也是RPC机制。

给TA打赏
共{{data.count}}人
人已打赏
安全经验

Google Adsense老手经验

2021-10-11 16:36:11

安全经验

安全咨询服务

2022-1-12 14:11:49

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索