首先,本章讲解在Linux系统下对结构化数据的处理。Spark SQL模块让用户可以通过SQL、DataFrame API 和 **DataSet API **三种方式来实现对结构化数据的处理。 在Spark-Shell操作下,RDD很容易就能转换成为DataFrame。那么在Windows系统下呢?又该如何使RDD转换成为DataFrame?
import java.util.Properties import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SaveMode,SparkSession} case class Person(id:Int,name:String,age:Int)
object sparkSqlMysql { def main(args: Array[String]): Unit = { //创建sparkSession对象 val spark: SparkSession = SparkSession.builder() .appName("sparkSqlMysql") .master("local[2]") .getOrCreate() val sc = spark.sparkContext.parallelize(Array("3,wangwu,22","4,zhaoliu,26")) //切分读取数据 val data: RDD[Array[String]] = sc.map(_.split(",")) //RDD关联Person val personRdd: RDD[Person] = data.map(x => Person(x(0)。toInt,x(1), x(2).toLong)) //导入隐式转换 import spark.implicits._ //将RDD转换成DataFrame val personDF: DataFrame = personRdd.toDF() //创建Properties对象,配置连接mysql的用户名和密码 val prop =new Properties() prop.setProperty("user","root") prop.setProperty("password","123456") //将personDF写入MySQL personDF.write.mode(SaveMode.Append).jdbc("jdbc:mysql://127.0.0.1:3306/spark?useUnicode=true&characterEncoding=utf8","person",prop) personDF.show() spark.stop() } }
评论区
欢迎你留下宝贵的意见,昵称输入QQ号会显示QQ头像哦~