WordCount案例分析
给定一个路径,统计这个路径下所有的文件中的每一个单词的出现次数。
其中,需要我们去实现代码的部分是:map函数和reduce函数。它们各自的作用是:
map函数的入参是kv结构,k是偏移量,v是一行的具体内容。map函数的返回值格式也是kv结构,k是每个单词,v是数字1。
uce函数的入参是kv结构,k是单词,v是集合,每个元素值都是1。reduce函数的返回值格式也是kv结构,k是每个单词,v是汇总之后的数字。
WordCount案例实操-编码实现
准备maven工程,具体要求和之前的一致。具体操作如下:
1.新建一个空白项目
2.配置maven
3.创建三个类。
每个类的具体内容如下。
1.编写Mapper类
核心要点是:
- 继承Mapper类。约定泛型<keyIn,ValueIn,KeyOut,ValueOut>
- 重写map方法(keyIn, ValueIn,Content<KeyOut, Key>)
我们来看下代码。
package com.example.mapreduce;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
// 继承Mapper
public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 获取一行数据,用空格拆分为一个个单词
String[] words = value.toString().split(" ");
// 遍历单词,设置键值对,值为1
for (String word : words) {
context.write(new Text(word), new LongWritable(1));
}
}
}
代码说明:
- LongWritable是固定写法。它表示读取到一行的偏移量。
- LongWritable, Text是hadoop的数据类型。
编写Reducer类
核心要点是:
- 继承Reducer类。约定泛型<keyIn,ValueIn,KeyOut,ValueOut>
- 重写reduce方法(keyIn, Iterable<ValueIn>,Content<KeyOut, Key>)
代码如下:
package com.example.mapreduce;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
// 继承 reducer类
public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
// 对 values中的值进行累加求和
long sum = 0;
for (LongWritable value : values) {
sum += value.get();
}
// 输出结果
context.write(key, new LongWritable(sum));
}
}
请注意Mapper的来源,它是mapreduce.Mapper,而不是mapred.Mapper。后者是hadoop的老版本用法。
编写Driver驱动类
Driver类负责提交job。它的核心代码有7个步骤,属于固定写法。这七个步骤分别如下:
- 获取job对象
- 关联本地Driver类的jar
- 关联map和reduce
- 设置map的输出kv类型
- 设置reduce的输出kv类型
- 设置输入数据和输出结果的地址
- 提交job。
下面我们一起来编写这份代码。
参考代码如下。
package com.example.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordCountDriver {
// mapreduce的Driver
// 提交job
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// 1. 获取配置信息以及获取job对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2. 关联本地的jar
job.setJarByClass(WordCountDriver.class);
// 3. 关联Mapper和Reducer
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 4. 设置Mapper输出的KV类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
// 5. 设置最终输出KV类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// 6. 设置输入和输出路径。请确保wcinput是存在的,并且下面有待统计词频的单词文件。
// output1会自动被创建,如果它已经存在,程序会报错!
FileInputFormat.setInputPaths(job, new Path("D://vm//wcinput"));
FileOutputFormat.setOutputPath(job, new Path("D://vm//output1"));
// 7. 提交job
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
WordCount代码运行
在本地运行代码,在本地查看效果。此时要注意,我们的程序并没有使用集群中的资源,在yarn中看不到运行的任务,我们也没有把结果保存在hdfs中。