MapReduce编写wordcount程序

发布于:2022-10-30 ⋅ 阅读:(575) ⋅ 点赞:(0)

环境:idea、maven

准备工作:

1、新建maven项目

 

2、修改pom.xml,增加内容如下

  <dependencies>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>3.0.0</version>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
  </dependencies>

        但是后期对工程进行构建时发现出现如下错误

经过查阅资料发现是因为maven默认使用的jdk版本太落后,这里我们需要这么增加一段来临时指定jdk版本

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.target>1.8</maven.compiler.target>
    <maven.compiler.source>1.8</maven.compiler.source>
  </properties>

 于是完整的pom.xml所要增加的内容就变成了

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.target>1.8</maven.compiler.target>
    <maven.compiler.source>1.8</maven.compiler.source>
  </properties>
  <dependencies>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>3.0.0</version>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
  </dependencies>

到此,准备工作完毕。

        由于wordcount程序适用于统计文本文件中每个单词出现的次数,于是便可指定其map阶段输入的类型是<LongWritable,Text>,输出的类型是<Text,IntWritable>,reduce阶段的输入和输出都为<Text,IntWritable>。

接下来开始编写程序,直接上代码。

一、Map阶段

package firsr;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
//LongWritable:偏移
//Text:输入
//Text:输出key
//IntWritable:输出value
public class wordcountmapper extends Mapper<LongWritable, Text,Text, IntWritable> {
    private Text out=new Text();
    private IntWritable i=new IntWritable(1);
    @Override
    //context是map的上下文
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
        //获取一行的内容,
        String line=value.toString();
        String datas[]=line.split(" "); //去除空格
        //遍历循环
        for(String data:datas){
            out.set(data);
            context.write(out,i);  //输出key->out,value->i
        }
    }
}

二、Reduce阶段

package firsr;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class wordcountreducer extends Reducer<Text, IntWritable,Text,IntWritable> {
    private IntWritable out=new IntWritable();
    @Override
    //context是reduce的上下文
    protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
        int total=0;
        for(IntWritable value:values){
            total+=value.get();
        }
        out.set(total);
        context.write(key,out);
    }
}

三、主程序入口(固定套路)

package firsr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class wordcount {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        //创建一个job和任务入口
        Job job=Job.getInstance(new Configuration());
        job.setJarByClass(wordcount.class);//main方法所在的class
        //指定job的mapper和输出的类型
        job.setMapperClass(wordcountmapper.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputKeyClass(IntWritable.class);
        //指定job的reduce和输出的类型
        job.setReducerClass(wordcountreducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //指定job的输入和输出
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        //执行任务
        job.waitForCompletion(true);
    }
}

(注意在编写程序的阶段一定不要导错包)

导出可执行的jar包:idea命令行执行

mvn clean package

可以看到导出成功

 接下来我们将jar包上传,并运行命令(此时你的集群应该时开启的状态)

hadoop jar /untitled.jar firsr.wordcount /data.txt /output/wc

(data.txt为你要统计单词数目的文本文件,wc为输出所存放的文件,之前应该不存在!!!)

这一步可能会出现一直卡在 INFO mapreduce.Job: Running job的情况,于是上网查了下,发现了这个方法,更改mapred-site.xml中的

<property>
       <name>mapreduce.framework.name</name>
       <value>yarn</value>
</property>

改成如下内容

<property>
      <name>mapreduce.job.tracker</name>
      <value>hdfs://ip:8001</value>       <!--ip为你master节点的IP-->
      <final>true</final>
 </property>

再次运行成功。


网站公告

今日签到

点亮在社区的每一天
去签到