消息传递模式分为点对点和发布订阅两种消息传递模式, Kafka使用消费组的概念统一了上述两种消息传递模式。
Kafka Streams开发单词计数应用
1. 添加依赖
打开 pom.xml 文件,添加 Kafka Streams 依赖,注意匹配版本号。
添加依赖代码
1 | <dependency> |
2. 编写代码
- 文件 LogProcessor.java,单词计数的业务功能开发
单词计数的业务功能开发Java代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import java.util.HashMap;
// 初始化上下文对象
public class LogProcessor implements Processor<byte[],byte[]> {
//上下文对象
private ProcessorContext processorContext;
@ Override
public void init(ProcessorContext processorContext) {
//初始化方法
this.processorContext=processorContext;
}
@ Override
public void process(byte[] key, byte[] value) {
//处理一条消息,每每接收到消息就处理并更新状态进行存储
String inputOri = new String(value);
HashMap <String,Integer> map = new HashMap<String,Integer>();
int times = 1;
if(inputOri.contains(" ")){
//截取字段
String [] words = inputOri.split(" ");
for (String word : words){
if(map.containsKey(word)){
map.put(word,map.get(word)+1);
}else{
map.put(word,times);
}
}
}
inputOri = map.toString();
processorContext.forward(key,inputOri.getBytes());
}
@ Override
// 关闭处理器,例如资源清理工作
public void close() {}
} - 运行主程序的类APP,测试上述业务程序
测试上述业务程序的Java代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import java.util.Properties;
public class App {
public static void main(String[] args) {
// 1 声明来源主题和目标主题
String fromTopic = "ittheme";
String toTopic = "ittheme2";
// 2.1 设置Kafka流处理应用程序的配置参数信息
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG,"LogProcessor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"master:9092,slave1:9092,slave2:9092");
// 2.2 实例化StreamsConfig并构建拓扑结构对象
StreamsConfig config = new StreamsConfig(props);
Topology topology = new Topology();
// 应用程序创建拓扑结构器后,调用addSource()、addProcessor()、addSink()方法构建出任务的执行拓扑关系
// 3.1 添加源处理节点,为源处理节点指定名称和它订阅的主题
topology.addSource("SOURCE",fromTopic)
//添加自定义处理节点,指定名称,处理器类和上一个节点的名称
.addProcessor("PROCESSOR", new ProcessorSupplier() {
//调用这个方法,就知道这条数据用哪个process处理,
public Processor get() {
return new LogProcessor();
}
},"SOURCE")
//添加目标处理节点,需要指定目标处理节点的名称,和上一个节点名称。
.addSink("SINK",toTopic,"PROCESSOR");//最后给SINK
//实例化KafkaStreams,并启动程序
KafkaStreams streams = new KafkaStreams(topology,config);
streams.start();
}
}
3. 执行测试
- 代码编写完成后,在hadoop01节点创建两个主题(创建来源主题和目标主题)
创建来源主题和目标主题代码
1
kafka-topics.sh --create --topic ittheme1 --partitions 3 --replication-factor 2 --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181
1
kafka-topics.sh --create --topic ittheme2 --partitions 3 --replication-factor 2 --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181
- 分别在hadoop01和hadoop02节点启动生产者服务和消费者服务
启动生产者服务和消费者服务代码
1
kafka-console-producer.sh --broker-list hadoop01:9092,hadoop02:9092,hadoop03:9092 --topic ittheme2
1
kafka-console-consumer.sh --from-beginning --topic ittheme2 --bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092
- 运行App主程序类,Kafka Streams测试环境达成。
- 在生产者服务节点(hadoop01)输入hello kafka kafka语句,可在消费者服务节点(hadoop02)中查看控制台,可以看到控制台输出了{hello=1, kafka=2}信息
[root@hadoop01 kafka_2.11-2.0.0]# bin/kafka-topics.sh –create –topic testStreams1 –partitions 3 –replication-factor 2 –zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181
Error while executing topic command : Replication factor: 2 larger than available brokers: 1.
[2023-04-01 02:12:01,157] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 2 larger than available brokers: 1.
(kafka.admin.TopicCommand$)
[root@hadoop01 kafka_2.11-2.0.0]# bin/kafka-topics.sh –create –topic testStreams1 –partitions 3 –replication-factor 1 –zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181
Created topic “testStreams1”.
[root@hadoop01 kafka_2.11-2.0.0]# bin/kafka-topics.sh –create –topic testStreams2 –partitions 3 –replication-factor 1 –zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181
Created topic “testStreams2”.
[root@hadoop01 kafka_2.11-2.0.0]# bin/kafka-console-producer.sh –broker-list hadoop01:9092,hadoop02:9092,hadoop03:9092 –topic testStreams1
评论区
欢迎你留下宝贵的意见,昵称输入QQ号会显示QQ头像哦~