首页 > 其他分享 >kafka动态生产者

kafka动态生产者

时间:2023-06-02 11:12:50浏览次数:27  
标签:configEntity configKafka 生产者 kafka props put import 动态

package com.sunclouder.das.data.kafka.forward;

import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.sunclouder.das.data.kafka.entity.ConfigEntity;
import com.sunclouder.das.data.kafka.entity.DasConfigKafka;
import com.sunclouder.das.data.kafka.service.DasConfigKafkaService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Objects;
import java.util.Properties;

@Slf4j
@Service
public class KafkaForward {

@Autowired
private DasConfigKafkaService dasConfigKafkaService;

private KafkaProducer<String, String> producer;

private String produceTopic;


private ConfigEntity configEntity = new ConfigEntity();


private String ip = "{}:{}";


public void forward(JSONObject dataEvent) {
  try {
    JSONObject entries = JSONUtil.parseObj(dataEvent.get("payload"));
    DasConfigKafka configKafka = dasConfigKafkaService.lambdaQuery()
      .eq(DasConfigKafka::getTenantId, entries.get("tenant").toString())
      .eq(DasConfigKafka::getDeviceId, entries.get("asset").toString())
      .eq(DasConfigKafka::getGetInfoType, "mq")
      .eq(DasConfigKafka::getIsDeleted, 1)
      .one();
      if (Objects.nonNull(configKafka)) {
      String configPort = StrUtil.format(ip, configKafka.getKafkaIp(), configKafka.getKafkaPort());
      configEntity.setTopic(configKafka.getTopics());
      configEntity.setBootstrapServers(configPort);
      if (configKafka.getUserName() != null && configKafka.getPassWord() != null) {
        configEntity.setUserName(configKafka.getUserName());
        configEntity.setPassword(configKafka.getPassWord());
          }
      sendToSgl(configEntity, "", dataEvent, configKafka);
      }
    } catch (Exception e) {
        log.error("转发异常,", e);
      }
    }


/**
* 实例化kafkaTemplate
*
* @param configEntity
* @param key
* @param value
*/
public void sendToSgl(ConfigEntity configEntity, String key, JSONObject value, DasConfigKafka configKafka) {
      Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, configEntity.getBootstrapServers());
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put("group.id", configEntity.getGroupId() == null ? "" : configEntity.getGroupId());
      if (configEntity.getUserName() != null && configEntity.getPassword() != null) {
         props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
         props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
         props.put(SaslConfigs.SASL_JAAS_CONFIG, configEntity.getSaslJaasConfig().replace("YH", configEntity.getUserName()).replace("MM", configEntity.getPassword()));
        }
         props.put("enable.auto.commit", "true");
           props.put("auto.commit.interval.ms", "1000");
           props.put("session.timeout.ms", "30000");
           props.put("auto.offset.reset", "earliest");
           this.producer = new KafkaProducer<String, String>(props);
           this.produceTopic = configEntity.getTopic();
            startSend(key, value, configKafka);
    }

/**
* 下发消息
*
* @param vin
* @param value
*/
public void startSend(String vin, JSONObject value, DasConfigKafka configKafka) {
    try {
        producer.send(new ProducerRecord<String, String>(produceTopic, vin, JSONUtil.toJsonStr(value)), new Callback() {
        @Override
        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
        if (e == null) {
          // 消息发送成功
            if (configKafka.getOnlineOrNot() != 1) {
              configKafka.setOnlineOrNot(1);
              dasConfigKafkaService.updateById(configKafka);
              }
              log.info(configKafka.getKafkaIp() + "-->消息转发成功.");
              } else {
              // 执行错误逻辑处理
            if (configKafka.getOnlineOrNot() != 0) {
              configKafka.setOnlineOrNot(0);
              dasConfigKafkaService.updateById(configKafka);
              }
               log.info(configKafka.getKafkaIp() + "-->消息转发失败.");
              }
              }
              });
              // 异步处理 资源变更通知,避免阻塞线程
               new Thread(() -> log.info("===============信息内容====================" + value)).start();
              } catch (Exception e) {
                e.printStackTrace();
              } finally {
                  producer.close();
                }
               }
              }

标签:configEntity,configKafka,生产者,kafka,props,put,import,动态
From: https://www.cnblogs.com/message-hrp/p/17451201.html

相关文章

  • 算法学习day39动态规划part02-62、63
    packageLeetCode.DPpart02;/***62.不同路径*一个机器人位于一个mxn网格的左上角(起始点在下图中标记为“Start”)。*机器人每次只能向下或者向右移动一步。机器人试图达到网格的右下角(在下图中标记为“Finish”)。*问总共有多少条不同的路径?*示例:*输入......
  • 算法学习day41动态规划part03-343、96
    packageLeetCode.DPpart03;/***343.整数拆分*给定一个正整数n,将其拆分为k个正整数的和(k>=2),并使这些整数的乘积最大化。*返回你可以获得的最大乘积。*示例:*输入:n=2*输出:1*解释:2=1+1,1×1=1。**/publicclassIntegerBre......
  • 经济学:动态模型平均(DMA)、动态模型选择(DMS)、ARIMA、TVP预测原油时间序列价格|附代
    全文链接:http://tecdat.cn/?p=22458最近我们被客户要求撰写关于动态模型平均的研究报告,包括一些图形和统计输出。本文提供了一个经济案例。着重于原油市场的例子。简要地提供了在经济学中使用模型平均和贝叶斯方法的论据,使用了动态模型平均法(DMA),并与ARIMA、TVP等方法进行比较 (......
  • C温故补缺(十七):动态链接(ELF,PIC,GOT,PLT)
    动态链接(PIC,GOT,PLT,ELF)参考:51CTO通过静态链接,可以生成一个可执行文件,这个可执行文件既可以是完全链接的也可以是部分链接的,对于部分链接的可执行文件,有些符号引用需要等到可执行文件加载时甚至是运行时才会进行符号解析和重定位。动态链接与静态链接一样包括符号解析和重......
  • Doris(七) -- 修改表、动态和临时分区、join的优化
    修改表修改表名--1.将名为table1的表修改为table2ALTERTABLEtable1RENAMEtable2;--示例ALTERTABLEaggregate_testRENAMEaggregate_test1;--2.将表example_table中名为rollup1的rollupindex修改为rollup2ALTERTABLEbase_table_nameRENAMEROLLUP......
  • Kafka环境的配置
    大家对于消息队列,想必不会很陌生,特别是ActiveMQ和RabbitMQ,今天我将会为大家介绍下Kafka在centOs系统中的安装。第一步:准备好包。对于kafka,你需要zookeeper,所以你需要下载zookeeper。点击zookeeper下载下载zookeeper后放入到centos中.放入文件夹software中。接着准备kafka.点击下......
  • 动态规划(五)背包问题
    基本思想:动态规划算法通常用于求解具有某种最优性质的问题。在这类问题中,可能会有许多可行解。每一个解都对应于一个值,我们希望找到具有最优值的解。动态规划算法与分治法类似,其基本思想也是将待求解问题分解成若干个子问题,先求解子问题,然后从这些子问题的解得到原问题的解。与分......
  • 动态规划(一)硬币找零,机器人路径
    动态规划(DynamicProgramming,简称DP),虽然抽象后进行求解的思路并不复杂,但具体的形式千差万别,找出问题的子结构以及通过子结构重新构造最优解的过程很难统一,并不像回溯法具有解决绝大多数问题的银弹。动态规划求解的一般思路1.硬币找零扩展1:单路取苹果扩展2:机器人路径2.字符......
  • 代理模式(动态)
    1,动态代理分为2类①基于JDK(1.5以后的版本)接口类:点击查看代码publicinterfaceIDAO{publicintsave();publicintremove();publicintmodify();publicintfindAll();}实现类点击查看代码publicclassDeptDAOimplementsIDAO{@Over......
  • 关于动态维护树中点集直径的研究
    例题:P2056.这是括号序动态维护的方法,这里不讲。注意到一个结论:设\(S,T(S\capT=\varnothing)\)为两个树种的点集。记\(f(S)\)为一个大小为\(2\)集合,其中两个点分别为\(S\)集合中直径的两个端点(多个取任意)。则有结论:\(\exists\x,y\inf(S)\cupf(T),x\neqy\),满足\(x......