首页 > 其他分享 >solon 集成 kafka-clients

solon 集成 kafka-clients

时间:2024-12-06 11:31:54浏览次数:9  
标签:solon clients kafka common props consumer public

使用 kafka-clients 原本是比较简单的事情。但有些同学习惯了 spring-kafka 后,对原始 java 接口会陌生些。会希望有个集成的示例。

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>${kafka.version}</version>
</dependency>

现在我们使用原始 sdk 的依赖包,做一个 solon 项目的集成分享(其它的框架,也可以参考此例)。

1、添加集成配置

  • 添加 yml 配置(具体的配置属性,参考:ProducerConfig,ConsumerConfig)
solon.app:
  name: "demo-app"
  group: "demo"

solon.logging:
  logger:
    root:
      level: INFO

# 配置可以自由定义,与 @Bean 代码对应起来即可(以下为参考)
solon.kafka:
  properties:  #公共配置(配置项,参考:ProducerConfig,ConsumerConfig 的公用部分)
    bootstrap:
      servers: "127.0.0.1:9092"
    key:
      serializer: "org.apache.kafka.common.serialization.StringSerializer"
      deserializer: "org.apache.kafka.common.serialization.StringDeserializer"
    value:
      serializer: "org.apache.kafka.common.serialization.StringSerializer"
      deserializer: "org.apache.kafka.common.serialization.StringDeserializer"
  producer: #生产者专属配置(配置项,参考:ProducerConfig)
    acks: "all"
  consumer: #消费者专属配置(配置项,参考:ConsumerConfig)
    enable:
      auto:
        commit: "false"
    isolation:
      level: "read_committed"
    group:
      id: "${solon.app.group}:${solon.app.name}"
  • 添加 java 配置器
@Configuration
public class KafkaConfig {
    @Bean
    public KafkaProducer<String, String> producer(@Inject("${solon.kafka.properties}") Properties common,
                                             @Inject("${solon.kafka.producer}") Properties producer) {

        Properties props = new Properties();
        props.putAll(common);
        props.putAll(producer);

        return new KafkaProducer<>(props);
    }

    @Bean
    public KafkaConsumer<String, String> consumer(@Inject("${solon.kafka.properties}") Properties common,
                                                  @Inject("${solon.kafka.consumer}") Properties consumer) {
        Properties props = new Properties();
        props.putAll(common);
        props.putAll(consumer);

        return new KafkaConsumer<>(props);
    }
}

完成上面两步,就算是集成了(后面,是应用的事儿)。配置也可以没有,直接写代码设定属性。

2、应用

  • 发送(或生产),这里代控制器由用户请求再发送消息(仅供参考):
@Controller
public class DemoController {
    @Inject
    private KafkaProducer<String, String> producer;

    @Mapping("/send")
    public void send(String msg) {
        //发送
        producer.send(new ProducerRecord<>("topic.test", msg));
    }
}
  • 拉取(或消费),这里采用定时拦取方式:(仅供参考)
@Component
public class DemoJob {
    @Inject
    private KafkaConsumer<String, String> consumer;

    @Init
    public void init() {
        //订阅
        consumer.subscribe(Arrays.asList("topic.test"));
    }

    @Scheduled(fixedDelay = 10_000L, initialDelay = 10_000L)
    public void job() throws Exception {
        //拉取
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
        for (ConsumerRecord<String, String> record : records) {
            System.out.println(record.value());
            //确认
            consumer.commitSync();
        }
    }
}

标签:solon,clients,kafka,common,props,consumer,public
From: https://www.cnblogs.com/noear/p/18590344

相关文章

  • kafka的至少一次和精确一次
    Kafka的“至少一次”(AtLeastOnce)和“精确一次”(ExactlyOnce)是两种不同的消息传递语义,它们在确保消息传递的可靠性和准确性方面有不同的特点和实现方式。一、至少一次(AtLeastOnce)定义:“至少一次”传递语义意味着生产者发送到Kafka的消息会至少被传递一次到消费者。即使出......
  • Kafka的同步发送到broker
    Kafka的同步发送是Kafka消息传递机制中的一种重要方式,它确保了消息在发送过程中的可靠性和一致性。以下是对Kafka同步发送的详细解释:一、同步发送的定义在同步发送模式下,Kafka生产者发送完消息后会阻塞等待Kafka服务器的响应。生产者只有在收到Kafka服务器的响应后,才会进行下一......
  • Kafka的异步发送到broker
    Kafka的异步发送是Kafka消息传递机制中的另一种重要方式,与同步发送相比,它在保证一定消息可靠性的基础上,提供了更高的发送性能。以下是对Kafka异步发送的详细解释:一、异步发送的定义在异步发送模式下,Kafka生产者发送消息后不会立即等待服务器的确认响应,而是继续发送下一条消息或......
  • 请解释一下什么是Kafka的acks策略
    Kafka的acks(acknowledgements)策略是生产者(Producer)在发送消息到Kafka集群时,用于控制消息持久化和确认机制的重要配置。这个策略决定了生产者何时认为一条消息已经被成功发送。Kafka提供了三种acks策略,它们分别对应不同的可靠性和性能权衡:acks=0:在这种模式下,生产者不会等待任......
  • kafka的acks=1策略数据丢失的风险场景
    在Kafka中,当使用acks=1策略时,确实存在数据丢失的风险,尽管这种风险相对较低。以下是对acks=1策略下数据丢失情况的详细解释:一、acks=1策略概述acks=1(或acks=leader)表示生产者会等待Kafka集群中的主副本(Leader)确认消息已经被成功写入日志后,才认为这条消息已经被成功发送。这种策略......
  • 通过flinkSql将kafka和mysql连接
    kafkaToKafka{"user_id":"1","page_id":"1","status":"success"}{"user_id":"1","page_id":"1","status":"success"}{"user_id&q......
  • 消息队列-kafka
    消息队列-kafkakafka常见面试题kafka实践环境准备代码结果截图参考摘要:本文将会对kafka进行介绍,首先介绍消息队列的一些基础知识,然后是kafka的基本概念和底层原理,以及kafka如何保证消息可靠性、消息不丢失,如何解决消息重复以及消息积压等问题,并且分析kafka为什么具......
  • Kafka入门示例
    KafKa基本介绍Kafka是开源的分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和任务关键型应用程序。Kafka是由Scala写成的一个分布式消息队列。它包括Topic、Consumer、Producer,Broker(包含多个Partition)。同一个Topic的消息会有多个分区多个副本,一个分区可......
  • 聊聊如何利用kafka实现请求-响应模式
    前言在大多数场景中,我们经常使用kafka来做发布-订阅,在发布-订阅模型中,消息一旦发送就不再追踪后续处理,但在某些业务场景下,我们希望在发送消息后等待一个响应,然后根据这个响应来做我们后续的操作。在这种请求-响应模式,我们就可以利用springkafka的ReplyingKafkaTemplate来实现Re......
  • Kafka单机集群安装
    下载地址https://kafka.apache.org/downloads解压到指定目录tar-zxf/mnt/d/software/kafka_2.12-3.9.0.tgz-C/mnt/d/appsmv/mnt/d/apps/kafka_2.12-3.9.0/mnt/d/apps/kafka自动添加相关配置信息及启动脚本假定当前机器的IP地址:172.29.2.194KAFKA_HOME=/mnt/d/apps/k......