MapReduce 执行流程深度解析
第一部分:客户端与集群的分工 (getSplits
vs. createRecordReader
)
客户端(Client)
job.submit()
/job.waitForCompletion(true)
触发分片计算:客户端首先实例化作业指定的InputFormat
(如TextInputFormat
),然后调用其getSplits()
方法。- 提交资源:计算出的分片元数据(
InputSplit
列表)、作业配置和 JAR 包等资源被提交给 YARN 的ResourceManager
。
- 源码佐证:
FileInputFormat.java
中核心的getSplits(JobContext job)
方法,通过遍历文件、计算splitSize
来生成FileSplit
对象列表,这正是分片逻辑的实现。// ... existing code ... public List<InputSplit> getSplits(JobContext job) throws IOException { StopWatch sw = new StopWatch().start(); long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job); // generate splits List<InputSplit> splits = new ArrayList<InputSplit>(); List<FileStatus> files = listStatus(job); // ... 循环遍历文件并创建分片 ... return splits; } // ... existing code ...
- 在大量的测试用例中(如
TestFixedLengthInputFormat.java
,TestCombineTextInputFormat.java
),都是在测试代码中直接调用format.getSplits(job, ...)
,这模拟了客户端的行为。
集群(YARN)
ResourceManager
接收到作业后,启动ApplicationMaster
(AM)。ApplicationMaster
根据InputSplit
的数量向ResourceManager
申请相应数量的容器(Container)来执行 Map 任务。NodeManager
在容器中启动MapTask
进程(YarnChild
)。MapTask
进程内部:- 反序列化获取分配给自己的那个
InputSplit
。 - 加载
InputFormat
类,并调用createRecordReader(split, context)
方法创建RecordReader
。 - 使用
RecordReader
从分片中读取<key, value>
对,并传递给用户实现的map()
方法。
- 反序列化获取分配给自己的那个
- 源码佐证:
InputSampler.java
是一个在客户端对输入进行采样的工具,它完美地模拟了 Map 任务的行为:它先获取分片,然后为每个分片创建RecordReader
并读取数据。// ... existing code ... RecordReader<K,V> reader = inf.createRecordReader( splits.get(i), samplingContext); reader.initialize(splits.get(i), samplingContext); while (reader.nextKeyValue()) { samples.add(ReflectionUtils.copy(job.getConfiguration(), reader.getCurrentKey(), null)); // ... existing code ...
MapReduceTutorial.md
也明确指出:RecordReader
的职责是从InputSplit
提供的面向字节的视图中,转换为Mapper
需要的面向记录的视图。
第二部分:MapReduce 完整流程
1. 输入(Input)和分片(Split)
- 描述: 由
InputFormat
的getSplits()
方法在客户端执行。它将输入数据源逻辑切分为多个InputSplit
,每个InputSplit
对应一个 Map 任务。分片是逻辑概念,包含位置和长度,而非物理切割。 - 源码佐证:
InputFormat.java
接口的 Javadoc 清晰地描述了getSplits
的作用是“Logically split the set of input files for the job”。
2. 映射(Map)
- 描述: 每个 Map 任务在一个
InputSplit
上执行。它使用RecordReader
读取数据,并执行用户定义的map()
方法,产出中间<key, value>
对。
3. Shuffle(洗牌)
这是 MapReduce 的“心脏”。
Map 端 Shuffle:
- 分区(Partition):
Partitioner
决定中间键值对被发送到哪个 Reducer。 - 排序和溢写(Sort & Spill): 数据首先写入环形内存缓冲区,在缓冲区内排序。当缓冲区满(由
mapreduce.map.sort.spill.percent
控制)时,数据被溢写(Spill)到磁盘上的临时文件。 - 合并(Merge): 如果产生了多个溢写文件,Map 任务结束前,会将这些文件合并排序成一个单一的、已分区且内部有序的输出文件。这是为 Reduce 端的拉取做准备。
- Combiner (可选): 在排序之后,合并之前对数据进行本地聚合,减少 I/O。
- 分区(Partition):
源码佐证 (Map端Merge):
MapTask.java
中有非常明确的合并逻辑。它会收集所有溢写文件(Spill)对应的Segment
,然后调用Merger.merge
方法。// ... existing code ... //create the segments to be merged List<Segment<K,V>> segmentList = new ArrayList<Segment<K, V>>(numSpills); for(int i = 0; i < numSpills; i++) { IndexRecord indexRecord = indexCacheList.get(i).getIndex(parts); // ... existing code ... //merge @SuppressWarnings("unchecked") RawKeyValueIterator kvIter = Merger.merge(job, rfs, keyClass, valClass, codec, segmentList, mergeFactor, new Path(mapId.toString()), job.getOutputKeyComparator(), reporter, sortSegments, null, spilledRecordsCounter, sortPhase.phase(), TaskType.MAP); // ... existing code ...
Reduce 端 Shuffle:
- 复制(Copy): Reduce 任务启动后,主动从各个 Map 任务的输出位置拉取(Copy)属于自己分区的数据。
- 合并(Merge/Sort): 在拉取数据的同时,在内存中进行归并排序。如果数据量过大,也会溢写到磁盘。最终,所有相关的 Map 输出被合并成一个统一的、有序的数据集。
4. 规约(Reduce)
- 描述: 框架将 Shuffle 阶段排序好的数据,以
<key, (list of values)>
的形式喂给reduce()
方法。对于每一个唯一的 key,reduce()
方法被调用一次。
5. 输出(Output)
- 描述:
Reducer
的输出通过OutputFormat
(及其包含的RecordWriter
)写入最终目的地,如 HDFS。
Mapper 和 Reducer 的核心联系:中间键值对
Mapper 和 Reducer 的核心联系是中间键值对 (intermediate key/value pairs)。
- Mapper 的产出: Mapper 的主要职责是处理输入的记录,并生成一系列中间的
<key, value>
对。这些是 Shuffle 过程的原材料。 - Reducer 的输入: Reducer 的输入是经过 Shuffle 和 Sort 之后,将来自所有 Mapper 的、具有相同 key 的中间值聚合在一起形成的
<key, (list of values)>
。
正如 Mapper.java
的注释所说:
All intermediate values associated with a given output key are subsequently grouped by the framework, and passed to a {@link Reducer} to determine the final output.
// ... existing code ...
* <p>All intermediate values associated with a given output key are
* subsequently grouped by the framework, and passed to a {@link Reducer} to
* determine the final output. Users can control the sorting and grouping by
* specifying two key {@link RawComparator} classes.</p>
// ... existing code ...
Mapper 把数据发往哪里?—— Partitioner
的决策
Mapper 并不直接“知道”要把数据发给哪个具体的 Reducer。这个决策是由 Partitioner
做出的。
- 分区(Partitioning): 对于 Mapper 输出的每一个
<key, value>
对,框架都会调用Partitioner
的getPartition
方法。 - 决定 Reducer 索引:
getPartition
方法会根据key
(或者 key 的一部分)计算出一个整数,这个整数就是目标 Reducer 的索引(从 0 到numReduceTasks - 1
)。 - 写入本地分区文件: Mapper 会将这个键值对写入自己本地磁盘上对应分区的一个缓冲区,最终合并成一个大的、已分区的输出文件。
所以,Mapper 的输出首先是写到自己所在的 Worker Node 的本地磁盘上,而不是直接通过网络发送给 Reducer。它只是根据 Partitioner
的逻辑,将数据整理好,等待 Reducer 来拉取。
MapReduceTutorial.md
对此有清晰的描述:
Partitioner controls the partitioning of the keys of the intermediate map-outputs. The key (or a subset of the key) is used to derive the partition, typically by a hash function. The total number of partitions is the same as the number of reduce tasks for the job. Hence this controls which of the m reduce tasks the intermediate key (and hence the record) is sent to for reduction.
MapReduceTutorial.md
[Partitioner](../../api/org/apache/hadoop/mapreduce/Partitioner.html) partitions the key space.
Partitioner controls the partitioning of the keys of the intermediate map-outputs. The key (or a subset of the key) is used to derive the partition, typically by a *hash function*. The total number of partitions is the same as the number of reduce tasks for the job. Hence this controls which of the `m` reduce tasks the intermediate key (and hence the record) is sent to for reduction.
网络在何时以及如何表示?—— Reduce 端的拉取 (Copy/Fetch)
网络传输在 MapReduce 中主要用于 Reduce 端的 Shuffle 阶段。
当一个 Reduce 任务启动后,它知道自己需要处理哪个分区的数据(例如,分区 i
)。于是,它会向 ApplicationMaster
查询所有已完成的 Map 任务的地址,然后通过 HTTP 协议,从每一个 Map 任务所在的 NodeManager
上拉取(fetch/copy)属于分区 i
的那部分数据。
这个过程可以总结为:
- Reduce 任务启动: Reducer 知道自己的分区号。
- 查询 Map 输出位置: Reducer 向 ApplicationMaster 询问所有 Map 任务的输出位置。
- HTTP 拉取: Reducer 启动多个“复制线程”(copier threads),并行地通过 HTTP GET 请求从各个 Map 任务所在的节点上下载属于自己分区的数据块。
- 内存/磁盘合并: 拉取来的数据块首先在 Reducer 的内存中进行合并和排序,如果数据量太大,也会溢写到 Reducer 所在节点的本地磁盘,最终合并成一个有序的数据集,供
reduce()
方法使用。
MapReduceTutorial.md
在描述 Reduce 端的 Shuffle 阶段时提到了这一点:
Input to the Reducer is the sorted output of the mappers. In this phase the framework fetches the relevant partition of the output of all the mappers, via HTTP.
##### Shuffle
Input to the `Reducer` is the sorted output of the mappers. In this phase the framework fetches the relevant partition of the output of all the mappers, via HTTP.
总结
- 联系: Mapper 和 Reducer 通过中间键值对联系起来。
- Mapper 发送目标: Mapper 的输出目标由
Partitioner
决定,数据被写入本地磁盘的不同分区中。 - 网络表示: 网络主要用于 Reduce 任务通过 HTTP 协议主动拉取各个 Mapper 节点上属于自己分区的数据。这是一个“拉”(Pull)模型,而不是“推”(Push)模型。
这种设计将计算(Map)和网络传输(Reduce Shuffle)解耦,并通过本地磁盘作为缓冲,提高了整个系统的鲁棒性和效率。
Mapper
Mapper
是 MapReduce 编程模型中至关重要的一环,它负责数据处理的第一个阶段,即“Map(映射)”阶段。它的核心职责是将输入的数据集(通常是原始数据)转换成一系列中间的键值对(Key-Value pairs),为后续的“Reduce(规约)”阶段做准备。
下面我们从用法、核心方法、执行流程以及与框架的交互等几个方面来深入分析。
Mapper 的基本用法
正如其 Javadoc 注释和类定义所示,开发者通常不直接实例化 Mapper
类,而是通过继承它来创建自己的 Mapper 实现。
// ... existing code ...
@InterfaceAudience.Public
@InterfaceStability.Stable
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
// ... existing code ...
Mapper
是一个泛型类,它有四个泛型参数,分别定义了输入和输出的键值对类型:
KEYIN
: 输入键(Key)的数据类型。例如,LongWritable
,通常表示文件中一行的偏移量。VALUEIN
: 输入值(Value)的数据类型。例如,Text
,通常表示文件中的一行文本。KEYOUT
: 中间输出键的数据类型。由你的业务逻辑决定,例如在单词计数的例子中是Text
(单词)。VALUEOUT
: 中间输出值的数据类型。同样由业务逻辑决定,例如在单词计数的例子中是IntWritable
(计数值 1)。
开发者最主要的工作就是重写 Mapper
中的一个或多个核心方法来实现具体的业务逻辑:
setup()
: 可选。在任务开始前执行一次,用于初始化。map()
: 必须。 对输入的每个键值对执行一次,是 Mapper 的核心逻辑所在。cleanup()
: 可选。在任务结束后执行一次,用于清理资源。
代码注释中提供了一个经典的“单词计数”(Word Count)示例,非常清晰地展示了如何使用 Mapper
:
public class TokenCounterMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one); // 输出 <单词, 1>
}
}
}
在这个例子中:
TokenCounterMapper
继承了Mapper<Object, Text, Text, IntWritable>
。- 输入是
<Object, Text>
,即行的偏移量和行的内容。 - 输出是
<Text, IntWritable>
,即单词和数字 1。 map
方法将一行文本 (value
) 切分成多个单词,然后对每个单词,通过context.write()
方法输出一个<单词, 1>
的键值对。
核心方法与执行流程剖析
Mapper
的执行由其生命周期方法 run()
控制。这个方法定义了一个标准的执行流程模板。
// ... existing code ...
/**
* Called once at the beginning of the task.
*/
protected void setup(Context context
) throws IOException, InterruptedException {
// NOTHING
}
/**
* Called once for each key/value pair in the input split. Most applications
* should override this, but the default is the identity function.
*/
@SuppressWarnings("unchecked")
protected void map(KEYIN key, VALUEIN value,
Context context) throws IOException, InterruptedException {
context.write((KEYOUT) key, (VALUEOUT) value);
}
/**
* Called once at the end of the task.
*/
protected void cleanup(Context context
) throws IOException, InterruptedException {
// NOTHING
}
/**
* Expert users can override this method for more complete control over the
* execution of the Mapper.
* @param context
* @throws IOException
*/
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
cleanup(context);
}
}
}
setup(Context context)
:- 调用时机: 在
run
方法中,map
循环开始之前被调用一次。 - 作用: 执行初始化工作。例如,在
TokenCounterMapper
示例中,可以在setup
方法里初始化one
和word
对象,避免在map
方法中重复创建,提高性能。
- 调用时机: 在
map(KEYIN key, VALUEIN value, Context context)
:- 调用时机: 在
run
方法的while
循环中,每当context.nextKeyValue()
返回true
时被调用。框架会为输入分片(InputSplit)中的每一条记录调用一次此方法。 - 作用: 这是 Mapper 的心脏,用于实现核心的转换逻辑。
- 默认实现:
Mapper
提供了一个默认的map
实现:context.write((KEYOUT) key, (VALUEOUT) value);
。这是一个“恒等映射”,即将输入原封不动地作为输出。这在某些场景下很有用,比如当 MapReduce 作业只用于排序或数据格式转换时。
- 调用时机: 在
cleanup(Context context)
:- 调用时机: 在
run
方法的finally
块中,确保在任务(无论是正常完成还是异常退出)的最后阶段被调用一次。 - 作用: 执行清理工作,如关闭在
setup
中打开的文件句柄或网络连接,确保资源被正确释放。
- 调用时机: 在
run(Context context)
:- 作用: 这是驱动整个 Mapper 任务执行的模板方法。它定义了 Mapper 的标准生命周期:
setup -> map (循环) -> cleanup
。 - 执行流程:
- 调用
setup()
。 - 进入
try
块,循环调用context.nextKeyValue()
来读取新的键值对。 - 如果读取成功,则调用
map()
方法处理该键值对。 - 循环结束后(或发生异常时),
finally
块保证cleanup()
方法一定会被执行。
- 调用
- 高级定制: 如注释所说,高级用户可以重写
run
方法来实现更复杂的逻辑,例如启动多个线程来并行处理map
任务,以充分利用多核 CPU 资源。
- 作用: 这是驱动整个 Mapper 任务执行的模板方法。它定义了 Mapper 的标准生命周期:
Mapper.Context
的作用
Context
对象是 Mapper
与 Hadoop 框架之间沟通的桥梁。它是一个在 Mapper
类内部定义的抽象类,框架会为其提供一个具体的实现。Context
为 Mapper
提供了执行所需的所有上下文信息和功能。
// ... existing code ...
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
/**
* The <code>Context</code> passed on to the {@link Mapper} implementations.
*/
public abstract class Context
implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
}
// ... existing code ...
它的主要功能包括:
- 获取输入:
context.getCurrentKey()
和context.getCurrentValue()
。 - 迭代控制:
context.nextKeyValue()
,用于在run
方法中驱动循环。 - 输出结果:
context.write(KEYOUT key, VALUEOUT value)
,将中间键值对写入缓冲区。 - 获取配置:
context.getConfiguration()
,可以访问作业的Configuration
对象,读取配置信息。 - 报告进度和状态:
context.setStatus()
和context.progress()
。 - 使用计数器:
context.getCounter()
,用于统计自定义的指标,方便调试和监控。
Mapper 与框架的交互
一个 Mapper
实例并非独立运行,而是作为整个 MapReduce 作业流的一部分,与框架的其他组件紧密协作:
- 输入: 框架通过
InputFormat
读取数据源,将其切分为多个InputSplit
。每个InputSplit
会被分配给一个 Map Task 来处理。 - 分区 (Partitioning):
Mapper
输出的中间键值对会经过Partitioner
。Partitioner
根据KEYOUT
决定这条记录应该发送给哪个Reducer
。默认的分区器是HashPartitioner
,它通过计算 key 的哈希值来分配。 - 排序 (Sorting): 在写入磁盘和传给 Reducer 之前,框架会对
Mapper
的输出在缓冲区内按KEYOUT
进行排序。 - 合并 (Combining): 用户可以指定一个
Combiner
。它在Mapper
端对排序后的输出进行一次本地的“预聚合”(mini-reduce)。这可以极大地减少从 Mapper 传输到 Reducer 的数据量,是 MapReduce 中一个非常重要的性能优化点。 - 零 Reducer: 如果作业设置的 Reducer 数量为 0,那么
Mapper
的输出将不会经过分区、排序和聚合,而是直接由OutputFormat
写入到最终的输出文件中。这种作业被称为 "Map-Only Job"。
Reducer
与 Mapper
类似,开发者通过继承 Reducer
抽象类并重写其方法来定义自己的业务逻辑。
// ... existing code ...
@Checkpointable
@InterfaceAudience.Public
@InterfaceStability.Stable
public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
// ... existing code ...
Reducer
类同样有四个泛型参数,定义了其输入和输出的键值类型:
KEYIN
: 输入键的类型。这个类型必须与Mapper
输出的键类型KEYOUT
一致。VALUEIN
: 输入值的类型。这个类型必须与Mapper
输出的值类型VALUEOUT
一致。KEYOUT
: 最终输出结果的键类型。VALUEOUT
: 最终输出结果的值类型。
Javadoc 中提供了一个非常经典的 IntSumReducer
示例,用于对 Mapper
产生的数字进行求和。这在单词计数的场景中非常常见。
// ... existing code ...
* <p>Example:</p>
* <p><blockquote><pre>
* public class IntSumReducer<Key> extends Reducer<Key,IntWritable,
* Key,IntWritable>; {
* private IntWritable result = new IntWritable();
*
* public void reduce(Key key, Iterable<IntWritable> values,
* Context context) throws IOException, InterruptedException {
* int sum = 0;
* for (IntWritable val : values) {
* sum += val.get();
* }
* result.set(sum);
* context.write(key, result);
* }
* }
* </pre></blockquote>
// ... existing code ...
在这个例子中:
IntSumReducer
继承了Reducer
。- 输入是
<Key, IntWritable>
,例如<"hello", 1>
。 - 输出是
<Key, IntWritable>
,例如<"hello", 100>
。 reduce
方法接收一个键(如 "hello")和与该键关联的所有值的集合(一个包含很多1
的Iterable
)。它遍历这个集合,将所有值累加起来,最后通过context.write()
输出最终的键和总和。
Reducer 的核心阶段
Reducer
的工作过程比 Mapper
要复杂一些,其 Javadoc 中明确地将其划分为三个主要阶段:Shuffle、Sort 和 Reduce。
a. Shuffle (混洗)
- 作用: 这是 Reduce 任务的第一个阶段。在此阶段,
Reducer
任务通过网络(HTTP)从所有已完成的Mapper
任务中拉取(copy)属于自己的那部分中间输出数据。 - 分区 (Partitioning):
Mapper
的输出在发送前会经过Partitioner
。Partitioner
会根据 key 计算出一个分区号,确保所有相同的 key 会被发送到同一个Reducer
任务。这就是为什么Reducer
能收到一个 key 对应的所有 values。
b. Sort (排序)
- 作用: 当
Reducer
任务拉取数据时,框架会在后台对这些数据进行归并排序(merge sort)。这个排序是根据键 (KEYIN
) 来进行的。 - 分组 (Grouping): 排序的最终目的是为了分组。排序完成后,所有具有相同 key 的 value 自然地聚集在一起,形成一个
<key, (list of values)>
的结构,这正是reduce
方法的输入形式。 - Shuffle 和 Sort 的并发: Javadoc 中提到,这两个阶段是同时进行的。也就是说,框架一边从 Mapper 拉取数据,一边就在内存和磁盘上进行归并排序,以提高效率。
c. Reduce (规约)
- 作用: 这是
Reducer
的核心阶段,也是用户定义业务逻辑的地方。 - 调用: 当 Shuffle 和 Sort 阶段完成后,框架会开始遍历排好序的键。对于每一个唯一的键及其关联的值列表,框架会调用一次用户实现的
reduce
方法。 - 输出: 在
reduce
方法中,用户通过Context.write(Object, Object)
将最终结果写入到输出文件系统(如 HDFS)。注意:Reducer
的输出是不会被再次排序的。
核心方法与执行流程
Reducer
的生命周期由其 run
方法控制,这个方法定义了一个标准的执行模板。
// ... existing code ...
/**
* This method is called once for each key. Most applications will define
* their reduce class by overriding this method. The default implementation
* is an identity function.
*/
@SuppressWarnings("unchecked")
protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
) throws IOException, InterruptedException {
for(VALUEIN value: values) {
context.write((KEYOUT) key, (VALUEOUT) value);
}
}
// ... existing code ...
/**
* Advanced application writers can use the
* {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to
* control how the reduce task works.
*/
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKey()) {
reduce(context.getCurrentKey(), context.getValues(), context);
// If a back up store is used, reset it
Iterator<VALUEIN> iter = context.getValues().iterator();
if(iter instanceof ReduceContext.ValueIterator) {
((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();
}
}
} finally {
cleanup(context);
}
}
}
setup(Context context)
: 在任务开始时调用一次,用于初始化操作。reduce(KEYIN key, Iterable<VALUEIN> values, Context context)
:- 调用时机: 在
run
方法的while
循环中,每当context.nextKey()
成功移动到下一个唯一的 key 时,该方法被调用一次。 - 参数:
key
是当前处理的键,values
是一个迭代器,包含了所有与该key
关联的值。 - 默认实现: 默认的
reduce
方法是一个恒等函数,它会遍历values
迭代器,并将每个<key, value>
对原样输出。
- 调用时机: 在
cleanup(Context context)
: 在任务结束时调用一次,用于资源清理。run(Context context)
:- 执行流程:
- 调用
setup()
。 - 进入
try
块,循环调用context.nextKey()
来遍历所有唯一的、排好序的键。 - 如果
nextKey()
返回true
,则调用reduce()
方法,并传入当前的键context.getCurrentKey()
和对应的值迭代器context.getValues()
。 - 循环结束后(或发生异常),
finally
块保证cleanup()
方法一定会被执行。
- 调用
- 执行流程:
Reducer.Context
的作用
Context
对象是 Reducer
与 Hadoop 框架沟通的桥梁,它提供了 Reducer
运行所需的所有上下文信息和功能。
- 迭代控制:
context.nextKey()
用于驱动run
方法的循环,移动到下一个键。 - 获取输入:
context.getCurrentKey()
获取当前键,context.getValues()
获取与当前键关联的值的Iterable
。 - 输出结果:
context.write(KEYOUT key, VALUEOUT value)
将最终结果写入输出。 - 其他功能: 与
Mapper.Context
类似,它也提供了获取配置 (getConfiguration
)、获取计数器 (getCounter
)、报告状态 (setStatus
) 等功能。
二次排序 (Secondary Sort)
Javadoc 中还提到了一个高级用法:二次排序。 当我们需要对同一个 key 对应的 values 列表也进行排序时,就需要用到二次排序。 实现方式是:
- 自定义组合键: 创建一个包含主键和次键的自定义
WritableComparable
类。 - 自定义排序比较器:
Job.setSortComparatorClass()
。让框架在排序阶段按照整个组合键(主键+次键)进行排序。 - 自定义分组比较器:
Job.setGroupingComparatorClass()
。这个比较器告诉框架,哪些键应该被分到同一个reduce
调用中。它只比较组合键中的主键部分。
这样,框架在排序时会考虑次键,但在调用 reduce
方法时,会将主键相同的所有记录(尽管次键不同)都分到同一次 reduce
调用中,而此时 values
迭代器中的值就是按照次键排好序的了。
Partitioner
Partitioner
(分区器)是 MapReduce 流程中一个非常关键的组件。它的核心职责是在 Map 阶段结束、Reduce 阶段开始之前,决定 Mapper 输出的每一个键值对(key-value pair)应该被发送到哪一个 Reducer 任务去处理。
Partitioner
是一个抽象类,我们通常必须通过继承它来创建自定义的分区逻辑。
我们来看一下它的定义:
// ... existing code ...
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class Partitioner<KEY, VALUE> {
/**
* Get the partition number for a given key (hence record) given the total
* number of partitions i.e. number of reduce-tasks for the job.
*
* <p>Typically a hash function on a all or a subset of the key.</p>
*
* @param key the key to be partioned.
* @param value the entry value.
* @param numPartitions the total number of partitions.
* @return the partition number for the <code>key</code>.
*/
public abstract int getPartition(KEY key, VALUE value, int numPartitions);
}
从代码中可以看出:
- 泛型: 它有两个泛型参数
<KEY, VALUE>
,这对应了Mapper
输出的键值类型。 - 核心抽象方法: 它只定义了一个核心的抽象方法
getPartition
。任何子类都必须实现这个方法。
getPartition
方法详解
public abstract int getPartition(KEY key, VALUE value, int numPartitions);
key
:Mapper
输出的键。value
:Mapper
输出的值。numPartitions
: 分区的总数,这个值等于你为这个 Job 设置的Reducer
任务的数量 (job.setNumReduceTasks(int)
)。- 返回值: 一个整数,范围必须是
0
到numPartitions - 1
。这个返回值就是分区号,它直接决定了这条记录会被发送到哪个 Reducer(例如,返回0
就发送给第一个 Reducer,返回1
就发送给第二个,以此类推)。
注意: 正如 Javadoc 中提到的,只有当你设置的 Reducer 数量大于 1 时,Partitioner
才会被创建和使用。如果只有一个 Reducer 或者没有 Reducer,分区是没有意义的。
默认的 Partitioner: HashPartitioner
在你的工程中,如果你不通过 job.setPartitionerClass(...)
来指定一个自定义的 Partitioner
,Hadoop 会使用默认的 HashPartitioner
。从MapReduceTutorial.md
中也可以看到这一点。
HashPartitioner
的逻辑非常简单:
- 获取
key
的hashCode()
。 - 用一个很大的正数(
Integer.MAX_VALUE
)进行按位与操作,确保结果为正。 - 用
numPartitions
(Reducer 的数量) 取模。
// HashPartitioner 的核心逻辑伪代码
return (key.hashCode() & Integer.MAX_VALUE) % numPartitions;
这种默认方式在大多数情况下能很好地工作,它可以相对均匀地将键分散到不同的 Reducer 中,实现负载均衡。
为什么要自定义 Partitioner?
既然有默认的实现,为什么我们还需要自定义呢?主要有以下几个原因:
数据倾斜 (Data Skew): 默认的
hashCode
方法可能无法均匀地分布你的特定数据集。例如,某些 key 的哈希值可能恰好都聚集在少数几个结果上,导致少数 Reducer 任务过重,而其他 Reducer 很空闲,拖慢整个作业的执行效率。通过自定义分区逻辑,可以根据数据特点进行更均匀的分配。业务逻辑要求: 有些业务场景要求具有相同特征的 key 被发送到同一个 Reducer。例如,假设你正在处理订单数据,你可能希望所有来自同一个省份的订单都由同一个 Reducer 处理。这时,你可以自定义一个
Partitioner
,它不根据整个 key(可能是订单 ID)来分区,而是根据 key 对象中的“省份”字段来分区。
如何实现自定义 Partitioner
TeraSort.java
提供了一个很好的例子:
// ... existing code ...
/**
* A total order partitioner that assigns keys based on their first
* PREFIX_LENGTH bytes, assuming a flat distribution.
*/
public static class SimplePartitioner extends Partitioner<Text, Text>
implements Configurable {
int prefixesPerReduce;
private static final int PREFIX_LENGTH = 3;
private Configuration conf = null;
public void setConf(Configuration conf) {
this.conf = conf;
prefixesPerReduce = (int) Math.ceil((1 << (8 * PREFIX_LENGTH)) /
(float) conf.getInt(MRJobConfig.NUM_REDUCES, 1));
}
// ... existing code ...
这个 SimplePartitioner
继承了 Partitioner
,并且还实现了 Configurable
接口。实现 Configurable
接口是为了让 Partitioner 能够获取到 Job 的 Configuration
对象,从而读取一些配置信息(比如这里的 NUM_REDUCES
)。
高级 Partitioner: TotalOrderPartitioner
还存在一个更高级的实现:TotalOrderPartitioner
。它用于实现全局排序。普通的 MapReduce 作业只能保证 Reducer 的输出在各自的文件内部是有序的,但 Reducer 0 的输出和 Reducer 1 的输出之间没有顺序关系。
TotalOrderPartitioner
通过读取一个预先生成的分区文件(包含了 key 的分割点),来保证所有被发送到 Reducer i
的 key 都小于被发送到 Reducer i+1
的 key。这样,当所有 Reducer 完成后,将它们的输出文件按顺序拼接起来,就得到了一个全局有序的大文件。
总结
Partitioner
是连接 Map 和 Reduce 阶段的桥梁,负责数据分发。- 它是一个抽象类,你必须继承它并实现
getPartition
方法来创建自定义分区器。 - Hadoop 默认使用
HashPartitioner
,它基于 key 的哈希值进行分区,适用于大多数场景。 - 当需要解决数据倾斜或根据业务逻辑对数据进行分组时,就需要自定义
Partitioner
。 - 更高级的
TotalOrderPartitioner
可以用来实现输出结果的全局排序。