首页 > 其他分享 >Flink kafka source

Flink kafka source

时间:2023-09-05 16:36:05浏览次数:38  
标签:kafkaConsumer Flink kafka source env offset new

kafka source

接收kafka的数据

<!-- Kafka 相关依赖 -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.12</artifactId>
            <version>1.12.0</version>
        </dependency>
public class kafkaSourceStudent {
    public static void main(String[] args) throws Exception {
        // 运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 配置 kafka 连接属性
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","10.50.8.136:9093");

        // 创建kafka数据源
        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>("vsoc_etl_rule_filter", new SimpleStringSchema(), properties);
        // 添加 Kafka 数据源到 Flink 环境
        env.addSource(kafkaConsumer).print();
        // 执行任务
        env.execute("Flink Kafka Source Example");
    }
}

消费策略

提供了四种消费策略:

// 创建kafka数据源
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>("vsoc_etl_rule_filter", new SimpleStringSchema(), properties);
// 默认设置,读取group.id对应保存的offset开始消费的数据,读取不到读取auto.offset.rest参数设置的策略
kafkaConsumer.setStartFromGroupOffsets();
// 从最早的记录开始消费数据,排除已提交的offset信息
kafkaConsumer.setStartFromEarliest();
// 从最新的开始消费 排除已提交的offset信息
kafkaConsumer.setStartFromLatest();
// 从指定时间戳开始消费数据
kafkaConsumer.setStartFromTimestamp(213412412312L);

kafka consumer的容错

当checkPoint机制开启的时候,consumer会定期把kafka的offset信息还有其他算子任务的state信息一块保存起来,当job失败重启的时候,Flink会从最近的一次checkPoint中进行恢复数据,重新消费kafka中的数据。


启动checkpoint:

// 运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 5秒执行一次 checkPoint
env.enableCheckpointing(5000);


Flink kafka source_数据源


state数据存储:

  • MemorySateBackend:基于内存存储
  • FsSateBackend:基于远程文件系统,如hdfs
  • RocksDBStateBackend:先存储在本地文件系统中,会同步到远程文件系统

标签:kafkaConsumer,Flink,kafka,source,env,offset,new
From: https://blog.51cto.com/u_13589027/7373716

相关文章

  • 43、Flink之Hive 读写及详细验证示例
    Flink系列文章[1、Flink部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接][13、Flink的tableapi与sql的基本概念、通用api介绍及入门示例][14、Flink的tableapi与sql之数据类型:内置数据类型以及它们的属性][15、Flink的t......
  • KAFKA
    Kafka消息队列的两种模型:点对点模式:在点对点模式中,有一个消息生产者(Producer)将消息发送到特定的消息队列(Queue),然后消息消费者(Consumer)从该队列中获取消息。每个消息只能被一个消费者接收,即使有多个消费者监听同一个队列,每条消息也只会被其中一个消费者消费。这种模式适用于......
  • IIncrementalGenerator 增量 Source Generator 生成代码入门 从语法到语义 获取类型完
    本文告诉大家如何在使用IIncrementalGenerator进行增量的SourceGenerator生成代码时,如何从语法分析过程,将获取的语法Token转换到语义分析上,比如获取类型完全限定名。一个使用的例子是在拿到一个Token表示某个类型时,本文将演示通过语义分析获取到拿到的Token的Type类......
  • kafka的幂等性
    什么是幂等性:无论发送多少次相同的请求,最终的结果都是一致。 问:那他又是如何保证消息不会被重复发送的?答:Kafka通过ProducerId(生产者标识符)和SequenceNumber(序列号)来保证消息不会被重复发送。以下是Kafka如何实现这一点的工作原理:ProducerId(PID):每个Kafka生产......
  • Springboot+Quartz+Dynamic-datasource
    在使用dynamic-datasource多数据源切换场景下,实现Quartz任务持久化配置和API动态调度1.pom依赖暂未找到版本对应关系,若有版本不一致异常,请自行尝试升降版本。<dependencies><!--动态数据源--><dependency><groupId>com.baomidou</groupI......
  • dotnet 记 TaskCompletionSource 的 SetException 可能将异常记录到 UnobservedTaskEx
    本文将记录dotnet的一个已知问题,且是设计如此的问题。假定有一个TaskCompletionSource对象,此对象的Task没有被任何地方引用等待。在TaskCompletionSource被调用SetException或TrySetException方法时,将会记录一个存在异常且未捕获的Task对象。此Task对象将会在被G......
  • Flink SQL基本语法
    在flinksql中,对表名、字段名、函数名等是严格区分大小写的,为了兼容hive等其他仓库,建议建表时,表名和字段名都采用下划线连接单词的方式,以避免大小写问题。比如hive,是不区分大小写的,所有大写字母最终都会被系统转化为小写字母,此时使用flinksql去读写hive,出现大写字母时......
  • kafka集群安装(CentOS7 + kafka 2.7.1)
    Linux系统-部署-运维系列导航 kafka介绍Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等,Linkedin于2010年贡献给了Apache基金会并成为顶级开源......
  • 【Kafka系列】(一)Kafka入门
    有的时候博客内容会有变动,首发博客是最新的,其他博客地址可能会未同步,认准https://blog.zysicyj.top首发博客地址系列文章地址Kafka是什么?一句话概括:ApacheKafka是一款开源的消息引擎系统什么是消息引擎系统?消息引擎系统(MessageBrokerSystem)是一种中间件软件或服务,用......
  • Go语言实现Kafka消费者的示例代码
    Kafka是一种分布式流处理平台,由Facebook于2011年推出,现在已经成为Apache项目的一部分。Kafka提供了高可用性、可扩展性和低延迟的消息传递服务,适用于处理实时和离线数据。Kafka的主要功能包括生产者-消费者通信、批处理和实时数据流处理。Kafka基于发布/订阅模型,允许消息发布者将数......