Hadoop其六,yarn,MapReduce和main传参

发布于:2024-12-18 ⋅ 阅读:(56) ⋅ 点赞:(0)

目录

一、Yarn

1、概念

2、Yarn的组成部分

3、Yarn如何进行配置和搭建

二、MapReduce任务有三种运行开发模式

1、local模式

2、local模式2

3、Yarn模式

三、Main方法如何传参

四、练习

1、数据清洗

2、自定义分区案例

3、排序


一、Yarn

1、概念

Hadoop三大件:HDFS、MapReduce、Yarn

Yarn其实就是一个类似于操作系统一样的东西。

Yarn是MapReduce运行的环境,Yarn可以管理程序运行所需要的东西(内存,CPU,带宽等资源)

Yarn诞生于Hadoop,但是现在已经脱离了Hadoop,变成了一个独立的软件,系统。

2、Yarn的组成部分

上面那个图是用在线作图工具做的:

Visio 、MindManager


ProcessOn思维导图流程图-在线画思维导图流程图_在线作图实时协作

我们的Yarn,其实有两大部分组成:

1、ResourceManager (BOSS):  1个
 他用来管理整个的Yarn平台,里面有一个资源调度器。
2、NodeManager (各个机器上的主管)  多个
   听从我们的ResouceManager的调遣。是每一台电脑的管家。
3、Container(容器)
   每一个NodeManager中,有一个或者多个这样的容器。是包含了一些资源的封装(CPU,内存,硬盘等),类似于我们熟悉的虚拟机。
4、AppMaster (项目经理)
   每一个MapReduce任务启动提交后,会有一个对应的AppMaster。这个主要作用是负责整个job任务的运行。

3、Yarn如何进行配置和搭建

/opt/installs/hadoop/etc/hadoop 文件夹下:

<configuration>
    <property>
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
    </property>
</configuration>

指定mapreduce运行平台为yarn
<!--指定resourceManager启动的主机为第一台服务器-->
    <property>
        <name>yarn.resourcemanager.hostname</name>
        <value>bigdata01</value>
    </property>


    <!--配置yarn的shuffle服务-->
    <property>
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle</value> 
    </property>

检查hadoop-env.sh 中是否配置了权限:

export YARN_RESOURCEMANAGER_USER=root
export YARN_NODEMANAGER_USER=root

继续配置:为了防止报AppMaster的错误,需要如下配置

<property>
        <name>yarn.application.classpath</name>
        <value>/opt/installs/hadoop/etc/hadoop:/opt/installs/hadoop/share/hadoop/common/lib/*:/opt/installs/hadoop/share/hadoop/common/*:/opt/installs/hadoop/share/hadoop/hdfs:/opt/installs/hadoop/share/hadoop/hdfs/lib/*:/opt/installs/hadoop/share/hadoop/hdfs/*:/opt/installs/hadoop/share/hadoop/mapreduce/*:/opt/installs/hadoop/share/hadoop/yarn:/opt/installs/hadoop/share/hadoop/yarn/lib/*:/opt/installs/hadoop/share/hadoop/yarn/*</value> 
    </property>

分发mapred-site.xml & yarn-site.xml 到另外两台电脑上。

cd /opt/installs/hadoop/etc/hadoop/

xsync.sh mapred-site.xml yarn-site.xml

启动和停止yarn平台:

启动: start-yarn.sh
停止: stop-yarn.sh

也可以使用web访问一下:

http://192.168.233.128:8088

4、关于启动和停止的命令

服务

命令

启动,停止hdfs

start-dfs.sh stop-dfs.sh

启动,停止yarn

start-yarn.sh stop-yarn.sh

停止和启动整个集群(包含hdfs,以及Yarn)

start-all.sh stop-all.sh

start-all.sh ==(start-dfs.sh start-yarn.sh )

stop-all.sh == (stop-dfs.sh stop-yarn.sh )

只启动resourcemanager

停止resourcemanager

yarn --daemon start resourcemanger

yarn --daemon stop resourcemanger

只启动nodemanager

停止nodemanager

yarn --daemon start nodemanger

yarn --daemon stop nodemanger

hdfs 中单独启动的命令:

namenode : hdfs --daemon start namenode

datanode : hdfs --daemon start datanode [ 这个命令只能启动一个 datanode]

5、使用yarn平台进行wordCount计算

将一个wc.txt 上传至hdfs平台,然后通过yarn平台进行计算

hadoop spark hello hadoop
spark hello flink world
scala python python scala

运行hadoop自带的workCount:

hadoop jar /opt/installs/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.3.1.jar wordcount /home/wc.txt /home/output

我们还可以在Yarn平台上查看运行的情况:

以上这个案例使用的就是yarn运行,数据必须在hdfs上,yarn也必须启动。代码虽然在本地,但是也会上传到hdfs的。
时间同步:
xcall.sh ntpdate time1.aliyun.com

二、MapReduce任务有三种运行开发模式

1、local模式

数据在本地,代码也在本地,使用本机的电脑的资源运行我们的MR

输入和输出路径指的都是本地路径,运行时耗费的资源也是本地资源。

2、local模式2

数据在hdfs上,代码在本地,使用本机的电脑的资源运行我们的MR

        System.setProperty("HADOOP_USER_NAME","root");    
        Configuration configuration = new Configuration();
        
        configuration.set("fs.defaultFS","hdfs://192.168.32.128:9820");
        // 使用本地的资源(CPU,内存等), 也可以使用yarn平台跑任务
        configuration.set("mapreduce.framework.name","local");

这个里面的输入和输出路径指的是hdfs上的路径。

3、Yarn模式

数据在hdfs上,代码 跑 在yarn上。

        System.setProperty("HADOOP_USER_NAME","root");    
        Configuration configuration = new Configuration();
        
        configuration.set("fs.defaultFS","hdfs://192.168.32.128:9820");
        
        configuration.set("mapreduce.framework.name","yarn");

        // 跨平台任务提交打开
        configuration.set("mapreduce.app-submission.cross-platform", "true");	

案例:使用Yarn运行自己编写的WordCount:

修改代码如下:

package com.bigdata.day12.workcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class WordCountDriver2 {
    public static void main(String[] args) throws Exception {

        System.setProperty("HADOOP_USER_NAME","root");
        Configuration configuration = new Configuration();
        // 使用本地的文件系统,而不是hdfs
        configuration.set("fs.defaultFS","hdfs://192.168.32.128:9820");
        // 使用本地的资源(CPU,内存等), 也可以使用yarn平台跑任务
        configuration.set("mapreduce.framework.name","yarn");
        //要解决客户端在windows上运行,同时将MapTask和ReduceTask提交到linux集群上运行产生的冲突问题,就要修改下mapReduce的一些默认配置
        configuration.set("mapreduce.app-submission.cross-platform", "true");
        Job job = Job.getInstance(configuration, "在yarn上运行workCount");
        // map任务的设置

        // 这句话不要忘记添加
        job.setJarByClass(WordCountDriver2.class);

        job.setMapperClass(WordCountMapper2.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // 指定分区的类是哪一个
        job.setPartitionerClass(WordCountPartitioner2.class);
        // 还要执行 reduce的数量  因为一个reduce 就会产生一个结果文件
     

        // reduce任务的设置
        job.setReducerClass(WordCountReducer2.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 设置要统计的数据的路径,结果输出路径
        FileInputFormat.setInputPaths(job,new Path(args[0]));
        // ouput文件夹一定不要出现,否则会报错
        FileOutputFormat.setOutputPath(job,new Path(args[1]));
        // 等待任务执行结束
        boolean b = job.waitForCompletion(true);
        // 此处是一定要退出虚拟机的
        System.exit(b ? 0:-1);
    }
}

将程序达成jar包:

将我们的大好的Jar包上传至linux服务器,运行该jar包:

hadoop jar WC.jar com.bigdata.day12.workcount.WordCountDriver2 /input /oottpp2

jar包的名字最好短一点
com.bigdata.day12.workcount.WordCountDriver2   这个是Main方法所在的类的全路径
/input   hdfs上文件的路径
/oottpp2  hdfs上数据的统计的输出路径

如果出现web端查看错误:

记得dfs-site.xml中添加如下配置,重启集群:

<property>
    <name>dfs.webhdfs.enabled</name>
    <value>true</value>
</property>

接着在本地windows系统的hosts文件中,添加映射

配置浏览器所在系统的 hosts 文件
windows:
在 C:\Windows\System32\drivers\etc\hosts 末尾增加内容(Hadoop集群中各节点及主机名的映射)

三、Main方法如何传参

以问题驱动学习:

假如你到公司中,如何自己学习?你们公司正在使用的技术是什么就学什么,特别大的技术,不要学。

第一个途径:

第二个途径:

package com.bigdata;

public class TestMain {

    /**
     *  args 是一个字符串数组,谁给它可以赋值呢?
     * @param args
     */
    public static void main(String[] args) {

        System.out.println("参数打印开始");
        for (String str:args) {
            System.out.println(str);
        }
        System.out.println("参数打印完毕");
    }
}

思考:假如你的这个代码不在idea中如何传参?

任何java代码都可以打成jar包,jar包中的文件如何运行?

java -jar xxxx.jar 某个类的全路径 如果这个类有参数,直接跟在后面

java -jar hello.jar com.bigdata.TestMain 10 20 30

四、练习

1、数据清洗

# 数据清洗概念

通常情况下,大数据平台获得原始数据文件中,存在大量无效数据和缺失数据,需要再第一时间,对数据进行清洗,获得符合后续处理需求的数据内容和格式

# 需求

对手机流量原始数据,将其中的手机号为"null"和不完整的数据去除

数据格式:

# 源数据

id 手机号 手机mac ip地址 上传 下载 HTTP状态码

1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 24 27 2481 24681 200

1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200

1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200

1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200

1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 24 27 2481 24681 200

1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200

1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200

1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200

1363157995052 13826544109 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0

1363157995052 null 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 240 0 200

1363157991076 13926435659 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 null null null

# 期望结果【删除其中手机号不符合要求,上传流量缺失和下载流量缺失的数据,并仅保留手机号 上传流量 下载流量。】

13726230503 2481 24681

13826544101 264 0

13926435656 132 1512

13926251106 240 0

13726230503 2481 24681

13826544101 264 0

13926435656 132 1512

13926251106 240 0

编码提示:

# 重点:

MapReduce整个流程中可以取消reduce阶段的程序执行,map输出的会直接作为结果输出到HDFS文件中。

# 编码实现

1. 删除job中有关reducer的相关设置:reducer类和输出的key value类型。

2. 手动设置reducetask的个数为0

job.setNumReduceTasks(0);//取消reducer

package com.bigdata.etl;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

class ETLMapper extends Mapper<LongWritable,Text, Text, NullWritable>{


    Text text = null;
    @Override
    protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {

        text = new Text();
    }

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
        // 1363157985066  13726230503  00-FD-07-A4-72-B8:CMCC  120.196.100.82  24  27  2481  24681  200
        // 跳过第一行
        long num = key.get();
        if(num == 0){
            return;
        }
        String line = value.toString();
        String[] arr = line.split("\\s+");
        // 长度等于 9   手机号码要正确
        // 删除其中手机号不符合要求,上传流量缺失和下载流量缺失的数据,并仅保留手机号 上传流量 下载流量
        String phone = arr[1]; // null "null"

        if(arr.length == 9 && phone.matches("1[3-9][0-9]{9}")  ){

            String upFlow = arr[arr.length-3];
            String downFlow = arr[arr.length-2];
            if(upFlow.equals("null") || downFlow.equals("null")){
                return ;
            }
            String msg = phone +"\t" + upFlow + "\t" + downFlow;
            text.set(msg);
            context.write(text,NullWritable.get());
        }
    }
}
public class CleanDriver {

    public static void main(String[] args) throws Exception{
// 以下都是固定写法
        Configuration conf = new Configuration();

        // 使用本地的文件系统,而不是hdfs
        conf.set("fs.defaultFS","file:///");
        // 使用本地的资源(CPU,内存等), 也可以使用yarn平台跑任务
        conf.set("mapreduce.framework.name","local");

        Job job = Job.getInstance(conf, "数据清洗");
        job.setMapperClass(ETLMapper.class);


        // bigdata 1  map类的输出类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);


        // 设置reduce的任务数为0
        job.setNumReduceTasks(0);

        FileInputFormat.setInputPaths(job,new Path("./clean/input"));

        // 每次运行的时候,假如输出文件夹存在,就会报错,严重影响体验
        // 是否可以判断一下,假如存在,直接删除
        Path path = new Path("./clean/output");

        FileSystem fileSystem = FileSystem.get(conf);
        if(fileSystem.exists(path)){
            fileSystem.delete(path,true);
        }
        FileOutputFormat.setOutputPath(job,path);

        boolean b = job.waitForCompletion(true);
        System.exit(b?0:-1);
    }
}

2、自定义分区案例

案例之·学生成绩统计分析

# 将学生成绩,按照各科成绩降序排序,各个科目成绩单独输出。

数据如下:

# 自定义partition

将下面数据分区处理:

人名 科目 成绩

张三 语文 10

李四 数学 30

王五 语文 20

赵6 英语 40

张三 数学 50

李四 语文 10

张三 英语 70

李四 英语 80

王五 英语 45

王五 数学 10

赵6 数学 10

赵6 语文 100

package com.bigdata.stu;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class StuWritable implements WritableComparable<StuWritable> {

    private String name;
    private String subject;
    private int score;

    public StuWritable() {
    }

    public StuWritable(String name, String subject, int score) {
        this.name = name;
        this.subject = subject;
        this.score = score;
    }

    @Override
    public String toString() {
        return "StuWritable{" +
                "name='" + name + '\'' +
                ", subject='" + subject + '\'' +
                ", score=" + score +
                '}';
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getSubject() {
        return subject;
    }

    public void setSubject(String subject) {
        this.subject = subject;
    }

    public int getScore() {
        return score;
    }

    public void setScore(int score) {
        this.score = score;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(name);
        out.writeUTF(subject);
        out.writeInt(score);
    }

    @Override
    public void readFields(DataInput in) throws IOException {

        name = in.readUTF();
        subject = in.readUTF();
        score = in.readInt();
    }

    @Override
    public int compareTo(StuWritable o) {
        return o.score - this.score;
    }
}
package com.bigdata.stu;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class StuPartitioner extends Partitioner<Text, StuWritable> {
    @Override
    public int getPartition(Text text, StuWritable stuWritable, int i) {
        String subject = text.toString();
        if(subject.equals("语文")){
            return 0;
        }else if(subject.equals("数学")){
            return 1;
        }
        return 2;
    }
}
package com.bigdata.stu;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;

class StuMapper extends Mapper<LongWritable, Text, Text, StuWritable> {


    Text text = null;
    StuWritable stu = new StuWritable();
    @Override
    protected void setup(Mapper<LongWritable, Text, Text, StuWritable>.Context context) throws IOException, InterruptedException {

        text = new Text();
    }

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, StuWritable>.Context context) throws IOException, InterruptedException {
        // 跳过第一行的标题行
        if(key.get() == 0){
            return;
        }
        String line = value.toString();
        String[] arr = line.split("\\s+");
        String subject = arr[1];
        text.set(subject);

        stu.setName(arr[0]);
        stu.setSubject(subject);
        stu.setScore(Integer.valueOf(arr[2]));
        context.write(text,stu);
    }
}

class StuReducer extends Reducer<Text, StuWritable, NullWritable, Text> {


    @Override
    protected void reduce(Text key, Iterable<StuWritable> values, Reducer<Text, StuWritable, NullWritable, Text>.Context context) throws IOException, InterruptedException {
        ArrayList<StuWritable> stuList = new ArrayList<>();
        for (StuWritable stu : values) {
            StuWritable newStu = new StuWritable(stu.getName(),stu.getSubject(),stu.getScore());
            stuList.add(newStu);
        }
        // 你能够调用这个方法进行排序的原因是:list 中的元素可以自行排序,因为它实现了Comparable接口
        Collections.sort(stuList);

        for (StuWritable stuWritable : stuList) {
            String msg = stuWritable.getName()+"\t"+stuWritable.getSubject() +"\t" + stuWritable.getScore();
            context.write(NullWritable.get(),new Text(msg));
        }

    }
}
public class StuDriver {

    public static void main(String[] args) throws Exception {
// 以下都是固定写法
        Configuration conf = new Configuration();

        // 使用本地的文件系统,而不是hdfs
        conf.set("fs.defaultFS","file:///");
        // 使用本地的资源(CPU,内存等), 也可以使用yarn平台跑任务
        conf.set("mapreduce.framework.name","local");

        Job job = Job.getInstance(conf, "成绩排名");
        job.setMapperClass(StuMapper.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(StuWritable.class);


        job.setPartitionerClass(StuPartitioner.class);

        job.setReducerClass(StuReducer.class);

        // reduce类的输出类型  bigdata 10
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Text.class);
        job.setNumReduceTasks(3);

        FileInputFormat.setInputPaths(job,new Path("./student/input"));

        // 每次运行的时候,假如输出文件夹存在,就会报错,严重影响体验
        // 是否可以判断一下,假如存在,直接删除
        Path path = new Path("./student/output");

        FileSystem fileSystem = FileSystem.get(conf);
        if(fileSystem.exists(path)){
            fileSystem.delete(path,true);
        }
        FileOutputFormat.setOutputPath(job,path);

        boolean b = job.waitForCompletion(true);

        System.exit(b?0:-1);
    }
}

补充:Comparable 和 Comparator 两个接口的区别:

第一个:Comparator 案例:

package com.bigdata.java;

public class Student {

    private String name;
    private int age;


    @Override
    public String toString() {
        return "Student{" +
                "name='" + name + '\'' +
                ", age=" + age +
                '}';
    }

    public Student() {
    }

    public Student(String name, int age) {
        this.name = name;
        this.age = age;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }
}
package com.bigdata.java;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;

public class TestMain {

    public static void main(String[] args) {
        List<Student> list = new ArrayList<Student>();
        list.add(new Student("zs",19));
        list.add(new Student("ls",17));
        list.add(new Student("ww",21));
        list.add(new Student("zl",38));
        list.add(new Student("qq",16));
        list.add(new Student("z8",30));

        // 将list集合中的元素按照年龄排序
        Collections.sort(list, new Comparator<Student>() {
            @Override
            public int compare(Student o1, Student o2) {
                return o1.getAge() - o2.getAge();
            }
        });

        System.out.println(list);
    }
}

第二个 Comparable 接口

package com.bigdata.java;

public class Student2 implements Comparable<Student2>{

    private String name;
    private int age;


    @Override
    public String toString() {
        return "Student{" +
                "name='" + name + '\'' +
                ", age=" + age +
                '}';
    }

    public Student2() {
    }

    public Student2(String name, int age) {
        this.name = name;
        this.age = age;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    @Override
    public int compareTo(Student2 o) {
        return this.age - o.age;
    }
}
package com.bigdata.java;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class TestMain2 {

    public static void main(String[] args) {
        List<Student2> list = new ArrayList<Student2>();
        list.add(new Student2("zs",19));
        list.add(new Student2("ls",17));
        list.add(new Student2("ww",21));
        list.add(new Student2("zl",38));
        list.add(new Student2("qq",16));
        list.add(new Student2("z8",30));

        // 将list集合中的元素按照年龄排序
        Collections.sort(list);

        System.out.println(list);
    }
}

3、排序

主播数据按照观众人数降序排序,如果观众人数相同,按照直播时长降序

表 live
select * from live order by people_num desc,time_num desc;

# 案例数据

用户id 观众人数 直播时长

团团 300 1000

小黑 200 2000

哦吼 400 7000

卢本伟 100 6000

八戒 250 5000

悟空 100 4000

唐僧 100 3000

# 期望结果

哦吼 400 7000

团团 300 1000

八戒 250 5000

小黑 200 2000

卢本伟 100 6000

悟空 100 4000

唐僧 100 3000

关键代码:

package com.bigdata.sort;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;

class SortMapper extends Mapper<LongWritable, Text, NullWritable,Live > {


    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, NullWritable,Live>.Context context) throws IOException, InterruptedException {
        String line = value.toString();

        if(key.get() == 0){
            return ;
        }

        String[] arr = line.split("\\s+");

        String name = arr[0];
        int peopleNum = Integer.parseInt(arr[1]);
        int liveTime = Integer.parseInt(arr[2]);
        Live live = new Live(name,peopleNum,liveTime);

        context.write(NullWritable.get(),live);
    }
}


class SortReducer extends Reducer<NullWritable,Live, NullWritable, Text> {


    @Override
    protected void reduce(NullWritable key, Iterable<Live> values, Reducer<NullWritable,Live, NullWritable, Text>.Context context) throws IOException, InterruptedException {
        ArrayList<Live> stuList = new ArrayList<>();
        for (Live live : values) {
            stuList.add(new Live(live.getName(),live.getPeopleNum(),live.getLiveTime()));
        }

        Collections.sort(stuList, new Comparator<Live>() {
            @Override
            public int compare(Live o1, Live o2) {
                if(o1.getPeopleNum() == o2.getPeopleNum()){
                    return o2.getLiveTime() - o1.getLiveTime();
                }
                return o2.getPeopleNum() - o1.getPeopleNum();
            }
        });

        for (Live live : stuList) {
            context.write(NullWritable.get(),new Text(live.getName()+" "+live.getPeopleNum()+" "+live.getLiveTime()));
        }


    }
}


public class SortDriver {

    public static void main(String[] args) throws Exception {
// 以下都是固定写法
        Configuration conf = new Configuration();

        // 使用本地的文件系统,而不是hdfs
        conf.set("fs.defaultFS","file:///");
        // 使用本地的资源(CPU,内存等), 也可以使用yarn平台跑任务
        conf.set("mapreduce.framework.name","local");

        Job job = Job.getInstance(conf, "成绩排名");
        job.setMapperClass(SortMapper.class);

        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(Live.class);



        job.setReducerClass(SortReducer.class);

        // reduce类的输出类型  bigdata 10
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Text.class);


        FileInputFormat.setInputPaths(job,new Path("./sort/input"));

        // 每次运行的时候,假如输出文件夹存在,就会报错,严重影响体验
        // 是否可以判断一下,假如存在,直接删除
        Path path = new Path("./sort/output");

        FileSystem fileSystem = FileSystem.get(conf);
        if(fileSystem.exists(path)){
            fileSystem.delete(path,true);
        }
        FileOutputFormat.setOutputPath(job,path);

        boolean b = job.waitForCompletion(true);

        System.exit(b?0:-1);
    }
}
package com.bigdata.sort;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class Live implements Writable {

    private String name;
    private int peopleNum;
    private int liveTime;


    @Override
    public String toString() {
        return "Live{" +
                "name='" + name + '\'' +
                ", peopleNum=" + peopleNum +
                ", liveTime=" + liveTime +
                '}';
    }

    public Live() {
    }

    public Live(String name, int peopleNum, int liveTime) {
        this.name = name;
        this.peopleNum = peopleNum;
        this.liveTime = liveTime;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getPeopleNum() {
        return peopleNum;
    }

    public void setPeopleNum(int peopleNum) {
        this.peopleNum = peopleNum;
    }

    public int getLiveTime() {
        return liveTime;
    }

    public void setLiveTime(int liveTime) {
        this.liveTime = liveTime;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(name);
        out.writeInt(peopleNum);
        out.writeInt(liveTime);
    }

    @Override
    public void readFields(DataInput in) throws IOException {

        name  = in.readUTF();
        peopleNum  = in.readInt();
        liveTime  = in.readInt();
    }
}

第二种写法:让自定义的数据类型当 key 值

package com.bigdata.sort;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

class SortMapper2 extends Mapper<LongWritable, Text, Live2,NullWritable > {


    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Live2,NullWritable>.Context context) throws IOException, InterruptedException {
        String line = value.toString();

        if(key.get() == 0){
            return ;
        }

        String[] arr = line.split("\\s+");

        String name = arr[0];
        int peopleNum = Integer.parseInt(arr[1]);
        int liveTime = Integer.parseInt(arr[2]);
        Live2 live = new Live2(name,peopleNum,liveTime);

        context.write(live,NullWritable.get());
    }
}


public class SortDriver2 {

    public static void main(String[] args) throws Exception {
// 以下都是固定写法
        Configuration conf = new Configuration();

        // 使用本地的文件系统,而不是hdfs
        conf.set("fs.defaultFS","file:///");
        // 使用本地的资源(CPU,内存等), 也可以使用yarn平台跑任务
        conf.set("mapreduce.framework.name","local");

        Job job = Job.getInstance(conf, "排序");
        job.setMapperClass(SortMapper2.class);

        job.setMapOutputKeyClass(Live2.class);
        job.setMapOutputValueClass(NullWritable.class);

        // 此处有一个坑,不要写这个,可以使用map的key进行排序,否则呢?map中的key值不排序
        //job.setNumReduceTasks(0);


        FileInputFormat.setInputPaths(job,new Path("./sort/input"));

        // 每次运行的时候,假如输出文件夹存在,就会报错,严重影响体验
        // 是否可以判断一下,假如存在,直接删除
        Path path = new Path("./sort/output");

        FileSystem fileSystem = FileSystem.get(conf);
        if(fileSystem.exists(path)){
            fileSystem.delete(path,true);
        }
        FileOutputFormat.setOutputPath(job,path);

        boolean b = job.waitForCompletion(true);

        System.exit(b?0:-1);
    }
}


网站公告

今日签到

点亮在社区的每一天
去签到