首页 > 数据库 >通过flinkSql将kafka和mysql连接

通过flinkSql将kafka和mysql连接

时间:2024-12-03 23:29:58浏览次数:6  
标签:status int flinkSql kafka tabEnv user mysql id page

kafkaToKafka

{"user_id": "1", "page_id":"1", "status": "success"}
{"user_id": "1", "page_id":"1", "status": "success"}
{"user_id": "1", "page_id":"1", "status": "success"}
{"user_id": "1", "page_id":"1", "status": "success"}
{"user_id": "1", "page_id":"1", "status": "fail"}
package com.bigdata.day07;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class _08_kafka_to_kafka {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);
        tabEnv.executeSql("CREATE TABLE table1 (\n" +
                "  `user_id` int,\n" +
                "  `page_id` int,\n" +
                "  `status` STRING\n" +
                ") WITH (\n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = 'topicA',\n" +
                "  'properties.bootstrap.servers' = 'bigdata01:9092',\n" +
                "  'properties.group.id' = 'g1',\n" +
                "  'scan.startup.mode' = 'latest-offset',\n" +
                // 这个是需要flink-json的
                "  'format' = 'json'\n" +
                ")");
        tabEnv.executeSql("CREATE TABLE table2 (\n" +
                "  `user_id` int,\n" +
                "  `page_id` int,\n" +
                "  `status` STRING\n" +
                ") WITH (\n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = 'topicB',\n" +
                "  'properties.bootstrap.servers' = 'bigdata01:9092',\n" +
                "  'properties.group.id' = 'g1',\n" +
                "  'scan.startup.mode' = 'latest-offset',\n" +
                "  'format' = 'json'\n" +
                ")");

        tabEnv.executeSql("insert into table2 select * from table1 where status = 'success'");

    }
}
// 非常简单的代码

使用executeSql后,就可以不使用execute了
但是若有一个print ,那么还需要execute

kafkaToMysql

需要先在mysql中建表
create table t_success
(
    id      int auto_increment,
    user_id int         null,
    page_id int         null,
    status  varchar(20) null,
    constraint t_success_pk
        primary key (id)
);
package com.bigdata.day07;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class _09_kafka_to_mysql {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);
        tabEnv.executeSql("CREATE TABLE table1 (\n" +
                "  `user_id` int,\n" +
                "  `page_id` int,\n" +
                "  `status` STRING\n" +
                ") WITH (\n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = 'topicA',\n" +
                "  'properties.bootstrap.servers' = 'bigdata01:9092',\n" +
                "  'properties.group.id' = 'g1',\n" +
                "  'scan.startup.mode' = 'latest-offset',\n" +
                "  'format' = 'json'\n" +
                ")");
        tabEnv.executeSql("CREATE TABLE table2 (\n" +
                "  `user_id` int,\n" +
                "  `page_id` int,\n" +
                "  `status` STRING\n" +
                ") WITH (\n" +
                "    'connector' = 'jdbc',\n" +
                "    'url' = 'jdbc:mysql://localhost:3306/edu?useUnicode=true&characterEncoding=utf8',\n" +
                "    'table-name' = 't_success', \n" +
                "    'username' = 'root',\n" +
                "    'password' = 'root'\n" +
                ")");

        tabEnv.executeSql("insert into table2 select * from table1 where status = 'success'");


    }
}

readMysql

package com.bigdata.day07;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class _10_read_mysql {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);
        tabEnv.executeSql("CREATE TABLE table1 (\n" +
                "  `user_id` int,\n" +
                "  `page_id` int,\n" +
                "  `status` STRING\n" +
                ") WITH (\n" +
                "    'connector' = 'jdbc',\n" +
                "    'url' = 'jdbc:mysql://localhost:3306/edu?useUnicode=true&characterEncoding=utf8',\n" +
                "    'table-name' = 't_success', \n" +
                "    'username' = 'root',\n" +
                "    'password' = 'root'\n" +
                ")");

        Table table = tabEnv.sqlQuery("select * from table1");

        DataStream<Row> appendStream = tabEnv.toAppendStream(table, Row.class);

        appendStream.print();
        env.execute();
    }
}

标签:status,int,flinkSql,kafka,tabEnv,user,mysql,id,page
From: https://blog.csdn.net/weixin_52642840/article/details/144226654

相关文章

  • MySQL底层概述—10.InnoDB锁机制
    大纲1.锁概述2.锁分类3.锁实战之全局锁4.锁实战之表级锁(偏读)5.锁实战之行级锁(偏写)—行级锁升级表级锁6.锁实战之行级锁(偏写)—间隙锁7.锁实战之行级锁(偏写)—临键锁8.锁实战之行级锁(偏写)—幻读演示和解决9.锁实战之行级锁(偏写)—优化建议10.锁实战之乐观锁11.......
  • A109 PHP+MYSQL+LW+网上论坛网站 军事BBS系统的设计与实现 源码+文档 全套 教程
    网上军事论坛网站系统1.项目摘要2.研究背景3.项目功能4.界面展示5.源码获取1.项目摘要随着计算机网络的普及,如今越来越多的论坛类网站也是数不胜数,各种类型的论坛交流网站,深受当前网友们的喜欢。网上军事论坛网站的成立,是基于对于想要了解更多军事方面的资讯信息的......
  • MySQL--索引
    索引一、索引定义索引是一种独立存在的,对数据库表中一列或多列的值进行排序(目的:加快查询速度,提高查询效率)。二、案例演示索引是如何提高查询速度的?示例:--未建立索引select*fromtest_indexwherename='我的年龄12999';--......
  • Mysql简介及相关知识
    一、Mysql简介1、介绍1.1什么是数据库?数据库:database,数据的仓库(用来存放数据库对象)按照一定的数据结构来组织、存储和管理的数据的仓库,简单来说就是存储数据的仓库。数据库系统组成:DBS是由DB和DBMS两部分组成。计算机硬件、DBMS、......
  • MySQL 常用的日期函数
    两个日期相减:TIMESTAMPDIFF函数语法:TIMESTAMPDIFF(unit,begin,end)说明:TIMESTAMPDIFF函数返回end-begin的结果,其中begin和end是DATE或DATETIME表达式。unit参数是确定end-begin的结果的单位,表示为整数。以下是有效单位:取值含义MICROSECOND毫秒SECOND......
  • 245 微信+PHP+MYSQL+LW+基于微信的校园健身小程序的设计与实现 源码 配置 文档
    基于微信的校园健身小程序的设计与实现1.摘要2.开发目的和意义3.系统功能设计4.系统界面截图5.源码获取1.摘要近几年来,随着我国居民生活水平的提高,人们对于健身养生保健方面也是尤为关注,而电子商务在基于互联网信息技术的发展而普及,近几年,随着微信小程序的应用成熟,也......
  • 免费送源码:Java+B/S+My eclipse+MySQL Springboot 连锁超市零售管理系统 计算机毕业设
         摘 要在网络信息的时代,众多的软件被开发出来,给用户带来了很大的选择余地,而且人们越来越追求更个性的需求。在这种时代背景下,超市零售管理只能以用户为导向,按品种小批量组织生产,以产品的持续创新作为超市零售管理最重要的竞争手段。系统采用了B/S结构,将所有业务......
  • 计算机毕业设计原创定制(免费送源码):Java+ssm+JSP+Ajax+MySQL SSM汽车租赁管理系统
    摘 要信息化社会内需要与之针对性的信息获取途径,但是途径的扩展基本上为人们所努力的方向,由于站在的角度存在偏差,人们经常能够获得不同类型信息,这也是技术最为难以攻克的课题。针对汽车租赁信息管理等问题,对其进行研究分析,然后开发设计出汽车租赁管理系统以解决问题。汽车......
  • 消息队列-kafka
    消息队列-kafkakafka常见面试题kafka实践环境准备代码结果截图参考摘要:本文将会对kafka进行介绍,首先介绍消息队列的一些基础知识,然后是kafka的基本概念和底层原理,以及kafka如何保证消息可靠性、消息不丢失,如何解决消息重复以及消息积压等问题,并且分析kafka为什么具......
  • MySQL备份与恢复(mysqldump与xtrabckup8,超详细)
    MySQL备份与恢复mysqldump和xtrabackup是MySQL数据库中常用的两种备份工具,它们各自有不同的特点和适用场景:备份方式:mysqldump:逻辑备份工具,通过生成SQL语句来备份数据库的数据和结构。xtrabackup:物理备份工具,直接复制数据文件,特别适合InnoDB和XtraDB数据库的热备份。......