以下是搜索内容: 关闭

  • 首页
  • 日志
  • 友情链接
  • 关于我

KoiNL.

愿世间美好 温柔以待

“锦鲤握运,未离我韵”

“愿好运常在”

18 分类
0 标签
16 归档
  • 小站首页
  • 个人日志
  • 友情链接
  • 关于自己
  • 我的工具
站点信息

文章数目: 84 篇

最近动态: 2天前

上线时间: 531天

当前版本: v3.0.0

第3章 Spark RDD弹性分布式数据集

分类: Spark
标签:

创建日期:2023-03-19 10:50:11

RDD即弹性分布式数据集,是一个数据结构。其是分布式内存的抽象概念,容错且并行。

请在spark目录使用bin/spark-shell --master local[2] 命令来编写程序。

首先,本章介绍了RDD的三种创建方式;
其次,介绍了RDD的两种操作,并以Shell RDD实现词频统计的案例来演示操作;
最后,简单介绍了RDD的相关知识,包括RDD的分区方式、RDD的依赖关系、RDD的容错方式以及Spark的任务调度。

open 本章所用到的文件RddTest.txt
1
2
3
4
5
6
7
# 在Linux本地创建一个需要进行词频统计的文件
[root@hadoop01 ~]# vim /export/data/RddTest.txt
hadoop spark
itcast rdd
scala spark
spark itcast
itcast hadoop

RDD的创建方式

Spark有三种创建RDD的方式,分别为于文件系统(本地或HDFS)和并行集合创建RDD。具体操作方式如下:

创建RDD的两种方式
1
2
3
4
5
6
#在linux本地读取文件创建RDD
scala> val lines=sc.textFile("file:///export/data/RddTest.txt")
# 假设在HDFS的/data目录下有同样的一个文件
# scala> val testRDD=sc.textFile("/data/test.txt")
# 从并行集合上面创建RDD
# scala> val arrRDD=sc.parallelize(Array(1,2,3,4,5))

RDD的处理过程

RDD采用惰性调用,即真正的计算只在行动算子中。

1. RDD的转换算子

RDD的转换算子有5个:filter(func) | map(func) | flatMap(func) | groupByKey() | reduceByKey(func)。其作用分别为筛选、拆分行元素、拆分全部元素、合并、合并且聚合。

  • 可以将reduceByKey()理解为groupByKey(),而当reduceByKey(func)时,func的作用则千变万化。
RDD的转换算子举例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#筛选出满足条件的元素,该行含有hadoop的元素:"hadoop spark" "itcast hadoop"
scala> val linesWithSpark=lines.filter(line=>line.contains("hadoop"))

#文件的每一行内容都拆分成一行行的单词元素:Array("hadoop","spark") Array("itcast","rdd") ...
scala> val words=lines.map(line=>line.split(" "))

#文件的每一行内容都拆分成一个个的单词元素:"hadoop" "spark" "itcast" "rdd" ...
scala> val words=lines.flatMap(line=>line.split(" "))

# 进行flatMap(func)操作,再进行map()操作,最终呈现例:("hadoop,1") ("spark,1") ("itcast",1) ...
scala> val words=lines.flatMap(line=>line.split(" ")).map(word=>(word,1))

# 先执行groupByKey()操作,将所有Key相同的Value值合并到一起,有("spark",(1,1,1))
scala> val groupWords=words.groupByKey()

# 先生成键值对如("spark",(1,1,1));然后执行函数func操作,即执行(a,b)=>a+b,该函数作用是聚合求和,得到最终结果("spark",3)
scala> val reduceWords=words.reduceByKey((a,b)=>a+b)

2. RDD的行动算子

RDD的行动算子有6个:count() | first() | take(n) | reduce(func) | collect() | foreach(func),其作用分别为求个数、第一元素、前几元素、聚合元素、转为数组、遍历输出

RDD的行动算子举例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
scala> val arrRdd=sc.parallelize(Array(1,2,3,4,5))

#count() 返回数据集中元素个数
scala> arrRdd.count()
res0: Long = 5

#first() 返回数组的第一个元素
scala> arrRdd.first()
res1: Int = 1

#take(n) 以数组形式返回数组集中前n个元素
scala> arrRdd.take(3)
res2: Array[Int] = Array(1, 2, 3)

#reduce(func) 聚合数据集中的元素
scala> arrRdd.reduce((a,b)=>a+b)
res3: Int = 15

#collect() 以数组的形式返回数据集中的所有元素
scala> arrRdd.collect()
res4: Array[Int] = Array(1, 2, 3, 4, 5)

#foreach(func) 将数据集中的每个元素传递
scala> arrRdd.foreach(x => println(x))
1
2
3
4
5

RDD实现词频统计

准备好数据后编写RDD进行词频统计的代码
1
2
3
4
5
6
7
8
9
10
11
#在linux本地读取文件RddTest.txt
#将文件的每一行内容都拆分成一个个的单词元素::"hadoop" "spark" "itcast" "rdd" ...
# 将word变成(word,1),例如:("hadoop,1") ("spark,1") ("itcast",1) ...
#通过reduceByKey操作把(Key,Value)键值对类型的RDD,按单词Key将单词出现的次数Value进行聚合
#foreach()打印输出结果
scala> sc.textFile("file:///export/data/RddTest.txt").flatMap(line=>line.split(" ")).map(word=>(word,1)).reduceByKey((a,b)=>a+b).foreach(println)
(spark,3)
(scala,1)
(hadoop,2)
(itcast,3)
(rdd,1)

RDD的相关知识

RDD的分区方式

哈希分区和范围分区

RDD的依赖关系

宽依赖和窄依赖

RDD的容错机制

容错机制则故障恢复的方式,一般分为血统方式和设置检查点方式。

Spark的任务调度

浏览量

评论区

欢迎你留下宝贵的意见,昵称输入QQ号会显示QQ头像哦~

目录

  1. 1. RDD的创建方式
  2. 2. RDD的处理过程
    1. 2.1. 1. RDD的转换算子
    2. 2.2. 2. RDD的行动算子
  3. 3. RDD实现词频统计
  4. 4. RDD的相关知识
    1. 4.1. RDD的分区方式
    2. 4.2. RDD的依赖关系
    3. 4.3. RDD的容错机制
    4. 4.4. Spark的任务调度

上一篇: 第1章 Scala语言基础.

下一篇 第8章 Spark MLlib机器学习算法库

公告栏

《 

 》

Hello~近期剽窃本站内容频发,本站唯一指定网站:https://koinl.github.io。请认准。点击点击此处选择进入。
回到顶部
查看评论

Power By Hexo.

Theme:koinl.

信息来源于锦鲤未离