首页 > 其他分享 >Flink DataStream API-数据源、数据转换、数据输出

Flink DataStream API-数据源、数据转换、数据输出

时间:2024-02-11 11:02:37浏览次数:20  
标签:DataStream name 数据源 Flink 算子 public

本文继续介绍Flink DataStream API先关内容,重点:数据源、数据转换、数据输出。

1、Source数据源

1.1、Flink基本数据源

  • 文件数据源
// 2. 读取数据源
DataStream<String> fileDataStreamSource =
        env.readTextFile("/Users/yclxiao/Project/bigdata/flink-blog/doc/words.txt");
  • Socket数据源
// 2. 读取数据源
DataStream<String> textStream = env.socketTextStream("localhost", 9999, "\n");
  • 集合数据源
DataStreamSource<String> textStream = env.fromCollection(Arrays.asList(
                "java,c++,php,java,spring",
                "hadoop,scala",
                "c++,jvm,html,php"
        ));

1.2、高级数据源

Flink可以从Kafka、Mysql-CDC等数据源读取数据,使用时需要引入第三方依赖库。

对接Kafka数据源

在Maven中引入Flink针对Kafka的API依赖库,pom代码如下:


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>${flink.version}</version>
        </dependency>

Java代码如下:

// 1. 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 读取数据源
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
        .setBootstrapServers("10.20.1.26:9092")
        .setGroupId("group-flinkdemo")
        .setTopics("topic-flinkdemo")
        // 从最末尾位点开始消费
        .setStartingOffsets(OffsetsInitializer.latest())
        // 从上次消费者提交的地方开始消费,应该采用这种方式,防止服务重启的期间丢失数据
//                .setStartingOffsets(OffsetsInitializer.committedOffsets())
        .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
        .build();

DataStreamSource<String> dataStreamSource = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "KafkaSource");


// 3. 数据转换
DataStream<Tuple2<String, Integer>> dataStream = dataStreamSource
        .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                for (String word : value.split("\\,")) {
                    out.collect(new Tuple2<>(word, 1));
                }
            }
        })
        .keyBy(value -> value.f0)
        .reduce((a, b) -> new Tuple2<>(a.f0, a.f1 + b.f1));

dataStream.print("BlogDemoStream=======")
        .setParallelism(1);

// 5. 启动任务
env.execute(KafkaDataStreamSourceDemo.class.getSimpleName());

对接Myql-CDC数据源

详细的过程可以查看我的这边文章:一次打通FlinkCDC同步Mysql数据

在Maven中引入Flink针对Mysql-CDC的API依赖库,pom代码如下:

<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-connector-mysql-cdc</artifactId>
    <version>2.3.0</version>
    <exclusions>
        <exclusion>
            <artifactId>flink-shaded-guava</artifactId>
            <groupId>org.apache.flink</groupId>
        </exclusion>
    </exclusions>
</dependency>

java代码如下:

MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname(MYSQL_HOST)
                .port(MYSQL_PORT)
                .databaseList(SYNC_DB) // set captured database
                .tableList(String.join(",", SYNC_TABLES)) // set captured table
                .username(MYSQL_USER)
                .password(MYSQL_PASSWD)
                .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
                .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);
        env.enableCheckpointing(5000);

        DataStreamSource<String> cdcSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "CDC Source" + LeagueOcSettleProfit2DwsHdjProfitRecordAPI.class.getName());

1.3、自定义的数据源方式

Flink中可以很方便的使用自定义数据源,只需要实现SourceFunction接口即可。

比如实现一个随机产生某10个学生的N此考试分数的自定义数据源,对每个学生的份数相加,代码如下:


    private static class RandomStudentSource implements SourceFunction<Student> {

        private Random rnd = new Random();
        private volatile boolean isRunning = true;

        @Override
        public void run(SourceContext<Student> ctx) throws Exception {
            while (isRunning) {
                Student student = new Student();
                student.setName("name-" + rnd.nextInt(5));
                student.setScore(rnd.nextInt(20));
                ctx.collect(student);
                Thread.sleep(100L);
            }
        }

        @Override
        public void cancel() {
            isRunning = false;
        }
    }

    private static class Student {
        private String name;
        private Integer score;

        public Student() {
        }

        public Student(String name, Integer score) {
            this.name = name;
            this.score = score;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public Integer getScore() {
            return score;
        }

        public void setScore(Integer score) {
            this.score = score;
        }

        @Override
        public String toString() {
            return "Student{" +
                    "name='" + name + '\'' +
                    ", score=" + score +
                    '}';
        }
    }

2、Transformation数据转换

常用的数据转换函数如下:

map:map()算子接收一个函数作为参数,并把这个函数应用于DataStream的每个元素,最后将函数的返回结果作为结果DataStream中对应元素的值,即将DataStream的每个元素转换成新的元素。

flatMap:与map()算子类似,但是每个传入该函数func的DataStream元素会返回0到多个元素,最终会将返回的所有元素合并到一个DataStream。

filter:通过函数filter对源DataStream的每个元素进行过滤,并返回一个新的DataStream。

keyBy:keyBy()算子主要作用于元素类型是元组或数组的DataStream上。使用该算子可以将DataStream中的元素按照指定的key(指定的字段)进行分组,具有相同key的元素将进入同一个分区中(不进行聚合),并且不改变原来元素的数据结构。

reduce:reduce()算子主要作用于KeyedStream上,对KeyedStream数据流进行滚动聚合,即将当前元素与上一个聚合值进行合并,并且发射出新值。该算子的原理与MapReduce中的Reduce类似,聚合前后的元素类型保持一致。

aggregation:aggregation是聚合算子,类似的还有:reduce、sum、max、min。Aggregation算子作用于KeyedStream上,并且进行滚动聚合。与keyBy()算子类似,可以使用数字或字段名称指定需要聚合的字段。keyBy()算子会将DataStream转换为KeyedStream,而Aggregation算子会将KeyedStream转换为DataStream,类似下图:

union:union()算子用于将两个或多个数据流进行合并,创建一个包含所有数据流所有元素的新流(不会去除重复元素)。

connect:connect()算子可以连接两个数据流,并保持各自元素的数据类型不变,允许在两个流之间共享状态数据。connect与union有几点区别:

  1. union()要求多个数据流的数据类型必须相同,connect()允许多个数据流中的元素类型可以不同。

  2. union()可以合并多个数据流,但connect()只能连接两个数据流

  3. union()的执行结果是DataStream,而connect()的执行结果是ConnectedStreams;ConnectedStreams表示两个(可能)不同数据类型的连接流,可以对两个流的数据应用不同的处理方法,当一个流上的操作直接影响另一个流上的操作时,连接流非常有用。可以通过流之间的共享状态对两个流进行操作。

  4. 与流的转换:

3、Sink数据输出

3.1、Sink简介

Sink这个词很形象,中文意思“水槽”,寓意:数据流像水一样,源源不断的,经过水槽流向各种目的地。

Flink可以使用DataStream API将数据流输出到文件、Socket、外部系统等。Flink自带了各种内置的输出格式,比如writeAsText()、writeAsCsv()等,但是已经过时,如下:

官方鼓励使用addSink()方法,调用自定义接收函数,如下:

3.2、自定义Sink

Flink也可以与其他系统(如Apache Kafka、doris等)的Sink集成在一起,这些系统已经实现了自定义Sink函数。也可以完全自己自定义Sink。并且,通过addSink()方法可以参与到Flink的检查点(Checkpoint)中,以实现“精确的一次”语义。

自定义Sink只需要实现SinkFunction,例如上面的例子中(随机生成学生分数),现在是直接print出来信息,改造成自定义Sink的方式之后,代码如下:

public class AlertSink implements SinkFunction<SourceSourceDemo.Student> {

    private static final long serialVersionUID = 1L;

    private static final Logger LOG = LoggerFactory.getLogger(AlertSink.class);

    @Override
    public void invoke(SourceSourceDemo.Student value, Context context) {
        LOG.info("自定义sink" + value.toString());
    }
}

//        transformedStream.print("result =======").setParallelism(1);
        transformedStream.addSink(new AlertSink());

截图如下:

4、代码地址

https://github.com/yclxiao/flink-blog/blob/main/src/main/java/top/mangod/flinkblog/demo002/KafkaDataStreamSourceDemo.java

本篇完结!感谢你的阅读,欢迎点赞 关注 收藏 私信!!!

原文链接:http://www.mangod.top/articles/2023/08/03/1691059784552.htmlhttps://mp.weixin.qq.com/s/1pP3PEt5ozILFGK0-tsNmA

标签:DataStream,name,数据源,Flink,算子,public
From: https://www.cnblogs.com/mangod/p/18013229

相关文章

  • 10分钟了解Flink Watermark水印
    在上一篇中,介绍了Flink里时间的概念和窗口计算,在实际生产过程中,由于网络等原因,许多数据会延迟到达窗口,这种情况Flink如何处理?Watermark登场,本文从这几点进行介绍:水印的概念、水印如何计算、允许延迟和侧道输出、水印生成策略、案例及代码。1、一个小例子讲解概念前,我先举个例......
  • 5分钟了解Flink状态管理
    什么叫做Flink的有状态计算呢?说白了就是将之前的中间结果暂时存储起来,等待后续的事件数据过来后,可以使用之前的中间结果继续计算。本文主要介绍Flink状态计算和管理、代码示例。1、有状态的计算什么是Flink的有状态的计算。在流式计算过程中将算子的中间结果保存在内存或者文......
  • 一次打通FlinkCDC同步Mysql数据
    业务痛点离开了业务谈技术都是耍流氓。我们来聊聊基于业务的痛点,衍生出来的多种数据同步方案。业务中常见的需要数据同步的场景1、多个库的表合并到一张表。不同的业务线或者微服务在不同的数据库里开发,但是此时有些报表需要将多个库的类似的数据合并后做查询统计。或者,某些历......
  • 10分钟入门Flink--架构和原理
    相信你读完上一节的《10分钟入门Flink--了解Flink》对Flink已经有初步了解了。这是继第一节之后的Flink入门系列的第二篇,本篇主要内容是是:了解Flink运行模式、Flink调度原理、Flink分区、Flink安装。1、运行模式Flink有多种运行模式,可以运行在一台机器上,称为本地(单机)模式;也可以......
  • 10分钟入门Flink--了解Flink
    Flink入门系列文章主要是为了给想学习Flink的你建立一个大体上的框架,助力快速上手Flink。学习Flink最有效的方式是先入门了解框架和概念,然后边写代码边实践,然后再把官网看一遍。Flink入门分为四篇,第一篇是《了解Flink》,第二篇《架构和原理》,第三篇是《DataStream》,第四篇是《Tabl......
  • 10分钟入门Flink--安装
    本文介绍Flink的安装步骤,主要是Flink的独立部署模式,它不依赖其他平台。文中内容分为4块:前置准备、Flink本地模式搭建、FlinkStandalone搭建、FlinkStandalongHA搭建。演示使用的Flink版本是1.15.4,官方文档地址:https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/......
  • 在 SpringBoot 项目中多数据源切换
    使用dynamic-datasource-spring-boot-starter库添加依赖<dependency><groupId>com.baomidou</groupId><artifactId>dynamic-datasource-spring-boot-starter</artifactId><version>3.4.1</version></dependency>添加配置......
  • 【Flink入门修炼】1-3 Flink WordCount 入门实现
    本篇文章将带大家运行Flink最简单的程序WordCount。先实践后理论,对其基本输入输出、编程代码有初步了解,后续篇章再对Flink的各种概念和架构进行介绍。下面将从创建项目开始,介绍如何创建出一个Flink项目;然后从DataStream流处理和FlinkSQL执行两种方式来带大家学习Word......
  • Flink CDC实时同步PG数据库到Kafka
    一、安装规划操作系统服务器IP主机名硬件配置CentOS7.6192.168.80.131hadoop01内存:2GB,CPU:2核,硬盘:100GBCentOS7.6192.168.80.132hadoop02内存:2GB,CPU:2核,硬盘:100GBCentOS7.6192.168.80.133hadoop03内存:2GB,CPU:2核,硬盘:100GB......
  • 【Flink】使用CoProcessFunction完成实时对账、基于时间的双流join
    【Flink】使用CoProcessFunction完成实时对账、基于时间的双流join文章目录零处理函数回顾一CoProcessFunction的使用1CoProcessFunction使用2实时对账(1)使用离线数据源(批处理)(2)使用高自定义数据源(流处理)二基于时间的双流Join1基于间隔的Join(1)正向join(2)反向join2......