目录
一、Yarn
1、概念
Hadoop三大件:HDFS、MapReduce、Yarn
Yarn其实就是一个类似于操作系统一样的东西。
Yarn是MapReduce运行的环境,Yarn可以管理程序运行所需要的东西(内存,CPU,带宽等资源)
Yarn诞生于Hadoop,但是现在已经脱离了Hadoop,变成了一个独立的软件,系统。
2、Yarn的组成部分
上面那个图是用在线作图工具做的:
Visio 、MindManager
ProcessOn思维导图流程图-在线画思维导图流程图_在线作图实时协作
我们的Yarn,其实有两大部分组成:
1、ResourceManager (BOSS): 1个
他用来管理整个的Yarn平台,里面有一个资源调度器。
2、NodeManager (各个机器上的主管) 多个
听从我们的ResouceManager的调遣。是每一台电脑的管家。
3、Container(容器)
每一个NodeManager中,有一个或者多个这样的容器。是包含了一些资源的封装(CPU,内存,硬盘等),类似于我们熟悉的虚拟机。
4、AppMaster (项目经理)
每一个MapReduce任务启动提交后,会有一个对应的AppMaster。这个主要作用是负责整个job任务的运行。
3、Yarn如何进行配置和搭建
/opt/installs/hadoop/etc/hadoop 文件夹下:
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>
指定mapreduce运行平台为yarn
<!--指定resourceManager启动的主机为第一台服务器-->
<property>
<name>yarn.resourcemanager.hostname</name>
<value>bigdata01</value>
</property>
<!--配置yarn的shuffle服务-->
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
检查hadoop-env.sh 中是否配置了权限:
export YARN_RESOURCEMANAGER_USER=root
export YARN_NODEMANAGER_USER=root
继续配置:为了防止报AppMaster的错误,需要如下配置
<property>
<name>yarn.application.classpath</name>
<value>/opt/installs/hadoop/etc/hadoop:/opt/installs/hadoop/share/hadoop/common/lib/*:/opt/installs/hadoop/share/hadoop/common/*:/opt/installs/hadoop/share/hadoop/hdfs:/opt/installs/hadoop/share/hadoop/hdfs/lib/*:/opt/installs/hadoop/share/hadoop/hdfs/*:/opt/installs/hadoop/share/hadoop/mapreduce/*:/opt/installs/hadoop/share/hadoop/yarn:/opt/installs/hadoop/share/hadoop/yarn/lib/*:/opt/installs/hadoop/share/hadoop/yarn/*</value>
</property>
分发mapred-site.xml & yarn-site.xml 到另外两台电脑上。
cd /opt/installs/hadoop/etc/hadoop/
xsync.sh mapred-site.xml yarn-site.xml
启动和停止yarn平台:
启动: start-yarn.sh
停止: stop-yarn.sh
也可以使用web访问一下:
http://192.168.233.128:8088
4、关于启动和停止的命令
服务 |
命令 |
启动,停止hdfs |
start-dfs.sh stop-dfs.sh |
启动,停止yarn |
start-yarn.sh stop-yarn.sh |
停止和启动整个集群(包含hdfs,以及Yarn) |
start-all.sh stop-all.sh start-all.sh ==(start-dfs.sh start-yarn.sh ) stop-all.sh == (stop-dfs.sh stop-yarn.sh ) |
只启动resourcemanager 停止resourcemanager |
yarn --daemon start resourcemanger yarn --daemon stop resourcemanger |
只启动nodemanager 停止nodemanager |
yarn --daemon start nodemanger yarn --daemon stop nodemanger |
hdfs 中单独启动的命令:
namenode : hdfs --daemon start namenode
datanode : hdfs --daemon start datanode [ 这个命令只能启动一个 datanode]
5、使用yarn平台进行wordCount计算
将一个wc.txt 上传至hdfs平台,然后通过yarn平台进行计算
hadoop spark hello hadoop
spark hello flink world
scala python python scala
运行hadoop自带的workCount:
hadoop jar /opt/installs/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.3.1.jar wordcount /home/wc.txt /home/output
我们还可以在Yarn平台上查看运行的情况:
以上这个案例使用的就是yarn运行,数据必须在hdfs上,yarn也必须启动。代码虽然在本地,但是也会上传到hdfs的。
时间同步:
xcall.sh ntpdate time1.aliyun.com
二、MapReduce任务有三种运行开发模式
1、local模式
数据在本地,代码也在本地,使用本机的电脑的资源运行我们的MR
输入和输出路径指的都是本地路径,运行时耗费的资源也是本地资源。
2、local模式2
数据在hdfs上,代码在本地,使用本机的电脑的资源运行我们的MR
System.setProperty("HADOOP_USER_NAME","root");
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS","hdfs://192.168.32.128:9820");
// 使用本地的资源(CPU,内存等), 也可以使用yarn平台跑任务
configuration.set("mapreduce.framework.name","local");
这个里面的输入和输出路径指的是hdfs上的路径。
3、Yarn模式
数据在hdfs上,代码 跑 在yarn上。
System.setProperty("HADOOP_USER_NAME","root");
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS","hdfs://192.168.32.128:9820");
configuration.set("mapreduce.framework.name","yarn");
// 跨平台任务提交打开
configuration.set("mapreduce.app-submission.cross-platform", "true");
案例:使用Yarn运行自己编写的WordCount:
修改代码如下:
package com.bigdata.day12.workcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCountDriver2 {
public static void main(String[] args) throws Exception {
System.setProperty("HADOOP_USER_NAME","root");
Configuration configuration = new Configuration();
// 使用本地的文件系统,而不是hdfs
configuration.set("fs.defaultFS","hdfs://192.168.32.128:9820");
// 使用本地的资源(CPU,内存等), 也可以使用yarn平台跑任务
configuration.set("mapreduce.framework.name","yarn");
//要解决客户端在windows上运行,同时将MapTask和ReduceTask提交到linux集群上运行产生的冲突问题,就要修改下mapReduce的一些默认配置
configuration.set("mapreduce.app-submission.cross-platform", "true");
Job job = Job.getInstance(configuration, "在yarn上运行workCount");
// map任务的设置
// 这句话不要忘记添加
job.setJarByClass(WordCountDriver2.class);
job.setMapperClass(WordCountMapper2.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 指定分区的类是哪一个
job.setPartitionerClass(WordCountPartitioner2.class);
// 还要执行 reduce的数量 因为一个reduce 就会产生一个结果文件
// reduce任务的设置
job.setReducerClass(WordCountReducer2.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 设置要统计的数据的路径,结果输出路径
FileInputFormat.setInputPaths(job,new Path(args[0]));
// ouput文件夹一定不要出现,否则会报错
FileOutputFormat.setOutputPath(job,new Path(args[1]));
// 等待任务执行结束
boolean b = job.waitForCompletion(true);
// 此处是一定要退出虚拟机的
System.exit(b ? 0:-1);
}
}
将程序达成jar包:
将我们的大好的Jar包上传至linux服务器,运行该jar包:
hadoop jar WC.jar com.bigdata.day12.workcount.WordCountDriver2 /input /oottpp2
jar包的名字最好短一点
com.bigdata.day12.workcount.WordCountDriver2 这个是Main方法所在的类的全路径
/input hdfs上文件的路径
/oottpp2 hdfs上数据的统计的输出路径
如果出现web端查看错误:
记得dfs-site.xml中添加如下配置,重启集群:
<property>
<name>dfs.webhdfs.enabled</name>
<value>true</value>
</property>
接着在本地windows系统的hosts文件中,添加映射
配置浏览器所在系统的 hosts 文件
windows:
在 C:\Windows\System32\drivers\etc\hosts 末尾增加内容(Hadoop集群中各节点及主机名的映射)
三、Main方法如何传参
以问题驱动学习:
假如你到公司中,如何自己学习?你们公司正在使用的技术是什么就学什么,特别大的技术,不要学。
第一个途径:
第二个途径:
package com.bigdata;
public class TestMain {
/**
* args 是一个字符串数组,谁给它可以赋值呢?
* @param args
*/
public static void main(String[] args) {
System.out.println("参数打印开始");
for (String str:args) {
System.out.println(str);
}
System.out.println("参数打印完毕");
}
}
思考:假如你的这个代码不在idea中如何传参?
任何java代码都可以打成jar包,jar包中的文件如何运行?
java -jar xxxx.jar 某个类的全路径 如果这个类有参数,直接跟在后面
java -jar hello.jar com.bigdata.TestMain 10 20 30
四、练习
1、数据清洗
# 数据清洗概念
通常情况下,大数据平台获得原始数据文件中,存在大量无效数据和缺失数据,需要再第一时间,对数据进行清洗,获得符合后续处理需求的数据内容和格式
# 需求
对手机流量原始数据,将其中的手机号为"null"和不完整的数据去除
数据格式:
# 源数据
id 手机号 手机mac ip地址 上传 下载 HTTP状态码
1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 24 27 2481 24681 200
1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200
1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200
1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200
1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 24 27 2481 24681 200
1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200
1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200
1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200
1363157995052 13826544109 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0
1363157995052 null 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 240 0 200
1363157991076 13926435659 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 null null null
# 期望结果【删除其中手机号不符合要求,上传流量缺失和下载流量缺失的数据,并仅保留手机号 上传流量 下载流量。】
13726230503 2481 24681
13826544101 264 0
13926435656 132 1512
13926251106 240 0
13726230503 2481 24681
13826544101 264 0
13926435656 132 1512
13926251106 240 0
编码提示:
# 重点:
MapReduce整个流程中可以取消reduce阶段的程序执行,map输出的会直接作为结果输出到HDFS文件中。
# 编码实现
1. 删除job中有关reducer的相关设置:reducer类和输出的key value类型。
2. 手动设置reducetask的个数为0
job.setNumReduceTasks(0);//取消reducer
package com.bigdata.etl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
class ETLMapper extends Mapper<LongWritable,Text, Text, NullWritable>{
Text text = null;
@Override
protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
text = new Text();
}
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
// 1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 24 27 2481 24681 200
// 跳过第一行
long num = key.get();
if(num == 0){
return;
}
String line = value.toString();
String[] arr = line.split("\\s+");
// 长度等于 9 手机号码要正确
// 删除其中手机号不符合要求,上传流量缺失和下载流量缺失的数据,并仅保留手机号 上传流量 下载流量
String phone = arr[1]; // null "null"
if(arr.length == 9 && phone.matches("1[3-9][0-9]{9}") ){
String upFlow = arr[arr.length-3];
String downFlow = arr[arr.length-2];
if(upFlow.equals("null") || downFlow.equals("null")){
return ;
}
String msg = phone +"\t" + upFlow + "\t" + downFlow;
text.set(msg);
context.write(text,NullWritable.get());
}
}
}
public class CleanDriver {
public static void main(String[] args) throws Exception{
// 以下都是固定写法
Configuration conf = new Configuration();
// 使用本地的文件系统,而不是hdfs
conf.set("fs.defaultFS","file:///");
// 使用本地的资源(CPU,内存等), 也可以使用yarn平台跑任务
conf.set("mapreduce.framework.name","local");
Job job = Job.getInstance(conf, "数据清洗");
job.setMapperClass(ETLMapper.class);
// bigdata 1 map类的输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
// 设置reduce的任务数为0
job.setNumReduceTasks(0);
FileInputFormat.setInputPaths(job,new Path("./clean/input"));
// 每次运行的时候,假如输出文件夹存在,就会报错,严重影响体验
// 是否可以判断一下,假如存在,直接删除
Path path = new Path("./clean/output");
FileSystem fileSystem = FileSystem.get(conf);
if(fileSystem.exists(path)){
fileSystem.delete(path,true);
}
FileOutputFormat.setOutputPath(job,path);
boolean b = job.waitForCompletion(true);
System.exit(b?0:-1);
}
}
2、自定义分区案例
案例之·学生成绩统计分析
# 将学生成绩,按照各科成绩降序排序,各个科目成绩单独输出。
数据如下:
# 自定义partition
将下面数据分区处理:
人名 科目 成绩
张三 语文 10
李四 数学 30
王五 语文 20
赵6 英语 40
张三 数学 50
李四 语文 10
张三 英语 70
李四 英语 80
王五 英语 45
王五 数学 10
赵6 数学 10
赵6 语文 100
package com.bigdata.stu;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class StuWritable implements WritableComparable<StuWritable> {
private String name;
private String subject;
private int score;
public StuWritable() {
}
public StuWritable(String name, String subject, int score) {
this.name = name;
this.subject = subject;
this.score = score;
}
@Override
public String toString() {
return "StuWritable{" +
"name='" + name + '\'' +
", subject='" + subject + '\'' +
", score=" + score +
'}';
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getSubject() {
return subject;
}
public void setSubject(String subject) {
this.subject = subject;
}
public int getScore() {
return score;
}
public void setScore(int score) {
this.score = score;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(name);
out.writeUTF(subject);
out.writeInt(score);
}
@Override
public void readFields(DataInput in) throws IOException {
name = in.readUTF();
subject = in.readUTF();
score = in.readInt();
}
@Override
public int compareTo(StuWritable o) {
return o.score - this.score;
}
}
package com.bigdata.stu;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class StuPartitioner extends Partitioner<Text, StuWritable> {
@Override
public int getPartition(Text text, StuWritable stuWritable, int i) {
String subject = text.toString();
if(subject.equals("语文")){
return 0;
}else if(subject.equals("数学")){
return 1;
}
return 2;
}
}
package com.bigdata.stu;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
class StuMapper extends Mapper<LongWritable, Text, Text, StuWritable> {
Text text = null;
StuWritable stu = new StuWritable();
@Override
protected void setup(Mapper<LongWritable, Text, Text, StuWritable>.Context context) throws IOException, InterruptedException {
text = new Text();
}
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, StuWritable>.Context context) throws IOException, InterruptedException {
// 跳过第一行的标题行
if(key.get() == 0){
return;
}
String line = value.toString();
String[] arr = line.split("\\s+");
String subject = arr[1];
text.set(subject);
stu.setName(arr[0]);
stu.setSubject(subject);
stu.setScore(Integer.valueOf(arr[2]));
context.write(text,stu);
}
}
class StuReducer extends Reducer<Text, StuWritable, NullWritable, Text> {
@Override
protected void reduce(Text key, Iterable<StuWritable> values, Reducer<Text, StuWritable, NullWritable, Text>.Context context) throws IOException, InterruptedException {
ArrayList<StuWritable> stuList = new ArrayList<>();
for (StuWritable stu : values) {
StuWritable newStu = new StuWritable(stu.getName(),stu.getSubject(),stu.getScore());
stuList.add(newStu);
}
// 你能够调用这个方法进行排序的原因是:list 中的元素可以自行排序,因为它实现了Comparable接口
Collections.sort(stuList);
for (StuWritable stuWritable : stuList) {
String msg = stuWritable.getName()+"\t"+stuWritable.getSubject() +"\t" + stuWritable.getScore();
context.write(NullWritable.get(),new Text(msg));
}
}
}
public class StuDriver {
public static void main(String[] args) throws Exception {
// 以下都是固定写法
Configuration conf = new Configuration();
// 使用本地的文件系统,而不是hdfs
conf.set("fs.defaultFS","file:///");
// 使用本地的资源(CPU,内存等), 也可以使用yarn平台跑任务
conf.set("mapreduce.framework.name","local");
Job job = Job.getInstance(conf, "成绩排名");
job.setMapperClass(StuMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(StuWritable.class);
job.setPartitionerClass(StuPartitioner.class);
job.setReducerClass(StuReducer.class);
// reduce类的输出类型 bigdata 10
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(3);
FileInputFormat.setInputPaths(job,new Path("./student/input"));
// 每次运行的时候,假如输出文件夹存在,就会报错,严重影响体验
// 是否可以判断一下,假如存在,直接删除
Path path = new Path("./student/output");
FileSystem fileSystem = FileSystem.get(conf);
if(fileSystem.exists(path)){
fileSystem.delete(path,true);
}
FileOutputFormat.setOutputPath(job,path);
boolean b = job.waitForCompletion(true);
System.exit(b?0:-1);
}
}
补充:Comparable 和 Comparator 两个接口的区别:
第一个:Comparator 案例:
package com.bigdata.java;
public class Student {
private String name;
private int age;
@Override
public String toString() {
return "Student{" +
"name='" + name + '\'' +
", age=" + age +
'}';
}
public Student() {
}
public Student(String name, int age) {
this.name = name;
this.age = age;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}
package com.bigdata.java;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
public class TestMain {
public static void main(String[] args) {
List<Student> list = new ArrayList<Student>();
list.add(new Student("zs",19));
list.add(new Student("ls",17));
list.add(new Student("ww",21));
list.add(new Student("zl",38));
list.add(new Student("qq",16));
list.add(new Student("z8",30));
// 将list集合中的元素按照年龄排序
Collections.sort(list, new Comparator<Student>() {
@Override
public int compare(Student o1, Student o2) {
return o1.getAge() - o2.getAge();
}
});
System.out.println(list);
}
}
第二个 Comparable 接口
package com.bigdata.java;
public class Student2 implements Comparable<Student2>{
private String name;
private int age;
@Override
public String toString() {
return "Student{" +
"name='" + name + '\'' +
", age=" + age +
'}';
}
public Student2() {
}
public Student2(String name, int age) {
this.name = name;
this.age = age;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
@Override
public int compareTo(Student2 o) {
return this.age - o.age;
}
}
package com.bigdata.java;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class TestMain2 {
public static void main(String[] args) {
List<Student2> list = new ArrayList<Student2>();
list.add(new Student2("zs",19));
list.add(new Student2("ls",17));
list.add(new Student2("ww",21));
list.add(new Student2("zl",38));
list.add(new Student2("qq",16));
list.add(new Student2("z8",30));
// 将list集合中的元素按照年龄排序
Collections.sort(list);
System.out.println(list);
}
}
3、排序
主播数据按照观众人数降序排序,如果观众人数相同,按照直播时长降序
表 live
select * from live order by people_num desc,time_num desc;
# 案例数据
用户id 观众人数 直播时长
团团 300 1000
小黑 200 2000
哦吼 400 7000
卢本伟 100 6000
八戒 250 5000
悟空 100 4000
唐僧 100 3000
# 期望结果
哦吼 400 7000
团团 300 1000
八戒 250 5000
小黑 200 2000
卢本伟 100 6000
悟空 100 4000
唐僧 100 3000
关键代码:
package com.bigdata.sort;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
class SortMapper extends Mapper<LongWritable, Text, NullWritable,Live > {
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, NullWritable,Live>.Context context) throws IOException, InterruptedException {
String line = value.toString();
if(key.get() == 0){
return ;
}
String[] arr = line.split("\\s+");
String name = arr[0];
int peopleNum = Integer.parseInt(arr[1]);
int liveTime = Integer.parseInt(arr[2]);
Live live = new Live(name,peopleNum,liveTime);
context.write(NullWritable.get(),live);
}
}
class SortReducer extends Reducer<NullWritable,Live, NullWritable, Text> {
@Override
protected void reduce(NullWritable key, Iterable<Live> values, Reducer<NullWritable,Live, NullWritable, Text>.Context context) throws IOException, InterruptedException {
ArrayList<Live> stuList = new ArrayList<>();
for (Live live : values) {
stuList.add(new Live(live.getName(),live.getPeopleNum(),live.getLiveTime()));
}
Collections.sort(stuList, new Comparator<Live>() {
@Override
public int compare(Live o1, Live o2) {
if(o1.getPeopleNum() == o2.getPeopleNum()){
return o2.getLiveTime() - o1.getLiveTime();
}
return o2.getPeopleNum() - o1.getPeopleNum();
}
});
for (Live live : stuList) {
context.write(NullWritable.get(),new Text(live.getName()+" "+live.getPeopleNum()+" "+live.getLiveTime()));
}
}
}
public class SortDriver {
public static void main(String[] args) throws Exception {
// 以下都是固定写法
Configuration conf = new Configuration();
// 使用本地的文件系统,而不是hdfs
conf.set("fs.defaultFS","file:///");
// 使用本地的资源(CPU,内存等), 也可以使用yarn平台跑任务
conf.set("mapreduce.framework.name","local");
Job job = Job.getInstance(conf, "成绩排名");
job.setMapperClass(SortMapper.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Live.class);
job.setReducerClass(SortReducer.class);
// reduce类的输出类型 bigdata 10
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job,new Path("./sort/input"));
// 每次运行的时候,假如输出文件夹存在,就会报错,严重影响体验
// 是否可以判断一下,假如存在,直接删除
Path path = new Path("./sort/output");
FileSystem fileSystem = FileSystem.get(conf);
if(fileSystem.exists(path)){
fileSystem.delete(path,true);
}
FileOutputFormat.setOutputPath(job,path);
boolean b = job.waitForCompletion(true);
System.exit(b?0:-1);
}
}
package com.bigdata.sort;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class Live implements Writable {
private String name;
private int peopleNum;
private int liveTime;
@Override
public String toString() {
return "Live{" +
"name='" + name + '\'' +
", peopleNum=" + peopleNum +
", liveTime=" + liveTime +
'}';
}
public Live() {
}
public Live(String name, int peopleNum, int liveTime) {
this.name = name;
this.peopleNum = peopleNum;
this.liveTime = liveTime;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getPeopleNum() {
return peopleNum;
}
public void setPeopleNum(int peopleNum) {
this.peopleNum = peopleNum;
}
public int getLiveTime() {
return liveTime;
}
public void setLiveTime(int liveTime) {
this.liveTime = liveTime;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(name);
out.writeInt(peopleNum);
out.writeInt(liveTime);
}
@Override
public void readFields(DataInput in) throws IOException {
name = in.readUTF();
peopleNum = in.readInt();
liveTime = in.readInt();
}
}
第二种写法:让自定义的数据类型当 key 值
package com.bigdata.sort;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
class SortMapper2 extends Mapper<LongWritable, Text, Live2,NullWritable > {
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Live2,NullWritable>.Context context) throws IOException, InterruptedException {
String line = value.toString();
if(key.get() == 0){
return ;
}
String[] arr = line.split("\\s+");
String name = arr[0];
int peopleNum = Integer.parseInt(arr[1]);
int liveTime = Integer.parseInt(arr[2]);
Live2 live = new Live2(name,peopleNum,liveTime);
context.write(live,NullWritable.get());
}
}
public class SortDriver2 {
public static void main(String[] args) throws Exception {
// 以下都是固定写法
Configuration conf = new Configuration();
// 使用本地的文件系统,而不是hdfs
conf.set("fs.defaultFS","file:///");
// 使用本地的资源(CPU,内存等), 也可以使用yarn平台跑任务
conf.set("mapreduce.framework.name","local");
Job job = Job.getInstance(conf, "排序");
job.setMapperClass(SortMapper2.class);
job.setMapOutputKeyClass(Live2.class);
job.setMapOutputValueClass(NullWritable.class);
// 此处有一个坑,不要写这个,可以使用map的key进行排序,否则呢?map中的key值不排序
//job.setNumReduceTasks(0);
FileInputFormat.setInputPaths(job,new Path("./sort/input"));
// 每次运行的时候,假如输出文件夹存在,就会报错,严重影响体验
// 是否可以判断一下,假如存在,直接删除
Path path = new Path("./sort/output");
FileSystem fileSystem = FileSystem.get(conf);
if(fileSystem.exists(path)){
fileSystem.delete(path,true);
}
FileOutputFormat.setOutputPath(job,path);
boolean b = job.waitForCompletion(true);
System.exit(b?0:-1);
}
}