首页 > 其他分享 >Flink 1.12.2样例

Flink 1.12.2样例

时间:2023-08-27 15:55:50浏览次数:24  
标签:Flink 1.12 flink 样例 BasicTypeInfo api import apache org

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>sample-project</artifactId>
        <groupId>org.myproject.bigdata</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>JavaFlink-112</artifactId>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <flink.version>1.12.2</flink.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-optimizer_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- Connector -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.2</version>
        </dependency>

        <!-- json -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.12.0</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.13.2</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
</project>

样例代码

package org.myproject.bigdata.sample;

import com.alibaba.fastjson.JSONObject;
import lombok.Data;
import org.aishuang.bigdata.udf.ToJsonData;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

public class FlinkTest {
    private static TypeInformation[] typeInformations = {
        BasicTypeInfo.STRING_TYPE_INFO,
        BasicTypeInfo.STRING_TYPE_INFO,
        BasicTypeInfo.STRING_TYPE_INFO,
        BasicTypeInfo.STRING_TYPE_INFO,
        BasicTypeInfo.STRING_TYPE_INFO,
        BasicTypeInfo.STRING_TYPE_INFO,
        BasicTypeInfo.STRING_TYPE_INFO,
        BasicTypeInfo.STRING_TYPE_INFO,
        BasicTypeInfo.STRING_TYPE_INFO,
        BasicTypeInfo.STRING_TYPE_INFO
    };

    private static String[] columnNames = {
        "n1", "n2", "n3", "n4", "n5", "n6", "n7", "n8", "n9", "n10"
    };

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        DataStream<Row> mapStream = env.addSource(new CustomSource())
            .map((MapFunction<String, Row>) line -> {
                String[] split = line.split(",");
                Row row = new Row(split.length);
                for (int i = 0; i < split.length; i++) {
                    row.setField(i, split[i]);
                }
                return row;
            }).returns(new RowTypeInfo(typeInformations, columnNames));

        tableEnv.createTemporaryFunction("tojson", ToJsonData.class);
        Table table = tableEnv.fromDataStream(mapStream);
        tableEnv.registerTable("tmp_table", table);
        Table rsTable = tableEnv.sqlQuery("select '' as c1, '' as c2, tojson('k1,k2,k3', n1, n2, n3) as json_data from tmp_table where n1='A'");
        DataStream<Row> rowDataStream = tableEnv.toAppendStream(rsTable, Row.class);
        SingleOutputStreamOperator<JavaBean> process = rowDataStream.process(new ConvertJavaBean());
        // rowDataStream.printToErr();
        process.printToErr();
        env.execute();
    }

    private static class ConvertJavaBean extends ProcessFunction<Row, JavaBean> {

        @Override
        public void processElement(Row row, Context ctx, Collector<JavaBean> out) throws Exception {
            JavaBean javaBean = new JavaBean();
            javaBean.setData(JSONObject.parseObject(String.valueOf(row.getField(2))));
            out.collect(javaBean);
        }
    }

    @Data
    private static class JavaBean {
        private JSONObject data;
    }

    private static class CustomSource extends RichSourceFunction<String> {
        private boolean flag = true;

        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            while (flag) {
                ctx.collect("A,B,C,D,E,F,G,H,J,10");
                ctx.collect("X,B,C,D,E,F,G,H,J,10");
                Thread.sleep(10000);
            }
        }

        @Override
        public void cancel() {
            flag = false;
        }
    }

    private class toJson extends ScalarFunction {
        public String eval(String... a) {
            if (a == null || a.length <= 1) {
                return "";
            }
            JSONObject json = new JSONObject();
            String[] columns = a[0].split(",");
            for (int i = 0; i < columns.length; i++) {
                json.put(columns[i], a[i + 1]);
            }
            return json.toString();
        }
    }

    //        DataStreamSource<String> source = env.fromElements(
//            "A,B,C,D,E,F,G,H,J,10",
//            "A,B,C,D,E,F,G,H,J,10",
//            "A,B,C,D,E,F,G,H,J,10"
//        );
}

标签:Flink,1.12,flink,样例,BasicTypeInfo,api,import,apache,org
From: https://www.cnblogs.com/zi-shuo/p/17660371.html

相关文章

  • OpenJDK17.0.8字节码解读样例
    因为JDK17将会成为未来5至10年里Java应用的主流JDK,刚好闲着没事,就想着将《深入理解Java虚拟机》一书中关于字节码的解读样例在OpenJDK17.0.8上看看变化有多大!先把实验环境说明一下:OS:Windows10专业版 22H2JDK:openjdkversion"17.0.8"2023-07-18LTS源......
  • AUC计算及为何不受样例不均衡的影响
    在很多排序场景下,尤其是当前许多数据集正负样例都不太均衡;或者说因训练集过大,可能会对数据进行负采样等操作。这擦操作的前提是建立在AUC值不会受到正负样本比例的影响。看过很多博客也都在讨论:为什么AUC不会受正负样例不平衡的影响?为什么排序喜欢选择AUC作为评判指标。一方面,从A......
  • 15种实时uv实现方案系列(附源码)之一:Flink基于set实时uv统计
    UVStatMultiPlans(GitHub)项目持续收集各种高性能实时uv实现方案并对各种实现方案的优缺点进行对比分析!需求描述统计每分钟用户每个页面的uv访问量。Kafka数据格式{"userId":"c61b801e-22e7-4238-8f67-90968a40f2a7","page":"page_1","behaviorTime":1692247408129}{"userId......
  • Flink and Kafka Streams: a Comparison and Guideline for Users
    ThisblogpostiswrittenjointlybyStephanEwen,CTOofdataArtisans,andNehaNarkhede,CTOofConfluent. StephanEwenisPMCmemberofApacheFlinkandco-founderandCTOofdataArtisans.BeforefoundingdataArtisans,Stephanwasleadingthedevelo......
  • 开源XL-LightHouse与Flink、ClickHouse之类技术相比有什么优势
    Flink是一款非常优秀的流式计算框架,而ClickHouse是一款非常优秀的OLAP类引擎,它们是各自所处领域的佼佼者,这一点是毋庸置疑的。Flink除了各种流式计算场景外也必然可以用于流式统计,ClickHouse同样也可以用于流式统计,但我不认为它们是优秀的流式统计工具。XL-Lighthouse在流式统计这......
  • 中电金信:技术实践|Flink多线程实现异构集群的动态负载均衡
    导语:ApacheFlink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。本文主要从实际案例入手并结合作者的实践经验,向各位读者分享当应用场景中异构集群无法做到负载均衡时,如何通过Flink的自定义多线程来实现异构集群的动态负载均衡。●1. 前言●2. 出现的问......
  • Apache Flink目录遍历漏洞复现CVE-2020-17519
    ApacheFlink目录遍历漏洞复现CVE-2020-17519前置知识ApacheFlink:ApacheFlink是一个框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算。Flink能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。漏洞利用条件:ApacheFlink版本为1.11.0......
  • k8s jenkines kubesphere 部署流水线样例
    pipeline{agent{node{label'maven'}}stages{stage('拉取代码'){agentnonesteps{container('maven'){git(url......
  • 史上最全Flink面试题,高薪必备,大数据面试宝典
    文章很长,且持续更新,建议收藏起来,慢慢读!疯狂创客圈总目录博客园版为您奉上珍贵的学习资源:免费赠送:《尼恩Java面试宝典》持续更新+史上最全+面试必备2000页+面试必备+大厂必备+涨薪必备免费赠送:《尼恩技术圣经+高并发系列PDF》,帮你实现技术自由,完成职业升级,薪......
  • flink-cdc同步mysql数据到elasticsearch
    1,什么是cdcCDC是(ChangeDataCapture变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入INSERT、更新UPDATE、删除DELETE等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。2,flink的cdc项目地址:https://github......