PySpark与GraphFrames的安装与使用环境搭建过程
目录
- PySpark环境搭建
- 配置hadoop
- 安装pyspark与Java
- graphframes安装
- 使用方法
- 启动spark并读取数据
- 启动hive支持
- Spark的DataFrame与RDD
- DataFrame的基础api
- RDD的简介
- RDD的API概览
- 各类RDD
- cache&checkpoint
- graphframes的用法
- Motiffinding(模式发现)
- Subgraphs子图
- GraphFrames支持的GraphX算法
- PySpark3.X与pandas融合
- 自定义UDF和UDAF
- 分组聚合与JOIN
- map迭代
- Pyspark与Pandas的交互
PySpark环境搭建
配置hadoop
spark访问本地文件并执行运算时,可能会遇到权限问题或是dll错误。这是因为spark需要使用到Hadoop的winutils和hadoop.dll,首先我们必须配置好Hadoop相关的环境。可以到github下载:https://github.com/4ttty/winutils
gitcode提供了镜像加速:https://gitcode.net/mirrors/4ttty/winutils
我选择了使用这个仓库提供的最高的Hadoop版本3.0.0将其解压到D:\deploy\hadoop-3.0.0目录下,然后配置环境变量:
文章图片
我们还需要将对应的hadoop.dll复制到系统中,用命令表达就是:
copy D:\deploy\hadoop-3.0.0\bin\hadoop.dll C:\Windows\System32
不过这步需要拥有管理员权限才可以操作。
为了能够在任何地方使用winutils命令工具,将
%HADOOP_HOME%\bin
目录加入环境变量中:文章图片
安装pyspark与Java
首先,我们安装spark当前(2022-2-17)的最新版本:
pip install pyspark==3.2.1
需要注意pyspark的版本决定了jdk的最高版本,例如假如安装2.4.5版本的pyspark就只能安装1.8版本的jdk,否则会报出
java.lang.IllegalArgumentException: Unsupported class file major version 55
的错误。这是因为pyspark内置了Scala,而Scala是基于jvm的编程语言,Scala与jdk的版本存在兼容性问题,JDK与scala的版本兼容性表:
JDK version | Minimum Scala versions | Recommended Scala versions |
---|---|---|
17 | 2.13.6, 2.12.15 (forthcoming) | 2.13.6, 2.12.15 (forthcoming) |
16 | 2.13.5, 2.12.14 | 2.13.6, 2.12.14 |
13, 14, 15 | 2.13.2, 2.12.11 | 2.13.6, 2.12.14 |
12 | 2.13.1, 2.12.9 | 2.13.6, 2.12.14 |
11 | 2.13.0, 2.12.4, 2.11.12 | 2.13.6, 2.12.14, 2.11.12 |
8 | 2.13.0, 2.12.0, 2.11.0, 2.10.2 | 2.13.6, 2.12.14, 2.11.12, 2.10.7 |
6, 7 | 2.11.0, 2.10.0 | 2.11.12, 2.10.7 |
这里我依然选择安装jdk8的版本:
文章图片
测试一下:
>java -versionjava version "1.8.0_201"Java(TM) SE Runtime Environment (build 1.8.0_201-b09)Java HotSpot(TM) 64-Bit Server VM (build 25.201-b09, mixed mode)
jdk11的详细安装教程(jdk1.8在官网只有安装包,无zip绿化压缩包):
绿化版Java11的环境配置与Python调用Java
https://xxmdmst.blog.csdn.net/article/details/118366166
graphframes安装
pip安装当前最新的graphframes:
pip install graphframes==0.6
然后在官网下载graphframes的jar包。
下载地址:https://spark-packages.org/package/graphframes/graphframes
由于安装的pyspark版本是3.2,所以这里我选择了这个jar包:
文章图片
然后将该jar包放入pyspark安装目录的jars目录下:
文章图片
pyspark安装位置可以通过pip查看:
C:\Users\ASUS>pip show pysparkName: pysparkVersion: 3.2.1Summary: Apache Spark Python APIHome-page: https://github.com/apache/spark/tree/master/pythonAuthor: Spark DevelopersAuthor-email: dev@spark.apache.orgLicense: http://www.apache.org/licenses/LICENSE-2.0Location: d:\miniconda3\lib\site-packagesRequires: py4jRequired-by:
使用方法 学习pyspark的最佳路径是官网:https://spark.apache.org/docs/latest/quick-start.html
在下面的网页,官方提供了在线jupyter:
https://spark.apache.org/docs/latest/api/python/getting_started/index.html
文章图片
启动spark并读取数据
本地模式启动spark:
from pyspark.sql import SparkSession, Rowspark = SparkSession \.builder \.appName("Python Spark") \.master("local[*]") \.getOrCreate()sc = spark.sparkContextspark
文章图片
SparkSession输出的内容中包含了spark的web页面,新标签页打开页面后大致效果如上。
【PySpark与GraphFrames的安装与使用环境搭建过程】点击Environment选项卡可以查看当前环境中的变量:
文章图片
启动hive支持
找到pyspark的安装位置,例如我的电脑在D:\Miniconda3\Lib\site-packages\pyspark
手动创建conf目录并将hive-site.xml配置文件复制到其中。如果hive使用了MySQL作为原数据库,则还需要将MySQL对应的驱动jar包放入spark的jars目录下。
创建spark会话对象时可通过
enableHiveSupport()
开启hive支持:from pyspark.sql import SparkSessionfrom pyspark.sql import Rowspark = SparkSession \.builder \.appName("Python Spark SQL Hive integration example") \.enableHiveSupport() \.getOrCreate()sc = spark.sparkContextspark
spark访问hive自己创建的表有可能会出现如下的权限报错:
Caused by: java.lang.RuntimeException: The root scratch dir: /tmp/hive on HDFS s是因为当前用户不具备对\tmp\hive的操作权限:
hould be writable. Current permissions are: rwx------
>winutils ls \tmp\hivedrwx------ 1 BUILTIN\Administrators XIAOXIAOMING\None 0 May4 2020 \tmp\hive
把\tmp\hive目录的权限改为777即可顺利访问:
>winutils chmod 777 \tmp\hive>winutils ls \tmp\hivedrwxrwxrwx 1 BUILTIN\Administrators XIAOXIAOMING\None 0 May4 2020 \tmp\hive
Spark的DataFrame与RDD
从spark2.x开始将RDD和DataFrame的API统一抽象成dataset,DataFrame就是Dataset[Row],RDD则是Dataset.rdd。可以将DataFrame理解为包含结构化信息的RDD。
将含row的RDD转换为DataFrame只需要调用toDF方法或SparkSession的createDataFrame方法即可,也可以传入schema覆盖类型或名称设置。
DataFrame的基础api DataFrame默认支持DSL风格语法,例如:
//查看DataFrame中的内容df.show()//查看DataFrame部分列中的内容df.select(df['name'], df['age'] + 1).show()df.select("name").show()//打印DataFrame的Schema信息df.printSchema()//过滤age大于等于 21 的df.filter(df['age'] > 21).show()//按年龄进行分组并统计相同年龄的人数personDF.groupBy("age").count().show()
将DataFrame注册成表或视图之后即可进行纯SQL操作:
df.createOrReplaceTempView("people")//df.createTempView("t_person")//查询年龄最大的前两名spark.sql("select * from t_person order by age desc limit 2").show()//显示表的Schema信息spark.sql("desc t_person").show()
Pyspark可以直接很方便的注册udf并直接使用:
strlen = spark.udf.register("len", lambda x: len(x))print(spark.sql("SELECT len('test') length").collect())print(spark.sql("SELECT 'foo' AS text").select(strlen("text").alias('length')).collect())
执行结果:
[Row(length='4')]
[Row(length='3')]
RDD的简介 DataFrame的本质是对RDD的包装,可以理解为DataFrame=RDD[Row]+schema。
RDD(A Resilient Distributed Dataset)叫做弹性可伸缩分布式数据集,是Spark中最基本的数据抽象。它代表一个不可变、自动容错、可伸缩性、可分区、里面的元素可并行计算的集合。
在每一个RDD内部具有五大属性:
- 具有一系列的分区
- 一个计算函数操作于每一个切片
- 具有一个对其他RDD的依赖列表
- 对于 key-value RDDs具有一个Partitioner分区器
- 存储每一个切片最佳计算位置
**一个计算每个分区的函数。**Spark中RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。
**RDD之间的依赖关系。**RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。
**一个Partitioner,即RDD的分片函数。**当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。只有对于于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。
**一个列表,存储存取每个Partition的优先位置(preferred location)。**对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。
RDD的API概览 RDD包含Transformation API和 Action API,Transformation API都是延迟加载的只是记住这些应用到基础数据集上的转换动作,只有当执行Action API时这些转换才会真正运行。
Transformation API产生的两类RDD最重要,分别是MapPartitionsRDD和ShuffledRDD。
产生MapPartitionsRDD的算子有map、keyBy、keys、values、flatMap、mapValues 、flatMapValues、mapPartitions、mapPartitionsWithIndex、glom、filter和filterByRange 。其中用的最多的是map和flatMap,但任何产生MapPartitionsRDD的算子都可以直接使用mapPartitions或mapPartitionsWithIndex实现。
产生ShuffledRDD的算子有combineByKeyWithClassTag、combineByKey、aggregateByKey、foldByKey 、reduceByKey 、distinct、groupByKey、groupBy、partitionBy、sortByKey 和 repartitionAndSortWithinPartitions。
combineByKey到groupByKey 底层均是调用combineByKeyWithClassTag方法:
@Experimentaldef combineByKeyWithClassTag[C](createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {combineByKeyWithClassTag(createCombiner,mergeValue,mergeCombiners,defaultPartitioner(self))}def combineByKey[C](createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)}def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {val createCombiner = (v: V) => CompactBuffer(v)val mergeValue = https://www.it610.com/article/(buf: CompactBuffer[V], v: V) => buf += vval mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2val bufs = combineByKeyWithClassTag[CompactBuffer[V]](createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)bufs.asInstanceOf[RDD[(K, Iterable[V])]]}
三个重要参数的含义:
- createCombiner:根据每个分区的第一个元素操作产生一个初始值
- mergeValue:对每个分区内部的元素进行迭代合并
- mergeCombiners:对所有分区的合并结果进行合并
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "spider", "eagle"), 2).keyBy(_.length)a.groupByKey.collectres9: Array[(Int, Iterable[String])] = Array((4,CompactBuffer(lion)), (6,CompactBuffer(spider)), (3,CompactBuffer(dog, cat)), (5,CompactBuffer(tiger, eagle)))
aggregateByKey:每个分区使用zeroValue作为初始值,迭代每一个元素用seqOp进行合并,对所有分区的结果用combOp进行合并。例如:
val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collectres6: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6))pairRDD.aggregateByKey(100)(math.max(_, _), _ + _).collectres7: Array[(String, Int)] = Array((dog,100), (cat,200), (mouse,200))
reduceByKey :每个分区迭代每一个元素用func进行合并,对所有分区的结果用func再进行合并,例如:
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)val b = a.map(x => (x.length, x))b.reduceByKey(_ + _).collect
Action API有:
动作 | 含义 |
---|---|
reduce(func) | 通过func函数聚集RDD中的所有元素,这个功能必须是课交换且可并联的 |
collect() | 在驱动程序中,以数组的形式返回数据集的所有元素 |
count() | 返回RDD的元素个数 |
first() | 返回RDD的第一个元素(类似于take(1)) |
take(n) | 返回一个由数据集的前n个元素组成的数组 |
takeSample(withReplacement*,*num, [seed]) | 返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子 |
takeOrdered(n, [ordering]) | 排序并取前N个元素 |
saveAsTextFile(path) | 将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本 |
saveAsSequenceFile(path) | 将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。 |
saveAsObjectFile(path) | 将RDD中的元素用NullWritable作为key,实际元素作为value保存为sequencefile格式 |
countByKey() | 针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。 |
foreach(func) | 在数据集的每一个元素上,运行函数func进行更新。 |
object MapreduceWordcount {def main(args: Array[String]): Unit = {import org.apache.spark._val sc: SparkContext = new SparkContext(new SparkConf().setAppName("wordcount").setMaster("local[*]"))sc.setLogLevel("WARN")import org.apache.hadoop.io.{LongWritable, Text}import org.apache.hadoop.mapred.TextInputFormatimport org.apache.spark.rdd.HadoopRDDimport scala.collection.mutable.ArrayBufferdef map(k: LongWritable, v: Text, collect: ArrayBuffer[(String, Int)]) = {for (word <- v.toString.split("\\s+"))collect += ((word, 1))}def reduce(key: String, value: Iterator[Int], collect: ArrayBuffer[(String, Int)]) = {collect += ((key, value.sum))}val rdd = sc.hadoopFile("/hdfs/wordcount/in1/*", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], 2).asInstanceOf[HadoopRDD[LongWritable, Text]].mapPartitionsWithInputSplit((split, it) =>{val collect: ArrayBuffer[(String, Int)] = ArrayBuffer[(String, Int)]()it.foreach(kv => map(kv._1, kv._2, collect))collect.toIterator}).repartitionAndSortWithinPartitions(new HashPartitioner(2)).mapPartitions(it => {val collect: ArrayBuffer[(String, Int)] = ArrayBuffer[(String, Int)]()var lastKey: String = ""var values: ArrayBuffer[Int] = ArrayBuffer[Int]()for ((currKey, value) <- it) {if (!currKey.equals(lastKey)) {if (values.length != 0)reduce(lastKey, values.toIterator, collect)values.clear()}values += valuelastKey = currKey}if (values.length != 0) reduce(lastKey, values.toIterator, collect)collect.toIterator})rdd.foreach(println)}}
各类RDD
文章图片
- ShuffledRDD :表示需要走Shuffle过程的网络传输
- CoalescedRDD :用于将一台机器的多个分区合并成一个分区
- CartesianRDD :对两个RDD的所有元素产生笛卡尔积
- MapPartitionsRDD :用于对每个分区的数据进行特定的处理
- CoGroupedRDD :用于将2~4个rdd,按照key进行连接聚合
- SubtractedRDD :用于对2个RDD求差集
- UnionRDD和PartitionerAwareUnionRDD :用于对2个RDD求并集
- ZippedPartitionsRDD2:zip拉链操作产生的RDD
- ZippedWithIndexRDD:给每一个元素标记一个自增编号
- PartitionwiseSampledRDD:用于对rdd的元素按照指定的百分比进行随机采样
from pyspark.sql.types import StructType, LongTypeschema = data.schema.add(StructField("id", LongType()))rowRDD = data.rdd.zipWithUniqueId().map(lambda t: t[0]+Row(t[1]))data = https://www.it610.com/article/rowRDD.toDF(schema)data.show()
API用法详情可参考:https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.html#pyspark.RDD
cache&checkpoint RDD通过persist方法或cache方法可以将前面的计算结果缓存,但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。
rdd.persist()
checkpoint的源码注释可以看到:
- 标记该RDD作为检查点。
- 它将被保存在通过SparkContext#setCheckpointDir方法设置的检查点目录中
- 它所引用的所有父RDD引用将全部被移除
- 这个方法在这个RDD上必须在所有job执行前运行。
- 强烈建议将这个RDD缓存在内存中,否则这个保存文件的计算任务将重新计算。
sc.setCheckpointDir("checkpoint")rdd.cache()rdd.checkpoint()
graphframes 的用法
GraphFrame是将Spark中的Graph算法统一到DataFrame接口的Graph操作接口,为Scala、Java和Python提供了统一的图处理API。
Graphframes是开源项目,源码工程如下:https://github.com/graphframes/graphframes
可以参考:
- 官网:https://graphframes.github.io/graphframes/docs/_site/index.html
- GraphFrames用户指南-Python — Databricks文档:https://docs.databricks.com/spark/latest/graph-analysis/graphframes/user-guide-python.html
- 顶点DataFrame:必须包含列名为“id”的列,用于作为顶点的唯一标识
- 边DataFrame:必须包含列名为“src”和“dst”的列,根据唯一标识id标识关系
from graphframes import GraphFramevertices = spark.createDataFrame([("a", "Alice", 34),("b", "Bob", 36),("c", "Charlie", 30),("d", "David", 29),("e", "Esther", 32),("f", "Fanny", 36),("g", "Gabby", 60)], ["id", "name", "age"])edges = spark.createDataFrame([("a", "b", "friend"),("b", "c", "follow"),("c", "b", "follow"),("f", "c", "follow"),("e", "f", "follow"),("e", "d", "friend"),("d", "a", "friend"),("a", "e", "friend")], ["src", "dst", "relationship"])# 生成图g = GraphFrame(vertices, edges)
GraphFrame提供三种视图:
print("顶点表视图:")graph.vertices.show() # graph.vertices 就是原始的verticesprint("边表视图:")graph.edges.show() # graph.edges 就是原始的 edgesprint("三元组视图:")graph.triplets.show()
获取顶点的度、入度和出度:
# 顶点的度graph.degrees.show()# 顶点的入度graph.inDegrees.show()# 顶点的出度graph.outDegrees.show()
Motif finding (模式发现) 示例:
# 多个路径条件motif = graph.find("(a)-[e]->(b); (b)-[e2]->(a)")# 在搜索的结果上进行过滤motif.filter("b.age > 30")# 不需要返回路径中的元素时,可以使用匿名顶点和边motif = graph.find("(start)-[]->()")# 设置路径不存在的条件motif = graph.find("(a)-[]->(b); !(b)-[]->(a)")
假设我们要想给用户推荐关注的人,可以找出这样的关系:A关注B,B关注C,但是A未关注C。找出这样的关系就可以把C推荐给A:
# Motif: A->B->C but not A->Cresults = graph.find("(A)-[]->(B); (B)-[]->(C); !(A)-[]->(C)")# 排除自己results = results.filter("A.id != C.id")# 选择需要的列results = results.select(results.A.id.alias("A"), results.C.id.alias("C"))results.show()
结果:
+---+---+Motif在查找路径过程的过程中,还可以沿着路径携带状态。例如我们想要找出关系链有4个顶点,而且其中3条边全部都是"friend"关系:
|A|C|
+---+---+
|e|c|
|e|a|
|d|b|
|a|d|
|f|b|
|d|e|
|a|f|
|a|c|
+---+---+
from pyspark.sql.functions import col, lit, whenfrom functools import reducechain4 = graph.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[cd]->(d)")def sumFriends(cnt, relationship):"定义下一个顶点更新状态的条件:如果关系为friend则cnt+1"return when(relationship == "friend", cnt+1).otherwise(cnt)# 将更新方法应用到整个链的,链上每有一个关系是 friend 就加一,链上共三个关系。condition = reduce(lambda cnt, e: sumFriends(cnt, col(e).relationship), ["ab", "bc", "cd"], lit(0))chainWith2Friends2 = chain4.where(condition >= 3)chainWith2Friends2.show()
结果:
+---------------+--------------+---------------+--------------+---------------+--------------+---------------+
|a|ab|b|bc|c|cd|d|
+---------------+--------------+---------------+--------------+---------------+--------------+---------------+
| {a, Alice, 34}|{a, e, friend}|{e, Esther, 32}|{e, d, friend}| {d, David, 29}|{d, a, friend}| {a, Alice, 34}|
|{e, Esther, 32}|{e, d, friend}| {d, David, 29}|{d, a, friend}| {a, Alice, 34}|{a, b, friend}|{b, Bob, 36}|
| {d, David, 29}|{d, a, friend}| {a, Alice, 34}|{a, e, friend}|{e, Esther, 32}|{e, d, friend}| {d, David, 29}|
|{e, Esther, 32}|{e, d, friend}| {d, David, 29}|{d, a, friend}| {a, Alice, 34}|{a, e, friend}|{e, Esther, 32}|
+---------------+--------------+---------------+--------------+---------------+--------------+---------------+
Subgraphs 子图 可以直接过滤其顶点或边,
dropIsolatedVertices()
方法用于删除孤立没有连接的点:graph.filterVertices("age > 30").filterEdges("relationship = 'friend'").dropIsolatedVertices()
还可以基于模式发现获取到的边创建Subgraphs :
paths = graph.find("(a)-[e]->(b)")\.filter("e.relationship = 'follow'")\.filter("a.age < b.age")# 抽取边信息e2 = paths.select("e.src", "e.dst", "e.relationship")e2 = paths.select("e.*")# 创建Subgraphsg2 = GraphFrame(graph.vertices, e2)
GraphFrames支持的GraphX算法
- PageRank:查找图中的重要顶点。
- 广度优先搜索(BFS):查找从一组顶点到另一组顶点的最短路径
- 连通组件(ConnectedComponents):为具备连接关系的顶点分配相同的组件ID
- 强连通组件(StronglyConnectedConponents):根据每个顶点的强连通分量分配SCC。
- 最短路径(Shortest paths):查找从每个顶点到目标顶点集的最短路径。
- 三角形计数(TriangleCount):计算每个顶点所属的三角形的数量,经常用于确定组的稳定性(相互连接的数量代表了稳定性)或作为其他网络度量(如聚类系数)的一部分,在社交网络分析中用来检测社区。
- 标签传播算法(LPA):检测图中的社区。
results = graph.pageRank(resetProbability=0.15, maxIter=10)results.vertices.sort("pagerank", ascending=False).show()
结果:
+---+-------+---+-------------------+可以设置起始顶点:
| id|name|age|pagerank|
+---+-------+---+-------------------+
|b|Bob| 36| 2.7025217677349773|
|c|Charlie| 30| 2.6667877057849627|
|a|Alice| 34| 0.4485115093698443|
|e| Esther| 32| 0.3613490987992571|
|f|Fanny| 36|0.32504910549694244|
|d|David| 29|0.32504910549694244|
|g|Gabby| 60|0.17073170731707318|
+---+-------+---+-------------------+
graph.pageRank(resetProbability=0.15, maxIter=10, sourceId="a")graph.parallelPersonalizedPageRank(resetProbability=0.15, sourceIds=["a", "b", "c", "d"], maxIter=10)
广度优先搜索BFS:
搜索从姓名叫Esther到年龄小于32的最小路径:
paths = graph.bfs("name = 'Esther'", "age < 32")paths.show()
+--------------+--------------+---------------+|from|e0|to|+--------------+--------------+---------------+|{a, Alice, 34}|{a, e, friend}|{e, Esther, 32}|+--------------+--------------+---------------+
可以指定只能在指定的边搜索:
graph.bfs("name = 'Esther'","age < 32",edgeFilter="relationship != 'friend'",maxPathLength=4).show()
+---------------+--------------+--------------+--------------+----------------+|from|e0|v1|e1|to|+---------------+--------------+--------------+--------------+----------------+|{e, Esther, 32}|{e, f, follow}|{f, Fanny, 36}|{f, c, follow}|{c, Charlie, 30}|+---------------+--------------+--------------+--------------+----------------+
Connected components 连通组件:
必须先设置检查点:
sc.setCheckpointDir("checkpoint")graph.connectedComponents().show()
结果:
+---+-------+---+------------+可以看到仅g点在一个连通区域内,可以调用
| id|name|age|component|
+---+-------+---+------------+
|a|Alice| 34|412316860416|
|b|Bob| 36|412316860416|
|c|Charlie| 30|412316860416|
|d|David| 29|412316860416|
|e| Esther| 32|412316860416|
|f|Fanny| 36|412316860416|
|g|Gabby| 60|146028888064|
+---+-------+---+------------+
dropIsolatedVertices()
方法,删除这种孤立的没有连接的点:graph.dropIsolatedVertices().connectedComponents().show()
结果:
+---+-------+---+------------+Strongly connected components 强连通组件:
| id|name|age|component|
+---+-------+---+------------+
|a|Alice| 34|412316860416|
|b|Bob| 36|412316860416|
|c|Charlie| 30|412316860416|
|d|David| 29|412316860416|
|e| Esther| 32|412316860416|
|f|Fanny| 36|412316860416|
+---+-------+---+------------+
graph.stronglyConnectedComponents(maxIter=10).show()
Shortest paths 最短路径:
每个顶点到a或d的最短路径:
graph.shortestPaths(landmarks=["a", "d"]).show()
+---+-------+---+----------------+| id|name|age|distances|+---+-------+---+----------------+|g|Gabby| 60|{}||f|Fanny| 36|{}||e| Esther| 32|{a -> 2, d -> 1}||d|David| 29|{a -> 1, d -> 0}||c|Charlie| 30|{}||b|Bob| 36|{}||a|Alice| 34|{a -> 0, d -> 2}|+---+-------+---+----------------+
Triangle count 三角形计数:
graph.triangleCount().show()
+-----+---+-------+---+|count| id|name|age|+-----+---+-------+---+|1|a|Alice| 34||0|b|Bob| 36||0|c|Charlie| 30||1|d|David| 29||1|e| Esther| 32||0|g|Gabby| 60||0|f|Fanny| 36|+-----+---+-------+---+
说明顶点a/e/d构成三角形。
标签传播算法(LPA):
graph.labelPropagation(maxIter=5).orderBy("label").show()
+---+-------+---+-------------+| id|name|age|label|+---+-------+---+-------------+|g|Gabby| 60| 146028888064||f|Fanny| 36|1047972020224||b|Bob| 36|1047972020224||a|Alice| 34|1382979469312||c|Charlie| 30|1382979469312||e| Esther| 32|1460288880640||d|David| 29|1460288880640|+---+-------+---+-------------+
PySpark3.X与pandas融合
Pyspark从3.0版本开始出现了pandas_udf装饰器、applyInPandas和mapInPandas,基于这些方法,我们就可以使用熟悉的pandas的语法处理spark对象的数据。
首先创建几条测试数据,并启动 Apache Arrow:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")df = spark.createDataFrame([(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],("id", "v"))df.show()
自定义UDF和UDAF
pyspark暂不支持自定义UDTF。使用pandas_udf装饰器我们可以创建出基于pandas的udf自定义函数,在DSL的语法中可以被直接使用:
from pyspark.sql.functions import pandas_udfimport pandas as pd@pandas_udf("double")def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:return a * bdf.select(multiply_func("id", "v").alias("product")).show()
注册函数和视图后,可以直接在SQL中使用:
df.createOrReplaceTempView("t")spark.udf.register("multiply", multiply_func)spark.sql('select multiply(id, v) product from t').show()
结果均为:
+-------+还支持聚合函数和窗口函数:
|product|
+-------+
|1.0|
|2.0|
|6.0|
|10.0|
|20.0|
+-------+
from pyspark.sql import Window@pandas_udf("double")def mean_udf(v: pd.Series) -> float:return v.mean()# 对字段'v'进行求均值df.select(mean_udf('v').alias("mean_v")).show()# 按照‘id'分组,求'v'的均值df.groupby("id").agg(mean_udf('v').alias("mean_v")).show()# 按照‘id'分组,求'v'的均值,并赋值给新的一列df.withColumn('mean_v', mean_udf("v").over(Window.partitionBy('id'))).show()
注册到udf之后同样可以直接使用SQL实现:
spark.udf.register("mean2", mean_udf)spark.sql('select mean2(v) mean_v from t').show()spark.sql('select id,mean2(v) mean_v from t group by id').show()spark.sql('select id,v,mean2(v) over(partition by id) mean_v from t').show()
结果均为:
+--------+
| mean_v |
+--------+
|4.2|
+--------+
+---+--------+
| id| mean_v |
+---+--------+
|1|1.5|
|2|6.0|
+---+--------+
+---+----+------+
| id|v|mean_v|
+---+----+------+
|1| 1.0|1.5|
|1| 2.0|1.5|
|2| 3.0|6.0|
|2| 5.0|6.0|
|2|10.0|6.0|
+---+----+------+
分组聚合与JOIN applyInPandas需要在datafream调用groupby之后才能使用:
def subtract_mean(pdf):v = pdf.vpdf['v1'] = v - v.mean()pdf['v2'] = v + v.mean()return pdft = df.groupby("id")t.applyInPandas(subtract_mean, schema="id long, v double, v1 double, v2 double").show()
结果:
+---+----+----+----+subtract_mean函数接收的是对应id的dataframe数据,schema指定了返回值的名称和类型列表。
| id|v|v1|v2|
+---+----+----+----+
|1| 1.0|-0.5| 2.5|
|1| 2.0| 0.5| 3.5|
|2| 3.0|-3.0| 9.0|
|2| 5.0|-1.0|11.0|
|2|10.0| 4.0|16.0|
+---+----+----+----+
通过以下代码我们可以知道,applyInPandas可以借助cogroup进行表连接:
val a = sc.parallelize(List(1, 2, 1, 3))val b = a.map((_, "b"))val c = a.map((_, "c"))val d = a.map((_, "d"))val e = a.map((_, "e"))scala> b.cogroup(c).foreach(println)(3,(CompactBuffer(b),CompactBuffer(c)))(1,(CompactBuffer(b, b),CompactBuffer(c, c)))(2,(CompactBuffer(b),CompactBuffer(c)))
示例:
df1 = spark.createDataFrame([(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],("time", "id", "v1"))df2 = spark.createDataFrame([(20000101, 1, "x"), (20000101, 2, "y")],("time", "id", "v2"))def asof_join(l, r):# l、r is a pandas.DataFrame# 这里是按照id分组# 那么,l和r分别是对应id的df1和df2数据return pd.merge_asof(l, r, on="time", by="id")df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(asof_join, schema="time int, id int, v1 double, v2 string").show()# +--------+---+---+---+# |time| id| v1| v2|# +--------+---+---+---+# |20000101|1|1.0|x|# |20000102|1|3.0|x|# |20000101|2|2.0|y|# |20000102|2|4.0|y|# +--------+---+---+---+
map迭代 执行以下代码:
def filter_func(iterator):for i, pdf in enumerate(iterator):print(i, pdf.values.tolist())yield pdfdf.mapInPandas(filter_func, schema=df.schema).show()
后台看到执行结果为:
0 [[2.0, 5.0]]前台结果几乎保持原样。可以知道iterator是一个分区迭代器,迭代出当前分区的每一行数据都被封装成一个pandas对象。
0 [[2.0, 3.0]]
0 [[1.0, 1.0]]
0 [[1.0, 2.0]]
0 [[2.0, 10.0]]
Pyspark与Pandas的交互 将spark的Datafream对象转换为原生的pandas对象只需调用toPandas()方法即可:
sdf.toPandas()
将原生的pandas对象转换为spark对象可以使用spark的顶级方法:
spark.createDataFrame(pdf)
习惯使用pandas的童鞋,还可以直接使用pandas-on-Spark,在spark3.2.0版本时已经匹配到pandas 1.3版本的API。通过pandas-on-Spark,我们可以完全用pandas的api操作数据,而底层执行却是spark的并行化。
使用pandas-on-Spark最好设置一下环境变量:
import osos.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
将spark对象转换为pandas-on-Spark对象:
df = spark.createDataFrame([(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],("id", "v"))pdf = df.to_pandas_on_spark()print(type(pdf))pdf
文章图片
pandas-on-Spark对象也可以还原成spark对象:
pdf.to_spark()
另外spark提供直接将文件读取成pandas-on-Spark对象的api,例如:
import pyspark.pandas as pspdf = ps.read_csv("example_csv.csv")
ps对象与原生pandas对象的API几乎完全一致。
ps对象相对于原生pandas对象的API几乎一致,同时还支持一些强悍的功能,例如直接以SQL形式访问:
ps.sql("SELECT count(*) as num FROM {pdf}")
{pdf}访问了变量名为pdf的pandas-on-Spark对象。到此这篇关于PySpark与GraphFrames的安装与使用的文章就介绍到这了,更多相关PySpark与GraphFrames使用内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
推荐阅读
- 乘风破浪的投资人|对话顶级机构青年合伙人:五大热门领域的变化与机遇
- 新能源与纯电动技术|这么大的事儿,宁德时代就聊了不到4分钟?
- 防火墙基础之流量管理与控制
- 前端面试题|【前端面试题】防抖与节流-js
- 数据资产为王,如何解析企业数字化转型与数据资产管理的关系()
- 投稿|宁德时代CTP3.0,麒麟电池的喜与忧
- 阿里云体验有奖(如何将 PolarDB-X 与大数据等系统互通)
- 面试|MYSQL中的索引与事务———javaweb(8)(面试必考)
- 计算机毕业论文和程序设计|基于 SpringBoot 的个人博客系统设计与实现(含论文与程序代码).rar
- 投稿|中国奶粉一哥背后,飞鹤的内忧与外困