首页 > 其他分享 >flink中配置kafka

flink中配置kafka

时间:2023-04-01 17:35:55浏览次数:52  
标签:load 配置 flink Kafka Properties new kafka public

  Flink 提供了 Apache Kafka 连接器,用于从 Kafka topic 中读取或者向其中写入数据,可提供精确一次的处理语义。

一:简单使用

1.pom

        <!--Flink Connector KAFKA-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

 

2.创建properties

  具体的配置,写在配置文件中

    private static Properties kp;
    private static Properties kc;
    private static Properties kcc;

    public static Properties getProducer() {
        if (kp == null) {
            kp = new Properties();
            Config load = ConfigFactory.load("kafka.properties");
            kp.put("bootstrap.servers",load.getString("kafka.config.bootstrap.servers"));
            kp.put("acks", load.getString("kafka.config.acks"));
            kp.put("retries", load.getInt("kafka.config.retries"));
            kp.put("batch.size", load.getInt("kafka.config.batch.size"));
        }
        return kp;
    }


    public static Properties getConsumer(String groupId) {
        if (kc == null) {
            kc = new Properties();
            Config load = ConfigFactory.load("kafka.properties");
            kc.put("bootstrap.servers", load.getString("kafka.config.bootstrap.servers"));
            kc.put("group.id", StringUtils.isNotEmpty(groupId) ? groupId : "exceed-group");
            kc.put("enable.auto.commit", load.getString("kafka.config.enable.auto.commit"));
            kc.put("auto.commit.interval.ms",load.getString("kafka.config.auto.commit.interval.ms"));
            kc.put("session.timeout.ms",load.getString("kafka.config.session.timeout.ms"));
            kc.put("key.deserializer",load.getString("kafka.config.key.deserializer"));
            kc.put("value.deserializer",load.getString("kafka.config.value.deserializer"));
            kc.put("auto.offset.reset",load.getString("kafka.config.auto.offset.reset"));
        }

        return kc;
    }

 

3.代码使用

  消费者:

FlinkKafkaConsumer<String> stringFlinkKafkaConsumer = new FlinkKafkaConsumer<>(DataVerifyConst.Topics.LOG_TOPIC, new SimpleStringSchema(), KafkaConfigUtil.getConsumer(ConsumerGroup.getNowOneHourOfflineGroup()));

  其中,注意点是groupId

  重启之后,之前的数据不要了。

public class ConsumerGroup {

    public static String getMin(){
        Calendar cal = Calendar.getInstance();
        StringBuffer sb=new StringBuffer();
        sb.append(cal.get(Calendar.YEAR));
        sb.append(cal.get(Calendar.MONTH+1));
        sb.append(cal.get(Calendar.DATE));
        sb.append(cal.get(Calendar.HOUR));
        sb.append(cal.get(Calendar.MINUTE));
        return sb.toString();
    }

    public static String getNowOneHourOfflineGroup(){
        StringBuffer sb=new StringBuffer();
        sb.append("verify-offline-");
        sb.append(getMin());
        return sb.toString();
    }
}

 

  生产者:

Properties props = KafkaConfigUtil.getProducer();
FlinkKafkaProducer<MonitoringIndex> producer = new FlinkKafkaProducer<>("verify-alarm", new MonitorIndexSchema(), props);

  配置对应的schema序列化

public class MonitorIndexSchema implements DeserializationSchema<MonitoringIndex>, SerializationSchema<MonitoringIndex> {

    private static final long serialVersionUID = 1L;
    private transient Charset charset;

    public MonitorIndexSchema() {
        this(StandardCharsets.UTF_8);
    }

    public MonitorIndexSchema(Charset charset) {
        this.charset = checkNotNull(charset);
    }

    public Charset getCharset() {
        return charset;
    }

    @Override
    public MonitoringIndex deserialize(byte[] message) throws IOException {
        String json = new String(message, this.charset == null ? StandardCharsets.UTF_8 : this.charset);
        return JacksonJsonUtil.json2Obj(json, MonitoringIndex.class);
    }

    @Override
    public boolean isEndOfStream(MonitoringIndex monitoringIndex) {
        return false;
    }

    @Override
    public byte[] serialize(MonitoringIndex monitoringIndex) {
        String json = JacksonJsonUtil.obj2Json(monitoringIndex);
        return json.getBytes(StandardCharsets.UTF_8);
    }

    @Override
    public TypeInformation<MonitoringIndex> getProducedType() {
        return TypeInformation.of(new TypeHint<MonitoringIndex>() {
        });
    }
}

 

二:Kafka Consumer

1.构造函数

  Flink 的 Kafka consumer 称为 FlinkKafkaConsumer。它提供对一个或多个 Kafka topics 的访问。

  构造函数接受以下参数:

  1. Topic 名称或者名称列表
  2. 用于反序列化 Kafka 数据的 DeserializationSchema 或者 KafkaDeserializationSchema
  3. Kafka 消费者的属性。需要以下属性:
  • “bootstrap.servers”(以逗号分隔的 Kafka broker 列表)
  • “group.id” 消费组 ID

 

2.java中使用

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
DataStream<String> stream = env
    .addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties));

 

3.反序列化

Flink Kafka Consumer 需要知道如何将 Kafka 中的二进制数据转换为 Java 或者 Scala 对象。KafkaDeserializationSchema 允许用户指定这样的 schema,每条 Kafka 中的消息会调用 T deserialize(ConsumerRecord<byte[], byte[]> record) 反序列化。

 

4.配置 Kafka Consumer 开始消费的位置

FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(...);
myConsumer.setStartFromEarliest();     // 尽可能从最早的记录开始
myConsumer.setStartFromLatest();       // 从最新的记录开始
myConsumer.setStartFromTimestamp(...); // 从指定的时间开始(毫秒)
myConsumer.setStartFromGroupOffsets(); // 默认的方法
 
DataStream<String> stream = env.addSource(myConsumer);

 

5.Kafka Consumer 提交 Offset 的行为配置

  Flink Kafka Consumer 允许有配置如何将 offset 提交回 Kafka broker 的行为。请注意:Flink Kafka Consumer 不依赖于提交的 offset 来实现容错保证。提交的 offset 只是一种方法,用于公开 consumer 的进度以便进行监控。

  配置 offset 提交行为的方法是否相同,取决于是否为 job 启用了 checkpointing。

  • 禁用 Checkpointing: 如果禁用了 checkpointing,则 Flink Kafka Consumer 依赖于内部使用的 Kafka client 自动定期 offset 提交功能。 因此,要禁用或启用 offset 的提交,只需将 enable.auto.commit 或者 auto.commit.interval.ms 的Key 值设置为提供的 Properties 配置中的适当值。

  • 启用 Checkpointing: 如果启用了 checkpointing,那么当 checkpointing 完成时,Flink Kafka Consumer 将提交的 offset 存储在 checkpoint 状态中。 这确保 Kafka broker 中提交的 offset 与 checkpoint 状态中的 offset 一致。 用户可以通过调用 consumer 上的 setCommitOffsetsOnCheckpoints(boolean) 方法来禁用或启用 offset 的提交(默认情况下,这个值是 true )。 注意,在这个场景中,Properties 中的自动定期 offset 提交设置会被完全忽略。

 

 

三:Kafka Producter

1.构造函数

  Flink Kafka Producer 被称为 FlinkKafkaProducer。它允许将消息流写入一个或多个 Kafka topic。

  构造器接收下列参数:

  1. 事件被写入的默认输出 topic
  2. 序列化数据写入 Kafka 的 SerializationSchema / KafkaSerializationSchema
  3. Kafka client 的 Properties。下列 property 是必须的:
    • “bootstrap.servers” (逗号分隔 Kafka broker 列表)
  4. 容错语义
roperties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
 
FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<String>(
        "my-topic",                  // 目标 topic
        new SimpleStringSchema()     // 序列化 schema
        properties,                  // producer 配置
        FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // 容错
 
stream.addSink(myProducer);

 

四:解释

public TypeInformation<MonitoringIndex> getProducedType()

 

  实现这个接口的getProducedType方法就是获取此函数或输入格式产生的数据类型

 

标签:load,配置,flink,Kafka,Properties,new,kafka,public
From: https://www.cnblogs.com/juncaoit/p/17278962.html

相关文章

  • nacos默认配置启动
    1.相关组件组件说明版本地址Nacos配置及注册中心https://github.com/alibaba/nacos/releasesps:SpringBoot、SpringCloud和nacos集成版本对应关系对照(版本若对应不上,应用可能会启动报错):https://github.com/alibaba/spring-cloud-alibaba/wiki/版本说明2.组件......
  • maven打包后加载外部配置文件
    pom.xml文件配置在bulid中指定resources路径并指定不加载的配置文件<resources> <resource> <directory>src/main/resources</directory> <excludes> <!--<exclude>*.xml</exclude> <exclude>*.ini</exclude&g......
  • Flink与mysql结合
    在流式计算中,不是有时候需要和mysql进行结合做一些处理。 1.调用其他方法进行  2.更快的处理使用guava本地缓存对msql的操作是new对象过来privatefinalstaticRuleServiceruleService=newRuleService();finalstaticCache<Long,Ma......
  • zabbix配置触发器。。。。即设置监控项报警的分界值!!
              {192.168.8.101:system.users.num.last()}>6            报警声音设置    测试触发器报警:   问题来了,触发器提醒运维人员,需要及时处理一般问题!! ......
  • 在zsh中配置fzf
    fzf简介fzf是一个命令行模糊查询的工具。主要使用go语言实现。它提供一个友好的命令行模糊查询的界面,按照它的格式要求传参后,可以很好实现先搜索再操作,即"searchandaction"模式。这样就可以大大降低记忆负担,只需输入关键词即可完成各种操作。安装fzf#macbrewinstallfzf......
  • homebrew: 配置国内源(Homebrew 4.0.10-119)
    一,查看brew的安装目录:liuhongdi@liuhongdideMacBook-Propoem%cd"$(brew--repo)"liuhongdi@liuhongdideMacBook-ProHomebrew%pwd/usr/local/Homebrew说明:刘宏缔的架构森林是一个专注架构的博客,地址:https://www.cnblogs.com/architectforest     对应的源......
  • Go语言入门(go环境配置,变量和常量)
    go语言特点天然支持高并发语法简单,去掉很多臃肿的东西优化的内存分配和垃圾处理完善的标准库go安装下载:Go下载-Go语言中文网-Golang中文社区(studygolang.com)goversion判断是否安装成功配置环境变量GOROOT,GOPATHgoenv查看环境变量配置goland开发工具安装Dow......
  • HJ66 配置文件恢复_字典_字符串
    思路:1、把命令和执行对录入一字串字典和二字串字典2、取字典的可以与输入对比3、为了保证唯一性,用c常数增加1来判断是否唯一。4、最后根据c值统一打印输出1importsys2a=[]3forlineinsys.stdin:4a.append(line.strip().split())5#print(a)6d1={"rese......
  • flask-介绍、配置文件、路由系统
    1.flask和pythonweb框架介绍1.1框架介绍:django:大而全,内置的app多,第三方app也多Flask:小而精,没有过多的内置组件,只完成web框架最基本的功能,需要借助于第三方,完成更丰富的功能web.py:是一个小巧灵活的Python框架,它简单而且功能强大(国内几乎没有用的)fastapi:python的异步web框架,......
  • hadoop3.3 安装配置sqoop1.4.7
    一:在hadoop3.3中安装配置sqoop1.4.7前言:sqoop功能已经非常完善了,没有什么可以更新的了,官方停止更新维护了。因此官方集成的hadoop包停留在了2.6.0版本,在hadoop3.3.0版本会提示类版本过低错误,但纯净版sqoop有缺少必须的第三方库,所以将这两个包下载下来,提取部分sqoop_hadoop2.6.......