Spark核心:单跳转换率计算全解析

发布于:2025-09-09 ⋅ 阅读:(22) ⋅ 点赞:(0)

目录

代码功能解释与问题分析

关键问题分析

修正与拓展方案

1. 修正分子计算逻辑

2. 修正分母计算逻辑

3. 完善转换率计算

4. 优化代码结构

5. 性能优化

修正后的代码示例

关键改进点说明

测试与验证建议

package core.req

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object 单跳转换率_指定页面 {
  def main(args: Array[String]): Unit = {
    val sparkConf=new SparkConf().setMaster("local").setAppName("单跳转换率_指定页面")
    val sc = new SparkContext(sparkConf)
    //读取原始数据
    val RDD = sc.textFile("datas/user_visit_action.txt")
    RDD.cache()
    val fenzizhi = fenzi(RDD)
    val fenmuzhi = fenmu(RDD)
//    fenzizhi.foreach{
//      case ((pid1,pid2),sum)=>
//
//        val i = fenmuzhi.getOrElse(pid1, 0L)
//        println(s"页面${a(0)._1}跳转到${a(0)._2}的页面转换率为:"+(sum.toDouble/i))
//    }

    sc.stop()
  }
  def fenzi(RDD:RDD[String])={//将数据转换结构
    val ids=List[Long](1,2,3,4,5,6,7,23,40,12)
    val okids:List[(Long, Long)] = ids.zip(ids.tail)
    val okk=(1,2)

    val flatRDD = RDD.flatMap(
      action => {
        val datas = action.split("_")
        List((datas(1).toLong,datas(3).toLong,datas(4)))  //用户id,页面id,时间
      }
    )
    val groupRDD = flatRDD.groupBy(_._1)
    val mvRDD =groupRDD.mapValues(
      iter => {
        val tuples = iter.toList.sortBy(_._3)
        val flowIds = tuples.map(_._2)
        val tuples1 = flowIds.zip(flowIds.tail)
        tuples1
      }
    )
    val value= mvRDD.filter(t => okids.contains(okk))
    value.foreach(println)
    //val value1 = value.map(t => (t, 1)).map(a => (a._1._2, a._2)).reduceByKey(_ + _)
    //val value1=value.map(a=>(a._1,a._2,1L)).map(
     // A=>{
       // val list2:List[(Long,Long),Long]=A._2 :: (A._3) :: Nil
        //list2
    //  }).flatMap(l=>l)//.reduceByKey(_+_)
    //value1
  }


  def fenmu(RDD:RDD[String])={//计算分母
    val ids=List(1,2,3,4,5,6,7,23,40,12)
    val fenmu = RDD.map(
      action => {
        val datas = action.split("_")
        val tuple = datas(3)
        val str = tuple.filter(datas => ids.init.contains(datas))
        (str,1L)
      }
    ).reduceByKey(_ + _).collect()
    fenmu.toMap
    }

}

代码功能解释与问题分析

该代码旨在使用Spark计算指定页面的单跳转换率(即用户从页面A跳转到页面B的概率)。核心逻辑分为两部分:

  1. 分子计算fenzi函数):统计用户在指定页面之间的跳转次数。
  2. 分母计算fenmu函数):统计指定页面的总访问次数。
关键问题分析
  1. 分子计算逻辑错误

    • okids定义为ids.zip(ids.tail),生成相邻页面对(如(1,2)(2,3)等)。
    • 但过滤条件filter(t => okids.contains(okk))中,okk被硬编码为(1,2),导致仅保留(1,2)的跳转,而非所有okids中的页面对。
  2. 分母计算逻辑错误

    • ids.init会排除最后一个元素(如12),导致页面12的访问次数未被统计。
    • 应直接检查页面是否在完整ids列表中。
  3. 转换率计算未完成

    • 主函数中计算转换率的代码被注释,且存在变量引用错误(如a(0)._1未定义)。
  4. 代码健壮性问题

    • 硬编码的ids列表和okk值缺乏灵活性。
    • 未处理用户无跳转或页面未访问的情况,可能导致除零错误。

修正与拓展方案

1. 修正分子计算逻辑
  • 问题:仅过滤(1,2)跳转对。
  • 修正:改用okids.contains(tuple)检查所有指定页面对。
2. 修正分母计算逻辑
  • 问题ids.init排除最后一个页面。
  • 修正:直接检查页面是否在ids中。
3. 完善转换率计算
  • 关联分子和分母数据,避免变量引用错误。
  • 处理除零情况(如分母为0时输出0转换率)。
4. 优化代码结构
  • ids作为参数传入,增强灵活性。
  • 使用更具描述性的变量名(如pageIds代替ids)。
5. 性能优化
  • fenmu的统计结果缓存,避免重复计算。
  • 使用mapPartitions优化数据转换。

修正后的代码示例

package core.req

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object SingleJumpConversionRate {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local").setAppName("SingleJumpConversionRate")
    val sc = new SparkContext(sparkConf)

    // 指定页面列表(可通过参数传入)
    val pageIds = List(1L, 2L, 3L, 4L, 5L, 6L, 7L, 23L, 40L, 12L)
    val pagePairs = pageIds.zip(pageIds.tail) // 生成所有相邻页面对

    // 读取数据
    val dataRDD = sc.textFile("datas/user_visit_action.txt").cache()

    // 计算分子(页面跳转次数)
    val numeratorRDD = calculateNumerator(dataRDD, pagePairs)

    // 计算分母(页面访问次数)
    val denominatorMap = calculateDenominator(dataRDD, pageIds)

    // 计算转换率
    val conversionRates = numeratorRDD.join(sc.parallelize(denominatorMap.toSeq))
      .map { case ((from, to), (count, visits)) =>
        val rate = if (visits > 0) count.toDouble / visits else 0.0
        ((from, to), rate)
      }

    // 输出结果
    conversionRates.collect().foreach { case ((from, to), rate) =>
      println(s"页面 $from -> $to 的转换率为:$rate")
    }

    sc.stop()
  }

  /**
   * 计算分子:指定页面对的跳转次数
   * @param rdd 原始数据RDD
   * @param pagePairs 指定的页面对列表
   * @return RDD[((Long, Long), Long)] 页面对及其跳转次数
   */
  def calculateNumerator(rdd: RDD[String], pagePairs: List[(Long, Long)]): RDD[((Long, Long), Long)] = {
    rdd.flatMap(line => {
      val parts = line.split("_")
      val userId = parts(1).toLong
      val pageId = parts(3).toLong
      val timestamp = parts(4).toLong
      ((userId, pageId, timestamp)) // 用户ID、页面ID、时间戳
    })
      .groupByKey() // 按用户分组
      .flatMapValues(iter => {
        val sortedPages = iter.toList.sortBy(_._3).map(_._2) // 按时间排序页面ID
        val pagePairs = sortedPages.zip(sortedPages.tail) // 生成用户跳转路径中的页面对
        pagePairs.filter(pair => pagePairs.contains(pair)) // 过滤指定页面对
      })
      .filter { case (_, pairs) => pairs.nonEmpty } // 过滤无跳转的用户
      .flatMapValues(pairs => pairs) // 展开页面对
      .map(pair => (pair, 1L)) // 转换为(页面对, 1)
      .reduceByKey(_ + _) // 统计跳转次数
      .filterKeys(pagePairs.contains) // 仅保留指定页面对
  }

  /**
   * 计算分母:指定页面的访问次数
   * @param rdd 原始数据RDD
   * @param pageIds 指定页面列表
   * @return Map[Long, Long] 页面ID及其访问次数
   */
  def calculateDenominator(rdd: RDD[String], pageIds: List[Long]): Map[Long, Long] = {
    rdd.map(line => {
      val parts = line.split("_")
      val pageId = parts(3).toLong
      if (pageIds.contains(pageId)) (pageId, 1L) else (0L, 0L) // 过滤非指定页面
    })
      .reduceByKey(_ + _) // 统计访问次数
      .collectAsMap() // 转换为Map
      .filterKeys(pageIds.contains) // 移除非指定页面(如0L)
  }
}

关键改进点说明

  1. 动态页面对生成

    • 使用pageIds.zip(pageIds.tail)生成所有相邻页面对,避免硬编码。
    • 支持任意长度的页面列表。
  2. 灵活过滤条件

    • 分子计算中直接检查页面对是否在pagePairs中,确保所有指定页面对均被统计。
    • 分母计算中直接检查页面是否在pageIds中,避免遗漏。
  3. 转换率安全计算

    • 使用if (visits > 0)避免除零错误,若分母为0则转换率为0。
  4. 代码可读性优化

    • 重命名变量(如pageIds代替ids),增强语义清晰度。
    • 添加注释说明每一步操作的目的。
  5. 性能优化

    • 对原始数据cache(),避免重复读取。
    • 使用filterKeys替代getOrElse,减少Map查询开销。

测试与验证建议

  1. 测试数据准备

    • 构造包含指定页面跳转和不跳转的日志数据,验证分子和分母统计准确性。
    • 包含边界情况(如用户仅访问单个页面、页面未被访问等)。
  2. 验证转换率计算

    • 手动计算简单场景的转换率,与程序输出对比。
    • 检查除零处理逻辑是否生效。
  3. 性能测试

    • 使用大规模日志数据测试程序执行效率。
    • 检查是否存在数据倾斜(如某些页面访问量过大)。

通过以上改进,代码能够正确、高效地计算指定页面的单跳转换率,并具备良好的扩展性和健壮性。


网站公告

今日签到

点亮在社区的每一天
去签到