头歌 MapReduce基础编程

发布于:2025-06-13 ⋅ 阅读:(13) ⋅ 点赞:(0)

头歌 MapReduce基础编程:

第1关:合并去重
        本关任务:编程实现文件合并和去重操作。

相关知识
        为了实现文件的合并去重,我们首先要知道文件中都有些什么,将文件内容进行“拆解”,映射(Map)到不同节点;然后在通过归约(Reduce)去除重复部分。

Map过程
用法如下:
        重载map函数,直接将输入中的value复制到输出数据的key上。

public static class Map extends Mapper<Object, Text, Text, Text>{}

Reduce过程:

重载reduce函数,直接将输入中的key复制到输出数据的key上。

public static class Reduce extends Reducer<Text, Text, Text, Text>{}

编程要求:
        对于两个输入文件,即文件A和文件B,请编写MapReduce程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新的输出文件C。
为了完成文件合并去重的任务,你编写的程序要能将含有重复内容的不同文件合并到一个没有重复的整合文件,规则如下:

第一列按学号排列;
学号相同,按x,y,z排列。
测试说明
        程序会对你编写的代码进行测试:
输入已经指定了测试文本数据:需要你的程序输出合并去重后的结果。
下面是输入文件和输出文件的一个样例供参考。
输入文件A的样例如下:

20150101     x
20150102     y
20150103     x
20150104     y
20150105     z
20150106     x

输入文件B的样例如下:

20150101      y
20150102      y
20150103      x
20150104      z
20150105      y

根据输入文件A和B合并得到的输出文件C的样例如下:

20150101      x
20150101      y
20150102      y
20150103      x
20150104      y
20150104      z
20150105      y
20150105      z
20150106      x

上面三个都不是需要输入的代码,下面才是!!!

需要输入代码行:

import java.io.IOException;  
import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.IntWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.Job;  
import org.apache.hadoop.mapreduce.Mapper;  
import org.apache.hadoop.mapreduce.Reducer;  
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
import org.apache.hadoop.util.GenericOptionsParser;
public class Merge {
    /**  
     * @param args  
     * 对A,B两个文件进行合并,并剔除其中重复的内容,得到一个新的输出文件C  
     */  
    //重载map函数,直接将输入中的value复制到输出数据的key上  
    /********** Begin **********/  
    public static class Map extends Mapper<Object, Text, Text, Text>{  
        private static Text text = new Text();  
        public void map(Object key, Text value, Context context) throws IOException,InterruptedException{  
            text = value;  
            context.write(text, new Text(""));  
        }  
    }  
    /********** End **********/  
    //重载reduce函数,直接将输入中的key复制到输出数据的key上  
    /********** Begin **********/  
    public static class Reduce extends Reducer<Text, Text, Text, Text>{  
        public void reduce(Text key, Iterable<Text> values, Context context ) throws IOException,InterruptedException{  
            context.write(key, new Text(""));  
        }  
    }  
    /********** End **********/  
    public static void main(String[] args) throws Exception{  
        // TODO Auto-generated method stub  
        Configuration conf = new Configuration();  
        conf.set("fs.default.name","hdfs://localhost:9000");  
        String[] otherArgs = new String[]{"input","output"}; /* 直接设置输入参数 */  
        if (otherArgs.length != 2) {  
            System.err.println("Usage: wordcount <in> <out>");  
            System.exit(2);  
            }  
        Job job = Job.getInstance(conf,"Merge and duplicate removal");  
        job.setJarByClass(Merge.class);  
        job.setMapperClass(Map.class);  
        job.setCombinerClass(Reduce.class);  
        job.setReducerClass(Reduce.class);  
        job.setOutputKeyClass(Text.class);  
        job.setOutputValueClass(Text.class);  
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));  
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));  
        System.exit(job.waitForCompletion(true) ? 0 : 1);  
    }
}

第2关:整合排序

本关任务:编写程序实现对输入文件的排序。

相关知识

        为了实现文件的整合排序,我们首先要知道文件中都有些什么,将文件内容进行“拆解”,映射(Map)到不同节点;然后在通过归约(Reduce)的过程中进行排序输出。

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.IntWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.Job;  
import org.apache.hadoop.mapreduce.Mapper;  
import org.apache.hadoop.mapreduce.Partitioner;  
import org.apache.hadoop.mapreduce.Reducer;  
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
import org.apache.hadoop.util.GenericOptionsParser;
public class MergeSort {
    /**  
     * @param args  
     * 输入多个文件,每个文件中的每行内容均为一个整数  
     * 输出到一个新的文件中,输出的数据格式为每行两个整数,第一个数字为第二个整数的排序位次,第二个整数为原待排列的整数  
     */  
    //map函数读取输入中的value,将其转化成IntWritable类型,最后作为输出key  
    public static class Map extends Mapper<Object, Text, IntWritable, IntWritable>{
        private static IntWritable data = new IntWritable();  
        public void map(Object key, Text value, Context context) throws IOException,InterruptedException{  
    /********** Begin **********/  
            String text = value.toString();  
            data.set(Integer.parseInt(text));  
            context.write(data, new IntWritable(1));  
    /********** End **********/  
        }  
    }  
    //reduce函数将map输入的key复制到输出的value上,然后根据输入的value-list中元素的个数决定key的输出次数,定义一个全局变量line_num来代表key的位次  
    public static class Reduce extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable>{
        private static IntWritable line_num = new IntWritable(1);  
        public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException,InterruptedException{  
    /********** Begin **********/  
            for(IntWritable val : values){  
                context.write(line_num, key);  
                line_num = new IntWritable(line_num.get() + 1);  
            }  
    /********** End **********/  
        }  
    }  
    //自定义Partition函数,此函数根据输入数据的最大值和MapReduce框架中Partition的数量获取将输入数据按照大小分块的边界,然后根据输入数值和边界的关系返回对应的Partiton ID  
    public static class Partition extends Partitioner<IntWritable, IntWritable>{  
        public int getPartition(IntWritable key, IntWritable value, int num_Partition){  
    /********** Begin **********/  
            int Maxnumber = 65223;//int型的最大数值  
            int bound = Maxnumber/num_Partition+1;  
            int keynumber = key.get();  
            for (int i = 0; i<num_Partition; i++){  
                if(keynumber<bound * (i+1) && keynumber>=bound * i){  
                    return i;  
                }  
            }  
            return -1;  
    /********** End **********/  
        }  
    }  
    public static void main(String[] args) throws Exception{  
        // TODO Auto-generated method stub  
        Configuration conf = new Configuration();  
        conf.set("fs.default.name","hdfs://localhost:9000");  
        String[] otherArgs = new String[]{"input","output"}; /* 直接设置输入参数 */  
        if (otherArgs.length != 2) {  
            System.err.println("Usage: wordcount <in> <out>");  
            System.exit(2);  
            }  
        Job job = Job.getInstance(conf,"Merge and sort");  
        job.setJarByClass(MergeSort.class);  
        job.setMapperClass(Map.class);  
        job.setReducerClass(Reduce.class);  
        job.setPartitionerClass(Partition.class);  
        job.setOutputKeyClass(IntWritable.class);  
        job.setOutputValueClass(IntWritable.class);  
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));  
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));  
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

第3关:信息挖掘
本关任务:对给定的表格进行信息挖掘。


Map过程:

        Map将输入文件按照空格分割成child和parent,然后正序输出一次作为右表,反序输出一次作为左表,需要注意的是在输出的value中必须加上左右表区别标志;
在Map阶段,将父子关系与相反的子父关系,同时在各个value前补上前缀-与+标识此key-value中的value是正序还是逆序产生的,之后进入context。

import java.io.IOException;  
import java.util.*;
import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.IntWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.Job;  
import org.apache.hadoop.mapreduce.Mapper;  
import org.apache.hadoop.mapreduce.Reducer;  
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
import org.apache.hadoop.util.GenericOptionsParser;
public class simple_data_mining {  
    public static int time = 0;
    /**  
     * @param args  
     * 输入一个child-parent的表格  
     * 输出一个体现grandchild-grandparent关系的表格  
     */  
    //Map将输入文件按照空格分割成child和parent,然后正序输出一次作为右表,反序输出一次作为左表,需要注意的是在输出的value中必须加上左右表区别标志  
    public static class Map extends Mapper<Object, Text, Text, Text>{  
        public void map(Object key, Text value, Context context) throws IOException,InterruptedException{  
    /********** Begin **********/  
            String child_name = new String();  
            String parent_name = new String();  
            String relation_type = new String();  
            String line = value.toString();  
            int i = 0;  
            while(line.charAt(i) != ' '){  
                i++;  
            }  
            String[] values = {line.substring(0,i),line.substring(i+1)};  
            if(values[0].compareTo("child") != 0){  
                child_name = values[0];  
                parent_name = values[1];  
                relation_type = "1";//左右表区分标志  
                context.write(new Text(values[1]), new Text(relation_type+"+"+child_name+"+"+parent_name));  
                //左表  
                relation_type = "2";  
                context.write(new Text(values[0]), new Text(relation_type+"+"+child_name+"+"+parent_name));  
                //右表  
            }  
    /********** End **********/  
        }  
    }  
    public static class Reduce extends Reducer<Text, Text, Text, Text>{  
        public void reduce(Text key, Iterable<Text> values,Context context) throws IOException,InterruptedException{  
    /********** Begin **********/  
            if(time == 0){   //输出表头  
                context.write(new Text("grand_child"), new Text("grand_parent"));  
                time++;  
            }  
            int grand_child_num = 0;  
            String grand_child[] = new String[10];  
            int grand_parent_num = 0;  
            String grand_parent[]= new String[10];  
            Iterator ite = values.iterator();  
            while(ite.hasNext()){  
                String record = ite.next().toString();  
                int len = record.length();  
                int i = 2;  
                if(len == 0) continue;  
                char relation_type = record.charAt(0);  
                String child_name = new String();  
                String parent_name = new String();  
                //获取value-list中value的child  
                while(record.charAt(i) != '+'){  
                    child_name = child_name + record.charAt(i);  
                    i++;  
                }  
                i=i+1;  
                //获取value-list中value的parent  
                while(i<len){  
                    parent_name = parent_name+record.charAt(i);  
                    i++;  
                }  
                //左表,取出child放入grand_child  
                if(relation_type == '1'){  
                    grand_child[grand_child_num] = child_name;  
                    grand_child_num++;  
                }  
                else{//右表,取出parent放入grand_parent  
                    grand_parent[grand_parent_num] = parent_name;  
                    grand_parent_num++;  
                }  
            }
            if(grand_parent_num != 0 && grand_child_num != 0 ){  
                for(int m = 0;m<grand_child_num;m++){  
                    for(int n=0;n<grand_parent_num;n++){  
                        context.write(new Text(grand_child[m]), new Text(grand_parent[n]));  
                        //输出结果  
                    }  
                }  
            }  
    /********** End **********/  
        }  
    }  
    public static void main(String[] args) throws Exception{  
        // TODO Auto-generated method stub  
        Configuration conf = new Configuration();  
        conf.set("fs.default.name","hdfs://localhost:9000");  
        String[] otherArgs = new String[]{"input","output"}; /* 直接设置输入参数 */  
        if (otherArgs.length != 2) {  
            System.err.println("Usage: wordcount <in> <out>");  
            System.exit(2);  
            }  
        Job job = Job.getInstance(conf,"Single table join ");  
        job.setJarByClass(simple_data_mining.class);  
        job.setMapperClass(Map.class);  
        job.setReducerClass(Reduce.class);  
        job.setOutputKeyClass(Text.class);  
        job.setOutputValueClass(Text.class);  
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));  
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));  
        System.exit(job.waitForCompletion(true) ? 0 : 1);  
    }
}

创作不易,如果能解决您的问题,麻烦您点赞、收藏+关注,一键三连!!!