大数据架构师必知必会系列:图计算

发布于:2023-09-28 ⋅ 阅读:(88) ⋅ 点赞:(0)

作者:禅与计算机程序设计艺术

1.简介

图计算是一种用来处理复杂网络结构数据的快速、高效的方式。它广泛应用于社交网络分析、推荐系统、金融风险管理、科学信息发现等领域。GraphX是Spark中用于处理图形结构数据的API。本专栏将介绍GraphX在Spark中的主要特性及其用法,帮助读者更好地理解并掌握图计算相关技术的运用。

图数据模型的定义: 图(Graph)由一个顶点集合V和边集合E组成,其中每条边关联两个顶点。不同的图模型基于图的形式、权重或其他的特征不同而定义。如:

1)简单图:表示无向图和有向图 2)加权图:所有边均带有权值或长度信息 3)带属性图:每个顶点和边都可以有属性标签

图的存储方式:

1)邻接矩阵:将图视为一个二维数组,数组的行列分别对应着图中的顶点;数组元素表示两节点间是否存在边。对于稀疏图来说,该方法比较适用,但对于稠密图来说,耗费空间过多。 2)基于列表的存储:将图视为顶点集合与边集列表的组合。这样可以灵活地插入、删除和修改图的结构。这种方法比较适合稠密图。 3)分块邻接表:针对稠密图的优化方法。把图划分成多个小块,每个块仅包含相连的顶点。可以降低查询的时间复杂度。

Spark GraphX是在Spark上实现图计算的API。主要特性包括:

1)支持多种图模型:包括简单图、加权图、带属性图,这些模型的表示方法也是不同的。 2)内置图分析算法:提供了丰富的图分析算法,比如PageRank、Connected Components、Triangle Counting、Strongly Connected Components、Single Source Shortest Path等。 3)分布式执行:通过RDD(Resilient Distributed Datasets)的容错机制,使得图计算框架具有高可用性。 4)高性能:采用了Spark的并行计算框架,并利用了Spark平台上的优势,比如自动调度、内存管理、任务切分等。

本专栏将以页面级为主题,逐一介绍GraphX中各项功能的使用方法和原理。希望通过本专栏的学习,读者能够充分了解图计算相关知识,提升自己的工作能力、解决实际问题。

2.图数据模型

(一)图的表示形式

GraphX目前支持两种图的表示形式:

  • RDD[(VertexId, (VD))]

    • VertexId: 每个顶点的唯一标识符
    • VD: 顶点的数据类型,即VD可以是任何用户定义的数据类型
  • RDD[(Edge[ED], Iterable[(VertexId, Attributes)])]

    • Edge[ED]: 表示一条边,ED是边的数据类型
    • Iterable[(VertexId, Attributes)]: 指向该边的顶点ID及属性

一般来说,我们习惯用边的形式表示图,因为边比顶点更直观。对于边,需要包含起点顶点的ID,终点顶点的ID,以及边的属性。图数据模型的另一种表达形式为顶点邻接表,即对于每个顶点,记录它的直接邻居顶点以及它们之间的关系。

(二)图的查询

GraphX提供了三种查询图的方法:

  • 查找顶点
    • graph.vertices
    • 根据顶点ID查找指定的顶点,返回结果为RDD[VertexId, VD],RDD中包含所有满足条件的顶点。
  • 查找边
    • graph.edges
    • 根据边的起始顶点ID和目标顶点ID查找指定的边,返回结果为RDD[Edge[ED]],RDD中包含所有满足条件的边。
  • 查找顶点的邻居
    • graph.neighbors
    • 查询指定顶点的所有邻居顶点,返回结果为RDD[(VertexId, VD)], RDD中包含所有邻居顶点。

另外,还提供了一些可选参数:

  • direction: 指定查询方向,可以是In或者Out,默认为Both。
  • edges: 可以指定一个子集的边进行查询,返回结果为RDD[Edge[ED]].
  • coalesceEdges: 当图具有许多冗余的边时,可以对边进行合并,以减少磁盘和内存开销。

(三)图的变换

GraphX允许对图进行变换,包括创建子图、过滤顶点、投影、连接、Union等。

创建子图

可以通过顶点ID集合或者顶点属性条件创建子图。

例如,假设graph是一个包含若干顶点的图对象,我们要创建一个只包含具有特定标签(如“user”)的子图:

val subgraph = graph.subgraph(vpred = (vid, vd) => vd._2 == "user")

这里使用的vpred参数是一个函数,它接收到每个顶点的ID和属性,返回布尔值表示是否应该包含这个顶点。

过滤顶点

可以使用filterVertices()方法来过滤掉图中不需要的顶点。传入一个判断函数,若判断函数返回true则保留顶点,否则过滤掉。

例如,假设graph是一个包含若干顶点的图对象,我们要过滤掉所有年龄超过一定阈值的用户:

val filteredSubgraph = graph.filterVertices((vid, vd) => vd("age") < threshold)

这里使用的filterVertices()方法接受一个判断函数,函数的参数是顶点ID和属性,返回布尔值表示是否保留顶点。

投影

使用projectVertices()方法可以选择需要的顶点的属性,返回结果为RDD[(VertexId, A)], A表示选择的属性类型。

例如,假设graph是一个包含若干顶点的图对象,我们要选择其中的“name”和“gender”作为新图的顶点属性:

val projectedGraph = graph.projectVertices((vid, vd) => (vd("name"), vd("gender")))

这里使用的projectVertices()方法接受一个转换函数,函数的参数是顶点ID和属性,返回一个新的属性。

连接

使用joinVertices()方法可以对两个图进行连接,生成一个包含两个图顶点的新图。

例如,假设graph1和graph2都是包含若干顶点的图对象,我们想得到一个新的图,其中所有顶点都来自graph1,且其对应的顶点的属性是来源于graph1和graph2的共同属性:

val connectedGraph = graph1.joinVertices(graph2)((vid, attr1), (_, attr2))

这里使用的joinVertices()方法接受一个转换函数,该函数接收到两个顶点的属性,返回一个新的属性。

Union

使用union()方法可以将两个图合并为一个新图,两个图必须具有相同类型的顶点属性。

例如,假设graph1和graph2都是包含若干顶点的图对象,我们要得到一个新的图,其所有顶点来自graph1和graph2,但是顶点属性是分别来自graph1和graph2的:

val unionedGraph = graph1.union(graph2)

注意,两个图中的顶点不能有重复的ID。

3.图计算算法

(一)PageRank

PageRank算法是搜索引擎最著名的排名算法之一,用来评估网页的重要性。GraphX也内置了PageRank算法。

PageRank算法的计算过程如下:

1)给定一个随机概率值d(通常取0.85),初始分配给每个顶点一个等概率的分数(权重)。 2)重复以下步骤,直至收敛: a) 将每个顶点的入射链接(从当前页面指向其他页面的超链接数量)乘以该顶点的权重,获得该顶点的当前点引力。 b) 对所有顶点按照其获得的引力进行加权平均,获得新的权重。 c) 检查新的权重是否与之前的权重一致,若一致则停止迭代。 3)输出每个顶点的最终权重,其值反映了其在整个互联网中所占的重要性。

为了使用PageRank算法,首先需要导入GraphX包:

import org.apache.spark.graphx.{Graph, VertexRDD}

然后,可以调用pageRank()方法计算出每个顶点的重要性分数:

val pageRankGraph = graph.pageRank(0.0001) // 设置精度参数

这里设置的参数表示精度,值越小精度越高。

也可以自定义图的连接方式和度量标准。

(二)Connected Components

计算图的连接组件是一个非常重要的问题,由于图数据的复杂性,它难以直接解决。然而,GraphX提供了一种有效的算法——ConnectedComponents。

该算法的计算过程如下:

1)初始化每个顶点都属于单独的连通分量。 2)对图的每条边进行一次迭代,根据顶点之间的边,将它们所在的连通分量合并。 3)重复第2步,直至图不再变化。 4)输出每个顶点的连通分量ID。

为了使用Connected Components算法,首先需要导入GraphX包:

import org.apache.spark.graphx.{Graph, VertexRDD}

然后,可以调用connectedComponents()方法计算出每个顶点所在的连通分量ID:

val ccGraph = graph.connectedComponents()

默认情况下,它不会输出连通分量的大小。如果需要的话,可以调用groupCount()方法统计连通分量的大小:

ccGraph.groupCount().collectAsMap()

它将返回一个map,键是连通分量ID,值是该连通分量中顶点的个数。

此外,也可以使用pregel()方法实现类似的算法,当然前提是图的边没有方向。

(三)Triangle Counting

Triangle Counting是用来计算图中三角形数量的算法。在计算图的三角计数时,GraphX提供了一种有效的算法——TriangleCounting。

该算法的计算过程如下:

1)遍历图中的每条边。 2)对于每条边,检查是否存在与该边共享三个顶点的三角形,若存在则计数器+1。 3)输出计数器的值。

为了使用Triangle Counting算法,首先需要导入GraphX包:

import org.apache.spark.graphx.{Graph, Pregel}

然后,可以调用triangleCount()方法计算出图的三角计数:

val triangleCounts = graph.triangleCount()

该方法可以计算出顶点的三角计数,也可以同时计算出三角形的计数。

(四) stronglyConnectedComponents

Tarjan算法是用于计算图的强连通分量的经典算法。然而,GraphX提供了一种基于DFS的改进算法——stronglyConnectedComponents。

该算法的计算过程如下:

1)初始化每个顶点都属于单独的连通分量。 2)对图的每个顶点进行DFS,记录前序编号和后序编号。 3)对图的每个边进行一次迭代,根据顶点的前序编号和后序编号,将它们所在的连通分量合并。 4)重复第2、3步,直至图不再变化。 5)输出每个顶点的连通分量ID。

为了使用stronglyConnectedComponents算法,首先需要导入GraphX包:

import org.apache.spark.graphx.{Graph, VertexRDD}

然后,可以调用stronglyConnectedComponents()方法计算出每个顶点所在的连通分量ID:

val sccGraph = graph.stronglyConnectedComponents()

默认情况下,它不会输出连通分量的大小。如果需要的话,可以调用groupCount()方法统计连通分量的大小:

sccGraph.groupCount().collectAsMap()

它将返回一个map,键是连通分量ID,值是该连通分量中顶点的个数。

4.代码示例

下面是几个GraphX API的使用示例:

(一)创建图

假设有一个图的文本文件vertex.txt和edge.txt,里面记录了图中的顶点信息和边信息。第一行为顶点数和边数,后面每行分别记录了一个顶点的ID、属性、邻居数和邻居ID。

用Scala语言读取这两个文件并创建图:

// 创建顶点RDD
case class NodeAttr(id: Long, age: Int, gender: String)
val nodes = sc.textFile("vertex.txt").map{line=>
    val fields = line.split("\t")
    val nodeId = fields(0).toLong
    val attrs = Array(fields(1), fields(2)).asInstanceOf[Array[String]]
    (nodeId, NodeAttr(nodeId, attrs(0).toInt, attrs(1)))
}.cache()

// 创建边RDD
case class EdgeAttr(count: Double)
val edges = sc.textFile("edge.txt").flatMap{line=>
    val fields = line.split("\t")
    val srcNodeId = fields(0).toLong
    val neighborIds = fields(2).split(",").map(_.toLong).filter(_!= srcNodeId)
    if(neighborIds.length > 0){
        neighborIds.map { dstNodeId =>
            ((srcNodeId, dstNodeId), EdgeAttr(1.0))
        }
    }else{
        Iterator.empty
    }
}.cache()

// 创建图
val graph = Graph(nodes, edges)

(二)创建子图

假设有一个图,我们想要创建只包含年龄小于等于30岁的用户的子图:

val subgraph = graph.subgraph(vpred = (vid, vd) => vd.age <= 30)

这里使用了vpred参数来指定顶点属性的过滤条件,只有符合条件的顶点才会被包含在子图中。

(三)过滤顶点

假设有一个图,我们想要过滤掉所有年龄大于等于30岁的用户:

val filteredSubgraph = graph.filterVertices((vid, vd) => vd.age < 30)

这里使用filterVertices()方法来过滤掉顶点,参数是一个判断函数,接收到每个顶点的ID和属性,返回布尔值表示是否保留这个顶点。

(四)投影

假设有一个图,我们想要选择其中的名字和性别作为新的图的顶点属性:

val projectedGraph = graph.projectVertices((vid, vd) => (vd.name, vd.gender))

这里使用projectVertices()方法来选择顶点属性,参数是一个转换函数,接收到每个顶点的ID和属性,返回一个新的属性。

(五)连接

假设有两个图,我们想要得到一个新的图,其中所有顶点都来自graph1,且其对应的顶点的属性是来源于graph1和graph2的共同属性:

val connectedGraph = graph1.joinVertices(graph2)((vid, attr1), (_, attr2))

这里使用joinVertices()方法来连接两个图,参数是一个转换函数,接收到两个顶点的属性,返回一个新的属性。

(六)Union

假设有两个图,我们想要得到一个新的图,其所有顶点来自graph1和graph2,但是顶点属性是分别来自graph1和graph2的:

val unionedGraph = graph1.union(graph2)

注意,两个图中的顶点不能有重复的ID。

5.未来发展方向

随着图计算技术的不断发展,GraphX也正在逐渐演化。GraphX将会继续持续增长,并加入更多的功能,比如动态图、Bipartite Graph、GraphFrames等。

最后,推荐阅读下面的参考文献:

1. , et al. "Graph computation at big data scale." Proceedings of the VLDB Endowment. Vol. 7. No. 9. 2013.

2. . "Distributed computing with graphs: GraphLab and graph computation frameworks." Foundations and Trends in Computer Science. Vol. 4. No. 2. 2013.

3.https://en.wikipedia.org/wiki/Graph_theory


网站公告

今日签到

点亮在社区的每一天
去签到