首页 > 其他分享 >flink随笔

flink随笔

时间:2024-02-03 20:00:27浏览次数:24  
标签:flink join -- xxx hive kafka table 随笔

  • 内存设置
    Flink 总内存,建议在独立部署模式下使用
    taskmanager.memory.flink.size
    jobmanager.memory.flink.size
    进程总内存,建议在容器化部署模式(Kubernetes、Yarn 或 Mesos)下使用
    taskmanager.memory.process.size
    jobmanager.memory.process.size
    不建议同时设置进程总内存和 Flink 总内存。
    这可能会造成内存配置冲突,从而导致部署失败。

  • 并行度
    slot 是静态的概念,表示 TaskManager 具有多少并发执行能力。
    parallelism 是动态的概念,表示程序运行时实际使用时的并发能力。
    设置的 parallelism 不能高于 slot 数量,不然将会出现计算资源不够用的情况

  • 使用hiveSQL的语法
    flink 引擎会自动使用 hive 模块来解析 flink 模块解析不了的函数,如果想改变模块解析顺序,
    则可以使用 use modules hive, core; 语句来改变模块解析顺序。
    load module hive;

insert overwrite test.sink_table
select col1,
	collect_list(col2) as col2, 
	collect_set(col2) as col3
from test.source_table
group by col1
;
  • 设置hiveSQL方言
set 'table.sql-dialect' = 'hive';

insert overwrite test.sink_table
select col1,
	collect_list(col2) as col2, 
	collect_set(col2) as col3
from test.source_table
group by col1
;
  • 将流式数据,转换成数据表式输出
    'table'分页表格可视化
    'changelog'由插入(+)和撤销(-)组成的持续查询产生结果流
    'tableau'制表的形式
    SET sql-client.execution.result-mode = tableau;

  • 将flink的DAG图中算子拆分,默认为true,需要手动设置false
    为false时,可以看到算子的数据流向、数据压力等信息
    为true时,多个算子合并为一个,提高执行效率
    set pipeline.operator-chaining = false;

  • 表状态的超时时间
    flink将流数据转换为表数据时,已经被计算过的表的数据会无限堆积,此时需要设置一个数据的生存时间
    set 'table.exec.state.ttl' = '60 s' ;

  • 检查点之间的时间间隔(以毫秒为单位)。
    在此间隔内,系统将生成新的检查点
    SET 'execution.checkpointing.interval' = '3s';

  • 允许的连续失败检查点的最大数量。
    如果连续失败的检查点数量超过此值,作业将失败。
    SET execution.checkpointing.tolerable-failed-checkpoints = 10;

  • 检查点的超时时间(以毫秒为单位)。
    如果在此时间内未完成检查点操作,作业将失败。
    SET execution.checkpointing.timeout =600000;

  • 无数据到来时,水位线的等待的时间,超过这个时间触发窗口结束
    如多表join时,某张表数据迟迟没有到来,此时如果水位线小于关窗时间,窗口没法关闭就没有结果输出。设置该参数,如果遇到这种情况会推进水位线来触发窗口关闭。
    set 'table.exec.source.idle-timeout' = '10 s';

  • 设置检查点

-Dexecution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION
-Dstate.backend.ttl.seconds=60
  • flink-cdc
1、v3.0版本不支持Doris连接
2、cdc Oracle表时,如果添加checkpoint,需要开启表的增量日志
ALTER TABLE cdc.TEST2 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA; 
  • 读取hive维表的方式:
    1、需要创建一张hive维表的分区表(小数据量),保留一个分区存储全部数据
    2、flinkSQL中如下使用hive维表,添加SQL的options提示
CREATE TEMPORARY VIEW V_ODS_WMS_ERP_ROW_TYPE
AS 
select * from hive.his.t_his_wms_erp_row_type 
	/*+ OPTIONS(
		'streaming-source.enable' = 'true', 
		'streaming-source.partition-order' = 'partition-time',
		'streaming-source.monitor-interval' = '12 hour'
	)
	*/
;

--是否开启流式source。
--streaming-source.enable

--持续监控分区/文件的时间间隔。时间间隔太短会对hive产生压力
--streaming-source.monitor-interval	

--设置读取分区的选项,默认'all’意味着读取所有分区
--而latest按照’streaming-source.partition.order’设置的分区顺序来读取最新的分区。
--streaming-source.partition.include	

--流式source模式下的分区顺序
--create-time会比较分区/文件(在文件系统)的创建时间
--partition-time比较通过分区名称提取的时间。
--streaming-source.partition-order
  • flink的connector

--将流数据输出打印到 控制台 中
WITH (
 'connector' = 'print'
)

--jdbc连接MySQL
WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://xxx.xxx.x.x:3306/db',
    'table-name' = 'tablename',
    'username' = 'xxx',
    'password' = 'xxxx'
)

--jdbc连接Oracle
WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:oracle:thin:@xxx.xxx.x.x:1521:db',
  'driver' = 'oracle.jdbc.driver.OracleDriver',
  'table-name' = 'ods.t_ods_source_kafka_print',
  'username' = 'xxx',
  'password' = 'xxx'
)

--连接Doris
with(
  'connector' = 'doris',
  'fenodes' = 'xxx.xxx.x.x:8030',
  'table.identifier' = 'ods.t_ods_table_name',
  'username' = 'etl',
  'password' = 'xxxxx'
)
--连接Doris,写入数据
WITH (
    'connector' = 'doris',
    'fenodes' = 'xxx.xxx.x.x:8030',
    'table.identifier' = 'ods.t_ods_xiaobing_mail',
	'username' = 'etl',
	'password' = 'xxxx',
    'sink.properties.format' = 'json',
    'sink.properties.read_json_by_line' = 'true',
	'sink.label-prefix' = 'sink_doris_tablename_20230731' --数据流前缀名
);

--连接kafka
WITH (
 'connector' = 'kafka',
 'topic' = 'topic_test2',
 'scan.startup.mode' = 'latest-offset',  --读取位置,消费最新的数据
 --'scan.startup.mode' = 'earliest-offset',  --读取位置,从最早的数据开始消费
 'properties.bootstrap.servers' = 'xxx.xxx.x.x:9092',
 'properties.group.id' = 'Test',   --消费者组ID
 'format' = 'raw'  --按行读取数据
 --'format' = 'json'   --按json读取表数据
);


--连接kafka,读取OGG到kafka的数据
with (
  'connector' = 'kafka',
	.....
  'ogg-json.ignore-parse-errors' = 'true',
  'format' = 'ogg-json'
);

--写入kafka
WITH (
 'connector' = 'kafka', 
	.....
 'format' = 'json', 
 'sink.partitioner' = 'default'
)

--upsert-kafka读取kafka,可以获取到变化的数据,默认消费从最早开始的数据
--upsert-kafka写入kafka
create table table_name(
	id bigint,
	...
	primary key(id) NOT ENFORCED --必须指定主键
)WITH (
 'connector' = 'upsert-kafka', --支持回撤流,否则只能进行inner join 
	.....
 'key.format' = 'json', 
 'value.format' = 'json'
)


--Linux系统文件路径
WITH (
  'connector' = 'filesystem',
  'path' = '/home/hadoop/Lgr/files/data',
  'format' = 'xlsx'
)



--创建Catalog,读取hive中的表,使用: hive.tmp.table_name
CREATE CATALOG hive WITH (
'type' = 'hive',
'default-database' = 'tmp',
'hive-conf-dir' = 'hdfs://xxx.xxx.x.x:9000/streampark/flink/flink-1.16.1/conf'
);




--导入split字符串分割函数、collectList集合函数
CREATE FUNCTION collectList as 'com.spring.ch.flink.udf.FlinkCollectList';
CREATE FUNCTION split as 'com.spring.ch.flink.udf.FlinkSplit';





--读取kafka的数据,创建流表
create table OGG_ACS_ACS_SLD_TASK(
	`CHILD_TASK_ID` BIGINT,  --主键
	`TASK_ID` BIGINT,
	`OP_TS` TIMESTAMP(3) METADATA FROM 'value.event-timestamp' VIRTUAL
)
with (
	'connector' = 'kafka',
	'topic' = 'flinksql-acs-sld-task',
	'properties.bootstrap.servers' = 'xxx.xxx.x.x:9092',
	'properties.group.id' = 'ZHLS_TEST1024',
	'scan.startup.mode' = 'latest-offset',
	'ogg-json.ignore-parse-errors' = 'true',
	'format' = 'ogg-json'
);

--创建流视图,根据主键和事件时间获取最新的一条数据
CREATE TEMPORARY VIEW V_ACS_ACS_SLD_TASK
AS
SELECT *  
FROM (
	SELECT *,ROW_NUMBER() OVER(PARTITION BY CHILD_TASK_ID ORDER BY OP_TS DESC) RK
	FROM OGG_ACS_ACS_SLD_TASK T1
) Q
WHERE Q.RK = 1;


  • 双流join
    离线join只计算一次,得出有界的结果
    实时join不断的计算,无界的数据,变化的结果

  • A流 join B流
    A流数据到达,此时为A状态更新,join B流状态,反之亦然
    即每次流状态更新,都会进行join操作

  • join类别
    1、普通join(Regular Join)
    常规的流join
    inner join:双流数据到达时,就计算,不会产生回撤流
    left join:左流数据到达就输出结果,右流数据到达时,数据产生回撤流,重新输出结果
    right join
    full join

  • 2、时间区间join(Interval Join)

  • 3、时态/快照join(Temporal Join)

  • 4、维表join(Lookup Join)

  • 5、数组炸裂(Array Expansion)

  • 6、表函数join(Table Function Join)

  • 7、窗口join(Window Join)

标签:flink,join,--,xxx,hive,kafka,table,随笔
From: https://www.cnblogs.com/nthforsth/p/18005100

相关文章

  • 【随笔】我终于理解了退流!!!!!!!!
    我知道我看起来像个若至但这对我来说是历史性的一刻在我看来退流是NOI级基础算法图论中最难理解的,但是我看过的所有讲解都是一笔带过我不能理解虽然你晨星灵也只提了几句,但是我总算是彻底理解退流的原理和合理性了对于强迫症的我,不完全理解就去敲代码是不可接受的晨星灵我......
  • [Flink] Flink源码分析 : BoundedOutOfOrdernessTimestampExtractor
    0序言0.1缘起importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.api.java.tuple.Tuple;importorg.apache.flink.api.java.tuple.Tuple3;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.configuration.......
  • [Flink] Flink Job之Web UI
    0序言在本地电脑开发、调试,或集群环境下运行FlinkJob时,需要利用WebUI观测作业内部的运行情况。WEBUI,对我们观测Flink作业的总体运行情况(系统负载)、快速定位和解决问题,至关重要。全文基于如下版本演示:scala.version=2.11/2.12;flink.version=1.13.11FlinkJob......
  • Apache Doris 整合 FLINK CDC + Iceberg 构建实时湖仓一体的联邦查询
    1概况本文展示如何使用FlinkCDC+Iceberg+Doris构建实时湖仓一体的联邦查询分析,Doris1.1版本提供了Iceberg的支持,本文主要展示Doris和Iceberg怎么使用,大家按照步骤可以一步步完成。完整体验整个搭建操作的过程。2系统架构我们整理架构图如下,   1.首先我们从Mysq......
  • Apache Doris 整合 FLINK CDC + Iceberg 构建实时湖仓一体的联邦查询
    1概况本文展示如何使用FlinkCDC+Iceberg+Doris构建实时湖仓一体的联邦查询分析,Doris1.1版本提供了Iceberg的支持,本文主要展示Doris和Iceberg怎么使用,大家按照步骤可以一步步完成。完整体验整个搭建操作的过程。2系统架构我们整理架构图如下,   1.首先我们从Mysq......
  • Flink CDC引起的Mysql元数据锁
    记一次FlinkCDC引起的Mysql元数据锁事故,总结经验教训。后续在编写FlinkCDC任务时,要处理好异常,避免产生长时间的元数据锁。同时出现生产问题时要及时排查,不能抱有侥幸心理。1、事件经过某天上午,收到系统的告警信息,告警提示:同步Mysql的某张表数据到Elasticsearch异常,提示连不......
  • Flink之状态编程 值状态(ValueState)列表状态(ListState)映射状态(MapState)归约状态(Reducin
    Flink之状态编程值状态(ValueState)列表状态(ListState)映射状态(MapState)归约状态(ReducingState)聚合状态(AggregatingState)广播状态(BroadcastState)Flink之状态编程一、按键分区状态(KeyedState)1.1、值状态(ValueState)1.1.1、定义1.1.2、使用案例1.2、列表状态(ListState)1.2.1......
  • java flink(二十六) 实战之电商黑名单过滤 Flink CEP编程实现、什么是CEP、CEP组合模式d
    javaflink(二十六)实战之电商黑名单过滤FlinkCEP编程实现、什么是CEP、CEP组合模式demo、CEP循环模式demo什么是CEP:1、复杂事件处理2、Flink中实现复杂事件处理库3、CEP允许在无休止的事件中检测事件模式,让我们有机会掌握数据中的重要部分4、一个或多个由简单事件构成的事......
  • flink定时器使用问题
    flink定时器使用问题        flink定时器的使用,需要涉及flinktime、watermark、keyStream、keyState等概述,尽管关于flinktime和watermark的文章烂大街,但还是有必要先简单介绍一下,有助于解释下面flink定时器使用遇到的问题。时间模型        flink在stream......
  • flink状态编程
    flink状态编程简单记录一下最近工作中常用的flink状态flink中可以创建不同类型的状态,如键控状态(KeyedState)和操作符状态(OperatorState)等。状态管理是在流处理的整个过程中保持状态的一种能力,它让我们能够在复杂的事件处理和流转换中保留重要的状态信息,例如:聚合结果、过滤条件......