Hadoop技术内幕之RPC框架解析(上)

云计算 来源:owen1190 18℃ 0评论

网络通信模块是分布式系统中最底层的模块。它直接支撑了上层分布式环境下复杂的进程间通信逻辑,是所有分布式系统的基础。远程过程调用(RPC)是一种常用的分布式网络通信协议。它允许运行于一台计算机的程序调用另一台计算机的子程序,同时将网络通信细节隐藏起来,使得用户无须额外地为这个交互作用编程。

Hadoop RPC框架概述

对于Hadoop RPC,具有以下几个特点:

  • 透明性:所有RPC框架的最根本特征。
  • 高性能:Hadoop各个系统均采用Master/Slave结构,其中,Master实际上为一个RPC server,负责处理集群中所有Slave发送的服务请求。为了保证Master的并发处理能力,RPC server应该是一个高性能服务器,能够高效地处理来自多个Client的并发RPC请求。
  • 可控性:RPC是Hadoop最底层、最核心的模块之一,保证其轻量级、高性能和可控性显得尤为重要。

Hadoop RPC主要分为四个部分,分别是序列化层、函数调用层、网络传输层和服务器端处理框架,具体实现机制如下:

  • 序列化层:主要作用是将结构化对象转为字节流以便于通过网络进行传输或写入持久存储。在RPC中,主要用于将用户请求中的参数或者应答转化为字节流以便跨机器传输。
  • 函数调用层:函数调用层的主要功能是定位要调用的函数并执行该函数。Hadoop RPC采用Java反射机制与动态代理实现函数调用。
  • 网络传输层:网络传输层描述了Client与Server之间信息传输的方式。Hadoop RPC采用了基于TCP/IP的Socket机制。
  • 服务器端处理框架:可以被抽象为网络I/O模型。它描述了客户端与服务器端间信息交互的方式。它的设计直接决定着服务器端的并发处理能力。Hadoop RPC采用了基于Reactor设计模式的事件驱动I/O模型。

Hadoop RPC总体框架如下图,自下而上分为两层。第一层是一个基于Java的NIO实现的客户机/服务器通信模型。其中具体实现时,主要用到jdk提供的各种功能包,主要包括java.nio(NIO)、java.lang.reflect(反射机制和动态代理)、java.net(网络编程)第二层是供更上层程序直接调用的RPC接口。

Hadoop RPC基本框架分析

RPC基本概念

RPC通常采用客户机/服务器模型。请求程序是一个客户机,而服务提供程序则是一个服务器。

一个典型的RPC框架主要包括以下部分:

通信模块:两个相互协作的通信模块实现请求-应答协议。请求-应答协议的实现方式有两种,分别为同步方式和异步方式。如下图所示,同步模式下客户端程序一直阻塞到服务器端发送的应答请求到达本地;而异步模式则不同,客户端请求发送后,不必等待应答返回,待服务器端处理完请求后,主动通知客户端。在高并发场景下,一般采用异步模式以降低访问延迟和提供宽度利用率。

Stub程序:客户端和服务器端均包含Stub程序,可将之看成代理程序。它使远程函数调用表现的跟本地调用一样,对用户程序完全透明。在客户端,Stub将请求信息通过网络模块发送给服务器端。在服务器端,Stub程序解码请求信息中的参数、调用相应的服务过程和编码应答结果的返回值。

调度程序:调度程序接受来自通信模块的请求信息,并根据其中的标识选择一个Stub程序处理。通常客户端并发请求量比较大时,会采用线程池提高处理效率。

客户程序/服务过程:请求的发出者和请求的处理者。

一个RPC请求从发送到获取处理结果,所经历的步骤如下:

步骤1: 客户程序以本地方式调用系统产生的Stub程序。

步骤2: 该Stub程序将函数调用信息按照网络通信模块的要求封装成信息包,并交给通信模块发送到远程服务器端。

步骤3: 远程服务器端接受此信息后,将此信息发送给相应的Stub程序。(该步骤使用了调度程序)

步骤4: Stub程序拆封信息,形成被调过程要求的形式,并调用对应的函数。

步骤5: 被调用函数按照所获参数执行,并将结果返回给Stub程序。

步骤6: Stub程序将此结果封装成信息,通过网络通信模块逐渐地传送给客户程序。

Hadoop RPC基本框架

Hadoop RPC使用

Hadoop RPC主要对外提供两种接口:

public static VersionedProtocol getProxy/waitForProxy():构造一个客户端代理对象,用于向服务器端发送RPC请求。

public static Server getServer():为某个协议实例构造一个服务器对象,用于处理客户端发送的请求。

Hadoop RPC使用方法可分为以下几个步骤。

步骤1: 定义RPC协议。RPC协议定义了服务器端对外提供的服务接口。Hadoop中所有自定义RPC接口都需要继承VersionedProtocol接口。

intereface ClientProtocol extends org.apache.ipc.VersionedProtocol{
    public static final long versionID=1L;
    String echo(String value) throws IOException;
    int add(int v1,int v2) throws IOException;
}

步骤2: 实现RPC协议。Hadoop RPC协议通常是一个Java接口,用户需要实现该接口。

public static class ClientProtocolImpl implements ClientProtocol{
    public long getProtocolVersion(String protocol,long clientVersion){
        return ClientProtocol.versionID;
    }
    public String echo(String value) throws IOException{
        return value;
    }
    public int add(int v1,int v2) throws IOException{
        return v1+v2;
    }
}

步骤3: 构造并启动RPC Server。直接使用静态方法getServer()构造一个RPC Server,并调用函数start()启动该Server:
Server = RPC.getServer(new ClientProtocolImpl(), serverHost,serverPort,numHandlers.fasle.conf);
server.start();

其中,serverHostserverPort分别表示服务器的host和监听端口号,而numHandlers表示服务器端处理请求的线程数目。到此为止,服务器处理监听状态,等待客户端请求到达。

步骤4: 构造RPC Client,并发送RPC请求。使用静态方法getProxy()构造客户端代理对象,直接通过代理对象调用远程端的方法。

proxy=(ClientProtocol)RPC.getProxy(ClientProtocol.class,ClientProtocol.versionID,addr,conf);
int result=Proxy.add(5,6);
String echoResult=proxy.echo("result");

经过以上四步,就利用Hadoop RPC搭建了一个客户机/服务器网络模型。

Hadoop RPC主要由三个大类组成,分别是RPC、Client和Server,分别对应对外编程接口、客户端实现和服务器端实现。

ipc.RPC类分析

RPC类实际上是对底层客户机/服务器网络模型的封装。

RPC类自定义了一个内部类RPC.Server。继承Server抽象类,根据客户端请求中的调用方法名称和对应参数完成方法调用。RPC类包含一个ClientCache类型的成员变量,根据用户提供的SoceketFactory缓存Client对象,以达到重用Client对象的目的。

Hadoop RPC使用了Java动态代理完成对远程方法调用。对于Hadoop RPC,函数调用由客户端发出,并在服务器端执行并返回,不能通过本地动态代理实例代码直接在invoke方法中本地调用相关函数,而是应该在invoke方法中,将函数调用信息打包成可序列化的Invocation对象,并通过网络发送给服务器端,服务器端收到该调用信息后,解析出函数名和函数参数列表信心,利用Java反射机制完成函数调用。

ipc.Client类分析

Client主要完成的功能是发送远程过程调用信息并接受执行结果。Client类对外提供了两种接口,一种用于执行单个远程调用。另外一种用于执行批量远程调用。

public Writable call(Writable param,ConnectionId remoteId) throws InterruptedException,IOException;
public Writable[] call(Writable[] parame,InerSocketAddress[] addresses,Class<?> protocol,UserGroupInformation ticket,Configuration conf) throws IOException,InterruptedException;

Client内有两个重要内部类,分别为Call和Connection。

Call类:该类封装了一个RPC请求,包含了5个成员变量,分别是唯一标识id、函数调用信息param、函数执行返回值value、出错或者异常信息error和执行完成标识符done。当客户端向服务器端发送请求时,只需填充id和param两个变量,而剩下的三个变量:value,error和done,则由服务器端根据函数执行情况填充。

Connection类:Client与每个Server之间维护一个通信连接。该连接相关的基本信息及操作被封装到Connection类中,基本信息主要包括:通信连接唯一标识(remoteId),与Server端通信的Socket(socket),网络输入数据流(in),网络输出数据流(out),保持RPC请求的哈希表(calls)等;操作则包括:

addCall——将一个Call对象添加到哈希表中;

sendParam——向服务器端发送RPC请求;

receiveResponse——从服务器端接收已经处理完成的RPC请求;

run——Connection是一个线程类,它的run方法调用了receiveResponse方法,会一直等待接收RPC返回结果。

当调用call函数执行某个远程方法时,Client端需要进行如下几步:

步骤1: 创建一个Connection对象,并将远程方法调用信息封装成Call对象,放到Connection对象中的哈希表calls中;

步骤2: 调用Connetion类中的sendParam()方法将当前Call对象发送给Server端;

步骤3: Server端处理完RPC请求后,将结果通过网络返回给Client端,Client端通过receiveResponse()函数获取结果。

步骤4: Client端检查结果处理状态,并将对应的Call对象从哈希表中删除。

ipc.Server类分析

ipc.Server采用了很多具有提高并发处理能力的技术,主要包括线程池、事件驱动和Reactor设计模式等,这些技术均采用JDK自带的库实现。本文重点分析它是如何利用Reactor设计模式提高整体性能的。

Reactor是并发编程中的一种基于事件驱动的设计模式。它具有以下两个特点:①通过派发/分离I/O操作事件提高系统的并发性能;②提供了粗粒度的并发控制,使用单线程实现,避免了复杂的同步处理。

一个典型的Reactor模式中主要包括以下几个角色。

**Reactor:**IO事件的派发者。

Acceptor:接受来自Client的连接,建立与Client对应的Handler,并向Reactor注册此Handler。

Handler:与一个Client通信的实体,并按一定的过程实现业务的处理。在Reactor模式中,业务逻辑被分散的IO事件所打破,所以Handler需要有适当的机制在所需的信息还不全的时候保存上下文,并在下一次IO事件到来的时候能继续上次中断的处理。

Reader/Sender:为了加速处理速度,Reactor模式往往构建一个存放数据处理线程的线程池,数据读出后,立即扔到线程池中等待后续处理即可。Reactor模式一般分离Handler中的读和写两个过程,分别注册成单独的读事件和写事件,并由对应的Reader和Sender线程处理。

ipc.Server被划分为三个阶段:接受请求,处理请求和返回结果。

1.接受请求

接受来自各个客户端的RPC请求,并将它们封装成固定的格式放到一个共享队列中,以便进行后续处理。该阶段内部又分为两个子阶段:建立连接和接受请求,分别由两种线程完成:Listener和Reader。

整个Server只有一个Listener线程,统一负责监听来自客户端的连接请求。一旦有新的请求到达,会采用轮询的方式从线程池中选择一个Reader线程进行处理。而Reader线程可同时存在多个,分别负责接收一部分客户端连接的RPC请求。Listener决定每个Reader线程负责哪些客户端连接。

Listener和Reader线程内部各自包含一个Selector对象,分别用于监听SelectionKey.OP_ACCEPTSelectionKey.OP_READ事件。对于Listener线程,主循环的实现体是监听是否有新的连接请求到达,并采用轮询策略选择一个Reader线程处理新线程;对于Reader线程,主循环的实现体是监听客户端连接中是否有新的RPC请求到达,并将新的RPC请求封装成Call对象,放到共享队列callQueue中。

2.处理请求

该阶段的主要任务是从共享队列callQueue中获取Call对象,执行对应的函数调用,并将结果返回给客户端,这全部由Handler线程完成。

Server端可同时存在多个Handler线程。它们并行从共享队列中读取Call对象,经执行对应的函数调用后,将尝试着直接将结果返回给对应的客户端。

3.返回结果

每个Handler线程执行完函数调用后,会尝试将执行结果返回给客户端,对于特殊情况,如返回结果过大或网络异常,会将发送任务交给Responder线程。

Server端仅存在一个Responder线程。内部包含一个Selector对象,用于监听SelectionKey.OP_WRITE事件。当Handler没能够将结果一次性发送到客户端时,会向该Selector对象注册SelectionKey.OP_WRITE事件,进而由Responder线程采用异步方式继续发送未发送完成的结果。

Hadoop RPC参数调优

Hadoop RPC对外提供一些可配置参数,主要配置参数如下:

Reader线程数目:由参数ipc.server.read.threadpool.size配置,默认是1。

每个Handler线程对应的最大Call数目:由参数ipc.server.handler.queue.size指定,默认是100;

Handler线程数目:在Hadoop中,JobTracker和NameNode分别是MapReduce和HDFS两个子系统的RPC Server,其对应的Handler数目分别由参数mapred.job.tracker.handler.countdfs.namenode.service.handler.count指定,默认值均为10。

客户端最大重试次数:客户端最大重试次数由参数ipc.client.connect.max.retries指定,默认值为10。