目录
- 序言
- protobuf处理引擎的注册
- 具体protobuf协议的注册
- 以ResourceTracker协议讲解服务端启动和注册过程
- 总结
序言
Hadoop RPC基于即远程过程调用,远程过程调用主要包括两个部分,网络协议和数据格式。Hadoop根据数据格式,有三种不同的RPC实现:
1
2
3
4
5
6
7
8
9
10
11
12 1 public enum RpcKind {
2 RPC_BUILTIN ((short) 1), // Used for built in calls by tests
3 RPC_WRITABLE ((short) 2), // Use WritableRpcEngine
4 RPC_PROTOCOL_BUFFER ((short) 3); // Use ProtobufRpcEngine
5 final static short MAX_INDEX = RPC_PROTOCOL_BUFFER.value; // used for array size
6 public final short value; //TODO make it private
7
8 RpcKind(short val) {
9 this.value = val;
10 }
11 }
12
RPC_BUILTIN是内置的用于测试的RPC调用,RPC_WRITABLE是传统的基于Writable序列化方式的RPC协议,写过MapReduce程序的都知道Writable数据格式。最后,RPC_PROTOCOL_BUFFER是基于google protobuf协议的RPC,这是目前最流行的RPC协议。由于Yarn是在Hadoop 2.0才引入的组件,因此,Yarn内部的远程过程调用全部使用了RPC_PROTOCOL_BUFFER来作为自己的RPC。从上层设计到底层设计,我们一起来看一下基于protobuf的RPC的实现方法。
protobuf处理引擎的注册
Hadoop中,每种RPC调用的处理类叫做一个Engine。基于RPC_PROTOCOL_BUFFER的RPC的engine叫做ProtobufRpcEngine,即ProtobufRpcEngine是专门用来处理基于google的protobuf消息格式的RPC协议。
根据hadoop的协议管理类ipc.Server的设计规定,所有的Engine在开始工作前,都必须向ipc.Server注册自己。看ProtobufRpcEngine的类初始化代码:
1
2
3
4
5
6 1 static { // Register the rpcRequest deserializer for WritableRpcEngine
2 org.apache.hadoop.ipc.Server.registerProtocolEngine(
3 RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcRequestWrapper.class,
4 new Server.ProtoBufRpcInvoker()); //类初始化时,向ipc.Server注册自己
5 }
6
注册的目的,是将自己纳入中央处理器ipc.Server的调度管理之下,这样,在收到某个消息,根据消息的RpcKind就可以知道这个消息的数据格式,因此ipc.Server会将消息直接交给对应的已经注册为该RpcKind的引擎进行处理。
先看ipc.Server.registerProtocolEngine()方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 1 /**
2 * Register a RPC kind and the class to deserialize the rpc request.
3 *
4 * Called by static initializers of rpcKind Engines
5 * @param rpcKind
6 * @param rpcRequestWrapperClass - this class is used to deserialze the
7 * the rpc request. 所有的rpcRequestWrapperClass必须是一个Writable
8 * @param rpcInvoker - use to process the calls on SS.用来定义在服务端处理rpc请求的代码
9 *
10 * ProtobufRpcEngine .registerProtocolEngine(
11 RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcRequestWrapper.class,
12 new Server.ProtoBufRpcInvoker())
13 */
14 public static void registerProtocolEngine(RPC.RpcKind rpcKind,
15 Class<? extends Writable> rpcRequestWrapperClass,
16 RpcInvoker rpcInvoker) {
17 RpcKindMapValue old =
18 if (old != null) {
19 rpcKindMap.put(rpcKind, new RpcKindMapValue(rpcRequestWrapperClass, rpcInvoker));
20 rpcKindMap.put(rpcKind, old);
21 throw new IllegalArgumentException("ReRegistration of rpcKind: " +
22 rpcKind);
23 }
24 LOG.debug("rpcKind=" + rpcKind +
25 ", rpcRequestWrapperClass=" + rpcRequestWrapperClass +
26 ", rpcInvoker=" + rpcInvoker);
27 }
28
可见注册的过程其实是将每一个Engine的Invoker和RPCKind对应起来。Invoker,其实就是真正处理这个请求的处理者,每一个Engine都必须包含这样一个Invoker,这样,ipc.Server
具体protobuf协议的注册
拿到请求的RPCKind,实际上是直接交给对应Engine的Invoker进行处理。那Engine拿到了请求以后,是如何知道这个请求具体该怎么去运行和处理呢?这就涉及到不同的protobuf协议定义。
比如,NodeManager和ResourceManager进行通信的基于protobuf的协议,叫做ResourceTracker协议,按照protobuf规范,具体协议接口定义在一个proto文件里面,比如:
ResourceTracker.proto
1
2
3
4
5 1service ResourceTrackerService {
2 rpc registerNodeManager(RegisterNodeManagerRequestProto) returns (RegisterNodeManagerResponseProto);
3 rpc nodeHeartbeat(NodeHeartbeatRequestProto) returns (NodeHeartbeatResponseProto);
4}
5
还有ApplicationMaster用来和ResourceManager进行通信的定义在applicatioinmaster_protocol.proto的协议文件,不再贴出。
每一个协议,都服务器端都有一个sevice与之对应,这个service在启动的时候,会向ipc.Server注册自己,这个注册是指某个具体的protobuf协议的注册,要与上面讲的engine的注册区分开。具体protobuf协议全部是hadoop根据应用场景自己定义的,比如NM和RM之间的协议,AM和NM之间的协议,AM和RM之间的等等,非常多,而engine目前只有三个而已。Engine的注册和具体某个协议的这注册,原理都相同,都是告知ipc.Server,我是负责处理哪些消息的,这些消息交给我,唯一的不同就是层次不同。
那么看一下具体的某个protobuf协议的注册是如何进行的。还是以NM和RM之间的通信协议ResourceTracker为例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 1 @Override
2 protected void serviceStart() throws Exception {
3 super.serviceStart();
4 // ResourceTrackerServer authenticates NodeManager via Kerberos if
5 // security is enabled, so no secretManager.
6 Configuration conf = getConfig();
7 //使用yarn自带的基于proto实际上是org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC
8 YarnRPC rpc = YarnRPC.create(conf);
9 //创建一个org.apache.hadoop.ipc.RPC.Server
10 this.server =
11 rpc.getServer(ResourceTracker.class, this, resourceTrackerAddress,
12 conf, null,
13 //....
14 }
15
YarnRPC.create(conf);创建了一个HadoopYarnProtoRPC对象,然后其getServer()方法将协议的名字、协议的服务端实现类this传入,最终,代码往下跟踪,还是让对应的Engine来自己负责创建Server,看ProtobufRpcEngine.Server类构造函数:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 1 public Server(Class<?> protocolClass, Object protocolImpl,
2 Configuration conf, String bindAddress, int port, int numHandlers,
3 int numReaders, int queueSizePerHandler, boolean verbose,
4 SecretManager<? extends TokenIdentifier> secretManager,
5 String portRangeConfig)
6 throws IOException {
7 super(bindAddress, port, null, numHandlers,
8 numReaders, queueSizePerHandler, conf, classNameBase(protocolImpl
9 .getClass().getName()), secretManager, portRangeConfig);
10 this.verbose = verbose;
11 //将协议和协议的具体实现注册给RPC.Server
12 registerProtocolAndImpl(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocolClass,
13 protocolImpl);
14 }
15
该方法最后,是通过registerProtocolAndImpl将创建完成的protocolImpl(实际运行时是一个com.google.protobuf.BlockingService,protoc自动编译生成,专门负责反向代理这个协议在服务器端的执行)向ipc.Server进行注册。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 1void registerProtocolAndImpl(RpcKind rpcKind, Class<?> protocolClass,
2 Object protocolImpl) {
3 String protocolName = RPC.getProtocolName(protocolClass);
4 long version;
5
6
7 try {
8 version = RPC.getProtocolVersion(protocolClass);
9 } catch (Exception ex) {
10 LOG.warn("Protocol " + protocolClass +
11 " NOT registered as cannot get protocol version ");
12 return;
13 }
14 //以协议名称和版本号作为key,协议具体实现作为value,存入到protocolImplMapArray中
15 getProtocolImplMap(rpcKind).put(new ProtoNameVer(protocolName, version),
16 new ProtoClassProtoImpl(protocolClass, protocolImpl));
17 LOG.debug("RpcKind = " + rpcKind + " Protocol Name = " + protocolName + " version=" + version +
18 " ProtocolImpl=" + protocolImpl.getClass().getName() +
19 " protocolClass=" + protocolClass.getName());
20 }
21
注册的核心过程,其实就是将自己保存在一个以rpcKind、协议名称(如ResourceeTracker)以及版本号为key、协议的具体代理(一个BlockingService)为value的map中。
以ResourceTracker协议讲解服务端启动和注册过程
那么,Engine注册完成,具体协议对应的服务也完成了注册和启动。这时候可以开始通信了。当一条消息过来,比如,一个Yarn节点启动,需要对应的NodeManager遵循ResourceTracker协议,通过registerNodeManager()方法向远程的ResourceTrackerService发起调用,过程是什么呢?
在我的Hadoop RPC Server架构和原理 这篇博客里面介绍了,ipc.Server收到远程客户端请求以后,实际上最后是让Handler来进行消息的派发,将消息交给对应的engine进行处理,ipc.Server.Handler.run()方法:
1
2
3
4
5
6
7
8
9
10 1 call.connection.user.doAs
2 (new PrivilegedExceptionAction<Writable>() {
3 @Override
4 public Writable run() throws Exception {
5 // make the call
6 //call方法是一个抽象方法,实际运行的时候会调用具体实现类的call方法
7 return call(call.rpcKind, call.connection.protocolName,
8 call.rpcRequest, call.timestamp);
9 }
10
以ProtobufRpcEngine为例,ipc.Server的实现类RPC.Server,查看它对ipc.Server.call()方法的实现:
1
2
3
4
5
6
7 1 @Override
2 public Writable call(RPC.RpcKind rpcKind, String protocol,
3 Writable rpcRequest, long receiveTime) throws Exception {
4 return getRpcInvoker(rpcKind).call(this, protocol, rpcRequest,
5 receiveTime);
6 }
7
对于ProtobufRpcEngine,getRpcInvoker(rpcKind)获取的就是ProtobufRpcEngine.Server.ProtoBufRpcInvoker,来看这个内部类的call()方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79 1 /**
2 * This is a server side method, which is invoked over RPC. On success
3 * the return response has protobuf response payload. On failure, the
4 * exception name and the stack trace are return in the resposne.
5 * See {@link HadoopRpcResponseProto}
6 *
7 * In this method there three types of exceptions possible and they are
8 * returned in response as follows.
9 * <ol>
10 * <li> Exceptions encountered in this method that are returned
11 * as {@link RpcServerException} </li>
12 * <li> Exceptions thrown by the service is wrapped in ServiceException.
13 * In that this method returns in response the exception thrown by the
14 * service.</li>
15 * <li> Other exceptions thrown by the service. They are returned as
16 * it is.</li>
17 * </ol>
18 */
19 public Writable call(RPC.Server server, String protocol,
20 Writable writableRequest, long receiveTime) throws Exception {
21 RpcRequestWrapper request = (RpcRequestWrapper) writableRequest;
22 RequestHeaderProto rpcRequest = request.requestHeader;
23 //从请求中获取方法、协议名称
24 String methodName = rpcRequest.getMethodName();
25 //从请求中取出proto名称
26 String protoName = rpcRequest.getDeclaringClassProtocolName();
27 //从请求中取出版本号
28 long clientVersion = rpcRequest.getClientProtocolVersion();
29 if (server.verbose)
30 LOG.info("Call: protocol=" + protocol + ", method=" + methodName);
31
32 //向RPC.Server()获取它所管理的具体协议对应的实现,比如ResourceTracker协议对应的BlockingService
33 ProtoClassProtoImpl protocolImpl = getProtocolImpl(server, protoName,
34 clientVersion);
35 //这个service是在ResourceTracker里面创建的一个匿名实例newReflectiveBlockingService()
36 BlockingService service = (BlockingService) protocolImpl.protocolImpl;
37 //通过远程调用的方法名称,获取方法描述信息
38 MethodDescriptor methodDescriptor = service.getDescriptorForType()
39 .findMethodByName(methodName);
40 if (methodDescriptor == null) {
41 //......throw exception
42 }
43 //获取请求的proto对应的java 类,如RegisterNodeManagerRequestProto,通过protobuf生成的proto的java类都是一个Message
44 Message prototype = service.getRequestPrototype(methodDescriptor);
45
46 //将实际请求的参数数据匹配到对应的请求类
47 Message param = prototype.newBuilderForType()
48 .mergeFrom(request.theRequestRead).build();
49
50 Message result;
51 long startTime = Time.now();
52 int qTime = (int) (startTime - receiveTime);
53 Exception exception = null;
54 try {
55 server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
56 result = service.callBlockingMethod(methodDescriptor, null, param);
57 } catch (ServiceException e) {
58 //.....throw some exception....
59 } finally {
60 int processingTime = (int) (Time.now() - startTime);
61 if (LOG.isDebugEnabled()) {
62 String msg = "Served: " + methodName + " queueTime= " + qTime +
63 " procesingTime= " + processingTime;
64 if (exception != null) {
65 msg += " exception= " + exception.getClass().getSimpleName();
66 }
67 LOG.debug(msg);
68 }
69 String detailedMetricsName = (exception == null) ?
70 methodName :
71 exception.getClass().getSimpleName();
72 server.rpcMetrics.addRpcQueueTime(qTime);
73 server.rpcMetrics.addRpcProcessingTime(processingTime);
74 server.rpcDetailedMetrics.addProcessingTime(detailedMetricsName,
75 processingTime);
76 }
77 return new RpcResponseWrapper(result);
78 }
79
方法中会首先从请求头中获取方法名称、协议名称和版本号,协议名称比如ResourceTracker。
在从请求头拿到了具体的协议名称以后,就可以提取出来这个协议在服务器端的代理实现,一个BlockingService。之所以能够准确拿出对应的代理实现,是因为前面讲到,服务器端该协议的对应的Service如ResourceTrackerService已经在启动的时候向ipc.Server 注册了代理类BlockingService负责处理这个ResourceTracker协议的请求。
前面说过,这个对象是protoc对这个ResourceTracker.proto文件编译的时候自动生成的,用来负责代理对应这个协议各个方法在服务器端的执行。比如,它像一个工厂一样,把我们的方法名称组装成com.google.protobuf.Descriptors.MethodDescriptor,把我们的参数组装成com.google.protobuf.Message,然后调用自己的callBlockingMethod,来执行这个方法。
总结
综上所述,我们可以看到,Hadoop RPC服务器端整个代码实现非常复杂,但是这种设计的复杂性带来了结构和流程上的清晰以及组件责任的明确。
核心内容包括:
两个级别的注册:
- Engine注册:目的是为了告知ipc.Server自己所负责的RpcKind,这样,ipc.Server在收到消息以后会根据RpcKind将消息交付给对应的Engine进行处理;
- 具体协议注册:具体协议如ResourceTracker协议向ipc.Server 注册的目的,就是告知ipc.Server自己所负责处理的协议,这样,ipc.Server在收到某个消息以后,会根据消息中的协议名称将消息正确地交付给对应的处理类进行处理。
比如:NodeManager根据ResourceTracker协议向ResourceManager发送了一个registerNodeManager请求,请求参数包含了自己合格节点的ip、资源等信息,服务器端收到了请求,提取出来了rpcKind 、协议名称、方法名称和请求参数,只要Engine的注册和协议注册两步已经完成,就能够找到对应的服务器端对 协议处理类。处理类完成服务器端方法调用,比如,收到registerNodeManager()请求,ResourceManager将对应的node添加到自己所管理的节点列表中去,完成以后,就可以给发起请求的客户端返回结果了。