首页 > 其他分享 >【flink】重启kafka消费者

【flink】重启kafka消费者

时间:2023-04-23 10:57:34浏览次数:35  
标签:消费者 kafkaConsumer 重启 flink Kafka env props new kafka

public class KafkaConsumerJob {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:9092");
        props.setProperty("group.id", "test-group");

      //创建了一个Flink Kafka消费者,并在StreamExecutionEnvironment中添加它作为数据源
        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("myTopic", new SimpleStringSchema(), props);
        DataStream<String> stream = env.addSource(kafkaConsumer);

        stream.print();
        env.execute("Kafka Consumer Job");
    }

//在Flink集群上运行该应用程序,则需要考虑应用程序在多个TaskManager上运行的情况,并确保在所有TaskManager上都执行相同的重启逻辑
    public void restart() {
        // 停止现有的Kafka消费者或生产者,关闭与Kafka的连接。
        // 销毁现有的Kafka消费者或生产者实例。
        kafkaConsumer.close();
        // 创建一个新的Kafka消费者或生产者实例,并重新连接到Kafka集群。
        kafkaConsumer = new FlinkKafkaConsumer<>("myTopic", new SimpleStringSchema(), props);
        stream = env.addSource(kafkaConsumer);
    }
}

标签:消费者,kafkaConsumer,重启,flink,Kafka,env,props,new,kafka
From: https://www.cnblogs.com/xiaoyu-jane/p/17345824.html

相关文章

  • 【spring boot】 重启kafka客户端连接
    背景kafka服务端重建时,kafka客户端会连不上kafka服务端,此时需要重启客户端重连代码实现@ServicepublicclassKafkaConsumerService{privateKafkaConsumer<String,String>consumer;@AutowiredprivateKafkaPropertieskafkaProperties;//在应用......
  • kafka实践(十五): 滴滴开源Kafka管控平台 Logi-KafkaManager研究
    目录调试环境搭建前端调试环境后端调试环境功能架构工具理解应用开发人员kafka/管控开发人员kafka/管控运维人员部署验证windows环境下的部署/调试环境linux环境下生产使用后续 调试环境搭建前端调试环境github克隆比较慢gitee很快,采取前后端分离架构(springboot+reactJS+Typescrip......
  • flink学习路线
    1传统架构2大数据架构和流式架构的演变工程3flink优势和不足4flink应用场景5flink基本架构6环境准备,运行环境和开发环境配置,建议使用java,兼容性好7flink编程模型:flink的数据集类型,编程接口,程序结构和数据类型4个维度进行分析。流式处理和批量计算。8flinktableap......
  • RabbitMQ、RocketMQ、Kafka性能对比分析
    MQ的作用MQ的作用是解耦、异步、削峰填谷。未使用MQ的情况MySql并发写大部分情况下维持在600-800之间,并发读1200-1500之间,所以消费端在消费消息的时候需控制在并发小于1000,从而达到限流的效果。使用MQ的情况MQ做个缓冲,消息放到磁盘,几个G或上T都可以存储,消息丢失......
  • Flink启动报错:/bin/config.sh: line 32: syntax error near unexpected token
    flink启动报错xxx@ssss:/xxx/flink-1.15.2/bin>shstart-cluster.sh/xxx/flink-1.15.2/bin/config.sh:line32:syntaxerrornearunexpectedtoken`<'/xxx/flink-1.15.2/bin/config.sh:line32:`done<<(find"$FLINK_LIB_DIR"!-ty......
  • 单机单节点Flink的部署
    一、Flink的下载和安装1、Flink的下载官方下载网址:https://archive.apache.org/dist/flink/这里选择1.15.2这个版本 2、把Flink上传到主机上把下载好的文件上传到/opt/software上3、解压Flink安装包把Flink解压到/opt/module中,要提前创建module文件夹tar-zxvfflink-......
  • 银河麒麟高级服务器操作系统V10 SP3安装kafka_2.12-2.3.1
    银河麒麟高级服务器操作系统V10SP3安装kafka_2.12-2.3.1 1.安装环境设置1关闭Selinux12345678910111213141516171819[root@localhost~]#vim/etc/selinux/config #Thisfilecontrolsthestate of SELinux on thesystem.#SELI......
  • kafka业务数据到ODS层处理小记
    kafka业务数据到ODS层处理小记1:kafka消息partition分区,应以表主键为key2:kafka消息落地后,同一批次数据中取主键+offset最大的一条,再删除基础数据中此批次数据,最后将此批次数据按数据处理类型(delete、insert、update),先insert、update,再delete。......
  • kafka
    首先配置好zookeeper修改kafka配置文件config/server.properties文件中broker.id=0port=9092host.name=192.168.112.91log.dirs=/usr/local/src/kafka/kafka-logszookeeper.connect=master:2181,slave1:2181,slave2:2181启动kafka./bin/kafka-server-start.sh-daemonc......
  • GitLab 服务的启动、停止和重启命令
    GitLab服务的启动、停止和重启命令 一、问题现象在使用GitLab管理项目代码时,有时候可能因为服务出现异常,导致无法正常访问GitLab上的代码。报错如下: 二、解决方案重启GitLab服务,命令如下:gitlab-ctlrestart命令执行效果如下: 三、补充说明 1、GitLab......