flink实时读取kafka数据到mysql flink 读取kafka
Flink提供了Kafka连接器,用于从或向Kafka读写数据。
本文总结Flink与Kafka集成中的问题,并对一些疑点进行总结和梳理。
问题一: 读Kafka的方式
## 读取一个Topic
FlinkKafkaConsumer010#FlinkKafkaConsumer010(String topic, DeserializationSchema<T> valueDeserializer, Properties props)
FlinkKafkaConsumer010#FlinkKafkaConsumer010(String topic, KafkaDeserializationSchema<T> deserializer, Properties props)
举例:
FlinkKafkaConsumer010<String> kafkaConsumer = new FlinkKafkaConsumer010<>("userActionLog1", new SimpleStringSchema(), kafkaProperties);
DataStream<String> source = env.addSource(kafkaConsumer).name("KafkaSource").uid("source-id");
## 读取多个Topic
FlinkKafkaConsumer010#FlinkKafkaConsumer010(List<String> topics, DeserializationSchema<T> deserializer, Properties props)
FlinkKafkaConsumer010#FlinkKafkaConsumer010(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props)
举例:
FlinkKafkaConsumer010<String> kafkaConsumer = new FlinkKafkaConsumer010<>(Arrays.asList("userActionLog1","userActionLog2","userActionLog3"), new SimpleStringSchema(), kafkaProperties);
DataStream<String> source = env.addSource(kafkaConsumer).name("KafkaSource").uid("source-id");
# 读取多个Topic
FlinkKafkaConsumer010#FlinkKafkaConsumer010(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props)
FlinkKafkaConsumer010#FlinkKafkaConsumer010(Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props)
举例:
FlinkKafkaConsumer010<String> kafkaConsumer = new FlinkKafkaConsumer010<>(Pattern.compile("userActionLog[1-9]{1}"), new SimpleStringSchema(), kafkaProperties);
DataStream<String> source = env.addSource(kafkaConsumer).name("KafkaSource").uid("source-id");
# 从指定的时间戳开始消费
kafkaConsumer.setStartFromTimestamp(long startupOffsetsTimestamp)
# 从指定的偏移量开始消费,可为每个分区单独设置偏移量
kafkaConsumer.setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long> specificStartupOffsets)
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
问题二: 读Kafka与反序列化器
可通过org.apache.flink.api.common.serialization.DeserializationSchema
或org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema
,将从Kafka读取的二进制字节流反序列化成Flink内部支持的Java/Scala对象。
Flink内置支持以下2种常用反序列化器:
org.apache.flink.api.common.serialization.SimpleStringSchema
:反序列化成String。org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema
:反序列化成jackson ObjectNode。
如果想实现Kafka复杂JSON直接转换成想要的Object,可仿照org.apache.flink.api.common.serialization.SimpleStringSchema
自定义即可。主要实现deserialize
反序列化方法。
问题三: 读Kafka动态发现Topic、Partition
之前使用Spark Streaming,Spark 2.2.2
不支持动态发现Kafka 0.10.1
中新增的Topic(基于正则指定)和Partition。当新增了Topic或Partition,需要重启Spark Streaming任务。
在Flink中, 默认支持动态发现Kafka中新增的Topic或Partition,但需要手动开启。
kafkaProperties.put("flink.partition-discovery.interval-millis","10000");
flink.partition-discovery.interval-millis: 检查间隔,单位毫秒。
- 1.
- 2.
- 3.
问题四: 读Kafka与Exactly Once语义
没有开启Checkpoint,默认自动提交Offset至外部存储(如Kafka Broker或Zookeeper),自动提交的间隔是5秒。Flink Kafka Consumer的容错依赖于自动提交的Offset。
开启Checkpoint,默认在Checkpoint完成后将存储在Checkpoint中的Offset再提交至外部存储(如Kafka Broker或0.8版本中的Zookeeper),Flink Kafka Consumer在Flink作业运行过程中的容错依赖于Checkpoint中的Offset,Flink作业恢复,则可能是从Checkpoint中的Offset恢复,也可能是从外部存储如Kafka Broker中的Offset恢复,具体取决于恢复方式。注意: 在这种方式下,Kafka Broker(或0.8中Zookeeper)存储的Offset仅用于监控消费进度。
总结,基于Kafka可重复消费的能力并结合Flink Checkpoint机制,Flink Kafka Consumer
能提供Exactly-Once语义。
问题五: 写Kafka与Exactly Once语义
- Kafka 0.8 Flink不提供Exactly-Once或At-Least-Once语义。
- Kafka 0.9、0.10 Flink启用Checkpoint,
FlinkKafkaProducer09
和FlinkKafkaProducer010
提供At-Least-Once语义。除此之外,还需设置以下参数:setLogFailuresOnly(false)
: 若为true
,Producer
遇到异常时,仅记录失败时的日志,流处理程序继续。需要设置为false
,当遇到异常,流处理程序失败,抛出异常恢复任务并重试发送。setFlushOnCheckpoint(true)
: Checkpoint中包含Kafka Producer Buffer
中的数据,设置为true, 确保Checkpoint成功前,Buffer中的所有记录都已写入Kafka。retries
: 重试次数,默认0,建议设置更大。 - Kafka 0.11、1.0.0+ Flink启用Checkpoint,基于
Two Phase Commit
,FlinkKafkaProducer011
和FlinkKafkaProducer(Kafka >=1.0.0)
默认提供Exactly-Once语义。
如需要其他语义Semantic.NONE(可能会丢或重)
、Semantic.AT_LEAST_ONCE(可能会重)
、Semantic.EXACTLY_ONCE(默认)
,可手动选择。
从Kafka 0.10.1
读数据并写入到Kafka 0.11.0.3
并实现PV统计
部分依赖
<!--Kafka连接器-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>1.8.0</version>
</dependency>
<!--Kafka连接器-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.8.0</version>
</dependency>
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
代码实现
package com.bigdata.flink;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.util.Collector;
import java.text.Format;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
/**
* Author: Wang Pei
* Summary: 读写Kafka
*/
public class ReadWriteKafka {
public static void main(String[] args) throws Exception{
/**解析命令行参数*/
ParameterTool fromArgs = ParameterTool.fromArgs(args);
ParameterTool parameterTool = ParameterTool.fromPropertiesFile(fromArgs.getRequired("applicationProperties"));
//checkpoint参数
String checkpointDirectory = parameterTool.getRequired("checkpointDirectory");
long checkpointSecondInterval = parameterTool.getLong("checkpointSecondInterval");
//fromKafka参数
String fromKafkaBootstrapServers = parameterTool.getRequired("fromKafka.bootstrap.servers");
String fromKafkaGroupID = parameterTool.getRequired("fromKafka.group.id");
String fromKafkaAutoOffsetReset= parameterTool.getRequired("fromKafka.auto.offset.reset");
String fromKafkaTopic = parameterTool.getRequired("fromKafka.topic");
//toKafka参数
String toKafkaBootstrapServers = parameterTool.getRequired("toKafka.bootstrap.servers");
String toKafkaTopic = parameterTool.getRequired("toKafka.topic");
//窗口参数
long tumblingWindowLength = parameterTool.getLong("tumblingWindowLength");
long outOfOrdernessSeconds = parameterTool.getLong("outOfOrdernessSeconds");
/**配置运行环境*/
//设置Local Web Server
Configuration config = new Configuration();
config.setInteger(RestOptions.PORT,8081);
config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//设置StateBackend
env.setStateBackend((StateBackend) new FsStateBackend(checkpointDirectory, true));
//设置Checkpoint
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setCheckpointInterval(checkpointSecondInterval * 1000);
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
/**配置数据源*/
Properties kafkaProperties = new Properties();
kafkaProperties.put("bootstrap.servers",fromKafkaBootstrapServers);
kafkaProperties.put("group.id",fromKafkaGroupID);
kafkaProperties.put("auto.offset.reset",fromKafkaAutoOffsetReset);
FlinkKafkaConsumer010<String> kafkaConsumer = new FlinkKafkaConsumer010<>(fromKafkaTopic, new SimpleStringSchema(), kafkaProperties);
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
DataStream<String> source = env.addSource(kafkaConsumer).name("KafkaSource").uid("source-id");
/**抽取转换*/
SingleOutputStreamOperator<Tuple4<String, String, String, Integer>> sourceMap = source
.map((MapFunction<String, Tuple4<String, String, String, Integer>>) value -> {
Tuple4<String, String, String, Integer> output = new Tuple4<>();
try {
JSONObject obj = JSON.parseObject(value);
output.f0 = obj.getString("userID");
output.f1 = obj.getString("eventTime");
output.f2 = obj.getString("eventType");
output.f3 = obj.getInteger("productID");
} catch (Exception e) {
e.printStackTrace();
}
return output;
}).returns(new TypeHint<Tuple4<String, String, String, Integer>>(){})
.name("Map: ExtractTransform").uid("map-id");
/**过滤掉异常数据*/
SingleOutputStreamOperator<Tuple4<String, String, String, Integer>> sourceFilter = sourceMap
.filter((FilterFunction<Tuple4<String, String, String, Integer>>) value -> value != null)
.name("Filter: FilterExceptionData").uid("filter-id");
/**抽取时间戳并发射水印*/
SingleOutputStreamOperator<Tuple4<String, String, String, Integer>> assignTimestampsAndWatermarks = sourceFilter.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple4<String, String, String, Integer>>(Time.seconds(outOfOrdernessSeconds)) {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Override
public long extractTimestamp(Tuple4<String, String, String, Integer> element) {
long timestamp = 0L;
try {
Date date = format.parse(element.f1);
timestamp = date.getTime();
} catch (ParseException e) {
e.printStackTrace();
}
return timestamp;
}
}).uid("watermark-id");
/**窗口统计*/
SingleOutputStreamOperator<String> aggregate = assignTimestampsAndWatermarks
//默认用Hash方式
.keyBy((KeySelector<Tuple4<String, String, String, Integer>, String>) value -> value.f2)
.window(TumblingEventTimeWindows.of(Time.seconds(tumblingWindowLength)))
//在每个窗口(Window)上应用WindowFunction(CustomWindowFunction)
//CustomAggFunction用于增量聚合
//在每个窗口上,先进行增量聚合(CustomAggFunction),然后将增量聚合的结果作为WindowFunction(CustomWindowFunction)的输入,计算后并输出
//具体: 可参考底层AggregateApplyWindowFunction的实现
.aggregate(new CustomAggFunction(), new CustomWindowFunction());
//aggregate.print();
/**结果输出*/
Properties kafkaProducerProperties = new Properties();
kafkaProducerProperties.setProperty("bootstrap.servers",toKafkaBootstrapServers);
kafkaProducerProperties.setProperty("transaction.timeout.ms",60000+"");
FlinkKafkaProducer011<String> kafkaProducer011 = new FlinkKafkaProducer011<>(
toKafkaTopic,
new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()),
kafkaProducerProperties,
FlinkKafkaProducer011.Semantic.EXACTLY_ONCE
);
aggregate.addSink(kafkaProducer011).name("outputToKafka");
env.execute();
}
/**
* 自定义AggregateFunction
* 增量聚合,这里实现累加效果
*/
static class CustomAggFunction implements AggregateFunction<Tuple4<String, String, String, Integer>,Long,Long> {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(Tuple4<String, String, String, Integer> value, Long accumulator) {
return accumulator + 1;
}
@Override
public Long getResult(Long accumulator) {
return accumulator;
}
@Override
public Long merge(Long accumulator1, Long accumulator2) {
return accumulator1 + accumulator2;
}
}
/**
* 自定义WindowFunction
* 对增量聚合的结果再做处理,并输出
*/
static class CustomWindowFunction implements WindowFunction<Long, String,String, TimeWindow> {
@Override
public void apply(String key, TimeWindow window, Iterable<Long> input, Collector<String> out) throws Exception {
Format format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long windowStart = window.getStart();
long windowEnd = window.getEnd();
Long windowPV = input.iterator().next();
String output=format.format(new Date(windowStart))+","+format.format(new Date(windowEnd))+","+key+","+windowPV;
out.collect(output);
}
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
- 40.
- 41.
- 42.
- 43.
- 44.
- 45.
- 46.
- 47.
- 48.
- 49.
- 50.
- 51.
- 52.
- 53.
- 54.
- 55.
- 56.
- 57.
- 58.
- 59.
- 60.
- 61.
- 62.
- 63.
- 64.
- 65.
- 66.
- 67.
- 68.
- 69.
- 70.
- 71.
- 72.
- 73.
- 74.
- 75.
- 76.
- 77.
- 78.
- 79.
- 80.
- 81.
- 82.
- 83.
- 84.
- 85.
- 86.
- 87.
- 88.
- 89.
- 90.
- 91.
- 92.
- 93.
- 94.
- 95.
- 96.
- 97.
- 98.
- 99.
- 100.
- 101.
- 102.
- 103.
- 104.
- 105.
- 106.
- 107.
- 108.
- 109.
- 110.
- 111.
- 112.
- 113.
- 114.
- 115.
- 116.
- 117.
- 118.
- 119.
- 120.
- 121.
- 122.
- 123.
- 124.
- 125.
- 126.
- 127.
- 128.
- 129.
- 130.
- 131.
- 132.
- 133.
- 134.
- 135.
- 136.
- 137.
- 138.
- 139.
- 140.
- 141.
- 142.
- 143.
- 144.
- 145.
- 146.
- 147.
- 148.
- 149.
- 150.
- 151.
- 152.
- 153.
- 154.
- 155.
- 156.
- 157.
- 158.
- 159.
- 160.
- 161.
- 162.
- 163.
- 164.
- 165.
- 166.
- 167.
- 168.
- 169.
- 170.
- 171.
- 172.
- 173.
- 174.
- 175.
- 176.
- 177.
- 178.
- 179.
- 180.
- 181.
- 182.
- 183.
- 184.
- 185.
- 186.
- 187.
- 188.
- 189.
- 190.
- 191.
- 192.
- 193.
- 194.
- 195.
- 196.
- 197.
- 198.
- 199.
- 200.
- 201.
- 202.
- 203.
- 204.
- 205.
- 206.
- 207.
- 208.
- 209.
- 210.
- 211.
- 212.
- 213.
- 214.
- 215.
- 216.
- 217.
- 218.
- 219.
- 220.
- 221.