Flink 提供了 Apache Kafka 连接器,用于从 Kafka topic 中读取或者向其中写入数据,可提供精确一次的处理语义。
一:简单使用
1.pom
<!--Flink Connector KAFKA--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_${scala.version}</artifactId> <version>${flink.version}</version> </dependency>
2.创建properties
具体的配置,写在配置文件中
private static Properties kp; private static Properties kc; private static Properties kcc; public static Properties getProducer() { if (kp == null) { kp = new Properties(); Config load = ConfigFactory.load("kafka.properties"); kp.put("bootstrap.servers",load.getString("kafka.config.bootstrap.servers")); kp.put("acks", load.getString("kafka.config.acks")); kp.put("retries", load.getInt("kafka.config.retries")); kp.put("batch.size", load.getInt("kafka.config.batch.size")); } return kp; } public static Properties getConsumer(String groupId) { if (kc == null) { kc = new Properties(); Config load = ConfigFactory.load("kafka.properties"); kc.put("bootstrap.servers", load.getString("kafka.config.bootstrap.servers")); kc.put("group.id", StringUtils.isNotEmpty(groupId) ? groupId : "exceed-group"); kc.put("enable.auto.commit", load.getString("kafka.config.enable.auto.commit")); kc.put("auto.commit.interval.ms",load.getString("kafka.config.auto.commit.interval.ms")); kc.put("session.timeout.ms",load.getString("kafka.config.session.timeout.ms")); kc.put("key.deserializer",load.getString("kafka.config.key.deserializer")); kc.put("value.deserializer",load.getString("kafka.config.value.deserializer")); kc.put("auto.offset.reset",load.getString("kafka.config.auto.offset.reset")); } return kc; }
3.代码使用
消费者:
FlinkKafkaConsumer<String> stringFlinkKafkaConsumer = new FlinkKafkaConsumer<>(DataVerifyConst.Topics.LOG_TOPIC, new SimpleStringSchema(), KafkaConfigUtil.getConsumer(ConsumerGroup.getNowOneHourOfflineGroup()));
其中,注意点是groupId
重启之后,之前的数据不要了。
public class ConsumerGroup { public static String getMin(){ Calendar cal = Calendar.getInstance(); StringBuffer sb=new StringBuffer(); sb.append(cal.get(Calendar.YEAR)); sb.append(cal.get(Calendar.MONTH+1)); sb.append(cal.get(Calendar.DATE)); sb.append(cal.get(Calendar.HOUR)); sb.append(cal.get(Calendar.MINUTE)); return sb.toString(); } public static String getNowOneHourOfflineGroup(){ StringBuffer sb=new StringBuffer(); sb.append("verify-offline-"); sb.append(getMin()); return sb.toString(); }
}
生产者:
Properties props = KafkaConfigUtil.getProducer(); FlinkKafkaProducer<MonitoringIndex> producer = new FlinkKafkaProducer<>("verify-alarm", new MonitorIndexSchema(), props);
配置对应的schema序列化
public class MonitorIndexSchema implements DeserializationSchema<MonitoringIndex>, SerializationSchema<MonitoringIndex> { private static final long serialVersionUID = 1L; private transient Charset charset; public MonitorIndexSchema() { this(StandardCharsets.UTF_8); } public MonitorIndexSchema(Charset charset) { this.charset = checkNotNull(charset); } public Charset getCharset() { return charset; } @Override public MonitoringIndex deserialize(byte[] message) throws IOException { String json = new String(message, this.charset == null ? StandardCharsets.UTF_8 : this.charset); return JacksonJsonUtil.json2Obj(json, MonitoringIndex.class); } @Override public boolean isEndOfStream(MonitoringIndex monitoringIndex) { return false; } @Override public byte[] serialize(MonitoringIndex monitoringIndex) { String json = JacksonJsonUtil.obj2Json(monitoringIndex); return json.getBytes(StandardCharsets.UTF_8); } @Override public TypeInformation<MonitoringIndex> getProducedType() { return TypeInformation.of(new TypeHint<MonitoringIndex>() { }); } }
二:Kafka Consumer
1.构造函数
Flink 的 Kafka consumer 称为 FlinkKafkaConsumer
。它提供对一个或多个 Kafka topics 的访问。
构造函数接受以下参数:
- Topic 名称或者名称列表
- 用于反序列化 Kafka 数据的 DeserializationSchema 或者 KafkaDeserializationSchema
- Kafka 消费者的属性。需要以下属性:
- “bootstrap.servers”(以逗号分隔的 Kafka broker 列表)
- “group.id” 消费组 ID
2.java中使用
Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("group.id", "test"); DataStream<String> stream = env .addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties));
3.反序列化
Flink Kafka Consumer 需要知道如何将 Kafka 中的二进制数据转换为 Java 或者 Scala 对象。KafkaDeserializationSchema 允许用户指定这样的 schema,每条 Kafka 中的消息会调用 T deserialize(ConsumerRecord<byte[], byte[]> record) 反序列化。
4.配置 Kafka Consumer 开始消费的位置
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(...); myConsumer.setStartFromEarliest(); // 尽可能从最早的记录开始 myConsumer.setStartFromLatest(); // 从最新的记录开始 myConsumer.setStartFromTimestamp(...); // 从指定的时间开始(毫秒) myConsumer.setStartFromGroupOffsets(); // 默认的方法 DataStream<String> stream = env.addSource(myConsumer);
5.Kafka Consumer 提交 Offset 的行为配置
Flink Kafka Consumer 允许有配置如何将 offset 提交回 Kafka broker 的行为。请注意:Flink Kafka Consumer 不依赖于提交的 offset 来实现容错保证。提交的 offset 只是一种方法,用于公开 consumer 的进度以便进行监控。
配置 offset 提交行为的方法是否相同,取决于是否为 job 启用了 checkpointing。
-
禁用 Checkpointing: 如果禁用了 checkpointing,则 Flink Kafka Consumer 依赖于内部使用的 Kafka client 自动定期 offset 提交功能。 因此,要禁用或启用 offset 的提交,只需将
enable.auto.commit
或者auto.commit.interval.ms
的Key 值设置为提供的Properties
配置中的适当值。 -
启用 Checkpointing: 如果启用了 checkpointing,那么当 checkpointing 完成时,Flink Kafka Consumer 将提交的 offset 存储在 checkpoint 状态中。 这确保 Kafka broker 中提交的 offset 与 checkpoint 状态中的 offset 一致。 用户可以通过调用 consumer 上的
setCommitOffsetsOnCheckpoints(boolean)
方法来禁用或启用 offset 的提交(默认情况下,这个值是 true )。 注意,在这个场景中,Properties
中的自动定期 offset 提交设置会被完全忽略。
三:Kafka Producter
1.构造函数
Flink Kafka Producer 被称为 FlinkKafkaProducer
。它允许将消息流写入一个或多个 Kafka topic。
构造器接收下列参数:
- 事件被写入的默认输出 topic
- 序列化数据写入 Kafka 的 SerializationSchema / KafkaSerializationSchema
- Kafka client 的 Properties。下列 property 是必须的:
- “bootstrap.servers” (逗号分隔 Kafka broker 列表)
- 容错语义
roperties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<String>( "my-topic", // 目标 topic new SimpleStringSchema() // 序列化 schema properties, // producer 配置 FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // 容错 stream.addSink(myProducer);
四:解释
public TypeInformation<MonitoringIndex> getProducedType()
实现这个接口的getProducedType
方法就是获取此函数或输入格式产生的数据类型
标签:load,配置,flink,Kafka,Properties,new,kafka,public From: https://www.cnblogs.com/juncaoit/p/17278962.html