Hadoop技术内幕之MapReduce编程模型(下)

云计算 来源:owen1190 20℃ 0评论

非Java API 解析

Hadoop Streaming的实现原理

Hadoop Streaming 是 Hadoop 为方便非 Java 用户编写 MapReduce 程序而设计的工具包。 它允许用户将任何可执行文件或者脚本作为 Mapper/Reducer, 这大大提高了程序员的开发效率。

Hadoop Streaming 要求用户编写的 Mapper/Reducer 从标准输入中读取数据, 并将结果写到标准数据中。

Hadoop Streaming 工具包实际上是一个使用 Java 编写的 MapReduce 作业。 当用户使用可执行文件或者脚本文件充当 Mapper 或者 Reducer 时, Java 端的 Mapper 或者 Reducer 充当了 wrapper 角色, 它们将输入文件中的 key 和 value 直接传递给可执行文件或者脚本文件进行处理, 并将处理结果写入 HDFS。

实现 Hadoop Streaming 的关键技术点是如何使用标准输入输出实现 Java 与其他可执行文件或者脚本文件之间的通信。 Hadoop Streaming 使用了 JDK 中的 java.lang.ProcessBuilder 类。 该类提供了一整套管理操作系统进程的方法, 包括创建、 启动和停止进程( 也就是应用程序) 等。 相比于 JDK 中的 Process 类, ProcessBuilder 允许用户对进程进行更多控制, 包括设置当前工作目录、 改变环境参数等。
Mapper的执行过程:Hadoop Streaming 使用 ProcessBuilder 以独立进程方式启动可执行文件 wc_mapper, 并创建该进程的输入输出流。

    // 将 wc_mapper 封装成一个进程
    ProcessBuilder builder = new ProcessBuilder("wc_mapper");
    builder.environment().putAll(childEnv.toMap()); // 设置环境变量
    sim = builder.start();
    // 创建标准输出流
    clientOut_ = new DataOutputStream(new BufferedOutputStream(sim.getOutputStream(), BUFFER_SIZE));
    // 创建标准输入流
    clientIn_ = new DataInputStream(new BufferedInputStream(sim.getInputStream() , BUFFER_SIZE));
    // 创建标准错误流
    clientErr_ = new DataInputStream(new BufferedInputStream(sim.getErrorStream()));

Hadoop Streaming 提 供 了 一 个 默 认 的 PipeMapper。 它 实 际 上 是 C++ 端 Mapper 的wrapper, 主要作用是向已经创建好的输出流 clientOut_ 中写入数据。

    public void map(Object key, Object value, OutputCollector output, Reporter reporter) throws IOException {
    …
    clientOut_.write(key, 0, keySize);
    clientOut_.write(mapInputFieldSeparator);
    clientOut_.write(value, 0, valueSize);
    clientOut_.write('\n');
    }

写入 clientOut_ 的数据直接成为 wc_mapper 的输入, 待数据被处理完后, 可直接从标准输入流 clientIn_ 中获取结果:

    // MROutputThread
    public void run() {
        lineReader = new LineReader((InputStream)clientIn_, job_);
        while (lineReader.readLine(line) > 0) {
        splitKeyVal(line, line.getLength(), key, val);
        output.collect(key, val);
        }
    }

Hadoop Streaming 使用分隔符定位一个完整的 key或 value, 因而只能支持文本格式数据, 不支持二进制格式。 在 0.21.0/0.22.X 系列版本中, Hadoop Streaming 增加了对二进制文件的支持 , 并添加了两种新的二进制文件格式 : RawBytes 和 TypedBytes。RawBytes 指 key 和 value 是原始字节序列, 而TypedBytes 指 key 和 value 可以拥有的数据类型,由于它们采用的是长度而不是某一种分隔符定位 key 和 value, 因而支持二进制文件格式。

RawBytes 传递给可执行文件或者脚本文件的内容编码格式为:

TypedBytes 允许用户为 key 和 value 指定数据类型。 对于长度固定的基本类型, 如byte、 bool、 int、 long 等, 其编码格式为:

对于长度不固定的类型, 如 byte array、 string 等, 其编码格式为:

当 key 和 value 大部分情况下为固定长度的基本类型时, TypedBytes 比 RawBytes 格式
更节省空间。

Hadoop Pipes 的实现原理

Hadoop Pipes 是 Hadoop 为方便 C/C++ 用户编写 MapReduce 程序而设计的工具。 其设计思想是将应用逻辑相关的 C++ 代码放在单独的进程中, 然后通过 Socket 让 Java 代码与C++ 代码通信以完成数据计算。

Hadoop Pipes 的一个缺点是调试不方便。 因为输入的数据是 Java 端代码通过 Socket 传到 C++ 应用程序的, 所以用户不能单独对 C++ 部分代码进行测试, 而需要连同 Java 端代码一起启动。

Hadoop Pipes也 使 用 Java 中 的ProcessBuilder 以单独进程方式启动可执行文件。 不同之处是 Java 代码与可执行文件( 或者脚本) 的通信方式: Hadoop Streaming 采用标准输入输出, 而 Hadoop Pipes 采用 Socket。

Hadoop Pipes 由两部分组成 : Java 端代码和 C++ 端代码。Java 端的 Mapper 或者 Reducer 实际上是C++ 端 Mapper 或者 Reducer 的封装器( wrapper), 它们通过 Socket 将输入的 key 和 value直接传递给可执行文件执行。

Hadoop Pipes具体流程:

步骤 1 用户提交 Pipes 作业后, Java 端启动一个 Socket server(等待 C++ 端接入),同时以独立进程方式运行 C++ 端代码。

步 骤 2 C++ 端 以 Client 身 份连 接 Java 端 的 Socket server, 连接成功后,Java 端依次发送一系列指令通知 C++ 端进行各项准备工作。

步 骤 3 Java 端 通 过 mapItem()函 数 不 断 向 C++ 端 传 送 key/value对, C++ 端将计算结果返回给 Java端, Java 端对结果进行保存。

步骤 4 所有数据处理完毕后,Java 端通知 C++ 端终止计算,并关闭 C++ 端进程。

接下来深入分析 Hadoop,Pipes 内 部 实 现 原 理。 如 下图 所 示, Java 端 用 PipesMapRunner 实 现 了 MapRunner,在MapRunner 内部, 借助两个协议类 DownwardProtocol 和 UpwardProtocol 向 C++ 端发送数据和从 C++ 端接收数据, 而 C++ 端也有两个类与之对应, 分别是 Protocol 和 UpwardProtocol。Protocol 将收到的数据传给用户编写的 Mapper, 经 Mapper、 Combiner 和 Partitioner 处理后,由 UpwardProtocol 返回给 Java 端的 UpwardProtocol, 由它写到本地磁盘上。

Hadoop工作流

很多情况下, 用户编写的作业比较复杂, 相互之间存在依赖关系, 这种依赖关系可以用有向图表示, 我们称之为“工作流”。

JobControl的实现原理

对于各个作业之间有依赖关系的MapReduce程序中,传统做法就是为每个作业创建相应的 JobConf 对象,并按照依赖关系依次( 串行) 提交各个作业。

如果采用JobControl,用户只需使用 addDepending() 函数添加作业依赖关系接口,JobControl 会按照依赖关系调度各个作业

JobControl 由两个类组成 : Job 和 JobControl。 其中, Job 类封装了一个 MapReduce 作业及其对应的依赖关系, 主要负责监控各个依赖作业的运行状态, 以此更新自己的状态,其状态转移图如下图 所示。 作业刚开始处于 WAITING 状态。 如果没有依赖作业或者所有依赖作业均已运行完成, 则进入 READY 状态。 一旦进入 READY 状态, 则作业可被提交到 Hadoop 集群上运行, 并进入 RUNNING 状态。 在 RUNNING 状态下, 根据作业运行情况, 可能进入 SUCCESS 或者 FAILED 状态。 需要注意的是, 如果一个作业的依赖作业失败, 则该作业也会失败。

JobControl 封装了一系列 MapReduce 作业及其对应的依赖关系。 它将处于不同状态的作业放入不同的哈希表中, 并按照图 3-26 所示的状态转移作业, 直到所有作业运行完成。在实现的时候, JobControl 包含一个线程用于周期性地监控和更新各个作业的运行状态,调度依赖作业运行完成的作业, 提交处于 READY 状态的作业等。 同时, 它还提供了一些API 用于挂起、 恢复和暂停该线程。

ChainMapper/ChainReducer 的实现原理

ChainMapper/ChainReducer 主要为了解决线性链式 Mapper 而提出的。

下图展示了一个典型的ChainMapper/ChainReducer应用场景。
在Map 阶段, 数据依次经过 Mapper1 和 Mapper2 处理 ; 在 Reduce 阶段, 数据经过 shuffle 和sort 后 ; 交由对应的 Reducer 处理, 但 Reducer 处理之后并没有直接写到 HDFS 上, 而是交给另外一个 Mapper 处理, 它产生的结果写到最终的 HDFS 输出目录中。

但是,对于任意一个 MapReduce 作业, Map 和 Reduce 阶段可以有无限个Mapper, 但 Reducer 只能有一个。如果遇到下图这种情况,只能将分解为两个MapReduce作业。

用户通过 addMapper 在 Map/Reduce 阶段添加多个 Mapper。 该函数带有 8 个输入参数, 分别是作业的配置、 Mapper 类、 Mapper 的输入 key 类型、 输入 value 类型、 输出 key类型、 输出 value 类型、 key/value 是否按值传递和 Mapper 的配置。 其中, Hadoop MapReduce 有一个约定, 函数 OutputCollector.collect(key, value) 执行期间不应改变 key 和 value 的值。 这主要是因为函数 Mapper.map() 调用完 OutputCollector.collect(key, value) 之后, 可能会再次使用 key 和 value 值, 如果被改变, 可能会造成潜在的错误。 为了防止 OutputCollector 直接对 key/value 修改, ChainMapper 允许用户指定key/value 传递方式。 如果用户确定 key/value 不会被修改, 则可选用按引用传递, 否则按值传递。 需要注意的是, 引用传递可避免对象拷贝, 提高处理效率, 但需要确保 key/value 不会被修改。

ChainMapper/ChainReducer 实 现 的 关 键 技 术 点 是 修 改 Mapper 和 Reducer 的 输 出
流, 将本来要写入文件的输出结果重定向到另外一个 Mapper 中。而输出结果由 OutputCollector 管理, 因而, ChainMapper/ChainReducer 需要重新实现一个OutputCollector 完成数据重定向功能。

当用户调用 addMapper 添加Mapper 时, 可能会为新添加的每个 Mapper 指定一个特有的 JobConf, 为此,ChainMapper/ChainReducer 将这些 JobConf 对象序列化后, 统一保存到作业的 JobConf 中。

当链式作业开始执行的时候, 首先将各个 Mapper 的 JobConf 对象反序列化, 并构造对
应的 Mapper 和 Reducer 对象, 添加到数据结构 mappers( List 类型) 和 reducer
( Reducer 类型) 中。

Hadoop 工作流引擎

JobControl 和 ChainMapper/ChainReducer 仅可具备最简单的工作流引擎功能。

为了增强Hadoop 支持工作流的能力, 在 Hadoop 之上出现了很多开源的工作流引擎, 主要可概括为两类: 隐式工作流引擎和显式工作流引擎。

隐式工作流引擎在 MapReduce 之上添加了一个语言抽象层, 允许用户使用更简单的方式编写应用程序,用户无须关注 MapReduce 的任何细节, 降低了用户的学习成本, 并可大大提高开发效率。

功能描述层 : 直接面向用户提供了一种简单的应用程序编写方法
作业生成器: 作业生成器主要将上层的应用程序转化成一批 MapReduce 作业。 这一批 MapReduce 存在相互依赖关系, 实际上是一个 DAG。
调度引擎 : 调度引擎直接构建于 MapReduce 环境之上, 将作业生成器生成的 DAG 按照依赖关系提交到MapReduce 上运行。

显式工作流引擎直接面向 MapReduce 应用程序开发者, 提供了一种作业依赖关系描述方式, 并能够按照这种描述方式进行作业调度。

工作流描述语言 : 工作流描述语言用于描述作业的依赖关系。 Oozie 采用了 XML, 而 Azkaban 采用了key/value 格式的文本文件。 需要注意的是, 这里的作业不仅仅是指 MapReduce 作业, 还包括 Shell 命令、 Pig 脚本等。
调度引擎 : 同隐式工作流引擎的调度引擎功能相同,即根据作业的依赖关系完成作业调度。