Spark Streaming是实时计算框架,主要为了对数据进行实时处理。
首先,简单了解了Spark Streaming的基础知识。
其次,Spark Streaming的核心是DStream,因此该小节最终以实例——实现网站热词排序来体现。
最后,将Spark Streaming整合Kafka,来实现词频统计。
Kafka Streaming开发单词计数应用
1. 添加依赖
打开 pom.xml 文件,添加 Kafka Streamimh 整合 Kafka的 依赖,注意匹配版本号。
添加依赖代码
1 | <!--引入sparkstreaming整合kafka的依赖--> |
2. 编写代码
- 文件 LogProcessor.java,单词计数的业务功能开发
在spark_ chapter07项目的/src/main/scala/cn.itcast.dstream目录下, 创建一个名为”SparkStreaming Kafka_createDstream”的Scala类,用来编写Spark Streaming应用程序实现词频统计。运行上述代码后,依次在hadoop01、hadoop02、hadoop03服务器执行命令Spark Streaming应用程序实现词频统计的Java代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39import kafka.serializer.StringDecoder
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object SparkStreaming_Kafka_createDirectStream {
def main(args: Array[String]): Unit = {
//1.创建sparkConf对象,用于配置Spark环境
val sparkConf: SparkConf = new SparkConf()
.setAppName("SparkStreaming_Kafka_createDirectStream").setMaster("local[2]")
//2.创建sparkContext对象,用于操作Spark集群
val sc = new SparkContext(sparkConf)
//3.设置日志输出级别
sc.setLogLevel("WARN")
//4.创建StreamingContext对象,用于创建DStream对象
val ssc = new StreamingContext(sc,Seconds(5))
//5.通过ssc对象设置chectPoint(检查点)
ssc.checkpoint("./Kafka_Direct")
//6.配置kafka相关参数 (metadata.broker.list为老版本的集群地址)
val kafkaParams=Map("metadata.broker.list"->"hadoop01:9092,hadoop02:9092,hadoop03:9092","group.id"->"spark_direct")
//7.定义topic
val topics=Set("kafka_direct0")
//8.通过低级api方式将kafka与sparkStreaming进行整合
val dstream: InputDStream[(String, String)] =
KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)
//9.获取kafka中topic中的数据
val topicData: DStream[String] = dstream.map(_._2)
//10.通过flatMap()和map()方法转换操作按空格进行切分每一行,并将切分的单词出现次数记录为1
val wordAndOne: DStream[(String, Int)] = topicData.flatMap(_.split(" ")).map((_,1))
//11.通过reduceByKey()方法转换操作统计单词在全局中出现的次数
val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_)
//12.打印输出结果
result.print()
//13.开启流式计算
ssc.start()
ssc.awaitTermination()
}
}zkServer.sh start
启动ZooKeeper集群;然后再依次在Kafka的根目录下执行命令bin/kafka-server-start.sh config/server.properties
启动Kafka集群。
- 创建Topic, 指定消息的类别
创建来源主题和目标主题代码
1
2
3
4
5kafka-topics.sh --create \
--topic kafka_direct0 \
--partitions 3 \
--replication-factor 1 \
--zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 - 在hadoop01启动生产者服务,生产数据
启动生产者服务和消费者服务代码
1
kafka-console-producer.sh --broker-list hadoop01:9092 --topic kafka_direct0
- 在生产者服务节点(hadoop01)输入hello kafka kafka语句,可在IDEA工具中查看控制台,可以看到控制台输出了{hello=1, kafka=2}信息
评论区
欢迎你留下宝贵的意见,昵称输入QQ号会显示QQ头像哦~