首页 > 数据库 >使用 Flink SQL 读取本地csv文件(Java实现)

使用 Flink SQL 读取本地csv文件(Java实现)

时间:2024-06-11 21:54:26浏览次数:42  
标签:Flink Java java flink version SQL apache org csv

data.csv内容:

1,Tom,15
2,Lily,13
3,Mike,21
4,John,20
5,Emma,18
6,Sophia,19
7,David,22
8,James,16
9,Olivia,17
10,Robert,23
11,Emily,14
12,Daniel,25
13,Amelia,24

代码:

package com.auguigu.demo;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class TextSql {

    public static void main(String[] args) throws Exception {

        // 1. 创建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);


        // 注册 CSV 表
        tableEnv.executeSql(
                "CREATE TABLE csv_source (" +
                        "  `uid` INT," +
                        "  `name` STRING," +
                        "  `age` INT" +
                        ") WITH (" +
                        "  'connector' = 'filesystem'," +
                        "  'path' = 'file:///G:/JetBrains/java_workspace/flink-learning/flink-demo1/input/data.csv'," +
                        "  'format' = 'csv'," +
                        "  'csv.field-delimiter' = ','," +
                        "  'csv.ignore-parse-errors' = 'true'" +
                        ")"
        );

        // 将注册的表转换为DataStream
        DataStream<Row> csvSourceDataStream = tableEnv.toAppendStream(tableEnv.from("csv_source"), Row.class);
        // 打印输出
        csvSourceDataStream.print();
        // 执行任务
        env.execute();

    }
}

输出结果

1> +I[1, Tom, 15]
1> +I[2, Lily, 13]
1> +I[3, Mike, 21]
1> +I[4, John, 20]
1> +I[5, Emma, 18]
1> +I[6, Sophia, 19]
1> +I[7, David, 22]
1> +I[8, James, 16]
1> +I[9, Olivia, 17]
1> +I[10, Robert, 23]
1> +I[11, Emily, 14]
1> +I[12, Daniel, 25]
1> +I[13, Amelia, 24]

Process finished with exit code 0

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flink-demo1</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <java.version>11</java.version>
        <flink.version>1.17.2</flink.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>


    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-loader</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>${flink.version}</version>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
        </dependency>

    </dependencies>

</project>

标签:Flink,Java,java,flink,version,SQL,apache,org,csv
From: https://www.cnblogs.com/FengZeng666/p/18242831

相关文章

  • [20240607]PL/SQL中sql语句的注解.txt
    [20240607]PL/SQL中sql语句的注解.txt--//别人测试遇到的问题,重复测试说明问题.1.环境:SCOTT@test01p>@verBANNER                                                                           ......
  • MySQL 触发器(实验报告)
    一、实验名称:触发器 二、实验日期:2024年 6月8日三、实验目的:掌握MySQL触发器的创建及调用;四、实验用的仪器和材料:硬件:PC电脑一台;配置:内存,2G及以上 硬盘250G及以上软件环境:操作系统windows7以上数据库环境:MySQL5.7或MySQL8.0.20五、实验步骤和方法练习:#......
  • 线程介绍及其Java如何用Thread 类创建线程和操作线程方法
    目录一、进程和线程1.1进程特征2.2线程特征2.3区别二、利用Thread类创建线程2.1通过创建Thread子类,重写run()方法2.2通过实现Runnable接口,重写run()方法2.3.Callable接口+FutureTask创建线程2.3三种方法区别1.通过创建Thread子类,重写run()方法2.通过实......
  • 基本数据类型 String,null 和 undefined,运算符,流程控制,JavaScript之数组,数组常用
    Ⅰ基本数据类型String【一】String类型String类型就是字符串类型【二】定义变量【1】常规变量var变量名="变量值";//一般用这种var变量名='变量值';不支持三引号【2】可以先声明不赋值先用varb;再对变量b赋值varb='6';【三】字符串的格式化输出语法......
  • 神奇的JavaScript弱等价类型转换
    JavaScript语言特性-类型转换JavaScript这门语言的类型系统从来没有它表面看起来的那样和善,虽然比起Java、C#等一众强类型语言,它的弱类型使用起来似乎是如此便利,但正因为它极高的自由度,所以才会衍生出令人摸不着头脑的荒诞行为。举个例子,虽然我们都知道一个包含内容的字符串会......
  • 探索Java的奥秘:网络编程、反射与注解的深度解析
    Java,作为一门功能强大的编程语言,在网络编程、反射和注解方面提供了丰富的支持和灵活的应用。本文将深入探讨这些概念,并通过实际例子来加深理解。一、Java网络编程1.1网络编程基础网络编程是Java语言的一大亮点,它允许我们创建能够通过网络进行通信的程序。Java提供了java......
  • 大学生HTML期末大作业——HTML+CSS+JavaScript美食网站(零食)
    HTML+CSS+JS【美食网站】网页设计期末课程大作业web前端开发技术web课程设计网页规划与设计......
  • 大学生HTML期末大作业——HTML+CSS+JavaScript购物商城(华为手机)
    HTML+CSS+JS【购物商城】网页设计期末课程大作业web前端开发技术web课程设计网页规划与设计......
  • Java中List集合中多个字段如何排序
    开源项目SDK:https://github.com/mingyang66/spring-parent个人文档:https://mingyang66.github.io/raccoon-docs/#/一、首先定义一个三个属性的People类publicclassPeople{privateStringname;privateintage;privateintheight;publicPeople......
  • 【数据库】mybatis生成java代码之AutoGenerator配置
    哈喽,大家好,我是木头左,AI改变生活!本文将详细解释MyBatis生成Java代码的过程,包括全局配置、数据源配置和策略配置。1.全局配置首先,我们需要创建一个GlobalConfig对象,用于配置MyBatis的全局设置。以下是一些常用的全局配置选项:setOutputDir(StringoutputDir):设置生......