以下是搜索内容: 关闭

  • 首页
  • 日志
  • 友情链接
  • 关于我

KoiNL.

愿世间美好 温柔以待

“锦鲤握运,未离我韵”

“愿好运常在”

18 分类
0 标签
16 归档
  • 小站首页
  • 个人日志
  • 友情链接
  • 关于自己
  • 我的工具
站点信息

文章数目: 84 篇

最近动态: 2天前

上线时间: 531天

当前版本: v3.0.0

第7章 Spark Streaming

分类: Spark
标签:

创建日期:2023-03-19 08:56:01

Spark Streaming是实时计算框架,主要为了对数据进行实时处理。
首先,简单了解了Spark Streaming的基础知识。
其次,Spark Streaming的核心是DStream,因此该小节最终以实例——实现网站热词排序来体现。
最后,将Spark Streaming整合Kafka,来实现词频统计。

Kafka Streaming开发单词计数应用

1. 添加依赖

打开 pom.xml 文件,添加 Kafka Streamimh 整合 Kafka的 依赖,注意匹配版本号。

添加依赖代码
1
2
3
4
5
6
7
<!--引入sparkstreaming整合kafka的依赖-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_0-8_2.11</artifactId>
<version>2.3.2</version>
</dependency>

2. 编写代码

  • 文件 LogProcessor.java,单词计数的业务功能开发
    在spark_ chapter07项目的/src/main/scala/cn.itcast.dstream目录下, 创建一个名为”SparkStreaming Kafka_createDstream”的Scala类,用来编写Spark Streaming应用程序实现词频统计。
    Spark Streaming应用程序实现词频统计的Java代码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    import kafka.serializer.StringDecoder
    import org.apache.spark.streaming.dstream.{DStream, InputDStream}
    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.{SparkConf, SparkContext}

    object SparkStreaming_Kafka_createDirectStream {
    def main(args: Array[String]): Unit = {
    //1.创建sparkConf对象,用于配置Spark环境
    val sparkConf: SparkConf = new SparkConf()
    .setAppName("SparkStreaming_Kafka_createDirectStream").setMaster("local[2]")
    //2.创建sparkContext对象,用于操作Spark集群
    val sc = new SparkContext(sparkConf)
    //3.设置日志输出级别
    sc.setLogLevel("WARN")
    //4.创建StreamingContext对象,用于创建DStream对象
    val ssc = new StreamingContext(sc,Seconds(5))
    //5.通过ssc对象设置chectPoint(检查点)
    ssc.checkpoint("./Kafka_Direct")
    //6.配置kafka相关参数 (metadata.broker.list为老版本的集群地址)
    val kafkaParams=Map("metadata.broker.list"->"hadoop01:9092,hadoop02:9092,hadoop03:9092","group.id"->"spark_direct")
    //7.定义topic
    val topics=Set("kafka_direct0")
    //8.通过低级api方式将kafka与sparkStreaming进行整合
    val dstream: InputDStream[(String, String)] =
    KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)
    //9.获取kafka中topic中的数据
    val topicData: DStream[String] = dstream.map(_._2)
    //10.通过flatMap()和map()方法转换操作按空格进行切分每一行,并将切分的单词出现次数记录为1
    val wordAndOne: DStream[(String, Int)] = topicData.flatMap(_.split(" ")).map((_,1))
    //11.通过reduceByKey()方法转换操作统计单词在全局中出现的次数
    val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_)
    //12.打印输出结果
    result.print()
    //13.开启流式计算
    ssc.start()
    ssc.awaitTermination()
    }
    }
    运行上述代码后,依次在hadoop01、hadoop02、hadoop03服务器执行命令zkServer.sh start启动ZooKeeper集群;然后再依次在Kafka的根目录下执行命令bin/kafka-server-start.sh config/server.properties启动Kafka集群。
  1. 创建Topic, 指定消息的类别
    创建来源主题和目标主题代码
    1
    2
    3
    4
    5
    kafka-topics.sh --create \
    --topic kafka_direct0 \
    --partitions 3 \
    --replication-factor 1 \
    --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181
  2. 在hadoop01启动生产者服务,生产数据
    启动生产者服务和消费者服务代码
    1
    kafka-console-producer.sh --broker-list hadoop01:9092 --topic kafka_direct0
  3. 在生产者服务节点(hadoop01)输入hello kafka kafka语句,可在IDEA工具中查看控制台,可以看到控制台输出了{hello=1, kafka=2}信息
浏览量

评论区

欢迎你留下宝贵的意见,昵称输入QQ号会显示QQ头像哦~

目录

  1. 1. Kafka Streaming开发单词计数应用
    1. 1.1. 1. 添加依赖
    2. 1.2. 2. 编写代码

上一篇: 第8章 Spark MLlib机器学习算法库

下一篇 第6章 Kafka分布式发布订阅消息系统

公告栏

《 

 》

Hello~近期剽窃本站内容频发,本站唯一指定网站:https://koinl.github.io。请认准。点击点击此处选择进入。
回到顶部
查看评论

Power By Hexo.

Theme:koinl.

信息来源于锦鲤未离