首页 > 数据库 >flink开发环境执行sql及生产环境提交sql文件

flink开发环境执行sql及生产环境提交sql文件

时间:2022-10-30 15:15:45浏览次数:67  
标签:flink 环境 kafka topic source sink sql sensor

  flink提供了sql-client.sh工具可直接操作sql,该工具一般在开发环境用于调试,在生产环境还是要打成jar文件。为了避免在java文件中写大量sql,我们可以将sql提取出来放到一个后缀是.sql的文件中,自己编辑java代码读取该sql文件。然后将java代码与sql文件一块打成jar,部署到flink环境中。

一、安装flink及部署jar,参考下面两篇文章

flink本地开发环境安装与部署(单机)

flink集群安装-Standalone模式

二、开发环境使用sql-client.sh

1、启动flink

start-cluster.sh

2、启动sqlclient

sql-client.sh

3、测试sqlclient

1)启动kafka,并创建生产者与消费者

kafka-console-producer.sh --broker-list 192.168.23.190:9092 --topic topic_source
kafka-console-consumer.sh --bootstrap-server 192.168.23.190:9092 --topic topic_sink

2)在sqlclient中创建表source_sensor,获取topic_source的数据

create table source_sensor (id string,ts bigint,vc int)
with('connector' = 'kafka',
     'topic' = 'topic_source',
     'properties.bootstrap.servers' = '192.168.23.190:9092',
     'scan.startup.mode' = 'latest-offset',
     'format' = 'json');

3)在sqlclient中创建表sink_sensor,将数据传送给topic_sink

create table sink_sensor (id string,ts bigint,vc int)
with('connector' = 'kafka',
    'topic' = 'topic_sink',
    'properties.bootstrap.servers' = '192.168.23.190:9092',
    'format' = 'json');

4)将source_sensor的数据存储到sink_sensor

insert into sink_sensor select  * from source_sensor;

 5)kafka生产端输入消息,消费端可以获取到消息

{"id":"23","ts":23,"vc":29}
{"id":"23","ts":23,"vc":29}
{"id":"231","ts":23,"vc":23}
{"id":"23","ts":23,"vc":29}

四、生产环境将sql打成jar提交

1、java本地执行sql示例

1)java代码,在idea中运行

public class Main {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);

        tabEnv.executeSql("create table source_sensor (id string,ts bigint,vc int)\n" +
                "with('connector' = 'kafka',\n" +
                "     'topic' = 'topic_source',\n" +
                "     'properties.bootstrap.servers' = '192.168.23.190:9092',\n" +
                "     'scan.startup.mode' = 'latest-offset',\n" +
                "     'format' = 'json')");

        tabEnv.executeSql("create table sink_sensor (id string,ts bigint,vc int)\n" +
                "with('connector' = 'kafka',\n" +
                "    'topic' = 'topic_sink',\n" +
                "    'properties.bootstrap.servers' = '192.168.23.190:9092',\n" +
                "    'format' = 'json')");

        tabEnv.executeSql("insert into sink_sensor select  * from source_sensor");
    }
}

2)同样开启kafka生产端、消费端进行测试

2、生产环境提交sql文件

1)编写java通用代码,读取.sql文件

public class Main2 {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);

        String path = "D:\\workset\\javaset\\tablesql\\test\\test.sql";
        List<String> list = FileUtils.readLines(new File(path),"UTF-8");
        StringBuilder stringBuilder = new StringBuilder("");
        String sql = "";
        for(String var : list){
            if(StringUtils.isNotBlank(var)){
                stringBuilder.append(var);
                if(var.contains(";")){
                    sql = stringBuilder.toString().replace(";","");
                    System.out.println(sql);
                    tabEnv.executeSql(sql);
                    stringBuilder = new StringBuilder("");
                }else{
                    stringBuilder.append("\n");
                }
            }
        }
    }
}

注意:部署时修改test.sql的路径为:/usr/local/myroom/a_project/test.sql

2)编写test.sql文件

create table source_sensor (id string,ts bigint,vc int)
with('connector' = 'kafka',
     'topic' = 'topic_source',
     'properties.bootstrap.servers' = '192.168.23.190:9092',
     'scan.startup.mode' = 'latest-offset',
     'format' = 'json');

create table sink_sensor (id string,ts bigint,vc int)
with('connector' = 'kafka',
    'topic' = 'topic_sink',
    'properties.bootstrap.servers' = '192.168.23.190:9092',
    'format' = 'json');

insert into sink_sensor select  * from source_sensor;

将test.sql上传到路径/usr/local/myroom/a_project/

3)将1)中的通用类打成jar,并提交

<build>
  <finalName>testSql</finalName>
   <plugins>
      <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-jar-plugin</artifactId>
         <version>2.4</version>
         <configuration>
         <includes>
         <include>com/leiyuke/flink/tablesql/test/Main2.class</include>
         </includes>
         </configuration>
     </plugin>
  </plugins>
</build>

4)测试如下:

 

标签:flink,环境,kafka,topic,source,sink,sql,sensor
From: https://www.cnblogs.com/javasl/p/16841084.html

相关文章

  • mySQL语句
    Mysql语句:1、插入sql  主键ID也需要填,mysql不会自动生成,--并且需要满足自增INSERTintotb_roomhclog(ID,RID,RoomID,RoomName,UCount,Raw_URL,Analyse_URL,Analys......
  • kali自带sqlmap使用报错[CRITICAL] unable to connect to the target URL. sqlmap is
    kali自带的sqlmap使用报错root@kali:~#sqlmap-u"http://192.168.204.133/mutillidae/index.php?page=user-info.php&username=admin&password=admin&user-info-php-sub......
  • MySQL线上环境单表1000w数据增加字段怎么做
    向一个1000w数据的线上业务表里新加字段,怎么操作?本地测试及正确解决方案:1.准备测试环境MySQL测试环境系统:Linuxcentos6.8内存:2G内存CPU:2核CPU硬盘:200G硬......
  • sqlmap自动注入
    SQL注入比较好用的工具,首推开源工具SQLmap。SQLmap是一个国内外著名的安全稳定性测试工具,可以用来进行自动化检测,利用SQL注入漏洞,获取数据库服务器的权限。它具有功能强大......
  • mysql 通城公司灯具部数据库
    创建数据库: createdatabasetongchenggongsi;  1、           查询车间名称和电话;2、           查询属于台灯厂的车间;   3、      ......
  • 「MySQL高级篇」MySQL日志、事务原理 -- undolog、redolog、binlog、两阶段提交
    引言日志日志,在我们平时开发中主要的用途在于监控、备份,但在MySQL中,日志的功能远远不止这些,分别有用于记录的慢查询日志,回滚版本的undolog,宕机恢复的redolog、全量备份的bin......
  • SQL模糊查询语句
    一般模糊语句如下:SELECT字段FROM表WHERE某字段Like条件其中关于条件,SQL提供了四种匹配模式:1、%:表示任意0个或多个字符。可匹配任意类型和长度的字符,有些情况下......
  • SQL中基础并重要的命令(3)
    ORDERBY用于对结果集按照一个列或者多个列进行排序。默认按照升序对记录进行排序。如果需要按照降序对记录进行排序,可以使用DESC关键字语法如下:select列名称from表......
  • 多语言在线客服系统源码-自动识别中英环境-私有化部署完美支持跨境电商网站
    如果您的客户遍布全球,客户沟通就必须跨越语言障碍。用客户当地的语言跟他们交谈,可以帮助您在客户生命周期的所有阶段建立信任,当然也包括服务支持。 具体做法,看看这四点......
  • 探花交友项目环境搭建
    探花交友项目环境搭建一、开发工具探花交友项目的开发统一使用提供的Centos7环境,该环境中部署安装了项目所需要的各种服务,如:RabbitMQ,MongoDB、Redis等。虚拟机的root......