Flink
基础
实时计算与离线计算的区别
1、根据处理时间
实时计算数据实时处理,结果实时存储
离线计算数据延迟处理,N+1
2.根据处理方式
实时计算流式处理:一次处理一条或少量,状态小
离线计算批量处理:处理大量数据,处理完返回结果
实时计算是一种持续、低时延、事件触发的计算任务
离线计算是一种批量、高时延、主动发起的计算任务
特性
支持高吞吐、低延迟、高性能的流处理
支持带有事件时间的窗口(Window)操作
支持有状态计算的Exactly-once语义
支持高度灵活的窗口(Window)操作,支持基于time、count、session,以及data-driven的窗口操作
支持具有反压功能的持续流模型
支持基于轻量级分布式快照(Snapshot)实现的容错
一个运行时同时支持Batch on Streaming处理和Streaming处理
Flink在JVM内部实现了自己的内存管理,避免了出现oom
支持迭代计算
支持程序自动优化:避免特定情况下Shuffle、排序等昂贵操作,中间结果有必要进行缓存
Source:数据源
Flink 在流处理和批处理上的 source 大概有 4 类:
基于本地集合的 source、
基于文件的 source、
基于网络套接字的 source、
自定义的 source。自定义的 source 常见的有 Apache kafka、Amazon Kinesis Streams、RabbitMQ、Twitter Streaming API、Apache NiFi 等,当然你也可以定义自己的 source。
Sink: 接收器
Flink 将转换计算后的数据发送的地点 。
Flink 常见的 Sink 大概有如下几类:
写入文件、
打印出来、
写入 socket 、
自定义的 sink 。自定义的 sink 常见的有 Apache kafka、RabbitMQ、MySQL、ElasticSearch、Apache Cassandra、Hadoop FileSystem 等,同理你也可以定义自己的 sink。
Flink架构
Client(作业客户端)
JobManager(作业管理器)
TaskManager(任务管理器)
Flink集群搭建
在YAERN上运行
Application mode
1、相当于spark的 cluster
2、在本地没有详细的错误日志
3、一般用于生产
4、直接提交任务,每一个任务单独启动一个JobManager和多个taskManager
3、Per-Job Cluster Mode
1、相当于spark 的client模式
2、在本地可以看到错误日志
3、一般用于测试
4、直接提交任务,每一个任务单独启动一个JobManager和多个taskManager
4、Session Mode
1、会话模式是先在yarn启动启动一个JobManager,再提交任务,提交任务时动态申请taskmanager
2、任务共享同一个JobManager
core
Flink程序处理的三种方式
/*
* BATCH:批处理,只能处理有界流,底层是MR模型,可以进行预聚合
* STREAMING:流处理,可以处理无界流,也可以处理有界流,底层是持续流模型,数据一条一条处理
* AUTOMATIC:自动判断,当所有的Source都是有界流则使用BATCH模式,当Source中有一个是无界流则会使用STREAMING模式
*/
Flink处理逻辑传入的方式
// new XXXFunction 使用匿名内部类
// DataStream<String> wordsDS = wordsFileDS.flatMap(new FlatMapFunction<String, String>() {
// /**
// *
// * @param line DS中的一条数据
// * @param out 通过collect方法将数据发送到下游
// * @throws Exception
// */
// @Override
// public void flatMap(String line, Collector<String> out) throws Exception {
// for (String word : line.split(",")) {
// // 将每个单词发送到下游
// out.collect(word);
// }
// }
// });
// 使用Lambda表达式
/*
* ()->{}
* 通过 -> 分隔,左边是函数的参数,右边是函数实现的具体逻辑
*/
// 使用自定类实现接口中抽象的方法
// wordsFileDS.flatMap(new MyFunction()).print();
如何设置并行度
/*
* 如何设置并行度?
* 1、考虑吞吐量
* 有聚合操作的任务:1w条/s 一个并行度
* 无聚合操作的任务:10w条/s 一个并行度
* 2、考虑集群本身的资源
*
* Task的数量由并行度以及有无Shuffle一起决定
*
* Task Slot数量 是由任务中最大的并行度决定
*
* TaskManager的数量由配置文件中每个TaskManager设置的Slot数量及任务所需的Slot数量一起决定
*
*/
// 1、通过env设置,不推荐,如果需要台调整并行度得修改代码重新打包提交任务
// env.setParallelism(3);
// 2、每个算子可以单独设置并行度,视实际情况决定,一般不常用
SingleOutputStreamOperator<Tuple2<String, Integer>> kvDS = ds
.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT))
.setParallelism(4);
// 3、还可以在提交任务的时候指定并行度,最常用 比较推荐的方式
// 命令行:flink run 可以通过 -p 参数设置全局并行度
// web UI:填写parallelism输入框即可设置,优先级:算子本身的设置 > env做的全局设置 > 提交任务时指定的 > 配置文件flink-conf.yaml
事件时间
// 事件时间:数据本身自带的时间
// 水位线:某个线程中所接收到的数据中最大的时间戳
Source
Sink
Function
process
/*
* 每进来一条数据就会执行一次
* value :一条数据
* ctx:可以获取任务执行时的信息
* out:用于输出数据
*/
Window
TimeWindow
/*
* 时间窗口:滚动、滑动
* 时间类型:处理时间、事件时间
*/
Session
// 会话窗口:当一段时间没有数据,那么就认定此次会话结束并触发窗口的执行
基于处理时间
基于时间时间
CountWindow
// 计数窗口:滚动、滑动
.countWindow(5) // 每同一个key的5条数据会统计一次
.countWindow(10,5) // 每隔同一个key的5条数据统计最近10条数据
Kafka
kafka是一个高吞吐的分布式消息系统
特点
消息系统的特点:生存者消费者模型,FIFO
高性能:单节点支持上千个客户端,百MB/s吞吐
持久性:消息直接持久化在普通磁盘上且性能好
分布式:数据副本冗余、流量负载均衡、可扩展
很灵活:消息长时间持久化+Client维护消费状态
架构
producer:消息生存者
consumer:消息消费者
broker:kafka集群的server,
负责处理消息读、写请求,存储消息
topic:消息队列/分类
broker就是代理,在kafka cluster这一层这里,其实里面是有很多个broker
topic就相当于queue
图里没有画其实还有zookeeper,这个架构里面有些元信息是存在zookeeper上面的,整个集群的管理也和zookeeper有很大的关系
设置偏移量的位置
/*
* 设置当前消费的偏移量位置:
* 1、earliest从头开始消费
* 2、latest 从最后开始消费
* 3、timestamp 设置从某个时间戳开始
* 4、offset 设置从哪个偏移量开始
* ......
*/
设置写入时的语义
/*
设置写入时的语义:
1、AT_LEAST_ONCE:保证数据至少被写入了一次,性能会更好,但是又可能会写入重复的数据
2、EXACTLY_ONCE:保证数据只会写入一次,不多不少,性能会有损耗
*/
Checkpoint
checkpoint可以定时将flink计算过程中的状态持久化到hdfs中,保存状态不丢失
标签:处理,Flink,并行度,任务,设置,数据 From: https://www.cnblogs.com/justice-pro/p/18339670