首页 > 数据库 >kafka数据入paimon(flink-sql)

kafka数据入paimon(flink-sql)

时间:2023-12-07 10:26:25浏览次数:46  
标签:info arr bus flink kafka user sql paimon

1.创建CATALOG

CREATE CATALOG paimon_hive WITH (
'type' = 'paimon',
'metastore' = 'hive',
'uri' = 'thrift://hadoopm111:9083',
'warehouse' = 'hdfs:///apps/hive/paimon'
);

2.创建表接kafka数据(数据格式是text)

CREATE TABLE if not exists kafka_bus_user_info_source (
log_new String
) WITH (
'connector' = 'kafka',
'topic' = 'top-bus-user-info',
'properties.bootstrap.servers' = '192.168.11.106:6667,192.168.11.108:6667,
'properties.group.id' = 'top-bus-user-info-paimon',
'scan.startup.mode' = 'group-offsets',
'format' = 'raw',
'sink.parallelism' = '2'
);

创建udf函数(函数功能为抛掉少于指定字段数的日志)

CREATE TEMPORARY SYSTEM FUNCTION rawUdf AS 'cn.leadeon.dbgrp.udf.SplitRawUDTF';

3.使用CATALOG

USE CATALOG paimon_hive;

4.创建paimon表(会同步到hive)

create table if not exists paimon_ods.bus_user_info(
column1 string,
column2 string,
column3 string,
dt string)
PARTITIONED BY (dt)
with(
'write-mode' = 'append-only',
'bucket' = '5',
'bucket-key' = 'column1,column2',
'full-compaction.delta-commits'='20',
'sink.parallelism'='3'
);

5.设置checkpointing

SET execution.checkpointing.interval = '300s';

6.kafka数据写入paimon表

insert into paimon_ods.bus_user_info
SELECT
arr[1] as column1,
arr[2] as column2,
arr[3] as column3,
replace(substr(arr[2],1,10),'-','') as dt
FROM default_catalog.default_database.kafka_bus_user_info_source AS S
left join lateral table(rawUdf(log_new,3)) AS T(arr) on true
where arr[3] is not null;

标签:info,arr,bus,flink,kafka,user,sql,paimon
From: https://www.cnblogs.com/whiteY/p/17881101.html

相关文章

  • 通过PowerShellPlus示例脚本学习PowerShell之-通过SMO获取SQLServer数据库
    ##=====================================================================##Title:Get-MSSQL-DB-UsingSMO##Description:ShowalldatabasesusingSMOforagivenserverinstance##Author:Idera##Date:1/28/2008##Input:-s......
  • oracle优化器SQL
    注解必须紧跟在select、update、merge、insert或delete关键字后面。selectempid,    ename/*+index(eemp_pk)*/ fromempewhereempidin(1001,1002);访问路径提示:/*+FULL(表名)*/           全表扫描/*+INDEX(表名)*/     ......
  • SQL ALTER TABLE 语句- 灵活修改表结构和数据类型
    SQLALTERTABLE语句SQLALTERTABLE语句用于在现有表中添加、删除或修改列,也可用于添加和删除各种约束。ALTERTABLE-添加列要在表中添加列,请使用以下语法:ALTERTABLE表名ADD列名数据类型;以下SQL向"Customers"表添加了一个"Email"列:ALTERTABLECustomersA......
  • MySQL基础命令笔记
    MySQL基础命令笔记上学期间学习记录的笔记,放这里方便查阅。--创建数据库--CREATE创建;DATABASE数据库;CREATEDATABASEstuinfo--默认字符集DEFAULTCHARACTERSETutf8mb4--排序规则DEFAULTCOLLATEutf8mb4_general_ci;--显示当前服务器下,所有数据库SHOWDA......
  • 【数据库概论】第三章 SQL简述、数据定义和索引
    3.1SQL概述3.1.1产生与发展最早在IBM的关系数据库管理系统原型SystemR上实现,后来美国国家标准局(ANSI)批准SQL作为关系数据库语言的美国标准,同年公布了SQL标准文本。近些年来SQL标准的内容越来越丰富和复杂。目前没有任何一个数据库系统能够支持SQL标准的所有概念和特性,同时不少......
  • Java 操作 MySQL 数据库
    Java入门-获取MySQL数据java代码/***@Author编程无忧*@Date2022/1/1313:17*@Desc*/publicclassmysqlTest{@TestpublicvoidgetMysqlCon(){//声明Connection对象Connectioncon;//驱动程序名Stringdriver=......
  • SQL ALTER TABLE 语句- 灵活修改表结构和数据类型
    SQLALTERTABLE语句SQLALTERTABLE语句用于在现有表中添加、删除或修改列,也可用于添加和删除各种约束。ALTERTABLE-添加列要在表中添加列,请使用以下语法:ALTERTABLE表名ADD列名数据类型;以下SQL向"Customers"表添加了一个"Email"列:ALTERTABLECustomers......
  • python连接mysql、oracle数据库
    python版本:3.10.5mysql版本:8.0.27oracle版本:oracle12c一、python连接mysql数据库安装第三方依赖PyMySQL,终端执行如下命令:pipinstallPyMySQLPyMySQL使用importpymysqlconfig={'host':'127.0.0.1','port':3306,'u......
  • 大白话说Python+Flask入门(六)Flask SQLAlchemy操作mysql数据库
    写在前面这篇文章被搁置真的太久了,不知不觉拖到了周三了,当然,也算跟falsk系列说再见的时候,真没什么好神秘的,就是个数据库操作,就大家都知道的CRUD吧。FlaskSQLAlchemy的使用1、FlaskSQLAlchemy简介FlaskSQLAlchemy是基于Flaskweb框架和SQLAlchemyORM(对象关系映射)的工具......
  • MySQL数据库的CURD
    一、数据库的CURD对数据库进行增(Create)、改(Update)、查(Retrieve)、删(Delete)等操作。CREATE{DATABASE|SCHEMA}[IFNOTEXISTS]db_name[create_specification[,create_specification]...]IFNOTEXISTS表示只有数据库不存在的时候才创建,如果存在同名就不再执......