Hadoop技术内幕之RPC框架解析(下)

云计算 来源:owen1190 28℃ 0评论

MapReduce 通信协议分析

在Hadoop MapReduce中,不同组件之间的通信协议均是基于RPC的,因为它们,支撑起整个MapReduce系统。

MapReduce通信协议概述

在Hadoop1.0.0,MapReduce框架中共有6个主要的通信协议。其中,直接面向Client的通信协议共有4个。

JobSubmissionProtocol:Client与JobTracker之间的通信协议。用户通过该协议提交作业,查看作业运行情况。

RefreshUserMappingsProtocol:Client通过该协议更新用户-用户组映射关系。

RefreshAuthorizationPolicyProtocol:Client通过该协议更新MapReduce服务级别访问控制列表。

AdminOperationsProtocol:Client通过该协议更新队列(存在于JobTracker或者Scheduler中)访问控制列表和节点列表。

因为安全问题,通常将JobSubmissionProtocol使用权限授予普通用户,而其他三个通信协议的权限授予管理员。

另外两个通信协议位于MapReduce框架内部,

InterTrackerProtocol:TaskTracker与JobTracker之间的通信协议。TaskTracker通过相关接口汇报本节点的资源使用情况和任务运行状态等信息,并执行JobTracker发送的命令。

TaskUmbilicalProtocol:Task与TaskTracker之间的通信协议。每个Task实际上是其同节点TaskTracker的子进程,它们通过该协议汇报Task运行状态、运行进度等信息。

在Hadoop中,所有使用Hadoop RPC的协议基类均为VersionedProtocol。该类主要用于描述协议版本号,以防止不同版本号的客户端与服务器端之间通信。

在MapReduce中,这六个通信协议与JobTracker,TaskTracker的类关系如下图

JobSubmissionProtocol通信协议

该协议接口分为三类:

1.作业提交

Client可通过以下RPC函数提交作业:
public JobStatus submitJob(JobID jobName,String jobSubmitDir,Credentials ts) throw IOException;

其中,jobName为该作业的ID,Client可通过getNewJobId()函数为作业获取一个唯一的ID,jobSubmitDir为作业文件所在目录,一般为HDFS上的一个目录lts是该作业分配到的密钥或者安全令牌。

2.作业控制

当用户提交作业后,可进一步控制该作业,主要有三个操作:修改其作业优先级(setJobPriority函数)、杀死一个作业(killJob函数)、杀死一个任务(killTask函数)。

3.查看系统状态和作业运行状态

该协议提供了一系列函数以供Client查看集群状态。

//获取集群当前状态
public ClusterStatus getClusterStatus(boolean detailed) throws IOException;
//获取某个作业的运行状态
public JobStatus getJobStatus(JobID jobid) throws IOException;
//获取所有作业的运行状态
public JobStatus[] getAllJobs() throws IOException;

InterTrackerProtocol通信协议

该协议的最重要的一个方法是heartbeat。它周期地被调用,进而形成了TaskTracker与JobTracker之间的心跳。其定义如下:

HeartbeatResponse heartbeat(TaskTrackerStatus status,boolean restarted,boolean initialContact,boolean acceptNewTasks,short responseId) throws IOException;

status封装了所在节点的资源使用情况(物理内存和虚拟内存总量和使用量,CPU个数以及利用率等)和任务运行情况(每个任务运行进度,状态以及所处的阶段等)。

函数返回值类型中包含了一个TaskTrackerAction类型数组。该数组包含了JobTracker向TaskTracker传达各种命令,主要分为以下几种:

CommitTaskAction:Task运行完成,提交其产生的结果。

ReinitTrackerAction:重新对自己(TaskTracker)初始化.

KillJobAction:杀死某个作业,并清理其使用的资源。

KillTaskAction:杀死某个任务。

LaunchTaskAction:启动一个新任务。

TaskUmbilicalProtocol通信协议

TaskUmbilicalProtocol是Task与TaskTracker之间的通信协议。每个Task通过该协议向对应的TaskTracker汇报自己的运行状况或者出错信息。

可将该协议中的方法分为两类:一类是周期地被调用的方法,另一类是按需调用的方法。其中,第一类方法主要有以下两个:

//Task向TaskTracker汇报自己当前状态,状态信息被封装到TaskStatus中
boolean statusUpdate(TaskAttemptID taskId,TaskStatus taskStatus,JvmContext jvmCOntext) throws IOException,InterruptedException;
//Task周期性探测TaskTracker是否活着
boolean ping(TaskAttemptID taskid,JvmContext jvmContext) throws IOException;

这两个方法相互合作,共同完成Task状态汇报的任务。一般下,Task每隔3s会调用一次statusUpdate函数向TaskTracker汇报最新进度。然而,如果Task在3s内没有处理任务数据,则不再汇报进度,而是直接调用ping方法探测TaskTracker,以确保当前数据处理过程中它一直是活着的。

第二类方法在Task的不同运行阶段被调用,其调用时机依次为:

1.Task初始化

TaskTracker从JobTracker那里接收到一个启动新Task的命令后,首先创建一个子进程,并由该子进程调用getTask方法领取对应的Task。

2.Task运行中

汇报错误及异常:Task运行过程中可能会出现各种异常或者错误,而reportDiagonstiocInfo/fsError/fatalError方法则分别用以汇报出现的Exception/FsError/Throwabele异常和错误。对于Reduce Task而言,还提供了shuffleError方法汇报Shuffle阶段出现的错误。

汇报记录范围:Hadoop可通过跳过坏记录提高程序的容错性。为了便于定位坏记录的位置,Task需要通过reportNextRecordRecordRange方法不断向TaskTracker汇报将要处理的记录范围。

获取Map Task完成列表:该协议专门为Reduce Task提供了getMapCompletinEvent方法,以方便其从TaskTracker获取已经完成的Map Task列表,进而能够获取Map Task产生的临时数据存放位置,并远程读取这些数据。

3.Task运行完成
当Task处理完最后一条记录后,会依次调用commitPendingcanCommitdone三个方法完成最后收尾工作。

其他通信协议

其他三个通信协议,即RefreshUserMappingsProtocolRefreshAuthorizationPolicyProtocolAdminOperationsProtocol,均用于动态更新Hadoop MapReduce的相关配置文件。这些配置文件涉及对Hadoop某些模块的访问权限,因而往往只将其使用权限授予一些级别高的用户。

RefreshUserMappingProtocol协议

该协议有两个作用:更新用户-用户组映射关系和更新超级用户代理列表,分别对应以下两个方法:

public void refreshUserToGroupsMapping()

public void refreshSuperUserGroupsConfiguration() throws IOException;

Hadoop对其进行封装,用户直接使用相应的Shell命令就可以完成更新操作。

默认情况下,Hadoop使用UNIX/Linux系统自带的用户-用户组映射关系,且对其进行缓存。如果修改了某些映射关系,可以使用以下Shell命令更新到Hadoop MapReduce缓存中:

bin/hadoop mradmin -refreshUsserToGroupsMappings

Hadoop提供一种代理机制,允许某些用户伪装成其他用户执行某些操作。用户可在core-site.xml配置文件中添加以下配置选项:


    hadoop.proxyuser.oozie.groups
    group1,group2
    超级用户oozie可伪装成分组group1和group2中的任何用户


    hadoop.proxyuser.oozie.hosts
    host1,host2
    超级用户oozie可伪装成分组host1和host2中的任何用户

经过以上配置后,只要用户oozie拥有Hadoop Kerberos key,则host1和host2两个节点上group1和group2两个组中的所有用户可统一以用户oozie的身份提交作业。

可动态修改这种超级用户代理列表,并通过以下命令完成更新操作:
bin/hadoop mradmin -refreshSuperUserGroupsConfiguration

RefreshAuthorizationPloicyProtocol协议

该协议用于更新MapReduce服务级别访问控制列表,其中“服务级别访问控制列表”实际上是每个通信协议的访问控制列表。每种通信协议均对应一定的访问权限。每个协议的访问控制列表可在配置文件hadoop-policy.xml中配置。

管理员可以直接通过以下命令动态更新配置文件hadoop-policy.xml

bin/hadoop mradmin -refreshServiceAcl

该命令最终会调用RefreshAuthorizationPolicyProtocol协议中的refreshServiceAcl方法完成更新操作。

AdminOperationsProtocol协议

该协议用于更新队列访问控制列表(refreshQueues)和更新节点列表(refreshNodes)的两个方法。

更新队列访问控制列表:Hadoop以队列为单位管理用户,每个队列对应的用户或者用户组称为该队列的访问控制列表。在Hadoop中,队列访问控制列表信心可能存在于配置文件mapred-queue-acls.xml和调度配置文件中。在配置文件mapred-queue-acls.xml,管理员可为每个队列配置两种权限:提交作业权限和管理作业权限。对于用户而言,拥有某个队列的作业提交权限意味着该用户可向该队列提交作业并使用该队列中的计算资源,而拥有队列的作业管理权限意味着该用户可以改变任何作业的优先级、杀死任何作业等。

管理员可通过以下命令动态加载配置文件:
bin/hadoop mradmin -refreshQueues

更新节点列表:Hadoop允许管理员为节点建立白名单和黑名单。其中,白名单可通过配置选项mapred.hostsmapred-site.xml)指定,而黑名单可通过mapred.hosts.exclude配置选项指定(在mapred-site.xml中)。管理员可通过以下命令动态更新这两个名单:

bin/hadoop mradmin -refreshNodes