当在实际项目过程中 我们需要对数据进行增量更新操作
举个例子一张生活轨迹融合表是有上网信息和入住酒店信息两张表的数据union all 产生,如果一个融合表是多张表融合的,如果用kettle对每一张表 进行增量更新,有多少张表,就得要重复的拉一次所有的kettle组件并且每个组件都配置一下特别繁琐 ,现在我采用偏移量表和kettle任务只要拉一次控件即可
总体思路:
1:先去偏移量里获取同一任务下要跑的几张表的信息
2:把同一任务下的几张表行转列转化为同一组参数
3:带着参数进行union all 融合
4:数据流抽到目标后记录每张表的最大增量字段,通过同一task_id合并为一行
5:把每张表的最大时间戳插入到偏移量表
话不多说 直接开干总体流程
第一步创建偏移量表
create table data_offset_multi_info (
xh number(10),
task_id nvarchar2(50),
table_name nvarchar2(50),
incr_field nvarchar2(50),
field_value nvarchar2(50),
insert_time date
);
insert into data_offset_multi_info values (1,1,'LKNB','DATE1','20221030000000',sysdate) ;
insert into data_offset_multi_info values (2,1,'WB_USEINFO','KSSWSJ','2022-10-30 00:00:00',sysdate) ;
第二步 模拟酒店 网吧数据
CREATE TABLE LKNB (
LKBM VARCHAR2(20),
SFZH VARCHAR2(18),
XM VARCHAR2(18),
DATE1 VARCHAR2(14)
)
INSERT INTO LKNB values ('1','340001199302033516','小明','20221030000022') ;
INSERT INTO LKNB values ('2','340001199302033517','小花','20221030010012') ;
CREATE TABLE WB_USEINFO (
RYBH VARCHAR2(20),
SFZH VARCHAR2(18),
XM VARCHAR2(18),
KSSWSJ DATE
)
INSERT INTO WB_USEINFO VALUES ('WB12', '340223199202033515','爱我家宝', to_date( to_char(SYSDATE, 'yyyy-mm-dd hh24:mi:ss') ,'yyyy-mm-dd hh24:mi:ss') );
目标表
create table trajectory (
primary_key varchar(30),
sfzh varchar(18),
yw_time varchar2(20),
xm varchar2(20)
)
这里也可以直接用SQL
select * from (
select id ,table_name,OFFSET_FIELD from data_offset_info
) pivot (max(OFFSET_FIELD ) for table_name in ( 'LKNB' as LKNB ,'WB_USEINFO' as WB_USEINFO ) )
但是我生产环境是clickhouse 不支持这种写法 直接用组件了
step2解析
step3解析
进一步解析表输入 表输出 排序
select LKBM as primary_key ,sfzh ,DATE1 as yw_time , xm from LKNB
where date1 >'${LKNB}'
union all
select RYBH as primary_key ,sfzh , To_char(KSSWSJ, 'yyyy-mm-dd hh24:mi:ss') as yw_time , xm from WB_USEINFO
where KSSWSJ > to_date('${WB_USEINFO}' ,'yyyy-mm-dd hh24:mi:ss' )
进一步解析表合并记录
执行sql 这一步 也可以使用kettle 的删除组件 ,但是我生产库用的clichouse 不执行删除组件 ,用sql 是通用的 不管是关系型数据库还是非关系型的都可以
这里就是表输入1(多表结果集) 和表输入2(目标表trajectory) 按主键进行合并记录 ,indentical,deleted 不做任何操作,new 直接插入目标表,changged新删除再插入,这里changed不考虑插入更新操作,
是插入更新只适合小批量的数据 ,稍微大一点的数据 同步的特别慢
step4步骤解析
select a.seq ,a.lkbn_incr ,b.wb_incr from (
select 1 as seq , max( DATE1 ) lkbn_incr from LKNB
) a left join (
select 1 as seq , to_char( max( KSSWSJ) ,'yyyy-mm-dd hh24:mi:ss' ) wb_incr from WB_USEINFO
) b on a.seq =b.seq
step5步骤解析
step6解析
delete from data_offset_multi_info where xh=1 ;commit ;
delete from data_offset_multi_info where xh=2 ;commit ;
insert into data_offset_multi_info values (1,1,'LKNB','DATE1', '${LKBN_INCR}', sysdate) ;commit ;
insert into data_offset_multi_info values (2,1,'WB_USEINFO','KSSWSJ', '${WB_INCR}', sysdate) ;commit ;
总结:以上仍然存在不足 ,比如在其中一个步骤出错需要异常处理,这个还得要改进
标签:info,增量,WB,USEINFO,kettle,更新,LKNB,offset,data From: https://www.cnblogs.com/CloudHaiYun/p/16851240.html