以下是搜索内容: 关闭

  • 首页
  • 日志
  • 友情链接
  • 关于我

KoiNL.

愿世间美好 温柔以待

“锦鲤握运,未离我韵”

“愿好运常在”

18 分类
0 标签
16 归档
  • 小站首页
  • 个人日志
  • 友情链接
  • 关于自己
  • 我的工具
站点信息

文章数目: 84 篇

最近动态: 2天前

上线时间: 531天

当前版本: v3.0.0

第6章 Kafka分布式发布订阅消息系统

分类: Spark
标签:

创建日期:2023-03-19 07:55:06

消息传递模式分为点对点和发布订阅两种消息传递模式, Kafka使用消费组的概念统一了上述两种消息传递模式。

Kafka Streams开发单词计数应用

1. 添加依赖

打开 pom.xml 文件,添加 Kafka Streams 依赖,注意匹配版本号。

添加依赖代码
1
2
3
4
5
6
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.4.0</version>
</dependency>

2. 编写代码

  • 文件 LogProcessor.java,单词计数的业务功能开发
    单词计数的业务功能开发Java代码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    import org.apache.kafka.streams.processor.Processor;
    import org.apache.kafka.streams.processor.ProcessorContext;
    import java.util.HashMap;

    // 初始化上下文对象
    public class LogProcessor implements Processor<byte[],byte[]> {
    //上下文对象
    private ProcessorContext processorContext;
    @ Override
    public void init(ProcessorContext processorContext) {
    //初始化方法
    this.processorContext=processorContext;
    }
    @ Override
    public void process(byte[] key, byte[] value) {
    //处理一条消息,每每接收到消息就处理并更新状态进行存储
    String inputOri = new String(value);
    HashMap <String,Integer> map = new HashMap<String,Integer>();
    int times = 1;
    if(inputOri.contains(" ")){
    //截取字段
    String [] words = inputOri.split(" ");
    for (String word : words){
    if(map.containsKey(word)){
    map.put(word,map.get(word)+1);
    }else{
    map.put(word,times);
    }
    }
    }
    inputOri = map.toString();
    processorContext.forward(key,inputOri.getBytes());
    }
    @ Override
    // 关闭处理器,例如资源清理工作
    public void close() {}
    }
  • 运行主程序的类APP,测试上述业务程序
    测试上述业务程序的Java代码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    import org.apache.kafka.streams.KafkaStreams;
    import org.apache.kafka.streams.StreamsConfig;
    import org.apache.kafka.streams.Topology;
    import org.apache.kafka.streams.processor.Processor;
    import org.apache.kafka.streams.processor.ProcessorSupplier;
    import java.util.Properties;

    public class App {
    public static void main(String[] args) {
    // 1 声明来源主题和目标主题
    String fromTopic = "ittheme";
    String toTopic = "ittheme2";
    // 2.1 设置Kafka流处理应用程序的配置参数信息
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG,"LogProcessor");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"master:9092,slave1:9092,slave2:9092");
    // 2.2 实例化StreamsConfig并构建拓扑结构对象
    StreamsConfig config = new StreamsConfig(props);
    Topology topology = new Topology();
    // 应用程序创建拓扑结构器后,调用addSource()、addProcessor()、addSink()方法构建出任务的执行拓扑关系
    // 3.1 添加源处理节点,为源处理节点指定名称和它订阅的主题
    topology.addSource("SOURCE",fromTopic)
    //添加自定义处理节点,指定名称,处理器类和上一个节点的名称
    .addProcessor("PROCESSOR", new ProcessorSupplier() {
    //调用这个方法,就知道这条数据用哪个process处理,
    public Processor get() {
    return new LogProcessor();
    }
    },"SOURCE")
    //添加目标处理节点,需要指定目标处理节点的名称,和上一个节点名称。
    .addSink("SINK",toTopic,"PROCESSOR");//最后给SINK
    //实例化KafkaStreams,并启动程序
    KafkaStreams streams = new KafkaStreams(topology,config);
    streams.start();
    }
    }

3. 执行测试

  1. 代码编写完成后,在hadoop01节点创建两个主题(创建来源主题和目标主题)
    创建来源主题和目标主题代码
    1
    kafka-topics.sh --create --topic ittheme1 --partitions 3 --replication-factor 2 --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181
    1
    kafka-topics.sh --create --topic ittheme2 --partitions 3 --replication-factor 2 --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181
  2. 分别在hadoop01和hadoop02节点启动生产者服务和消费者服务
    启动生产者服务和消费者服务代码
    1
    kafka-console-producer.sh --broker-list hadoop01:9092,hadoop02:9092,hadoop03:9092 --topic ittheme2
    1
    kafka-console-consumer.sh --from-beginning  --topic ittheme2 --bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092
  3. 运行App主程序类,Kafka Streams测试环境达成。
  4. 在生产者服务节点(hadoop01)输入hello kafka kafka语句,可在消费者服务节点(hadoop02)中查看控制台,可以看到控制台输出了{hello=1, kafka=2}信息
    [root@hadoop01 kafka_2.11-2.0.0]# bin/kafka-topics.sh –create –topic testStreams1 –partitions 3 –replication-factor 2 –zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181
    Error while executing topic command : Replication factor: 2 larger than available brokers: 1.
    [2023-04-01 02:12:01,157] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 2 larger than available brokers: 1.
    (kafka.admin.TopicCommand$)
    [root@hadoop01 kafka_2.11-2.0.0]# bin/kafka-topics.sh –create –topic testStreams1 –partitions 3 –replication-factor 1 –zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181
    Created topic “testStreams1”.
    [root@hadoop01 kafka_2.11-2.0.0]# bin/kafka-topics.sh –create –topic testStreams2 –partitions 3 –replication-factor 1 –zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181
    Created topic “testStreams2”.
    [root@hadoop01 kafka_2.11-2.0.0]# bin/kafka-console-producer.sh –broker-list hadoop01:9092,hadoop02:9092,hadoop03:9092 –topic testStreams1
浏览量

评论区

欢迎你留下宝贵的意见,昵称输入QQ号会显示QQ头像哦~

目录

  1. 1. Kafka Streams开发单词计数应用
    1. 1.1. 1. 添加依赖
    2. 1.2. 2. 编写代码
    3. 1.3. 3. 执行测试

上一篇: 第7章 Spark Streaming

下一篇 第1章 深度卷积神经网络分析

公告栏

《 

 》

Hello~近期剽窃本站内容频发,本站唯一指定网站:https://koinl.github.io。请认准。点击点击此处选择进入。
回到顶部
查看评论

Power By Hexo.

Theme:koinl.

信息来源于锦鲤未离