【大数据】MapReduce 编程-- PageRank--网页排名算法,用于衡量网页“重要性”-排序网页

发布于:2025-05-20 ⋅ 阅读:(15) ⋅ 点赞:(0)

PageRank 是 Google 创始人拉里·佩奇(Larry Page)和谢尔盖·布林(Sergey Brin)在 1998 年提出的一种网页排名算法,用于衡量网页“重要性”的一种方式。它是搜索引擎中用于排序网页的一种基础算法

一个网页越是被其他重要网页链接,它就越重要

PageRank 的计算流程

  1. 初始化:假设总共 N 个网页,每个网页初始 PR 值为 1/N。

  2. 迭代计算:通过 MapReduce 不断迭代更新 PR 值,直到值趋于稳定。

  3. 结果输出:PR 值越大,说明该网页越重要,排名越靠前



A 0.25 B C D
B 0.25 A D
C 0.25 C
D 0.25 B C
  • 第一列:网页编号(如 A)

  • 第二列:初始 PageRank 值(例如 0.25)

  • 后续列:该网页链接到的其他网页

迭代的计算PageRank值,每次MapReduce 的输出要和输入的格式是一样的,这样才能使得Mapreduce 的输出用来作为下一轮MapReduce 的输入


Map过程

解析输入行,提取:

  • 当前网页 ID

  • 当前网页的 PR 值

  • 当前网页链接的其他网页列表

计算出要链接到的其他网友的个数,然后求出当前网页对其他网页的贡献值。

第一种输出的< key ,value>中的key 表示其他网页,value 表示当前网页对其他网页的贡献值

为了区别这两种输出

出链网页贡献值(标记为 @):<出链网页, @贡献值>

第二种输出的< key ,value>中的key 表示当前网页,value 表示所有其他网页。

网页链接列表(标记为 &):<当前网页, &链接网页列表>
 

B @0.0833
C @0.0833
D @0.0833
A &B C D

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/*map过程*/
public class MyMapper extends Mapper<Object,Text,Text,Text>{        
    private String id;
    private float pr;       
    private int count;
    private float average_pr;       
    public void map(Object key,Text value,Context context)
	    throws IOException,InterruptedException{            
        StringTokenizer str = new StringTokenizer(value.toString());//对value进行解析
        id =str.nextToken();//id为解析的第一个词,代表当前网页
        pr = Float.parseFloat(str.nextToken());//pr为解析的第二个词,转换为float类型,代表PageRank值
	count = str.countTokens();//count为剩余词的个数,代表当前网页的出链网页个数
	average_pr = pr/count;//求出当前网页对出链网页的贡献值


	String linkids ="&";//下面是输出的两类,分别有'@'和'&'区分
	while(str.hasMoreTokens()){
	    String linkid = str.nextToken();
	    context.write(new Text(linkid),new Text("@"+average_pr));//输出的是<出链网页,获得的贡献值>
	    linkids +=" "+ linkid;
	}       
	context.write(new Text(id), new Text(linkids));//输出的是<当前网页,所有出链网页>
    }       
}

输入数据格式(value):网页ID  PageRank值  出链网页1  出链网页2 ...

输出键值对:

  1. <出链网页ID, "@贡献值">(表示这个网页从别的网页获得了多少贡献)

  2. <当前网页ID, "& 出链网页列表">(保留网页结构)

String id;         // 当前网页ID
float pr;          // 当前网页的PageRank值
int count;         // 出链网页的数量
float average_pr;  // 当前网页对每个出链网页的平均贡献值
StringTokenizer str = new StringTokenizer(value.toString());是把整行字符串(比如 "A 1.0 B C D")按照空格分割成一个个小单元(token)

id = str.nextToken();  // 第一个token是当前网页ID------取出第一个单词(比如 A),表示当前正在处理的网页 ID,赋值给 id

pr = Float.parseFloat(str.nextToken());   // 第二个token是当前网页的PageRank值
取出第二个单词(比如 "1.0"),将其转为 float 类型,就是当前网页的 PageRank 值,赋值给 pr

count = str.countTokens();// 剩下的token是出链网页数量----统计剩余 token 的数量
average_pr = pr / count; //把当前网页的 PageRank 值平均分配给所有它链接的网页

贡献值输出:

while(str.hasMoreTokens()) {
    String linkid = str.nextToken(); // B, 然后 C, 然后 D
    context.write(new Text(linkid), new Text("@" + average_pr));
    linkids += linkid + " "; // 把 B、C、D 加入 linkids 中
}

str.hasMoreTokens() 只要还有未读取的 token(即还有出链网页没处理完),就继续执行循环体

网页结构输出(带 & 开头):

String linkids记录当前网页的所有出链网页 ID 

context.write(new Text(id), new Text(linkids));


Shuffle 是指 Map 阶段输出的数据按照 key 进行分组,并将具有相同 key 的数据发送到同一个 Reduce 任务中处理的过程 

每个网页 Map 阶段都会:

  • 向它出链的网页发 PageRank 贡献(加@前缀)

  • 自己保留一份出链结构

Shuffle 阶段:按网页ID归并聚合

  • 对 Map 输出的 key(网页 ID)进行排序

  • 将相同 key 的所有 value 合并成一个列表

Reducer 接收到的格式为:<网页ID, [贡献值, 出链结构]>

<网页ID, 列表[@贡献1, @贡献2, ..., &出链结构]>


Reduce过程

  • 求每个网页的新 PageRank 值

  • 保留该网页的出链结构

  • 输出格式为网页ID 新的PR值 出链网页列表

shuffule的输出也即是reduce的输入。

reduce输入的key 直接作为输出的key

reduce输入的value 进行解析,它是一个列表

a.若列表里的值里包含`@`,就把该值`@`后面的字符串转化成`float`型加起来

b.若列表里的值里包含`&`,就把该值`&`后面的字符串提取出来

c.把所有贡献值的加总,和提取的字符串进行连接,作为`reduce`的输出`value`

public class MyReducer extends Reducer<Text,Text,Text,Text>{

继承 Hadoop 提供的 Reducer 类,泛型参数说明:

  • Text, Text:输入的 key 和 value 类型

  • Text, Text:输出的 key 和 value 类型

public void reduce(Text key, Iterable<Text> values, Context context)
        throws IOException, InterruptedException {
为每一个网页 key 传入一个 values 列表,里面是 Shuffle 过程收集到的所有值

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * Reduce过程:计算每个网页的新PageRank值,并保留出链网页结构。
 * 输入:<网页ID, [@贡献值, @贡献值, ..., &出链网页列表]>
 * 输出:<网页ID, 新PageRank值 + 出链网页列表>
 */
public class MyReducer extends Reducer<Text, Text, Text, Text> {

    public void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {

        String lianjie = ""; // 用于保存当前网页的出链网页列表(结构信息)
        float pr = 0;        // 用于累加当前网页从其他网页获得的PageRank贡献值

        // 遍历所有传入的值:包含两类信息,分别通过首字符判断
        for (Text val : values) {
            String strVal = val.toString(); // 当前值转换为字符串

            if (strVal.substring(0, 1).equals("@")) {
                // 以@开头,表示这是从其他网页传来的PageRank贡献值
                // 取出@后面的数值并累加
                pr += Float.parseFloat(strVal.substring(1));
            } else if (strVal.substring(0, 1).equals("&")) {
                // 以&开头,表示这是本网页的出链结构信息
                // 将&后面的网页列表保留下来
                lianjie += strVal.substring(1); // 注意可能是多个网页用空格分隔
            }
        }

        // 平滑处理(加入跳转因子d = 0.8)
        // 假设网页总数为4,(1 - d) / N = 0.2 * 0.25 = 0.05
        // 新PageRank = d * 贡献值总和 + (1 - d)/N
        pr = 0.8f * pr + 0.2f * 0.25f;

        // 构造输出字符串:新PR值 + 出链网页列表
        String result = pr + lianjie;

        // 输出结果:<当前网页ID, 新的PageRank值 + 出链网页列表>
        context.write(key, new Text(result));
    }
}

遍历所有值,分类处理

pr += Float.parseFloat(val.toString().substring(1));

如果是 @ 开头,就从第 1 个字符开始截取字符串(去掉 @),再把它转换成浮点数,并累加到 pr

lianjie += val.toString().substring(1);

如果是 & 开头,就把 & 后面的出链网页字符串加到变量 lianjie

  • @ 开头:表示来自其他网页的 PageRank 贡献值,提取并累加。

  • & 开头:表示这是该网页自身的 出链网页结构,保留下来。

pr = 0.8f * pr + 0.2f * 0.25f;

 PageRank 中的阻尼系数模型

  • 0.8f:阻尼系数 d(表示 80% 用户点击链接)

  • 0.2f:1 - d,有 20% 用户会随机跳转

  • 0.25f:假设网页总数是 4 个,随机跳转概率均分为 0.25

PR(A) = d × 所有贡献值之和 + (1 - d) / N
 



import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.util.Scanner;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import java.net.URI;

public class MyRunner {
    public static void main(String[] args)
        throws IOException, ClassNotFoundException, InterruptedException {

        // 创建 Hadoop 配置对象
        Configuration conf = new Configuration();

        // 使用控制台输入获取初始输入路径和输出路径
        Scanner sc = new Scanner(System.in);
        System.out.print("inputPath:");
        String inputPath = sc.next();      // 第一次输入的 HDFS 输入路径,如:/pagerank/input
        System.out.print("outputPath:");
        String outputPath = sc.next();     // 第一次输出的 HDFS 路径,如:/pagerank/output

        // 进行 PageRank 的迭代计算,这里迭代 5 次
        for (int i = 1; i <= 5; i++) {
            // 创建新的 MapReduce 作业
            Job job = Job.getInstance(conf);

            // 设置 Job 的主类,用于打包 Jar
            job.setJarByClass(MyRunner.class);

            // 设置 Map 和 Reduce 的处理类
            job.setMapperClass(MyMapper.class);
            job.setReducerClass(MyReducer.class);

            // 设置 Map 阶段输出键值对类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);

            // 设置 Reduce 阶段输出键值对类型(最终输出)
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);

            // 设置输入数据路径(每轮迭代输入路径是上一轮的输出)
            FileInputFormat.setInputPaths(job, new Path("hdfs://master:9000" + inputPath));

            // 设置输出数据路径(每轮迭代输出不同路径)
            FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000" + outputPath));

            // 更新下一轮迭代的输入输出路径
            inputPath = outputPath;        // 当前输出变为下一轮的输入
            outputPath = outputPath + i;   // 每次输出加上数字以区分路径(如 output1, output2,...)

            // 提交作业并等待执行完成
            job.waitForCompletion(true);
        }

        // 读取最终输出文件内容并打印到控制台
        try {
            // 获取 Hadoop 文件系统
            FileSystem fs = FileSystem.get(new URI("hdfs://master:9000"), new Configuration());

            // 拼接最终输出文件的路径(最后一轮输出的 part-r-00000)
            Path srcPath = new Path(outputPath.substring(0, outputPath.length() - 1) + "/part-r-00000");

            // 打开输出文件
            FSDataInputStream is = fs.open(srcPath);

            // 打印最终结果到控制台
            System.out.println("Results:");
            while (true) {
                String line = is.readLine(); // 读取一行结果
                if (line == null) break;     // 如果到文件末尾,结束循环
                System.out.println(line);    // 打印当前行
            }
            is.close(); // 关闭输入流
        } catch (Exception e) {
            e.printStackTrace(); // 如果读取输出失败,打印错误
        }
    }
}


网站公告

今日签到

点亮在社区的每一天
去签到