首页 > 数据库 >flink实时读取kafka数据到mysql flink 读取kafka 依赖 Flink 1.8.0

flink实时读取kafka数据到mysql flink 读取kafka 依赖 Flink 1.8.0

时间:2024-02-22 19:45:39浏览次数:53  
标签:读取 flink new kafka import apache org Kafka

flink实时读取kafka数据到mysql flink 读取kafka


Flink提供了Kafka连接器,用于从或向Kafka读写数据。

本文总结Flink与Kafka集成中的问题,并对一些疑点进行总结和梳理。

问题一: 读Kafka的方式

## 读取一个Topic
FlinkKafkaConsumer010#FlinkKafkaConsumer010(String topic, DeserializationSchema<T> valueDeserializer, Properties props)
FlinkKafkaConsumer010#FlinkKafkaConsumer010(String topic, KafkaDeserializationSchema<T> deserializer, Properties props)
举例: 
    FlinkKafkaConsumer010<String> kafkaConsumer = new FlinkKafkaConsumer010<>("userActionLog1", new SimpleStringSchema(), kafkaProperties);
    DataStream<String> source = env.addSource(kafkaConsumer).name("KafkaSource").uid("source-id");
## 读取多个Topic
FlinkKafkaConsumer010#FlinkKafkaConsumer010(List<String> topics, DeserializationSchema<T> deserializer, Properties props)
FlinkKafkaConsumer010#FlinkKafkaConsumer010(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props)
举例:
    FlinkKafkaConsumer010<String> kafkaConsumer = new FlinkKafkaConsumer010<>(Arrays.asList("userActionLog1","userActionLog2","userActionLog3"), new SimpleStringSchema(), kafkaProperties);
    DataStream<String> source = env.addSource(kafkaConsumer).name("KafkaSource").uid("source-id");
# 读取多个Topic
FlinkKafkaConsumer010#FlinkKafkaConsumer010(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props)
FlinkKafkaConsumer010#FlinkKafkaConsumer010(Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props)
举例:
    FlinkKafkaConsumer010<String> kafkaConsumer = new FlinkKafkaConsumer010<>(Pattern.compile("userActionLog[1-9]{1}"), new SimpleStringSchema(), kafkaProperties);
    DataStream<String> source = env.addSource(kafkaConsumer).name("KafkaSource").uid("source-id");
# 从指定的时间戳开始消费
kafkaConsumer.setStartFromTimestamp(long startupOffsetsTimestamp)
# 从指定的偏移量开始消费,可为每个分区单独设置偏移量
kafkaConsumer.setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long> specificStartupOffsets)
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.

问题二: 读Kafka与反序列化器

可通过org.apache.flink.api.common.serialization.DeserializationSchemaorg.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema,将从Kafka读取的二进制字节流反序列化成Flink内部支持的Java/Scala对象。

Flink内置支持以下2种常用反序列化器:

  1. org.apache.flink.api.common.serialization.SimpleStringSchema:反序列化成String。
  2. org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema:反序列化成jackson ObjectNode。

如果想实现Kafka复杂JSON直接转换成想要的Object,可仿照org.apache.flink.api.common.serialization.SimpleStringSchema自定义即可。主要实现deserialize反序列化方法。

问题三: 读Kafka动态发现Topic、Partition

之前使用Spark Streaming,Spark 2.2.2不支持动态发现Kafka 0.10.1中新增的Topic(基于正则指定)和Partition。当新增了Topic或Partition,需要重启Spark Streaming任务。

在Flink中, 默认支持动态发现Kafka中新增的Topic或Partition,但需要手动开启。

kafkaProperties.put("flink.partition-discovery.interval-millis","10000");
flink.partition-discovery.interval-millis: 检查间隔,单位毫秒。
  • 1.
  • 2.
  • 3.

问题四: 读Kafka与Exactly Once语义

没有开启Checkpoint,默认自动提交Offset至外部存储(如Kafka Broker或Zookeeper),自动提交的间隔是5秒。Flink Kafka Consumer的容错依赖于自动提交的Offset。

开启Checkpoint,默认在Checkpoint完成后将存储在Checkpoint中的Offset再提交至外部存储(如Kafka Broker或0.8版本中的Zookeeper),Flink Kafka Consumer在Flink作业运行过程中的容错依赖于Checkpoint中的Offset,Flink作业恢复,则可能是从Checkpoint中的Offset恢复,也可能是从外部存储如Kafka Broker中的Offset恢复,具体取决于恢复方式。注意: 在这种方式下,Kafka Broker(或0.8中Zookeeper)存储的Offset仅用于监控消费进度。

总结,基于Kafka可重复消费的能力并结合Flink Checkpoint机制,Flink Kafka Consumer能提供Exactly-Once语义。

问题五: 写Kafka与Exactly Once语义

  • Kafka 0.8 Flink不提供Exactly-Once或At-Least-Once语义。
  • Kafka 0.9、0.10 Flink启用Checkpoint,FlinkKafkaProducer09FlinkKafkaProducer010提供At-Least-Once语义。除此之外,还需设置以下参数:
    setLogFailuresOnly(false): 若为true,Producer遇到异常时,仅记录失败时的日志,流处理程序继续。需要设置为false,当遇到异常,流处理程序失败,抛出异常恢复任务并重试发送。
    setFlushOnCheckpoint(true): Checkpoint中包含Kafka Producer Buffer中的数据,设置为true, 确保Checkpoint成功前,Buffer中的所有记录都已写入Kafka。
    retries: 重试次数,默认0,建议设置更大。
  • Kafka 0.11、1.0.0+ Flink启用Checkpoint,基于Two Phase Commit,FlinkKafkaProducer011FlinkKafkaProducer(Kafka >=1.0.0) 默认提供Exactly-Once语义。
    如需要其他语义Semantic.NONE(可能会丢或重)Semantic.AT_LEAST_ONCE(可能会重)Semantic.EXACTLY_ONCE(默认),可手动选择。

Kafka 0.10.1读数据并写入到Kafka 0.11.0.3并实现PV统计

部分依赖

<!--Kafka连接器-->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
    <version>1.8.0</version>
</dependency>
<!--Kafka连接器-->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
    <version>1.8.0</version>
</dependency>
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.

代码实现

package com.bigdata.flink;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.util.Collector;
import java.text.Format;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
/**
 * Author: Wang Pei
 * Summary: 读写Kafka
 */
public class ReadWriteKafka {
    public static void main(String[] args) throws Exception{
        /**解析命令行参数*/
        ParameterTool fromArgs = ParameterTool.fromArgs(args);
        ParameterTool parameterTool = ParameterTool.fromPropertiesFile(fromArgs.getRequired("applicationProperties"));
        //checkpoint参数
        String checkpointDirectory = parameterTool.getRequired("checkpointDirectory");
        long checkpointSecondInterval = parameterTool.getLong("checkpointSecondInterval");
        //fromKafka参数
        String fromKafkaBootstrapServers = parameterTool.getRequired("fromKafka.bootstrap.servers");
        String fromKafkaGroupID = parameterTool.getRequired("fromKafka.group.id");
        String fromKafkaAutoOffsetReset= parameterTool.getRequired("fromKafka.auto.offset.reset");
        String fromKafkaTopic = parameterTool.getRequired("fromKafka.topic");
        //toKafka参数
        String toKafkaBootstrapServers = parameterTool.getRequired("toKafka.bootstrap.servers");
        String toKafkaTopic = parameterTool.getRequired("toKafka.topic");
        //窗口参数
        long tumblingWindowLength = parameterTool.getLong("tumblingWindowLength");
        long outOfOrdernessSeconds = parameterTool.getLong("outOfOrdernessSeconds");
        /**配置运行环境*/
        //设置Local Web Server
        Configuration config = new Configuration();
        config.setInteger(RestOptions.PORT,8081);
        config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //设置StateBackend
        env.setStateBackend((StateBackend) new FsStateBackend(checkpointDirectory, true));
        //设置Checkpoint
        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
        checkpointConfig.setCheckpointInterval(checkpointSecondInterval * 1000);
        checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        /**配置数据源*/
        Properties kafkaProperties = new Properties();
        kafkaProperties.put("bootstrap.servers",fromKafkaBootstrapServers);
        kafkaProperties.put("group.id",fromKafkaGroupID);
        kafkaProperties.put("auto.offset.reset",fromKafkaAutoOffsetReset);
        FlinkKafkaConsumer010<String> kafkaConsumer = new FlinkKafkaConsumer010<>(fromKafkaTopic, new SimpleStringSchema(), kafkaProperties);
        kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
        DataStream<String> source = env.addSource(kafkaConsumer).name("KafkaSource").uid("source-id");
        /**抽取转换*/
        SingleOutputStreamOperator<Tuple4<String, String, String, Integer>> sourceMap = source
                .map((MapFunction<String, Tuple4<String, String, String, Integer>>) value -> {
                    Tuple4<String, String, String, Integer> output = new Tuple4<>();
                    try {
                        JSONObject obj = JSON.parseObject(value);
                        output.f0 = obj.getString("userID");
                        output.f1 = obj.getString("eventTime");
                        output.f2 = obj.getString("eventType");
                        output.f3 = obj.getInteger("productID");
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    return output;
                    }).returns(new TypeHint<Tuple4<String, String, String, Integer>>(){})
                .name("Map: ExtractTransform").uid("map-id");
        /**过滤掉异常数据*/
        SingleOutputStreamOperator<Tuple4<String, String, String, Integer>> sourceFilter = sourceMap
                .filter((FilterFunction<Tuple4<String, String, String, Integer>>) value -> value != null)
                .name("Filter: FilterExceptionData").uid("filter-id");
        /**抽取时间戳并发射水印*/
        SingleOutputStreamOperator<Tuple4<String, String, String, Integer>> assignTimestampsAndWatermarks = sourceFilter.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple4<String, String, String, Integer>>(Time.seconds(outOfOrdernessSeconds)) {
            SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            @Override
            public long extractTimestamp(Tuple4<String, String, String, Integer> element) {
                long timestamp = 0L;
                try {
                    Date date = format.parse(element.f1);
                    timestamp = date.getTime();
                } catch (ParseException e) {
                    e.printStackTrace();
                }
                return timestamp;
            }
        }).uid("watermark-id");
        /**窗口统计*/
        SingleOutputStreamOperator<String> aggregate = assignTimestampsAndWatermarks
                //默认用Hash方式
                .keyBy((KeySelector<Tuple4<String, String, String, Integer>, String>) value -> value.f2)
                .window(TumblingEventTimeWindows.of(Time.seconds(tumblingWindowLength)))
                //在每个窗口(Window)上应用WindowFunction(CustomWindowFunction)
                //CustomAggFunction用于增量聚合
                //在每个窗口上,先进行增量聚合(CustomAggFunction),然后将增量聚合的结果作为WindowFunction(CustomWindowFunction)的输入,计算后并输出
                //具体: 可参考底层AggregateApplyWindowFunction的实现
                .aggregate(new CustomAggFunction(), new CustomWindowFunction());
        //aggregate.print();
        /**结果输出*/
        Properties kafkaProducerProperties = new Properties();
        kafkaProducerProperties.setProperty("bootstrap.servers",toKafkaBootstrapServers);
        kafkaProducerProperties.setProperty("transaction.timeout.ms",60000+"");
        FlinkKafkaProducer011<String> kafkaProducer011 = new FlinkKafkaProducer011<>(
                toKafkaTopic,
                new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()),
                kafkaProducerProperties,
                FlinkKafkaProducer011.Semantic.EXACTLY_ONCE
        );
        aggregate.addSink(kafkaProducer011).name("outputToKafka");
        env.execute();
    }
    /**
     * 自定义AggregateFunction
     * 增量聚合,这里实现累加效果
     */
    static class CustomAggFunction implements AggregateFunction<Tuple4<String, String, String, Integer>,Long,Long> {
        @Override
        public Long createAccumulator() {
            return 0L;
        }
        @Override
        public Long add(Tuple4<String, String, String, Integer> value, Long accumulator) {
            return accumulator + 1;
        }
        @Override
        public Long getResult(Long accumulator) {
            return accumulator;
        }
        @Override
        public Long merge(Long accumulator1, Long accumulator2) {
            return accumulator1 + accumulator2;
        }
    }
    /**
     * 自定义WindowFunction
     * 对增量聚合的结果再做处理,并输出
     */
    static class CustomWindowFunction implements WindowFunction<Long, String,String, TimeWindow> {
        @Override
        public void apply(String key, TimeWindow window, Iterable<Long> input, Collector<String> out) throws Exception {
            Format format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            long windowStart = window.getStart();
            long windowEnd = window.getEnd();
            Long windowPV = input.iterator().next();
            String output=format.format(new Date(windowStart))+","+format.format(new Date(windowEnd))+","+key+","+windowPV;
            out.collect(output);
        }
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.
  • 62.
  • 63.
  • 64.
  • 65.
  • 66.
  • 67.
  • 68.
  • 69.
  • 70.
  • 71.
  • 72.
  • 73.
  • 74.
  • 75.
  • 76.
  • 77.
  • 78.
  • 79.
  • 80.
  • 81.
  • 82.
  • 83.
  • 84.
  • 85.
  • 86.
  • 87.
  • 88.
  • 89.
  • 90.
  • 91.
  • 92.
  • 93.
  • 94.
  • 95.
  • 96.
  • 97.
  • 98.
  • 99.
  • 100.
  • 101.
  • 102.
  • 103.
  • 104.
  • 105.
  • 106.
  • 107.
  • 108.
  • 109.
  • 110.
  • 111.
  • 112.
  • 113.
  • 114.
  • 115.
  • 116.
  • 117.
  • 118.
  • 119.
  • 120.
  • 121.
  • 122.
  • 123.
  • 124.
  • 125.
  • 126.
  • 127.
  • 128.
  • 129.
  • 130.
  • 131.
  • 132.
  • 133.
  • 134.
  • 135.
  • 136.
  • 137.
  • 138.
  • 139.
  • 140.
  • 141.
  • 142.
  • 143.
  • 144.
  • 145.
  • 146.
  • 147.
  • 148.
  • 149.
  • 150.
  • 151.
  • 152.
  • 153.
  • 154.
  • 155.
  • 156.
  • 157.
  • 158.
  • 159.
  • 160.
  • 161.
  • 162.
  • 163.
  • 164.
  • 165.
  • 166.
  • 167.
  • 168.
  • 169.
  • 170.
  • 171.
  • 172.
  • 173.
  • 174.
  • 175.
  • 176.
  • 177.
  • 178.
  • 179.
  • 180.
  • 181.
  • 182.
  • 183.
  • 184.
  • 185.
  • 186.
  • 187.
  • 188.
  • 189.
  • 190.
  • 191.
  • 192.
  • 193.
  • 194.
  • 195.
  • 196.
  • 197.
  • 198.
  • 199.
  • 200.
  • 201.
  • 202.
  • 203.
  • 204.
  • 205.
  • 206.
  • 207.
  • 208.
  • 209.
  • 210.
  • 211.
  • 212.
  • 213.
  • 214.
  • 215.
  • 216.
  • 217.
  • 218.
  • 219.
  • 220.
  • 221.


原文链接:https://blog.51cto.com/u_14987/6694326

标签:读取,flink,new,kafka,import,apache,org,Kafka
From: https://www.cnblogs.com/sunny3158/p/18028009

相关文章

  • Jenkins CLI 任意文件读取漏洞(CVE-2024-23897)复现
    0x00漏洞简介Jenkins是一款基于JAVA开发的开源自动化服务器。Jenkins使用args4j来解析命令行输入,并支持通过HTTP、WebSocket等协议远程传入命令行参数。在args4j中,用户可以通过@字符来加载任意文件。这一特性存在安全风险,攻击者可以利用它来读取服务器上的任意文件。0x01影响......
  • flink之核心抽象--Window窗口及窗口操作全面详解
    flink之核心抽象--Window窗口及窗口操作全面详解标签:flink 窗口 String val -- 元素 Long window1.Windows1.1.基本概念窗口是处理无限流的核心。窗口将流划分为固定大小的“桶”,方便程序员在上面应用各种计算。Window操作是流式数据处理的一种非常核心的抽象,......
  • Kafka 3.6.1 Kraft模式 集群安装与部署
    1.集群规划hadoop02(192.168.58.130)hadoop03(192.168.58.131)hadoop04(192.168.58.132)kafkakafkakafka2.集群部署1.下载kafka二进制包https://kafka.apache.org/downloads2.解压mkdir/usr/kafkatar-zxvf/home/kafka_2.13-3.6.1.tgz-C/usr/kafka/3.......
  • golang 读取excel 保存xml
    1、首先下载第三方excel读取库gogetgithub.com/xuri/excelize/v22、读取xml库,未使用默认xml库 gogetgithub.com/beevik/etreepackagemainimport( "fmt" "github.com/beevik/etree" "github.com/xuri/excelize/v2")funcLoadExcelAndSaveXML(){ ......
  • flink 窗口函数 中文解释和案例
    flink窗口函数中文解释和案例文章目录窗口函数时间语义处理时间事件时间摄入时间水位线有序流中的水位线乱序流中的水位线生成水位线生成水位线原则水位线生成策略flink内置水位线生成器有序流乱序流自定义水位线周期性水位线生成器断点式水位线生成器水位线的传递......
  • Spring Kafka AckMode介绍
     原文链接:https://blog.csdn.net/qq1309664161/article/details/116994341一:AckMode介绍kafka消费端在读取数据后,会向Kafka服务端提交偏移量,来记录消费端读取数据的位置。提交偏移量分为手动提交和自动提交,为了保证数据读取的安全性,我们一般设置成手动提交偏移量。在Springb......
  • 华为二面:SpringBoot读取配置文件的原理是什么?加载顺序是什么?
    引言SpringBoot以其简化的配置和强大的开箱即用功能而备受欢迎,而配置文件的加载是SpringBoot应用启动过程中的关键步骤之一。深入理解SpringBoot启动时如何加载配置文件的源码,有助于开发者更好地理解其内部工作原理,提高配置管理的灵活性和可维护性。本文将从源码入手,解读Sprin......
  • Kafka监控系统Kafka Eagle
    kafka集群部署完成后需要有一个可视化web页面,便于实时查看和观测kafka集群状态,kafka本身并没有提供可视化页面,但市面上有很多开源的可视化工具,我们以其中的KafkaEagle为例,在安装KafkaEagle之前,至少需要安装JDK、kafka、zookeeper的环境后,再进行后续操作。本文的前置条件:Kafka......
  • .net core 读取appsetting.json 封装
    /*需要引入的包Microsoft.Extensions.Configuration-提供配置的核心功能。Microsoft.Extensions.Configuration.Json-支持从JSON文件加载配置。Microsoft.Extensions.Configuration.FileExtensions-支持文件相关的配置,如设置基路径。Microsoft.Extensions.Configura......
  • pandas读取txt---按行输入按行输出
     1.pandas读取txt---按行输入按行输出importpandasaspd#我们的需求是取出所有的姓名#test1的内容'''idnamescore1张三1002李四993王五98'''test1=pd.read_table("test1.txt")#这个是带有标题的文件names=test1["name"]#根据标题来取值p......