Hadoop技术内幕之MapReduce编程模型(上)

云计算 来源:owen1190 85℃ 0评论

MapReduce 应用广泛的原因之一在于它的易用性。 它提供了一个因高度抽象化而变得
异常简单的编程模型。

MapReduce 编程模型概述

适用的应用场景往往具有一个共同的特点 : 任务可被分解成相互独立的子问题。

MapReduce 编程模型给出了其分布式编程方法, 共分 5 个步骤:

  1. 迭代( iteration)。 遍历输入数据, 并将之解析成 key/value 对。
  2. 将输入 key/value 对映射( map) 成另外一些 key/value 对。
  3. 依据 key 对中间数据进行分组( grouping)。
  4. 以组为单位对数据进行归约( reduce)。
  5. 迭代。 将最终产生的 key/value 对保存到输出文件中。

MapReduce 编程接口体系结构

整个编程模型位于应用程序层和 MapReduce 执行器之间, 可以分为两层。

第一层是最基本的 Java API, 主要有 5 个可编程组件, 分别是 InputFormat、 Mapper、 Partitioner、 ReducerOutputFormat

第二层是工具层, 位于基本 Java API 之上, 主要是为了方便用户编写复杂的 MapReduce 程序和利用其他编程语言增加 MapReduce 计算平台的兼容性而提出来的。

在该层,主要提供4个编程工具包

  • JobControl: 方便用户编写有依赖关系的作业, 这些作业往往构成一个有向图, 所以通常称为 DAG( Directed Acyclic Graph) 作业。
  • ChainMapper/ChainReducer: 方便用户编写链式作业, 即在 Map 或者 Reduce 阶段存在多个 Mapper, 形式如下:
    [MAPPER+ REDUCER MAPPER*]
  • Hadoop Streaming : 方便用户采用非 Java 语言编写作业, 允许用户指定可执行文件或者脚本作为 Mapper/Reducer。
  • Hadoop Pipes: 专门为 C/C++ 程序员编写 MapReduce 程序提供的工具包。

新旧 MapReduce API 比较

1.存放位置

旧版 API 放在 org.apache.hadoop.mapred 包中, 而新版 API 则放在 org.apache.hadoop.mapreduce 包及其子包中。

2.接口变为抽象类

当需要为抽象类添加新的方法时, 只要新添加的方法提供了默认实现, 用户之前的代码就不必修改了。
新API将InputFormatOutputFormatMapperReducerPartitioner由接口变为抽象类。

3.上下文封装

新版API将变量和函数封装成各种上下文(Context)类,使得 API 具有更好的易用性和扩展性。 首先, 函数参数列表经封装后变短,使得函数更容易使用;其次,当需要修改或添加某些变量或函数时,只需修改封装后的上下文类即可。

MapReduce API基本概念

序列化

序列化的主要作用有两个: 永久存储和进程间通信。

MapReduce 编程模型要求用户输入和输出数据中的 key 和 value 必须是可序列化的。 在Hadoop MapReduce中,使一个Java对象可序列化的方法是让其对应的类实现 Writable 接口。key 对应类需实现WritableComparable接口。

Reporter参数

应 用 程 序 可 使 用Reporter 中的方法报告完成进度(progress)、 设定状态消息(setStatus) 以及更新计数器(incrCounter)。

MapReduce 对外提供的大部分组件, 包括 InputFormatMapperReducer 等, 均在其主要方法中添加了该参数。

回调机制

它将工作流内的某个功能按照约定的接口暴露给外部使用者, 为外部使用者提供数据, 或要求外部使用者提供数据。
Hadoop MapReduce的InputFormatMapperParitionerReducerOutputFormat属于回调接口,当用户按照约定实现这几个接口后, MapReduce运行时环境会自动调用它们。

Java API解析

Hadoop 的主要编程语言是 Java, 因而 Java API 是最基本的对外编程接口。

作业配置与提交

Hadoop配置文件介绍

配置文件主要分为系统默认配置文件和管理员自定义配置文件。 其中, 系统默认配置文件分别是core-default.xmlhdfs-default.xmlmapred-default.xml, 它们包含了所有可配置属性的默认值。 而管理员自定义配置文件分别是 core-site.xmlhdfs-site.xmlmapred-site.xml。 它们由管理员设置, 主要用于定义一些新的配置属性或者覆盖系统默认配置文件中的默认值。

core-default.xmlcore-site.xml 属于公共基础库的配置文件, 默认情况下, Hadoop 总会优先加载它们。

每个配置属性主要包括三个配置参数 : namevaluedescription, 分别表示属性名、 属性值和属性描述。属性描述仅仅用来帮助用户理解属性的含义。

Hadoop 为配置文件添加了两个新的特性 : final 参数和变量扩展。

  • final参数:
    如果管理员不想让用户程序修改某些属性的属性值, 可将该属性的 final参数置为 true。
    一般在 XXX-site.xml 配置文件中为某些属性添加 final 参数, 以防止用户在应用程序中修改这些属性的属性值。

  • 变量扩展:
    当读取配置文件时, 如果某个属性存在对其他属性的引用, 则 Hadoop 首先会查找引用的属性是否为。 如果是其他已经定义的属性或Java中System.getProperties()函数可以获取属性, 则进行扩展。

MapReduce作业配置与提交

在 MapReduce 中, 每个作业由两部分组成 : 应用程序和作业配置。 其中, 作业配置内容包括环境配置和用户自定义配置两部分。 环境配置由 Hadoop 自动添加, 主要由mapreddefault.xmlmapred-site.xml 两个文件中的配置选项组合而成 ; 用户自定义配置则由用户自己根据作业特点个性化定制而成。

旧API作业配置实例:

JobConf job = new JobConf(new Configuration(), MyJob.class);
job.setJobName("myjob");
job.setMapperClass(MyJob.MyMapper.class);
job.setReducerClass(MyJob.MyReducer.class);
JobClient.runJob(job);

新API作业配置实例:

Configuration conf = new Configuration();
Job job = new Job(conf, "myjob ");
job.setJarByClass(MyJob.class);
job.setMapperClass(MyJob.MyMapper.class);
job.setReducerClass(MyJob.MyReducer.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);

从以上例子可以看出,新版 API 用 Job 类代替了 JobConfJobClient 两个类, 这样仅使用一个类的同时可完成作业配置和作业提交相关功能。

旧API中的作业配置

org.apache.hadoop.conf 中 的Configuration 类是配置模块最底层的类

该类支持以下两种:

● 序列化 : 序列化是将结构化数据转换成字节流, 以便于传输或存储。 Java 实现了自己的一套序列化框架。 凡是需要支持序列化的类, 均需要实现 Writable 接口。

● 迭代: 为了方便遍历所有属性, 它实现了 Java 开发包中的 Iterator 接口

Configuration 类总会依次加载 core-default.xmlcore-site.xml 两个基础配置文件。
相关代码如下:

addDefaultResource("core-default.xml");
addDefaultResource("core-site.xml");

addDefaultResource 函数的参数为 XML 文件名, 它能够将 XML 文件中的 name/value加载到内存中。 当连续调用多次该函数时, 对于同一个配置选项, 其后面的值会覆盖前面的值。

Configuration 类中有大量针对常见数据类型的 getter/setter 函数, 用于获取或者设置某种数据类型属性的属性值。

Configuration 类中还有一个非常重要的函数:
void writeXml(OutputStream out)
该函数能够将当前 Configuration 对象中所有属性及属性值保存到一个 XML 文件中,以便于在节点之间传输。

JobConf 类描述了一个 MapReduce 作业运行时需要的所有信息, 而 MapReduce 运行时环境正是根据 JobConf 提供的信息运行作业的。
JobConf 继承了 Configuration 类, 并添加了一些设置 / 获取作业属性的 setter/getter 函数, 以方便用户编写 MapReduce 程序。

JobConf 会自动加载配置文件 mapred-default.xmlmapred-site.xml
相关代码如下:

static{
Configuration.addDefaultResource("mapred-default.xml");
Configuration.addDefaultResource("mapred-site.xml");
}

新API中的作业配置

Job类同时具有作业配置和作业提交的功能。

Job 类继承了一个新类 JobContext, 而 Context 自身则包含一个JobConf 类型的成员。 注意, JobContext 类仅提供了一些 getter 方法, 而 Job 类中则提供了一些 setter 方法。

InputFormat 接口的设计与实现

InputFormat 主要用于描述输入数据的格式,主要有以下功能:

● 数据切分 : 按照某个策略将输入数据切分成若干个 split, 以便确定 Map Task 个数以及对应的 split。

● 为 Mapper 提供输入数据: 给定某个 split, 能将其解析成一个个 key/value 对。

旧版 API 的 InputFormat 解析

旧版InputFormat为一个接口,包含以下方法

InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
RecordReader<K, V> getRecordReader(InputSplit split,JobConf job,Reporter reporter) throws IOException;

getSplits 方法主要完成数据切分的功能, 它会尝试着将输入数据切分成 numSplits 个InputSplit。

逻辑分片: 它只是在逻辑上对输入数据进行分片, 并不会在磁盘上将其切分成分片进行存储。

● **可序列化:**InputSplit 支持序列化操作主要是为了进程间通信。 作业被提交到 JobTracker 之前, Client 会调用作业 InputFormat 中的 getSplits 函数, 并将得到的 InputSplit 序列化到文件中。 这样, 当作业提交到 JobTracker 端对作业初始化时, 可直接读取该文件, 解析出所有 InputSplit, 并创建对应的 Map Task。

getRecordReader方法返回一个RecordReader对象, 该对象可将输入的InputSplit解析成若干个key/value 对。MapReduce框架在MapTask执行过程中,会不断调用RecordReader 对象中的方法, 迭代获取 key/value 对并交给 map() 函数处理。

由公共基类 FileInputFormat 采用统一的方法对各种输入文件进行切分, 而由各个派生 InputFormat 自己提供机制将进一步解析InputSplit。 对应到具体的实现是, 基类 FileInputFormat 提供 getSplits 实现, 而派生类提供getRecordReader 实现。

FileInputFormat:它最重要的功能是为各种 InputFormat 提供统一的 getSplits 函数。 该函数实现中最核心的两个算法是文件切分算法和 host 选择算法。

1.文件切分算法

文件切分算法主要用于确定 InputSplit 的个数以及每个 InputSplit 对应的数据段,由以下三个属性值来确定其对应的InputSplit的个数

globalSize:它是根据用户期望的 InputSplit 数目计算出来的, 即 totalSize/numSplits。其中, totalSize 为文件总大小; numSplits 为用户设定的 Map Task 个数, 默认情况下是 1。

● **minSize:**InputSplit 的最小值, 由配置参数 mapred.min.split.size 确定, 默认是 1。

blockSize:文件在 HDFS 中存储的 block 大小, 不同文件可能不同, 默认是 64 MB。
最终其计算公式如下:

splitSize = max{minSize, min{goalSize, blockSize}}

一旦确定 splitSize 值后, FileInputFormat 将文件依次切成大小为 splitSize 的 InputSplit,最后剩下不足 splitSize 的数据块单独成为一个 InputSplit。
如果想让 InputSplit 尺寸大于 block 尺寸, 则直接增大配置参数 mapred.min.split.size 即可。

2.host选择算法

下一步要确定每个 InputSplit 的元数据信息。 这通常由四部分组成 : <file, start, length, hosts>, 分别表示 InputSplit 所在的文件、 起始位置、 长度以及所在的 host( 节点) 列表。

Hadoop 将数据本地性按照代价划分成三个等级 : node localityrack localitydatacenter locality( Hadoop 还未实现该 locality 级别)。 在进行任务调度时, 会依次考虑这 3 个节点的 locality, 即优先让空闲资源处理本节点上的数据, 如果节点上没有可处理的数据,则处理同一个机架上的数据, 最差情况是处理其他机架上的数据。

考虑到任务调度的效率, 通常选择包含( 该 InputSplit) 数据总量最大的前几个节点( Hadoop 限制最多选择 10 个, 多余的会过滤掉), 以作为任务调度时判断任务是否具有本地性的主要凭证。

为此, FileInputFormat 设计了一个简单有效的启发式算法 :首先按照 rack 包含的数据量对 rack 进行排序, 然后在 rack 内部按照每个 node 包含的数据量对 node 排序, 最后取前 N 个 node 的 host 作为 InputSplit 的 host 列表, 这里的 N 为 block副本数。 这样, 当任务调度器调度 Task 时, 只要将 Task 调度给位于 host 列表的节点, 就认为该 Task 满足本地性。

当使用基于 FileInputFormat 实现 InputFormat 时, 为了提高 Map Task 的数据本地性, 应尽量使 InputSplit 大小与 block 大小相同。

接下来分析派生类 TextInputFormat 与 SequenceFileInputFormat 的实现。在具体实现时,RecordReader 应考虑以下两点。

定位记录边界

为了能够识别一条完整的记录, 记录之间应该添加一些同步标识。对于 TextInputFormat, 每两条记录之间存在换行符; 对于 SequenceFileInputFormat,每隔若干条记录会添加固定长度的同步字符串。
由于 FileInputFormat 仅仅按照数据量多少对文件进行切分, 因而 InputSplit 的第一条记录和最后一条记录可能会被从中间切开。 为了解决这种记录跨越 InputSplit 的读取问题, RecordReader 规定每个InputSplit 的第一条不完整记录划给前一个 InputSplit 处理。

解析key/value

定位到一条新的记录后, 需将该记录分解成 key 和 value 两部分。对于 TextInputFormat, 每一行的内容即为 value, 而该行在整个文件中的偏移量为key。 对于 SequenceFileInputFormat, 每条记录的格式为:
[record length] [key length] [key] [value]
其中, 前两个字段分别是整条记录的长度和 key 的长度, 均为 4 字节, 后两个字段分别是 key 和 value 的内容。 知道每条记录的格式后, 很容易解析出 key 和 value。

新版API的InputFormat解析

InputSplit 划分算法不再考虑用户设定的 Map Task 个数, 而用 mapred.max.split.size( 记为 maxSize) 代替,计算公式如下:

splitSize = max{minSize, min{maxSize, blockSize}}

OutputFormat接口的设计与实现

OutputFormat 主要用于描述输出数据的格式, 它能够将用户提供的 key/value 对写入特定格式的文件中。

旧版API的OutputFormat解析

在旧版中,OutputFormat是一个接口,包含了两个方法:

RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job,String name, Progressable progress)throws IOException;
void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException;

checkOutputSpecs 方法一般在用户作业被提交到 JobTracker 之前, 由 JobClient 自动调用,以检查输出目录是否合法。

getRecordWriter 方法返回一个 RecordWriter 类对象。 该类中的方法 write 接收一个key/value 对, 并将之写入文件。

所有基于文件的 OutputFormat 实现的基类为 FileOutputFormat, 并由此派生出一些基于文本文件格式、 二进制文件格式的或者多输出的实现。

基类 FileOutputFormat 需要提供所有基于文件的 OutputFormat 实现的公共功能。

  1. 实现 checkOutputSpecs 接口
    该接口在作业运行之前被调用, 默认功能是检查用户配置的输出目录是否存在, 如果存在则抛出异常, 以防止之前的数据被覆盖。
  2. 处理 side-effect file
    任务的 side-effect file是具有特殊用途的任务专属文件。 它的典型应用是执行推测式任务。
    为了对那些由于硬件老化、网络故障造成的“慢任务”进行优化, Hadoop 会为之在另外一个节点上启动一个相同的任务, 该任务便被称为推测式任务, 最先完成任务的计算结果便是这块数据对应的处理结果。为防止这两个任务同时往一个输出文件中写入数据时发生写冲突, FileOutputFormat会为每个 Task 的数据创建一个 side-effect file, 并将产生的数据临时写入该文件, 待 Task完成后, 再移动到最终输出目录中。

这些文件的相关操作, 比如创建、 删除、 移动等, 均由 OutputCommitter 完成。 它是一个接口, Hadoop 提供了默认实现 FileOutputCommitter,用 户 也 可 以 根 据 自 己 的 需 求 编 写 OutputCommitter 实 现, 并 通 过 参 数 mapred.output.committer.class 指定。


默认情况下, 当作业成功运行完成后, 会在最终结果目录 ${mapred.out.dir} 下生成空文件_SUCCESS。 该文件主要为高层应用提供作业运行完成的标识。

新版API的OutputFormat解析

与旧版相比,主要有以下改动:接口变为抽象类,增加了一个新的方法: getOutputCommitter, 以允许用户自己定制合适的 OutputCommitter 实现。

Mapper与Reducer解析

旧版API的Mapper与Reducer解析

MapReduce 要求所有存储在底层分布式文件系统上的数据均要解释成 key/value 的形式, 并交给 Mapper/Reducer 中的 map/reduce 函数处理, 产生另外一些 key/value。
Mapper与Reducer类体系比较相似,Mapper类图如下,包括初始化、Map操作和清理三个部分。

1. 初始化
Mapper 继承了 JobConfigurable 接口。 该接口中的 configure 方法允许通过 JobConf 参数对 Mapper 进行初始化。
2. Map操作
MapReduce 框 架 会 通 过 InputFormat 中 RecordReader 从 InputSplit 获 取 一 个 个 key/
value 对, 并交给下面的 map() 函数处理:

void map(K1 key, V1 value, OutputCollector<K2, V2> output, Reporter reporter)throws IOException;

OutputCollector 和 Reporter 两个类型的参数, 分别用于输出结果和修改 Counter 值。
3. 清理
Mapper 通过继承 Closeable 接口( 它又继承了 Java IO 中的 Closeable 接口) 获得 close
方法, 用户可通过实现该方法对 Mapper 进行清理。

MapReduce提供大量Map/Reduce实现,它们对应功能为:

  • ChainMapper/ChainReducer: 用于支持链式作业。
  • IdentityMapper/IdentityReducer: 对于输入 key/value 不进行任何处理, 直接输出。
  • InvertMapper: 交换 key/value 位置。
  • RegexMapper: 正则表达式字符串匹配。
  • TokenMapper: 将字符串分割成若干个 token( 单词), 可用作 WordCount 的 Mapper。
  • LongSumReducer: 以 key 为组, 对 long 类型的 value 求累加和。

MapReduce 框架提供了比 Mapper 更通用的接口 : MapRunnable。
用户可以实现该接口以定制Mapper 的调用方式或者自己实现 key/value 的处理逻辑。
提供该接口的另外一个好处是允许用户实现多线程 Mapper。

如下图,MapReduce 提 供 了 两 个 MapRunnable 实 现, 分 别 是 MapRunner 和MultithreadedMapRunner, 其中 MapRunner 为默认实现。 MultithreadedMapRunner 实现了一种多线程的 MapRunnable。 默认情况下, 每个 Mapper 启动 10 个线程, 通常用于非 CPU类型的作业以提供吞吐率。

新版 API 的 Mapper/Reducer 解析

  • Mapper 由接口变为抽象类, 且不再继承 JobConfigurable 和 Closeable 两个接口, 而是直接在类中添加了 setup 和 cleanup 两个方法进行初始化和清理工作。
  • 将参数封装到 Context 对象中, 这使得接口具有良好的扩展性。
  • 去掉 MapRunnable 接口, 在 Mapper 中添加 run 方法, 以方便用户定制 map() 函数的调用方法, run 默认实现与旧版本中 MapRunner 的 run 实现一样。
  • 新 API 中 Reducer 遍历 value 的迭代器类型变为 java.lang.Iterable, 使得用户可以采用“foreach”形式遍历所有 value, 如下所示:

    void reduce(KEYIN key, Iterable values, Context context) throws IOException, InterruptedException {    
        for(VALUEIN value: values) { // 注意遍历方式
        context.write((KEYOUT) key, (VALUEOUT) value);
        }
    }
    

Partitioner 接口的设计与实现

Partitioner 的作用是对 Mapper 产生的中间结果进行分片, 以便将同一分组的数据交给同一个 Reducer 处理, 它直接影响 Reduce 阶段的负载均衡。

旧版的Partitioner继承了 JobConfigurable, 可通过 configure 方法初始化。 它本身只包含一个待实现的方法 getPartition。 该方法包含三个参数, 均由框架自动传入, 前面两个参数是key/value, 第三个参数 numPartitions 表示每个 Mapper 的分片数, 也就是 Reducer 的个数。
MapReduce 提供了两个 Partitioner 实现 : HashPartitionerTotalOrderPartitioner。其中HashPartitioner 是默认实现,是一种基于哈希值的分片方法。

public int getPartition(K2 key, V2 value,int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}

TotalOrderPartitioner 提供了一种基于区间的分片方法,为了提高全局排序的性能和扩展性,MapReduce 提供了 TotalOrderPartitioner。 它能够按照大小将数据分成若干个区间( 分片),并保证后一个区间的所有数据均大于前一个区间数据,步骤如下:

步骤1:数据采样。 在 Client 端通过采样获取分片的分割点。

步骤2:Map 阶段。 本阶段涉及两个组件, 分别是 Mapper 和 Partitioner。 其中, Mapper 可采用 IdentityMapper, 直接将输入数据输出, 但 Partitioner 必须选用 TotalOrderPartitioner,它将步骤 1 中获取的分割点保存到 trie 树中以便快速定位任意一个记录所在的区间, 这样,每个 Map Task 产生 R( Reduce Task 个数) 个区间, 且区间之间有序。
TotalOrderPartitioner 通过 trie 树查找每条记录所对应的 Reduce Task 编号。

步骤3:Reduce 阶段。 每个 Reducer 对分配到的区间数据进行局部排序, 最终得到全排序数据。

基于 TotalOrderPartitioner 全排序的效率跟 key 分布规律和采样算法有直接关系 ; key 值分布越均匀且采样越具有代表性, 则 Reduce Task 负载越均衡, 全排序效率越高。

新版API中的Partitioner不再实现 JobConfigurable 接口。 当用户需要让 Partitioner 通过某个 JobConf 对象初始化时, 可自行实现 Configurable 接口。