首页 > 数据库 >flinksql API StreamTableEnvironment StreamStatementSet应用

flinksql API StreamTableEnvironment StreamStatementSet应用

时间:2024-06-21 15:58:08浏览次数:15  
标签:StreamTableEnvironment String sipDataInfo tableEnv flinksql kafka topic API new

1.问题描述

在应用flink实时消费kafka数据多端中,一般会使用flink原生的addsink或flinkSQL利用SqlDialect,比如消费kafka数据实时写入hive和kafka一般用两种方式:
第一种方式是写入hive利用SqlDialect,写入kafka利用flink的旁路输出流+原生addSink
第二种方式是写入hive和kafka都利用SqlDialect的方式,将kafka也当作一个刘表

2.第一种方式核心代码及现状

	DataStream<String> dataStream = environment.addSource(new FlinkKafkaConsumer(topic, new SimpleStringSchema(), props));
        SingleOutputStreamOperator<SipDataInfo> mainStream = dataStream.map(s -> {
            SipDataInfo sipDataInfo = new SipDataInfo();
            JSONObject jsonObject = SipFullauditMonitor.complex(s);

            sipDataInfo.setRow(createRow(jsonObject, size, typeArray, column));
            sipDataInfo.setJsonObject(jsonObject);
            return sipDataInfo;
        });

        final OutputTag<SipDataInfo> kafkaOutputTag = new OutputTag<SipDataInfo>("kafka_stream") {
        };
        final OutputTag<SipDataInfo> hiveOutputTag = new OutputTag<SipDataInfo>("hive_stream") {
        };

        SingleOutputStreamOperator<SipDataInfo> sideOutputStream = mainStream.process(new ProcessFunction<SipDataInfo, SipDataInfo>() {
            @Override
            public void processElement(SipDataInfo sipDataInfo, Context context, Collector<SipDataInfo> collector) throws Exception {
                context.output(kafkaOutputTag, sipDataInfo);
                context.output(hiveOutputTag, sipDataInfo);
            }
        });

        DataStream<SipDataInfo> kafkaStream = sideOutputStream.getSideOutput(kafkaOutputTag);
        DataStream<SipDataInfo> hiveStream = sideOutputStream.getSideOutput(hiveOutputTag);

        Properties producerProperties = new Properties();
        producerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ambari1:6667");

        kafkaStream.map(sipDataInfo -> sipDataInfo.getJsonObject().toJSONString())
                .filter(s -> JSONObject.parseObject(s, SipFullauditMonitor.class).getReftaskid() != null && JSONObject.parseObject(s, SipFullauditMonitor.class).getReftaskid() == 0)
                .addSink(new FlinkKafkaProducer<String>("dwd_" + topic, new SimpleStringSchema(), props));


        TypeInformation[] tfs = getSqlColumsType(typeArray);
        DataStream<Row> hiveOdsSinkDataStream = hiveStream.map(sipDataInfo -> sipDataInfo.getRow())
                .returns(Types.ROW_NAMED(column, tfs))
                .filter(row -> CommonUtil.filter(row));

        setHiveParam(parameter, tableEnv);

        Table table = tableEnv.fromDataStream(hiveOdsSinkDataStream);
        tableEnv.createTemporaryView("tmp_" + topic, table);

        tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
        tableEnv.executeSql(BaseStreamLaucher.parseCreateTableSqlByColumn("ods_" + topic,column, typeArray,new String[]{"pdate","insterhour"},new String[]{"string","string"}));

        //写hive表
        tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
        String insertSql = "insert into ods_" + topic + " PARTITION(pdate='" +
                new SimpleDateFormat("yyyy-MM-dd").format(new Date()) +
                "') select " + sinkHiveColumnStr + " from tmp_" + topic;

        tableEnv.executeSql(insertSql);
        environment.execute();
    }
}

3.第二种方式实现的核心代码

		DataStream<String> dataStream = environment.addSource(new FlinkKafkaConsumer(topic, new SimpleStringSchema(), props));

        TypeInformation[] tfs = getSqlColumsType(typeArray);
        DataStream<Row> rowDataStream = dataStream.map(s -> createRow(SipFullauditMonitor.complex(s), size, typeArray, column))
                .returns(Types.ROW_NAMED(column, tfs))
                .filter(row -> CommonUtil.filter(row));

        Table table = tableEnv.fromDataStream(rowDataStream);
        setHiveParam(parameter, tableEnv);
        tableEnv.createTemporaryView("tmp_" + topic, table);

        //创建hive表
        tableEnv.executeSql(BaseStreamLaucher.parseCreateTableSqlByColumn("ods_" + topic,column, typeArray,new String[]{"pdate","insterhour"},new String[]{"string","string"}));

        tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
        //创建kafka表
        tableEnv.executeSql("drop table dwd_sip_fullaudit_monitor");
        String kafkaTableSql = createKafkaTableSqlByColumn("dwd_sip_fullaudit_monitor", parameter, column, typeArray);
        tableEnv.executeSql(kafkaTableSql);

        //写hive表
        String insertHiveSql = "insert into ods_" + topic + " PARTITION(pdate='" +
                new SimpleDateFormat("yyyy-MM-dd").format(new Date()) +
                "',insterhour='" + new SimpleDateFormat("yyyyMMddHH").format(new Date()) + "') select " + sinkHiveColumnStr + " from tmp_" + topic;
      
        //写kafka表
        String insertKafkaSql = "insert into dwd_sip_fullaudit_monitor" + " select " + sinkHiveColumnStr + " from " + "tmp_" + topic;

        tableEnv.executeSql(insertKafkaSql);
        tableEnv.executeSql(insertHiveSql);
        

在以上两种实现方式中,发现flink都会在yarn上启动两个应用,这两个应用虽然都能将数据正常写入hive和kafka,但是不太好。

后面通过不断的尝试api发现StreamTableEnvironment StreamStatementSet可以解决该问题

4.应用StreamTableEnvironment StreamStatementSet的核心代码

		DataStream<String> dataStream = environment.addSource(new FlinkKafkaConsumer(topic, new SimpleStringSchema(), props));

        TypeInformation[] tfs = getSqlColumsType(typeArray);
        DataStream<Row> rowDataStream = dataStream.map(s -> createRow(SipFullauditMonitor.complex(s), size, typeArray, column))
                .returns(Types.ROW_NAMED(column, tfs))
                .filter(row -> CommonUtil.filter(row));

        Table table = tableEnv.fromDataStream(rowDataStream);
        setHiveParam(parameter, tableEnv);
        tableEnv.createTemporaryView("tmp_" + topic, table);

        //创建hive表
        tableEnv.executeSql(BaseStreamLaucher.parseCreateTableSqlByColumn("ods_" + topic,column, typeArray,new String[]{"pdate","insterhour"},new String[]{"string","string"}));

        tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
        //创建kafka表
        tableEnv.executeSql("drop table dwd_sip_fullaudit_monitor");
        String kafkaTableSql = createKafkaTableSqlByColumn("dwd_sip_fullaudit_monitor", parameter, column, typeArray);
        tableEnv.executeSql(kafkaTableSql);

        StatementSet stmtSet = tableEnv.createStatementSet();
        //写hive表
        String insertHiveSql = "insert into ods_" + topic + " PARTITION(pdate='" +
                new SimpleDateFormat("yyyy-MM-dd").format(new Date()) +
                "',insterhour='" + new SimpleDateFormat("yyyyMMddHH").format(new Date()) + "') select " + sinkHiveColumnStr + " from tmp_" + topic;
        System.out.println("insertHiveSql:"+insertHiveSql);
        //写kafka表
        String insertKafkaSql = "insert into dwd_sip_fullaudit_monitor" + " select " + sinkHiveColumnStr + " from " + "tmp_" + topic;

        stmtSet.addInsertSql(insertHiveSql);
        stmtSet.addInsertSql(insertKafkaSql);

        stmtSet.execute();

执行查看flink web界面

说明:

StreamStatementSet的这个的应用在初学或者一般场景应用下可能不太容易发现或应用,来看下flink源码的解释,红色部分大概意思是[可以一起优化所有添加的语句,然后将它们作为一个作业提交],重点是作为一个作业提交。但StreamStatementSet并没有解决前面的第一种场景。所以在实际的应用中不太建议流表和原生addsink混用,flink越往后的版本也是更加提倡应用流表方式去完成流批一体的体系

标签:StreamTableEnvironment,String,sipDataInfo,tableEnv,flinksql,kafka,topic,API,new
From: https://www.cnblogs.com/jiashengmei/p/18260665

相关文章

  • Node.js车牌识别、文档识别、OCR API-自动化录入信息
    为什么人工智能如此受关注?因为人工智能技术在图片处理以及在感知与认知等领域的不断突破,带来更高的效率。没错,智能机器人、人工智能技术衍生的产品已经开始替代人的工作。文字录入工作较为显著,OCR技术成为手动录入的杀手锏,图片识别、扫描识别多样化的解决方案层出不穷。......
  • Apifox详细使用教程
    一、Apifox简介Apifox是一款集成了API设计、开发、测试等多功能于一体的工具,它提供了API文档管理、API调试、APIMock、API自动化测试等功能。以下是一些关于Apifox使用的基本步骤和教程:我们在日常编程开发过程中经常实行的是前后端分离架构的模式,一个项目的落地会通过产品、......
  • Shopee虾皮API:获取商家店铺商品列表
    一、平台介绍Shopee,作为东南亚及中国台湾地区领先的电商平台,为卖家提供了一个便捷、高效的销售渠道。作为卖家,能够将自己的商品展示在Shopee平台上,并通过平台的流量和工具,将商品销售给更多的潜在买家。为了帮助卖家更好地管理自己的店铺和商品,Shopee提供了丰富的API接口服务......
  • Shopee API接口:获取搜索栏生成的商品结果列表
    一、平台介绍Shopee,作为东南亚领先的电商平台,一直致力于为卖家和买家提供便捷、高效的在线购物体验。为了满足广大开发者的需求,Shopee提供了丰富的API接口服务,帮助卖家和第三方开发者更好地与平台进行数据交互,实现业务的自动化和扩展。通过Shopee的API接口服务,可以轻松地获取......
  • Docker 部署 YApi 记录
    最近想在linux环境部署YApi对接口文档进行管理,只不过基于官方的部署方式,需要npm的配置,而且中间踩了很多坑比如YApi版本,Nodejs版本...... 想着还是使用Docker部署吧,找到了这位大佬的部署方式: https://www.jianshu.com/p/a97d2efb23c5基于大佬的配置,终于部署成功了......
  • apisix~为自定义插件设计一个configmap脚本
    configMapKubernetes中的ConfigMap是一种用来存储配置数据的API资源,它允许您将配置信息以键值对的形式保存,并在容器中使用这些配置信息。ConfigMap提供了一种将配置数据与应用程序解耦的方式,使得应用程序可以动态地获取配置而无需重新构建镜像。以下是ConfigMap的一些特......
  • RapidLayout:中英文版面分析推理库
    引言继上一篇文章之后,我这里想着将360发布的版面分析模型整合到现有的rapid_layout仓库中,便于大家快速使用。不曾想到,我这整理工作越做越多了,好在整体都是往更好方向走。起初,rapid_layout项目是在RapidStructure仓库下的。RapidStructure仓库包含三大块:文档方向分类、版面分析......
  • 【fastapi】定时任务管理
    在FastApi框架搭建的WBE系统中如何实现定时任务的管理?Python中常见的定时任务框架包括Celery、APScheduler和Huey。以下是每个框架的简单对比和示例代码。1.Celery:分布式任务队列,适合处理长时间运行的任务。#安装celery#pipinstallcelery#celery_task.pyfrom......
  • 掌握Postman WYSIWYG编辑器:提升API开发效率的秘诀
    Postman是一款强大的API开发和测试工具,它提供了多种方式来构建和测试API请求。WYSIWYG(所见即所得)编辑器是Postman中的一个特性,允许用户以一种更直观、更易于操作的方式来编辑和格式化请求的各个部分。本文将详细介绍如何在Postman中使用WYSIWYG编辑器,以及它如何帮助提升API......
  • 使用 TensorRT C++ API 调用GPU加速部署 YOLOv10 实现 500FPS 推理速度——快到飞起!!
    ​NVIDIA®TensorRT™是一款用于高性能深度学习推理的SDK,包含深度学习推理优化器和运行时,可为推理应用程序提供低延迟和高吞吐量。YOLOv10是清华大学研究人员近期提出的一种实时目标检测方法,通过消除NMS、优化模型架构和引入创新模块等策略,在保持高精度的同时显著降低了......