RDD即弹性分布式数据集,是一个数据结构。其是分布式内存的抽象概念,容错且并行。
请在spark目录使用bin/spark-shell --master local[2]
命令来编写程序。
首先,本章介绍了RDD的三种创建方式;
其次,介绍了RDD的两种操作,并以Shell RDD实现词频统计的案例来演示操作;
最后,简单介绍了RDD的相关知识,包括RDD的分区方式、RDD的依赖关系、RDD的容错方式以及Spark的任务调度。
open 本章所用到的文件RddTest.txt
1 2 3 4 5 6 7
| # 在Linux本地创建一个需要进行词频统计的文件 [root@hadoop01 ~]# vim /export/data/RddTest.txt hadoop spark itcast rdd scala spark spark itcast itcast hadoop
|
RDD的创建方式
Spark有三种创建RDD的方式,分别为于文件系统(本地或HDFS)和并行集合创建RDD。具体操作方式如下:
创建RDD的两种方式
1 2 3 4 5 6
| #在linux本地读取文件创建RDD scala> val lines=sc.textFile("file:///export/data/RddTest.txt") # 假设在HDFS的/data目录下有同样的一个文件 # scala> val testRDD=sc.textFile("/data/test.txt") # 从并行集合上面创建RDD # scala> val arrRDD=sc.parallelize(Array(1,2,3,4,5))
|
RDD的处理过程
RDD采用惰性调用,即真正的计算只在行动算子中。
1. RDD的转换算子
RDD的转换算子有5个:filter(func) | map(func) | flatMap(func) | groupByKey() | reduceByKey(func)。其作用分别为筛选、拆分行元素、拆分全部元素、合并、合并且聚合。
- 可以将reduceByKey()理解为groupByKey(),而当reduceByKey(func)时,func的作用则千变万化。
RDD的转换算子举例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| #筛选出满足条件的元素,该行含有hadoop的元素:"hadoop spark" "itcast hadoop" scala> val linesWithSpark=lines.filter(line=>line.contains("hadoop"))
#文件的每一行内容都拆分成一行行的单词元素:Array("hadoop","spark") Array("itcast","rdd") ... scala> val words=lines.map(line=>line.split(" "))
#文件的每一行内容都拆分成一个个的单词元素:"hadoop" "spark" "itcast" "rdd" ... scala> val words=lines.flatMap(line=>line.split(" "))
# 进行flatMap(func)操作,再进行map()操作,最终呈现例:("hadoop,1") ("spark,1") ("itcast",1) ... scala> val words=lines.flatMap(line=>line.split(" ")).map(word=>(word,1))
# 先执行groupByKey()操作,将所有Key相同的Value值合并到一起,有("spark",(1,1,1)) scala> val groupWords=words.groupByKey()
# 先生成键值对如("spark",(1,1,1));然后执行函数func操作,即执行(a,b)=>a+b,该函数作用是聚合求和,得到最终结果("spark",3) scala> val reduceWords=words.reduceByKey((a,b)=>a+b)
|
2. RDD的行动算子
RDD的行动算子有6个:count() | first() | take(n) | reduce(func) | collect() | foreach(func),其作用分别为求个数、第一元素、前几元素、聚合元素、转为数组、遍历输出
RDD的行动算子举例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| scala> val arrRdd=sc.parallelize(Array(1,2,3,4,5))
#count() 返回数据集中元素个数 scala> arrRdd.count() res0: Long = 5
#first() 返回数组的第一个元素 scala> arrRdd.first() res1: Int = 1
#take(n) 以数组形式返回数组集中前n个元素 scala> arrRdd.take(3) res2: Array[Int] = Array(1, 2, 3)
#reduce(func) 聚合数据集中的元素 scala> arrRdd.reduce((a,b)=>a+b) res3: Int = 15
#collect() 以数组的形式返回数据集中的所有元素 scala> arrRdd.collect() res4: Array[Int] = Array(1, 2, 3, 4, 5)
#foreach(func) 将数据集中的每个元素传递 scala> arrRdd.foreach(x => println(x)) 1 2 3 4 5
|
RDD实现词频统计
准备好数据后编写RDD进行词频统计的代码
1 2 3 4 5 6 7 8 9 10 11
| #在linux本地读取文件RddTest.txt #将文件的每一行内容都拆分成一个个的单词元素::"hadoop" "spark" "itcast" "rdd" ... # 将word变成(word,1),例如:("hadoop,1") ("spark,1") ("itcast",1) ... #通过reduceByKey操作把(Key,Value)键值对类型的RDD,按单词Key将单词出现的次数Value进行聚合 #foreach()打印输出结果 scala> sc.textFile("file:///export/data/RddTest.txt").flatMap(line=>line.split(" ")).map(word=>(word,1)).reduceByKey((a,b)=>a+b).foreach(println) (spark,3) (scala,1) (hadoop,2) (itcast,3) (rdd,1)
|
RDD的相关知识
RDD的分区方式
哈希分区和范围分区
RDD的依赖关系
宽依赖和窄依赖
RDD的容错机制
容错机制则故障恢复的方式,一般分为血统方式和设置检查点方式。
Spark的任务调度
评论区
欢迎你留下宝贵的意见,昵称输入QQ号会显示QQ头像哦~