首页 > 数据库 >[AIGC] flink sql 消费kafka消息,然后写到mysql中的demo

[AIGC] flink sql 消费kafka消息,然后写到mysql中的demo

时间:2024-05-26 20:32:34浏览次数:28  
标签:timestamp SQL mysql demo flink Kafka MySQL kafka id

这是一个使用 Flink SQL 从 Kafka 中消费数据并写入 MySQL 的示例。在这个示例中,我们将假设有一个 Kafka 主题 “input_topic”,它产生格式为 (user_id: int, item_id: int, behavior: string, timestamp: long) 的数据,我们需要把这些数据写入名为"output_table"的 MySQL 表中。

首先,我们需要创建表达 Kafka 和 MySQL 的源和目标表条目:

CREATE TABLE kafka_source (
    user_id INT,
    item_id INT,
    behavior STRING,
    timestamp AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp` / 1000, 'yyyy-MM-dd HH:mm:ss')),
    WATERMARK FOR timestamp as timestamp - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'input_topic',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'localhost:9092',
    'format' = 'json'
);

CREATE TABLE mysql_sink (
    user_id INT,
    item_id INT,
    behavior STRING,
    timestamp TIMESTAMP(3)
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://localhost:3306/test',
    'table-name' = 'output_table',
    'username' = 'test',
    'password' = '123456'
);

进行消费 Kafka 和写入 MySQL 的操作:

INSERT INTO mysql_sink SELECT * FROM kafka_source;

首先,运行上述 SQL 创建对应 Kafka 和 MySQL 的表。其次,运行 INSERT INTO 语句进行从 Kafka 消费数据并将结果写入 MySQL 表的操作。这是一个基本的操作,你可以根据自己的需求进行相应的修改。

注意,需要根据实际的 Kafka 和 MySQL 的配置,如地址、用户名和密码,来修改上述 SQL 语句中的对应部分。

另外,Flink SQL 对 SQL 语句的语法进行了些许改变以适应流处理的特性,如在 kafka_source 表中的 WATERMARK 和 timestamp 的定义。

这个例子中,你需要确保已经引入了 flink-connector-kafka_2.11、flink-connector-jdbc_2.11 和 mysql-connector-java 等相关的 jar 包依赖。

标签:timestamp,SQL,mysql,demo,flink,Kafka,MySQL,kafka,id
From: https://blog.csdn.net/qq_45704048/article/details/139219951

相关文章

  • MySQL主从复制(五):读写分离
    一主多从架构主要应用场景:读写分离。读写分离的主要目标是分摊主库的压力。读写分离架构读写分离架构一架构一结构图:这种结构模式下,一般会把数据库的连接信息放在客户端的连接层,由客户端主动做负载均衡。也就是说由客户端来选择后端数据库进行查询。读写分离架构二架构......
  • (二) 快速安装Mysql - 腾讯云轻量应用服务器OpenCloud系统(Centos)
    #OpenCloud创建Mysql数据库####介绍腾讯云服务器OpenCloud系统创建Mysql数据库,可以通过MySQLWorkBench管理数据库####安装教程下面是使用"yum"命令来安装MySQL数据库的步骤:1.更新系统软件包在终端中执行以下命令,以更新系统软件包:sudoyumupdate2.安装MySQL数据......
  • 设置MySQL like查询时不区分大小写
    背景介绍MySQLlike查看结果出现了区分大小写的情况,实际需求是查询的时候不去区分大小写!原因MySQL的LIKE查询默认情况下是不区分大小写的。如果LIKE查询是区分大小写的,是因为列使用了区分大小写的字符集或校对规则。解决办法在like查询的时候使用COLLATE指定指定校对规......
  • MySQL---函数与约束
    目录一、函数1.字符串函数 2.数值函数3.日期函数4.流程函数5.总结 二、约束 1.概述2.约束演示3.外键约束3.1添加外键3.2删除外键 3.3外键删除更新行为4.总结 一、函数1.字符串函数 命令如下所示:--concatselectconcat("Hello","......
  • Springboot计算机毕业设计学生报到小程序【附源码】开题+论文+mysql+程序+部署
    本系统(程序+源码)带文档lw万字以上 文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景随着教育信息化的不断推进,学生报到流程也逐渐向数字化、智能化方向转变。传统的报到方式通常涉及大量的纸质材料和现场排队等待,不仅效率低下,而且容易......
  • 【MySQL数据库】认识数据库+环境搭建--------Windows系统
    一、认识数据库数据库(Database)是按照数据结构来组织、存储和管理数据的仓库。二、MySQL数据库MySQL是一个关系型数据库管理系统,由瑞典MySQLAB公司开发,目前属于Oracle公司。MySQL是一种关联数据库管理系统,关联数据库将数据保存在不同的表中,而不是将所有数据放在一个大......
  • Springboot计算机毕业设计行程日记卡小程序【附源码】开题+论文+mysql+程序+部署
    本系统(程序+源码)带文档lw万字以上 文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景在数字化和移动互联网时代,人们对于旅行方式的需求正发生深刻变化。行程规划、景点打卡、美食推荐等旅行需求日益个性化、多元化。然而,传统的旅行规划......
  • MySQL——2、并发事务所带来的问题是什么?怎样去解决?MySQL的默认隔离级别是什么
    一、并发事务所带来的问题1、脏读:就是一个事务对数据进行查询操作时而另一个事物在修改这条数据但未提交,这时进行查询操作的事务就会读取到未提交的数据也就是脏数据2、不可重复读:事务A查询一次数据接着事务B修改了数据并且已提交事务A继续执行操作查询数据读到了不一样的数......
  • java —— 连接 MySQL 操作
    MySQL是独立于java之外的数据库,二者之间建立连接需要提前引入mysql-connector-java的jar包。一、引入方法:①在项目中新建一个Folder(即文件夹),该文件夹通常命名为lib,意思是存放项目所依赖的第三方库或外部的jar文件。②将 mysql-connector-java的jar包复制进......
  • MYSQL满足条件函数里放查询最大函数的方法
    1.MYSQL满足条件函数里放查询最大函数的方法在MySQL中,如果我们想要在一个条件函数(如CASE)内部使用聚合函数(如MAX)来获取某个字段的最大值,我们通常需要在外部查询或子查询中执行这个聚合操作,并将结果作为参数传递给条件函数。以下是一个具体的代码示例,假设我们有一个名为sales的表,......