抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >


Spark RDD 编程

RDD 编程基础

RDD 创建

从文件系统中加载数据创建 RDD

Spark 采用 textFile() 方法来从文件系统中加载数据创建 RDD
该方法把文件的 URI 作为参数,这个 URI 可以是

  • 本地文件系统的地址
  • 分布式文件系统 HDFS 的地址
  • AmazonS3 的地址
  • 等等
从本地文件系统中加载数据创建 RDD
>>> lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
>>> lines.foreach(print)
Hadoop is good
Spark is fast
Spark is better

从本地文件系统中加载数据创建 RDD

从分布式文件系统 HDFS 中加载数据
>>> lines = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")
>>> lines = sc.textFile("/user/hadoop/word.txt")
>>> lines = sc.textFile("word.txt")

通过并行集合(列表)创建 RDD

可以调用 SparkContextparallelize 方法,在 Driver 中一个已经存在的集合(列表)上创建。

>>> array = [1,2,3,4,5]
>>> rdd = sc.parallelize(array)
>>> rdd.foreach(print)
1
2
3
4
5

通过并行集合(列表)创建 RDD

RDD 操作

转换操作

对于 RDD 而言,每一次转换操作都会产生不同的 RDD,供给下一个 “转换” 使用.
转换得到的 RDD 是惰性求值的,也就是说,整个转换过程只是记录了转换的轨迹,并不会发生真正的计算,只有遇到行动操作时,才会发生真正的计算,开始从血缘关系源头开始,进行物理的转换操作.

操作含义
filter(func) 筛选出满足函数 func 的元素,并返回一个新的数据集
map(func) 将每个元素传递到函数 func 中,并将结果返回为一个新的数据集
flatMap(func) 与 map () 相似,但每个输入元素都可以映射到 0 或多个输出结果
groupByKey() 应用于 (K,V) 键值对的数据集时,返回一个新的 (K,Iterable) 形式的数据集
reduceByKey(func) 应用于 (K,V) 键值对的数据集时,返回一个新的 (K,V) 形式的数据集,其中每个值是将每个 key 传递到函数 func 中进行聚合后的结果
filter(func)

筛选出满足函数 func 的元素,并返回一个新的数据集

>>>  lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
>>> linesWithSpark = lines.filter(lambda line: "Spark" in line)
>>> linesWithSpark.foreach(print)
Spark is better
Spark is fast

filter

map(func)

map(func) 操作将每个元素传递到函数 func 中,并将结果返回为一个新的数据集

>>> data = [1,2,3,4,5]
>>> rdd1 = sc.parallelize(data)
>>> rdd2 = rdd1.map(lambda x:x+10)
>>> rdd2.foreach(print)
11
13
12
14
15

map

flatMap(func)

>>> lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
>>> words = lines.flatMap(lambda line:line.split(" "))

flatMap

groupByKey()

应用于 (K,V) 键值对的数据集时,返回一个新的 (K, Iterable) 形式的数据集

>>> words = sc.parallelize([("Hadoop",1),("is",1),("good",1), \
... ("Spark",1),("is",1),("fast",1),("Spark",1),("is",1),("better",1)])
>>> words1 = words.groupByKey()
>>> words1.foreach(print)
('Hadoop', <pyspark.resultiterable.ResultIterable object at 0x7fb210552c88>)
('better', <pyspark.resultiterable.ResultIterable object at 0x7fb210552e80>)
('fast', <pyspark.resultiterable.ResultIterable object at 0x7fb210552c88>)
('good', <pyspark.resultiterable.ResultIterable object at 0x7fb210552c88>)
('Spark', <pyspark.resultiterable.ResultIterable object at 0x7fb210552f98>)
('is', <pyspark.resultiterable.ResultIterable object at 0x7fb210552e10>)

groupByKey

reduceByKey(func)

应用于 (K,V) 键值对的数据集时,返回一个新的 (K, V) 形式的数据集,其中的每个值是将每个 key 传递到函数 func 中进行聚合后得到的结果

>>> words = sc.parallelize([("Hadoop",1),("is",1),("good",1),("Spark",1), \
... ("is",1),("fast",1),("Spark",1),("is",1),("better",1)])
>>> words1 = words.reduceByKey(lambda a,b:a+b)
>>> words1.foreach(print)
('good', 1)
('Hadoop', 1)
('better', 1)
('Spark', 2)
('fast', 1)
('is', 3)

reduceByKey

行动操作

行动操作是真正触发计算的地方。Spark 程序执行到行动操作时,才会执行真正的计算,从文件中加载数据,完成一次又一次转换操作,最终,完成行动操作得到结果。

操作含义
count() 返回数据集中的元素个数
collect() 以数组的形式返回数据集中的所有元素
first() 返回数据集中的第一个元素
take(n) 以数组的形式返回数据集中的前 n 个元素
reduce(func) 通过函数 func(输入两个参数并返回一个值)聚合数据集中的元素
foreach(func) 将数据集中的每个元素传递到函数 func 中运行

惰性机制

所谓的 “惰性机制” 是指,整个转换过程只是记录了转换的轨迹,并不会发生真正的计算,只有遇到行动操作时,才会触发 “从头到尾” 的真正的计算
这里给出一段简单的语句来解释 Spark 的惰性机制

>>> lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
>>> lineLengths = lines.map(lambda s:len(s))
>>> totalLength = lineLengths.reduce(lambda a,b:a+b)
>>> print(totalLength)

持久化

在 Spark 中,RDD 采用惰性求值的机制,每次遇到行动操作,都会从头开始执行计算。每次调用行动操作,都会触发一次从头开始的计算。这对于迭代计算而言,代价是很大的,迭代计算经常需要多次重复使用同一组数据

>>> list = ["Hadoop","Spark","Hive"]
>>> rdd = sc.parallelize(list)
>>> print(rdd.count()) //行动操作,触发一次真正从头到尾的计算
3
>>> print(','.join(rdd.collect())) //行动操作,触发一次真正从头到尾的计算
Hadoop,Spark,Hive
  • 可以通过持久化(缓存)机制避免这种重复计算的开销
  • 可以使用 persist() 方法对一个 RDD 标记为持久化
  • 之所以说 “标记为持久化”,是因为出现 persist() 语句的地方,并不会马上计算生成 RDD 并把它持久化,而是要等到遇到第一个行动操作触发真正计算以后,才会把计算结果进行持久化
  • 持久化后的 RDD 将会被保留在计算节点的内存中被后面的行动操作重复使用
>>> list = ["Hadoop","Spark","Hive"]
>>> rdd = sc.parallelize(list)
>>> rdd.cache() #会调用persist(MEMORY_ONLY),但是,语句执行到这里,并不会缓存rdd,因为这时rdd还没有被计算生成
>>> print(rdd.count()) #第一次行动操作,触发一次真正从头到尾的计算,这时上面的rdd.cache()才会被执行,把这个rdd放到缓存中
3
>>> print(','.join(rdd.collect())) #第二次行动操作,不需要触发从头到尾的计算,只需要重复使用上面缓存中的rdd
Hadoop,Spark,Hive

分区

RDD 是弹性分布式数据集,通常 RDD 很大,会被分成很多个分区,分别保存在不同的节点上
RDD 分区的一个原则是使得分区的个数尽量等于集群中的 CPU 核心(core)数目

键值对 RDD

键值对 RDD 的创建

从文件中加载

可以采用多种方式创建键值对 RDD,其中一种主要方式是使用 map() 函数来实现

>>> lines = sc.textFile("file:///usr/local/spark/mycode/pairrdd/word.txt")
>>> pairRDD = lines.flatMap(lambda line:line.split(" ")).map(lambda word:(word,1))
>>> pairRDD.foreach(print)
('I', 1)
('love', 1)
('Hadoop', 1)
……

通过并行集合(列表)创建 RDD

>>> list = ["Hadoop","Spark","Hive","Spark"]
>>> rdd = sc.parallelize(list)
>>> pairRDD = rdd.map(lambda word:(word,1))
>>> pairRDD.foreach(print)
(Hadoop,1)
(Spark,1)
(Hive,1)
(Spark,1)

常用的键值对 RDD 转换操作

reduceByKey(func)

使用 func 函数合并具有相同键的值

>>> pairRDD = sc.parallelize([("Hadoop",1),("Spark",1),("Hive",1),("Spark",1)])
>>> pairRDD.reduceByKey(lambda a,b:a+b).foreach(print)
('Spark', 2)
('Hive', 1)
('Hadoop', 1)

groupByKey()

对具有相同键的值进行分组

>>> list = [("spark",1),("spark",2),("hadoop",3),("hadoop",5)]
>>> pairRDD = sc.parallelize(list)
>>> pairRDD.groupByKey()
PythonRDD[27] at RDD at PythonRDD.scala:48
>>> pairRDD.groupByKey().foreach(print)
('hadoop', <pyspark.resultiterable.ResultIterable object at 0x7f2c1093ecf8>)
('spark', <pyspark.resultiterable.ResultIterable object at 0x7f2c1093ecf8>)

sortByKey()

返回一个根据键排序的 RDD

>>> list = [("Hadoop",1),("Spark",1),("Hive",1),("Spark",1)]
>>> pairRDD = sc.parallelize(list)
>>> pairRDD.foreach(print)
('Hadoop', 1)
('Spark', 1)
('Hive', 1)
('Spark', 1)
>>> pairRDD.sortByKey().foreach(print)
('Hadoop', 1)
('Hive', 1)
('Spark', 1)
('Spark', 1)

mapValues(func)

对键值对 RDD 中的每个 value 都应用一个函数,但是,key 不会发生变化

>>> list = [("Hadoop",1),("Spark",1),("Hive",1),("Spark",1)]
>>> pairRDD = sc.parallelize(list)
>>> pairRDD1 = pairRDD.mapValues(lambda x:x+1)
>>> pairRDD1.foreach(print)
('Hadoop', 2)
('Spark', 2)
('Hive', 2)
('Spark', 2)

join

join 就表示内连接。对于内连接,对于给定的两个输入数据集 (K,V1) 和 (K,V2),只有在两个数据集中都存在的 key 才会被输出,最终得到一个 (K,(V1,V2)) 类型的数据集。

>>> pairRDD1 = sc. \
... parallelize([("spark",1),("spark",2),("hadoop",3),("hadoop",5)])
>>> pairRDD2 = sc.parallelize([("spark","fast")])
>>> pairRDD3 = pairRDD1.join(pairRDD2)
>>> pairRDD3.foreach(print)
('spark', (1, 'fast'))
('spark', (2, 'fast'))
推荐阅读
SparkStreaming SparkStreaming Spark环境部署(Ubuntu20.04) Spark环境部署(Ubuntu20.04) Spark Spark Hive基本概念 Hive基本概念 大数据技术概述 大数据技术概述 Hive使用方式 Hive使用方式

留言区

Are You A Robot?