首页 > 其他分享 >springboot整合kafka

springboot整合kafka

时间:2022-11-14 16:56:53浏览次数:42  
标签:springboot 提交 springframework kafka 整合 org import 监听器

maven配置:

        
<properties>
<java.version>1.8</java.version>
<oracle.version>11.2.0.4</oracle.version>
</properties>

<!--kafka--> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.3.4.RELEASE</version> </dependency>

<!--oracle-->
<dependency>
<groupId>com.oracle.database.jdbc</groupId>
<artifactId>ojdbc6</artifactId>
<version>${oracle.version}</version>
</dependency>

  

yml:

#省内集中调度平台资源查询
server:
  port: 9080

spring:
  datasource:
    driver-class-name: oracle.jdbc.OracleDriver
#    url: jdbc:oracle:thin:@10.216.86.211:1521/irmsdb
#    username: hbrmw6
#    password: ZIYhbrmw6.184
    url: ${rmsdb_url:jdbc:oracle:thin:@10.110.74.166:1521/rmsdb}
    username: ${rmsdb_username:uirms_yn}
    password: ${rmsdb_password:uirms_yn}
  kafka:
    bootstrap-servers: 127.0.0.1:9092 # kafka集群信息
    producer: # 生产者配置
      retries: 3 # 设置大于0的值,则客户端会将发送失败的记录重新发送
      batch-size: 16384 #16K
      buffer-memory: 33554432 #32M
      acks: 1
      # 指定消息key和消息体的编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: zhTestGroup # 消费者组
      enable-auto-commit: false # 关闭自动提交
      auto-offset-reset: earliest # 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      # 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
      # RECORD
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
      # BATCH
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
      # TIME
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
      # COUNT
      # TIME | COUNT 有一个条件满足时提交
      # COUNT_TIME
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
      # MANUAL
      # 手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种
      # MANUAL_IMMEDIATE
      ack-mode: manual_immediate

生产者:

package com.inspur.resource.module.kafka;

import com.alibaba.fastjson.JSON;
import com.inspur.resource.util.DataShareUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

/**
 * @author :fengwenzhe
 * @date :Created in 2022/11/14 10:43
 * 文件说明: </p>
 */
@Component
@ConditionalOnProperty(value = "spring.profiles.active",havingValue = "kafka")
@Slf4j
public class KafkaTimer {
    @Autowired
    private KafkaTemplate kafkaTemplate;
    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Scheduled(cron = "*/20 * * * * ?") // 每20秒执行一次
    //@Scheduled(cron = "0 */2 * * * ?") //每2分钟执行一次
    @PostConstruct
    public void loopSendKafka(){
        //查询数据库 如果有指定数据  则发送对应kafka消息
        String sql = "select * from RMS_KAFKA_ACCESS_ORDER where stateflag ='0' ";
        List<Map<String,Object>> list = this.jdbcTemplate.queryForList(sql);
        if(null != list){
            for(Map map:list){
                //需要发送kafka消息的入网工单数据
                log.info("需要发送kafka消息的入网工单数据"+ map.toString());
                System.out.println("sendkafka");
                kafkaTemplate.send("test", JSON.toJSONString(map));
            }
        }
    }
}

消费者:

package com.inspur.resource.module.kafka;


import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

import java.util.Optional;


@Component
@ConditionalOnProperty(value = "spring.profiles.active",havingValue = "kafka")
@Slf4j
public class KafkaConsumer {
    private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);

    //不指定group,默认取yml里配置的
    @KafkaListener(topics = {"test"},groupId = "${spring.kafka.consumer.group-id}")
    public void onMessage1(ConsumerRecord<?, ?> consumerRecord, Acknowledgment ack) {
        //消费者必须手动调用ack.acknowledge();不然会重复消费 因为在yml中配置了
        //ack-mode: manual_immediate
        ack.acknowledge();
        Optional<?> optional = Optional.ofNullable(consumerRecord.value());
        if (optional.isPresent()) {
            Object msg = optional.get();
            logger.info("message:{}", msg);
        }
    }
}

 

标签:springboot,提交,springframework,kafka,整合,org,import,监听器
From: https://www.cnblogs.com/fengwenzhee/p/16889507.html

相关文章

  • SpringBoot 05: 接口架构风格 + RESTful接口风格
    接口定义API(ApplicationProgrammingInterface,应用程序接口)是一些预先定义的接口(如函数、HTTP接口,或指软件系统不同组成部分衔接的约定)是用来提供应用程序与开......
  • rocketMq springboot2 发送广播消息
    广播消息:一个点发送,所有有监听订阅的程序都能收到消息。应用场景:一个配置更新了,其他点都需要知道配置更新需加载。 mq创建主要是创建组时与队列有点区别mqadminup......
  • rocketMq springboot2 发送有序消息
    有序消息:所有信息往mq中,在broker.conf配置文件中指定产生队列数量。如果是普通队列时,所有消息,会分发到默认队列的各个队列中。是无序的。有序则是,所有消息发送,都指定一个......
  • rocketMq springboot2接入配置
    rocketmq的接入配置。 引入jar包<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId></dependen......
  • 5.feign整合sentinel
    feign整合sentinelSpringCloud中,微服务调用都是通过Feign来实现的,因此做客户端保护必须整合Feign和Sentinel。一、修改配置,开启sentinel功能修改OrderService的applicat......
  • feig整合sentinel出现循环依赖问题
    feig整合sentinel出现循环依赖问题1.场景重现,回顾feign整合sentinel步骤1.1修改配置,开启sentinel功能修改OrderService的application.yml文件,开启Feign的Sentinel功能:f......
  • 外网访问内网SpringBoot【免费内网穿透】
    在本地搭建的SpringBoot项目,在内网能够正常访问,想要在外网环境下也能够访问,可以做内网穿透来实现,不需要公网IP,也不需要进入到路由器配置。这里主要介绍通过cpolar内网穿透......
  • Spring Boot 整合 HBase
    HBase介绍HBase是一个分布式的、面向列的开源数据库,Hadoop数据库。搭建基于Hadoop和ZK。历史是基于Google的Bigtable、Google文件系统等论文。HBase在Hadoop......
  • 【博学谷学习记录】超强总结,用心分享|狂野架构Kafka消费安全问题
    Kafka消费安全问题消费者线程安全问题首先,kafka的Javaconsumer是单线程的设计,准确来说是双线程,从kafka0.10.1.0版本开始kafkaConsumer变成了用户主线程和心跳线程的......
  • idea+maven+springboot如何配置Mybatis-plus并测试简单用例
    用例情况如上图,假设我们需要读取ywj数据库中的dept表中的记录 大致步骤1、首先,需要在对应maven工程的.pom文件中引入相应的依赖,包括mybatis-plus-boot-starter、mysq......