以下是搜索内容: 关闭

  • 首页
  • 日志
  • 友情链接
  • 关于我

KoiNL.

愿世间美好 温柔以待

“锦鲤握运,未离我韵”

“愿好运常在”

18 分类
0 标签
16 归档
  • 小站首页
  • 个人日志
  • 友情链接
  • 关于自己
  • 我的工具
站点信息

文章数目: 84 篇

最近动态: 2天前

上线时间: 531天

当前版本: v3.0.0

第4章 Spark SQL结构化数据文件处理

分类: Spark
标签:

创建日期:2023-03-31 11:48:40

本章目的在于如何使用Spark SQL模块来处理结构化数据,结构化数据即以关系型数据库表形式管理的数据。

首先,本章讲解在Linux系统下对结构化数据的处理。Spark SQL模块让用户可以通过SQL、DataFrame API 和 **DataSet API **三种方式来实现对结构化数据的处理。
在Spark-Shell操作下,RDD很容易就能转换成为DataFrame。那么在Windows系统下呢?又该如何使RDD转换成为DataFrame?

启动Spark-Shell的命令如下:$ spark-shell --master local[2];如不能正常运行,可跳转到spark安装目录,使用命令 $ bin/spark-shell 启动Spark-shell。

文件:no4-createDataFrame.txt,该文件用于本章例子
1
2
3
4
5
6
7
8
# 将要用到的文件no4-createDataFrame.txt
[root@hadoop01 ~]# hdfs dfs -cat /sparktest/data/no4-createDataFrame.txt
zhangsan 20
lisi 29
wangwu 25
zhaoliu 30
tianqi 35
jerry 40

DataFrame

DataFrame的创建

创建DataFrame的方式是从一个已经存在的RDD调用toDF()方法进行转换,得到DataFrame;或者通过Spark读取数据源直接创建。
这里提供两种成员函数printSchema()和show(),作用分别为打印当前对象的Schema元数据信息和结果数据。

创建DataFrame的两种方式及部分成员函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# 通过文件直接创建DataFrame。除了读取text(.txt)文件,还可以读取csv、json、parquet等 | 创建方式1
scala> val personDF=spark.read.text("/sparktest/data/no4-createDataFrame.txt")
personDF: org.apache.spark.sql.DataFrame = [value: string]

# 打印当前对象的Schema元数据信息:String数据类型,且可为空。
scala> personDF.printSchema()
root
|-- value: string (nullable = true)

# 打印当前DataFrame的结果数据
scala> personDF.show()
+-----------+
| value |
+-----------+
|zhangsan 20|
| lisi 29|
| wangwu 25|
| zhaoliu 30|
| tianqi 35|
| jerry 40|
+-----------+


# 从已经存在的RDD进行转换得到DataFrame,首先获取数据
# 第一步
scala> case class Person(name:String,age:Int)
defined class Person
# 以上三步可直接写成:
scala> val pDF=sc.textFile("/sparktest/data/no4-createDataFrame.txt").map(_.split(" ")).map(x => Person(x(0),x(1).toInt)).toDF()
scala> pDF.show
+--------+---+
| name|age|
+--------+---+
|zhangsan| 20|
| lisi| 29|
| wangwu| 25|
| zhaoliu| 30|
| tianqi| 35|
| jerry| 40|
+--------+---+

DataFrame的常用操作

DataFrame提供了两种语法风格,分别为DSL风格操作和SQL风格操作。对应的就是DataFrame API 、SQL两种方式。
DSL风格

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#查看name 字段的数据,PersonDF会随变量名的变化而变化。
#以下五种方式可同义替代。但第四种方法不推荐!原因是因为某些代码可能不支持!
pDF.select(pDF.col("name"),pDF.col("age")).show
pDF.select(pDF("name"),pDF("age")).show
pDF.select(col("name"),col("age")).show
pDF.select("name","age").show
# pDF.select("name","age"+1).show 不可运行!
# 但其他四个可运行!例如“$"age"+1”、“col("age")+1”
# 请注意可以通过as来重命名列名
pDF.select($"name",($"age"+1).as("new_age")).show

#条件过滤(对应Where语句)
pDF.filter($"age" >30).show

#分组(对应Group By语句)
pDF.groupBy("age").count().show

#排序(对应Order By语句)
pDF.sort($"age".desc).show

SQL风格操作

将DataFrame注册成一个临时表就可以进行SQL风格操作。

1
2
scala> pDF.registerTempTable("t_koinl")
scala> spark.sql("select name, age + 1 from t_koinl").show

Dataset

RDD转换为DataFrame

在上文中,有说明在Spark-shell中如何将RDD转换为DataFrame,在本小节中,来说明如何在Windows系统下开发Scala代码。

一般情况下,可以使用两种方法来实现。第一种方法是利用反射机制来推断包含特定类型对象的Schema。当case类不能提前定义,即未知数据结构时,应通过编程接口构造一个Schema,并将其应用在已知的RDD数据中。

Spark SQL操作数据源

操作MySQL

读取MySQL种数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SaveMode,SparkSession}

object sparkSqlMysql {
def main(args: Array[String]): Unit = {
//创建sparkSession对象
val spark: SparkSession = SparkSession.builder()
.appName("sparkSqlMysql")
.master("local[2]")
.getOrCreate()
//创建Properties对象,配置连接mysql的用户名和密码
val prop: prop =new Properties()
prop.setProperty("user","root")
prop.setProperty("password","123456")
//从数据库里读取数据
val mysqlDF: DataFrame = spark.read.jdbc("jdbc:mysql://127.0.0.1:3306/spark", "person", prop)
//显示Mysql中表数据
mysqlDF.show()
spark.stop()
}
}

写入数据到MySQL

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import java.util.Properties
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SaveMode,SparkSession}
case class Person(id:Int,name:String,age:Int)

object sparkSqlMysql {
def main(args: Array[String]): Unit = {
//创建sparkSession对象
val spark: SparkSession = SparkSession.builder()
.appName("sparkSqlMysql")
.master("local[2]")
.getOrCreate()
val sc = spark.sparkContext.parallelize(Array("3,wangwu,22","4,zhaoliu,26"))
//切分读取数据
val data: RDD[Array[String]] = sc.map(_.split(","))
//RDD关联Person
val personRdd: RDD[Person] = data.map(x => Person(x(0)。toInt,x(1), x(2).toLong))
//导入隐式转换
import spark.implicits._
//将RDD转换成DataFrame
val personDF: DataFrame = personRdd.toDF()
//创建Properties对象,配置连接mysql的用户名和密码
val prop =new Properties()
prop.setProperty("user","root")
prop.setProperty("password","123456")
//将personDF写入MySQL

personDF.write.mode(SaveMode.Append).jdbc("jdbc:mysql://127.0.0.1:3306/spark?useUnicode=true&characterEncoding=utf8","person",prop)
personDF.show()
spark.stop()
}
}

操作Hive

浏览量

评论区

欢迎你留下宝贵的意见,昵称输入QQ号会显示QQ头像哦~

目录

  1. 1. DataFrame
    1. 1.1. DataFrame的创建
    2. 1.2. DataFrame的常用操作
      1. 1.2.1. SQL风格操作
    3. 1.3. Dataset
  2. 2. RDD转换为DataFrame
  3. 3. Spark SQL操作数据源
    1. 3.1. 操作MySQL
      1. 3.1.1. 读取MySQL种数据
      2. 3.1.2. 写入数据到MySQL
    2. 3.2. 操作Hive

上一篇: 第5章 HBase 分布式数据库

下一篇 第1章 Scala语言基础.

公告栏

《 

 》

Hello~近期剽窃本站内容频发,本站唯一指定网站:https://koinl.github.io。请认准。点击点击此处选择进入。
回到顶部
查看评论

Power By Hexo.

Theme:koinl.

信息来源于锦鲤未离