第1章 Flink简介
1.1 初识Flink
1) Flink项目的理念是:“Apache Flink是为分布式、高性能、随时可用以及准确的流处理应用程序打造的开源流处理框架”。
2) Apache Flink是一个框架和分布式处理引擎,用于对无界(nc lk 9999)和有界数据(一个文档)流进行有状态计算。Flink被设计在所有常见的集群环境中运行,以内存执行速度和任意规模来执行计算。
1.2 Flink的重要特点
1.2.1 事件驱动型(Event-driven)
1) 事件驱动型应用是一类具有状态的应用,它从一个或多个事件流提取数据,并根据到来的事件触发计算、状态更新或其他外部动作。比较典型的就是以kafka为代表的消息队列几乎都是事件驱动型应用。(Flink的计算也是事件驱动型)
2) sparkstreaming是时间驱动:批次的时间
1.2.2 流(flink)与批(spark)的世界观
1) 批处理的特点是有界、大量,非常适合需要访问全套记录才能完成的计算工作,一般用于离线统计。 流处理的特点是无界、实时, 无需针对整个数据集执行操作,而是对通过系统传输的每个数据项执行操作,一般用于实时统计。
2) 在spark的世界观中,一切都是由批次组成的,离线数据是一个大批次,而实时数据是由一个一个无限的小批次组成的。
3) 而在flink的世界观中,==一切都是由流组成的,离线数据是有界限的流==,实时数据是一个没有界限的流,这就是所谓的有界流和无界流。
4) 无界数据流:
无界数据流有一个开始但是没有结束,它们不会在生成时终止并提供数据,必须连续处理无界流,也就是说必须在获取后立即处理event。对于无界数据流我们无法等待所有数据都到达,因为输入是无界的,并且在任何时间点都不会完成。处理无界数据通常要求以特定顺序(例如事件发生的顺序)获取event,以便能够推断结果完整性
5) 有界数据流:
有界数据流有明确定义的开始和结束,可以在执行任何计算之前通过获取所有数据来处理有界流,处理有界流不需要有序获取,因为可以始终对有界数据集进行排序,有界流的处理也称为批处理。
1.2.3 分层API
Croe APIs--> spark core Table API --> sql --> spark sql
1) 最底层级的抽象仅仅提供了有状态流,它将通过过程函数(Process Function)被嵌入到DataStream API中。底层过程函数(Process Function) 与 DataStream API 相集成,使其可以对某些特定的操作进行底层的抽象,它允许用户可以自由地处理来自一个或多个数据流的事件,并使用一致的容错的状态。除此之外,用户可以注册事件时间并处理时间回调,从而使程序可以处理复杂的计算。
2) 实际上,大多数应用并不需要上述的底层抽象,而是针对核心API(Core APIs) 进行编程,比如DataStream API(有界或无界流数据)以及DataSet API(有界数据集)。这些API为数据处理提供了通用的构建模块,比如由用户定义的多种形式的转换(transformations),连接(joins),聚合(aggregations),窗口操作(windows)等等。DataSet API 为有界数据集提供了额外的支持,例如循环与迭代。这些API处理的数据类型以类(classes)的形式由各自的编程语言所表示。
3) Table API 是以表为中心的声明式编程,其中表可能会动态变化(在表达流数据时)。Table API遵循(扩展的)关系模型:表有二维数据结构(schema)(类似于关系数据库中的表),同时API提供可比较的操作,例如select、project、join、group-by、aggregate等。Table API程序声明式地定义了什么逻辑操作应该执行,而不是准确地确定这些操作代码的看上去如何。
4) 尽管Table API可以通过多种类型的用户自定义函数(UDF)进行扩展,其仍不如核心API更具表达能力,但是使用起来却更加简洁(代码量更少)。除此之外,Table API程序在执行之前会经过内置优化器进行优化。
你可以在表与 DataStream/DataSet 之间无缝切换,以允许程序将 Table API 与 DataStream 以及 DataSet 混合使用。
5) Flink提供的最高层级的抽象是 SQL 。这一层抽象在语法与表达能力上与 Table API 类似,但是是以SQL查询表达式的形式表现程序。SQL抽象与Table API交互密切,同时SQL查询可以直接在Table API定义的表上执行。
6) 目前Flink作为批处理还不是主流,不如Spark成熟,所以DataSet使用的并不是很多。Flink Table API和Flink SQL也并不完善,大多都由各大厂商自己定制。==所以我们主要学习DataStream API的使用==。实际上Flink作为最接近Google DataFlow模型的实现,是流批统一的观点,所以基本上使用DataStream就可以了。
7) 2020年12月8日发布的1.12.0版本, 已经完成实现了真正的流批一体, 写好的一套代码, 即可以处理流式数据, 也可以处理离线数据. 这个与前面版本的处理有界流的方式是不一样的, Flink专门对批处理数据做了优化处理.
1.3 Spark or Flink
Spark 基于微批处理的方式需要同步会有额外开销,因此无法在延迟上做到极致。在大数据处理的低延迟场景,Flink 已经有非常大的优势。
1.4 Flink的应用
1.4.1 应用Flink的场景
1) 事件驱动型应用
事件驱动型应用是一类具有状态的应用,它从一个或多个事件流提取数据,并根据到来的事件触发计算、状态更新或其他外部动作。
事件驱动型应用是在计算存储分离的传统应用基础上进化而来。在传统架构中,应用需要读写远程事务型数据库。
相反,事件驱动型应用是基于状态化流处理来完成。在该设计中,数据和计算不会分离,应用只需访问本地(内存或磁盘)即可获取数据。系统容错性的实现依赖于定期向远程持久化存储写入 checkpoint。下图描述了传统应用和事件驱动型应用架构的区别。
2) 数据分析应用
Apache Flink 同时支持流式及批量分析应用。
Flink 为持续流式分析和批量分析都提供了良好的支持。具体而言,它内置了一个符合 ANSI 标准的 SQL 接口,将批、流查询的语义统一起来。无论是在记录事件的静态数据集上还是实时事件流上,相同 SQL 查询都会得到一致的结果。同时 Flink 还支持丰富的用户自定义函数,允许在 SQL 中执行定制化代码。如果还需进一步定制逻辑,可以利用 Flink DataStream API 和 DataSet API 进行更低层次的控制。此外,Flink 的 Gelly 库为基于批量数据集的大规模高性能图分析提供了算法和构建模块支持。
3) 数据管道应用
提取-转换-加载(ETL)是一种在存储系统之间进行数据转换和迁移的常用方法。ETL 作业通常会周期性地触发,将数据从事务型数据库拷贝到分析型数据库或数据仓库。
数据管道和 ETL 作业的用途相似,都可以转换、丰富数据,并将其从某个存储系统移动到另一个。但数据管道是以持续流模式运行,而非周期性触发。因此它支持从一个不断生成数据的源头读取记录,并将它们以低延迟移动到终点。例如:数据管道可以用来监控文件系统目录中的新文件,并将其数据写入事件日志;另一个应用可能会将事件流物化到数据库或增量构建和优化查询索引。
1.4.2 应用Flink的行业
电商和市场营销
数据报表、广告投放
物联网(IOT)
传感器实时数据采集和显示、实时报警,交通运输业
物流配送和服务业
订单状态实时更新、通知信息推送、电信业基站流量调配
银行和金融业
实时结算和通知推送,实时检测异常行为
第2章 Flink快速上手
2.1创建maven项目
1) idea.POM文件中添加需要的依赖: 课件上面有
2) src/main/resources添加文件:log4j.properties
log4j.rootLogger=error, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
3) idea的配置 创建目录和基本设置
<!-- 1. 编码 file encoding
2. 换行符 code style
3. maven
更新快照 选中
不要托管给maven 不选
4. 创建项目的时候, 万万不要把项目存入到中文路径
5. maven仓库也不要存到中文路径-->
2.2批处理WordCount 也是有界流
public class WcBounded {
public static void main(String[] args) {
// 1. 创建流StreamExecutionEnvironment的执行环境 自动根据当前运行的环境获取合适的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置通过此环境执行的操作的并行度。设置平行度
env.setParallelism(1);
// 2. 通过环境从数据源(source)获取一个流
DataStreamSource<String> source = env.readTextFile("input/words.txt");
// 3. 对流做各种转换 ,单词都是String的类型
SingleOutputStreamOperator<String> wordStream= source.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> out) throws Exception {
for (String word : line.split(" ")) {
out.collect(word); // 把单词放入到后序的流中
}
}
});
SingleOutputStreamOperator<Tuple2<String, Long>> map = wordStream.map(new MapFunction<String, Tuple2<String, Long>>()
{
@Override
public Tuple2<String, Long> map(String word) throws Exception {
return Tuple2.of(word, 1L);
}
}
);
KeyedStream<Tuple2<String, Long>, String> tuple2StringKeyedStream = map.keyBy(new KeySelector<Tuple2<String, Long>, String>() {
// 返回每个元素的key
@Override
public String getKey(Tuple2<String, Long> value) throws Exception {
return value.f0;
}
});
SingleOutputStreamOperator<Tuple2<String, Long>> sumStream = tuple2StringKeyedStream.sum(1);
// 4. 把流输出(sink)
sumStream.print();
// 5. 执行 执行环境
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
2.3流处理WordCount 也是无界流
就是读取的方式不一样,这个是获取无限的数据过来
public class WcUnBounded {
public static void main(String[] args) {
// 1. 创建流StreamExecutionEnvironment的执行环境 自动根据当前运行的环境获取合适的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置通过此环境执行的操作的并行度。设置平行度
env.setParallelism(1);
// 2. 通过环境从数据源(source)获取一个流
DataStreamSource<String> source = env.socketTextStream("hadoop162", 9999);
// 3. 对流做各种转换 ,单词都是String的类型
SingleOutputStreamOperator<String> wordStream= source.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> out) throws Exception {
for (String word : line.split(" ")) {
out.collect(word); // 把单词放入到后序的流中
}
}
});
SingleOutputStreamOperator<Tuple2<String, Long>> map = wordStream.map(new MapFunction<String, Tuple2<String, Long>>()
{
@Override
public Tuple2<String, Long> map(String word) throws Exception {
return Tuple2.of(word, 1L);
}
}
);
KeyedStream<Tuple2<String, Long>, String> tuple2StringKeyedStream = map.keyBy(new KeySelector<Tuple2<String, Long>, String>() {
// 返回每个元素的key
@Override
public String getKey(Tuple2<String, Long> value) throws Exception {
return value.f0;
}
});
SingleOutputStreamOperator<Tuple2<String, Long>> sumStream = tuple2StringKeyedStream.sum(1);
// 4. 把流输出(sink)
sumStream.print();
// 5. 执行 执行环境
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
==2.4 拉姆达表达式和泛型擦除==
1) 例子 1 :new的接口A 实现类是匿名内部类
public class Lambda {
public static void main(String[] args) {
//实现类是匿名内部类
A a = n -> System.out.println("匿名内部类的方法" + n);
A a2 = new A() {
@Override
public void fun(int n) {
System.out.println("匿名内部类的方法" + n);
}
};
a2.fun(20);
a.fun(10);
}
}
2) 例子 2 传的参数是接口
public class Lambda2 {
/*
java中的函数: lambda表达式(拉姆达表达式)
接口:
interface
常量
抽象方法
默认方法(default)
函数式接口:
如果一个接口中只有一个抽象方法, 这样的接口就是函数式接口,
而且要继承的父类也只有一个相同的抽象方法
如果一个类型是函数式接口类型, 则创建对象的时候可以使用lambda表示
*/
public static void main(String[] args) {
test2(new Bb() {
@Override
public void fun() {
System.out.println("匿名实现内部类");
}
});
test2(() -> System.out.println("匿名实现内部类2"));
test3(new c() {
@Override
public String fun(User2 user) {//形参传了一个类进来
return user.getName();
}
});
test3(user -> user.getName());
// 方法引用: 类名::方法名 , 是对特殊lambda表达式的替换,就只有一种方法的时候
test3(User2::getName);
}
public static void test3(c F){
String name = F.fun(new User2("lisi"));
System.out.println(name);
}
public static void test2(Bb f){
f.fun();
}
}
interface c{
String fun(User2 user);
}
class User2{
private String name;
public User2(String name) {
this.name = name;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
interface Bb {
void fun();
}
3) keyby可以写拉姆达表达式,其他类型写不方便
public class WcUnBoundLambda {
public static void main(String[] args) {
// 1. 创建流StreamExecutionEnvironment的执行环境 自动根据当前运行的环境获取合适的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置通过此环境执行的操作的并行度。设置平行度
env.setParallelism(1);
// 2. 通过环境从数据源(source)获取一个流
DataStreamSource<String> source = env.socketTextStream("hadoop162", 9999);
// 3. 对流做各种转换 ,单词都是String的类型
SingleOutputStreamOperator<String> wordStream= source.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> out) throws Exception {
for (String word : line.split(" ")) {
out.collect(word); // 把单词放入到后序的流中
}
}
});
// 泛型擦除
// 泛型只存在于源码中, 编译成字节码之后泛型是不存在, 这就叫泛型擦除. 在java中正常!!!!
// 在flink中如果由于泛型擦除的存在导致flink无法知道流中的数据的类型, 则不允许
SingleOutputStreamOperator<Tuple2<String, Long>> map = wordStream
.map((MapFunction<String, Tuple2<String, Long>>) word -> Tuple2.of(word, 1L))
// 其他需要用到就要-->明确指定类型
.returns(Types.TUPLE(Types.STRING,Types.LONG))
;
// 返回每个元素的key
KeyedStream<Tuple2<String, Long>, String> tuple2StringKeyedStream =
// 只在这个方法内使用lambda表达式,因为泛型擦除的在flink中,但是在keyby不会数据转换成object
map.keyBy((KeySelector<Tuple2<String, Long>, String>) value -> value.f0);
SingleOutputStreamOperator<Tuple2<String, Long>> sumStream = tuple2StringKeyedStream.sum(1);
// 4. 把流输出(sink)
sumStream.print();
// 5. 执行 执行环境
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
第3章 Flink部署
master 在flink中叫 JobManager
worker 在flink中叫 TaskManager
3.1 开发模式
咱们前面在idea中运行Flink程序的方式就是开发模式.
3.2 local-cluster模式
Flink中的Local-cluster(本地集群)模式,主要用于测试, 学习.
3.2.1 local-cluster模式配置
local-cluster模式基本属于零配置.
1) 上传Flink的安装包flink-1.13.1-bin-scala_2.12.tgz到hadoop162
2) 解压
3) 进入目录/opt/module, 复制flink-local
cp -r flink-1.13.1 flink-local
3.2.2 在local-cluster模式下运行无界的WordCount
1) 打包idea中的应用
2) 把不带依赖的jar包上传到目录/opt/module/flink-local下
3) 启动本地集群
bin/start-cluster.sh
4) 在hadoop162中启动netcat,相当于一个服务端,无界流的数据过来
nc -lk 9999
注意: 如果没有安装netcat需要先安装:
sudo yum install -y nc
5) 命令行提交Flink应用
bin/flink run -m hadoop162:8081 -c com.atguigu.flink.java.chapter_2.Flink03_WC_UnBoundedStream
./flink-prepare-1.0-SNAPSHOT.jar
6) 在浏览器中查看应用执行情况
http://hadoop162:8081
7) 也可以在WEB UI提交应用
3.3 Standalone模式
Standalone模式又叫独立集群模式.
# 3.3.1 Standalone模式配置
1.复制flink-standalone
cp -r flink-1.13.1 flink-standalone
2.修改配置文件:flink-conf.yaml
jobmanager.rpc.address: hadoop162
3.修改配置文件:workers
hadoop163
hadoop164
4.分发flink-standalone到其他节点
# 3.3.2 Standalone模式运行无界流WorkCount
1.启动standalone集群 先起集群, 多个job共享这个集群
bin/start-cluster.sh
2.命令行提交Flink应用 bin/flink run -d -m hadoop162:8081 -c 主类 ...jar
bin/flink run -m hadoop162:8081 -c com.atguigu.flink.java.chapter_2.Flink03_WC_UnBoundedStream
./flink-prepare-1.0-SNAPSHOT.jar
3.查看执行情况与本地集群一致.
4.也支持Web UI界面提交Flink应用
# 3.3.3Standalone高可用(HA)
任何时候都有一个 主 JobManager 和多个备用 JobManagers,以便在主节点失败时有备用 JobManagers 来接管集群。这保证了没有单点故障,一旦备 JobManager 接管集群,作业就可以正常运行。主备 JobManager 实例之间没有明显的区别。每个 JobManager 都可以充当主备节点。
1:修改配置文件: flink-conf.yaml
high-availability: zookeeper
high-availability.storageDir: hdfs://hadoop162:8020/flink/standalone/ha
high-availability.zookeeper.quorum: hadoop162:2181,hadoop163:2181,hadoop164:2181
high-availability.zookeeper.path.root: /flink-standalone
high-availability.cluster-id: /cluster_atguigu
2:修改配置文件: masters
hadoop162:8081
hadoop163:8081
3:分发修改的后配置文件到其他节点
4:在/etc/profile.d/my.sh中配置环境变量
Flink没有依赖这个环境,所以需要配置一个路径才可以找到hadoop命令,但是spark有
export HADOOP_CLASSPATH=`hadoop classpath`
分发到其他节点
5:首先启动dfs集群和zookeeper集群
6:启动standalone HA集群
bin/start-cluster.sh
7:可以分别访问
http://hadoop162:8081
http://hadoop163:8081
8:可以在zookeeper上面看
rest_server_lock这上面就可以看到
9:杀死hadoop162上的Jobmanager, 再看leader
3.4 Yarn模式
1) 独立部署(Standalone)模式由Flink自身提供计算资源,无需其他框架提供资源,这种方式降低了和其他第三方资源框架的耦合性,独立性非常强。但是你也要记住,Flink主要是计算框架,而不是资源调度框架,所以本身提供的资源调度并不是它的强项,所以还是和其他专业的资源调度框架集成更靠谱,所以接下来我们来学习在强大的Yarn环境中Flink是如何使用的。(其实是因为在国内工作中,Yarn使用的非常多)
2) 把Flink应用提交给Yarn的ResourceManager, Yarn的ResourceManager会申请容器从Yarn的NodeManager上面. Flink会创建JobManager和TaskManager在这些容器上.Flink会根据运行在JobManger上的job的需要的slot的数量动态的分配TaskManager资源
3.4.1 Yarn模式配置
1 复制flink-yarn
cp -r flink-1.13.6 flink-yarn
2 配置环境变量HADOOP_CLASSPATH, 如果前面已经配置可以忽略.
在/etc/profile.d/my.sh中配置
export HADOOP_CLASSPATH=`hadoop classpath`
Flink没有依赖这个环境,所以需要配置一个路径才可以找到hadoop命令,但是spark有
3.4.2 Flink on Yarn的3种部署模式
1: Session-Cluster模式执行无界流WordCount
1: Session-Cluster模式需要先启动Flink集群,向Yarn申请资源。以后提交任务都向这里提交。这个Flink集群会常驻在yarn集群中,除非手工停止。
2: 在向Flink集群提交Job的时候, 如果资源被用完了,则新的Job不能正常提交.
3: 缺点: 如果提交的作业中有长时间执行的大作业, 占用了该Flink集群的所有资源, 则后续无法提交新的job.
所以, Session-Cluster适合那些需要频繁提交的多个小Job, 并且执行时间都不长的Job.
4: 所有的job都提交到这个集群上 main函数在客户端执行
1) 启动一个Flink-Session 这是这种模式的玩法
bin/yarn-session.sh -d
2) 在Session上运行Job,会自动去找你启动的程序 -d 看不到日志,进程在后台
bin/flink run -d -c 主类 jar包
bin/flink run -c com.atguigu.flink.java.chapter_2.Flink03_WC_UnBoundedStream ./flink-prepare-1.0-SNAPSHOT.jar
会自动找到你的yarn-session启动的Flink集群.也可以手动指定你的yarn-session集群:
我不想手动指定
2: Per-Job-Cluster模式执行无界流WordCount
1: 一个Job会对应一个Flink集群,每提交一个作业会根据自身的情况,都会单独向yarn申请资源,直到作业执行完成,一个作业的失败与否并不会影响下一个作业的正常提交和运行。独享Dispatcher和ResourceManager,按需接受资源申请;适合规模大长时间运行的作业。
2: 每次提交都会创建一个新的flink集群,任务之间互相独立,互不影响,方便管理。任务执行完成之后创建的集群也会消失。在客户端执行
3: 每启动一个job启动一个集群, main函数在客户端执行,会话随着job的创建和消失
4:启动下面的命令会直接创建一个yarn会话 bin/flink run -d -t yarn-per-job -c 主类 jar包
bin/flink run -d -t yarn-per-job -c com.atguigu.flink.java.chapter_2.Flink03_WC_UnBoundedStream
./flink-prepare-1.0-SNAPSHOT.jar
3: Application Mode模式执行无界流WordCount
1: Application Mode会在Yarn上启动集群, 应用jar包的main函数(用户类的main函数)将会在JobManager上执行. 只要应用程序执行结束, Flink集群会马上被关闭. 也可以手动停止集群.
2: 与Per-Job-Cluster的区别: 就是Application Mode下, 用户的main函数是在集群中(job manager)执行的
3: 官方建议:
出于生产的需求, 我们建议使用Per-job or Application Mode,因为他们给应用提供了更好的隔离!
4: 每启动一个job启动一个集群 main函数在JobManager执行
5: bin/flink run-application -d -t yarn-application -c 主类 jar包
6: 开始操作
bin/flink run-application -t yarn-application -c com.atguigu.flink.java.chapter_2.Flink03_WC_UnBoundedStream
./flink-prepare-1.0-SNAPSHOT.jar
3.4.3 Yarn模式高可用
1) Yarn模式的高可用和Standalone模式的高可用原理不一样.
2) Standalone模式中, 同时启动多个Jobmanager, 一个为leader其他为standby, 当leader挂了, 其他的才会有一个成为leader.
3) yarn的高可用是同时只启动一个Jobmanager, 当这个Jobmanager挂了之后, yarn会再次启动一个, 其实是利用的yarn的重试次数来实现的高可用.
4) 在yarn-site.xml中配置 注意: 配置完不要忘记分发, 和重启yarn
<property>
<name>yarn.resourcemanager.am.max-attempts</name>
<value>4</value>
<description>
The maximum number of application master execution attempts.
</description>
</property>
5) 在flink-conf.yaml中配置
yarn.application-attempts: 3
high-availability: zookeeper
high-availability.storageDir: hdfs://hadoop162:8020/flink/yarn/ha
high-availability.zookeeper.quorum: hadoop162:2181,hadoop163:2181,hadoop164:2181
high-availability.zookeeper.path.root: /flink-yarn
6) 启动yarn-session
7) 杀死Jobmanager, 查看他的复活情况
注意: yarn-site.xml中是它活的次数的上限, flink-conf.xml中的次数应该小于这个值.
注意: 在10秒之内重启三次要是没有完成,还是杀不死yarn-session,这是flink默认设置这个,怕一个程序执行时间需要3天,中断了,但是因为你设置的这个程序不能重启了,所以设置了10秒内完成你设置的这个
3.5 K8S & Mesos模式
容器化部署时目前业界很流行的一项技术,基于Docker镜像运行能够让用户更加方便地对应用进行管理和运维。容器管理工具中最为流行的就是Kubernetes(k8s),而Flink也在最近的版本中支持了k8s部署模式。这里我们也不做过多的讲解.
Mesos是Apache下的开源分布式资源管理框架,就和yarn一样的,它被称为是分布式系统的内核,在Twitter得到广泛使用,管理着Twitter超过30,0000台服务器上的应用部署,但是在国内,依然使用着传统的Hadoop大数据框架,所以国内使用mesos框架的并不多,这里我们就不做过多讲解了。
第4章Flink运行架构
4.1运行架构
1) Flink运行时包含2种进程:1个JobManager和至少1个 TaskManager
2) TaskManager 是一个进程 --> 有多个 slots (线程,封装cpu和内存) --> 1个slots有多个task
4.1.1客户端
严格上说, 客户端不是运行和程序执行的一部分, 而是用于准备和发送dataflow到JobManager. 然后客户端可以断开与JobManager的连接(detached mode), 也可以继续保持与JobManager的连接(attached mode)
客户端作为触发执行的java或者scala代码的一部分运行, 也可以在命令行运行:bin/flink run ...
4.1.2 JobManager
1) 控制一个应用程序执行的主进程,也就是说,每个应用程序都会被一个的JobManager所控制执行。 JobManager会先接收到要执行的应用程序,这个应用程序会包括:作业图(JobGraph)、逻辑数据流图(logical dataflow graph)和打包了所有的类、库和其它资源的JAR包。 2) JobManager会把JobGraph转换成一个物理层面的数据流图,这个图被叫做“执行图”(ExecutionGraph),包含了所有可以并发执行的任务。JobManager会向资源管理器(ResourceManager)请求执行任务必要的资源,也就是任务管理器(TaskManager)上的插槽(slot)。一旦它获取到了足够的资源,就会将执行图分发到真正运行它们的TaskManager上。 3) 而在运行过程中,JobManager会负责所有需要中央协调的操作,比如说检查点(checkpoints)的协调。 这个进程包含3个不同的组件
4.1.2.1 ResourceManager
1) 负责资源的管理,在整个 Flink 集群中只有一个 ResourceManager. 注意这个ResourceManager不是Yarn中的ResourceManager, 是Flink中内置的, 只是赶巧重名了而已.
2) 主要负责管理任务管理器(TaskManager)的插槽(slot),TaskManger插槽是Flink中定义的处理资源单元。
3) 当JobManager申请插槽资源时,ResourceManager会将有空闲插槽的TaskManager分配给JobManager。如果ResourceManager没有足够的插槽来满足JobManager的请求,它还可以向资源提供平台发起会话,以提供启动TaskManager进程的容器。另外,ResourceManager还负责终止空闲的TaskManager,释放计算资源。
4.1.2.2 Dispatcher
负责接收用户提供的作业,并且负责为这个新提交的作业启动一个新的JobMaster 组件. Dispatcher也会启动一个Web UI,用来方便地展示和监控作业执行的信息。Dispatcher在架构中可能并不是必需的,这取决于应用提交运行的方式。
4.1.2.3 JobMaster
JobMaster负责管理单个JobGraph的执行.多个Job可以同时运行在一个Flink集群中, 每个Job都有一个自己的JobMaster.
4.1.3 TaskManager
1) Flink中的工作进程。通常在Flink中会有多个TaskManager运行,每一个TaskManager都包含了一定数量的插槽(slots)。插槽的数量限制了TaskManager能够执行的任务数量。
2) 启动之后,TaskManager会向资源管理器注册它的插槽;收到资源管理器的指令后,TaskManager就会将一个或者多个插槽提供给JobManager调用。JobManager就可以向插槽分配任务(tasks)来执行了。
3) 在执行过程中,一个TaskManager可以跟其它运行同一应用程序的TaskManager交换数据。
4.2 核心概念
4.2.1 TaskManager与Slots
1) Flink中每一个worker(TaskManager)都是一个JVM进程,它可能会在独立的线程上执行一个Task。为了控制一个worker能接收多少个task,worker通过Task Slot来进行控制(一个worker至少有一个Task Slot)。
2) 这里的Slot如何来理解呢?很多的文章中经常会和Spark框架进行类比,将Slot类比为Core,其实简单这么类比是可以的,可实际上,可以考虑下,当Spark申请资源后,这个Core执行任务时有可能是空闲的,但是这个时候Spark并不能将这个空闲下来的Core共享给其他Job使用,所以这里的Core是Job内部共享使用的。接下来我们再回想一下,之前在Yarn Session-Cluster模式时,其实是可以并行执行多个Job的,那如果申请两个Slot,而执行Job时,只用到了一个,剩下的一个怎么办?那我们自认而然就会想到可以将这个Slot给并行的其他Job,对吗?所以Flink中的Slot和Spark中的Core还是有很大区别的。
3) 每个task slot表示TaskManager拥有资源的一个固定大小的子集。假如一个TaskManager有三个slot,那么它会将其管理的内存分成三份给各个slot。资源slot化意味着一个task将不需要跟来自其他job的task竞争被管理的内存,取而代之的是它将拥有一定数量的内存储备。需要注意的是,这里不会涉及到CPU的隔离,slot目前仅仅用来隔离task的受管理的内存。
4.2.2 Parallelism(并行度)
一个特定算子的子任务(subtask)的个数被称之为这个算子的并行度(parallelism),一般情况下,一个流程序的并行度,可以认为就是其所有算子中最大的并行度。一个程序中,不同的算子可能具有不同的并行度。
Stream在算子之间传输数据的形式可以是one-to-one(forwarding)的模式也可以是redistributing的模式,具体是哪一种形式,取决于算子的种类。
1) One-to-one: 1对1的关系 ,且并行度相同 stream(比如在source和map operator之间)维护着分区以及元素的顺序。那意味着flatmap 算子的子任务看到的元素的个数以及顺序跟source 算子的子任务生产的元素的个数、顺序相同,map、fliter、flatMap等算子都是one-to-one的对应关系。类似于spark中的窄依赖 2) Redistributing:分区数据发生了改变, stream(map()跟keyBy/window之间或者keyBy/window跟sink之间)的分区会发生改变。每一个算子的子任务依据所选择的transformation发送数据到不同的目标任务。例如,keyBy()基于hashCode重分区、broadcast和rebalance会随机重新分区,这些算子都会引起redistribute过程,而redistribute过程就类似于Spark中的shuffle过程。类似于spark中的宽依赖
4.2.3 Task与SubTask
一个算子就是一个Task. 一个算子的并行度是几, 这个Task就有几个SubTask
4.2.4 Operator Chains(优化)
1: 操作链的优化是自动实现:
算子与算子之间: one-to-one
算子的并行度一样
-- 1: .startNewChain()
开启一个新链: 当前算子不会和前面的优化在一起
如果后面的算子满足优化条件, 也会与当前算子优化到一起
-- 2: .disableChaining()
当前算子不参与任何链的优化
--3: env.disableOperatorChaining()
当前应用所有算子都不进行优化
实际生产环境下, 尽量不要禁止优化: 优化只有好处没有坏处
2: 在flink中, 有4种办法设置并行度
-- 1. 在flink-conf.yaml 配置文件中设置算子的默认并行度
parallelism.default: 1
-- 2. 提交job的是时候通过参数设置
bin/flink run -d -p 2 ...
-- 3. 在代码中通过环境设置
env.setParallelism(1);
注意: 在flink中socket这个source他的并行度只能是1.
-- 4. 单独给每个算子设置并行度
.setParallelism(3)
演示代码:
public class Flink02_WC_UnBounded {
public static void main(String[] args) {
System.out.println("Flink02_WC_UnBounded.main");
// 1. 创建流的执行环境 自动根据当前运行的环境获取合适的执行环境
Configuration conf = new Configuration();
conf.setInteger("rest.port", 2000);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(1);
//env.disableOperatorChaining(); 当前应用所有算子都不进行优化
// 2. 通过环境从数据源(source)获取一个流
DataStreamSource<String> source = env.socketTextStream("hadoop162", 9999);
// 3. 对流做各种转换
SingleOutputStreamOperator<String> wordStream = source
.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> out) throws Exception {
for (String word : line.split(" ")) {
out.collect(word); // 把单词放入到后序的流中
}
}
});
SingleOutputStreamOperator<Tuple2<String, Long>> wordOneStream = wordStream
.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String word) throws Exception {
return Tuple2.of(word, 1L);
}
})
//.startNewChain()
// .disableChaining()
.filter(t -> true); // 开一起一个新链
KeyedStream<Tuple2<String, Long>, String> keyedStream = wordOneStream.keyBy(new KeySelector<Tuple2<String, Long>, String>() {
// 返回每个元素的key
@Override
public String getKey(Tuple2<String, Long> t) throws Exception {
return t.f0;
}
});
// 对元组中某个位置进行聚合
SingleOutputStreamOperator<Tuple2<String, Long>> resultStream = keyedStream.sum(1);
// 4. 把流输出(sink)
resultStream.print();
// 5. 执行 执行环境
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
4.2.5 ExecutionGraph(执行图)
由Flink程序直接映射成的数据流图是StreamGraph,也被称为"逻辑流图",因为它们表示的是计算逻辑的高级视图。为了执行一个流处理程序,Flink需要将逻辑流图转换为物理数据流图(也叫执行图),详细说明程序的执行方式。
Flink 中的执行图可以分成四层:StreamGraph -> JobGraph -> ExecutionGraph -> Physical Graph。
-- 1: StreamGraph:
是根据用户通过 Stream API 编写的代码生成的最初的图。用来表示程序的拓扑结构。
-- 2: JobGraph:
StreamGraph经过优化后生成了 JobGraph,是提交给 JobManager 的数据结构。主要的优化为: 将多个符合条件的节点 chain 在一起作为一个节点,这样可以减少数据在节点之间流动所需要的序列化/反序列化/传输消耗。
-- 3: ExecutionGraph:
JobManager 根据 JobGraph 生成ExecutionGraph。ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构。
--4: Physical Graph:
JobManager 根据 ExecutionGraph 对 Job 进行调度后,在各个TaskManager 上部署 Task 后形成的“图”,并不是一个具体的数据结构。
2个并发度(Source为1个并发度)的 SocketTextStreamWordCount 四层执行图的演变过程
4.3 提交流程
4.3.1 通用提交流程
4.3.2 yarn-cluster提交流程per-job
1.Flink任务提交后,Client向HDFS上传Flink的Jar包和配置
2.向Yarn ResourceManager提交任务,ResourceManager分配Container资源
3.通知对应的NodeManager启动ApplicationMaster,ApplicationMaster启动后加载Flink的Jar包和配置构建环境,然后启动JobManager
4.ApplicationMaster向ResourceManager申请资源启动TaskManager
5.ResourceManager分配Container资源后,由ApplicationMaster通知资源所在节点的NodeManager启动TaskManager
6.NodeManager加载Flink的Jar包和配置构建环境并启动TaskManager
7.TaskManager启动后向JobManager发送心跳包,并等待JobManager向其分配任务。
第5章 Flink流处理核心编程
5.1 Environment
Flink Job在提交执行计算时,需要首先建立和Flink框架之间的联系,也就指的是当前的flink运行环境,只有获取了环境信息,才能将task调度到不同的taskManager执行。而这个环境对象的获取方式相对比较简单
// 批处理环境
ExecutionEnvironment benv = ExecutionEnvironment.getExecutionEnvironment();
// 流式数据处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
5.2 Source
5.2.1 准备工作
1.导入注解工具依赖, 方便生产POJO类
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.16</version>
<scope>provided</scope>
</dependency>
2.准备一个WaterSensor类方便演示
/**
* id:传感器编号
* ts:时间戳
* vc:水位
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class WaterSensor {
private String id;
private Long ts;
private Integer vc;
}
5.2.2 从文件读取数据
// 1:从HDFS idea读取文件
DataStreamSource<String> source=env.readTextFile("hdfs://hadoop162:8020/words.txt");
DataStreamSource<String> source=env.readTextFile("input/out.txt");
5.2.3 从Java的集合中读取数据
List<String> list = Arrays.asList("a", "aa", "b", "c", "d");
DataStreamSource<String> source = env.fromCollection(list);
DataStreamSource<String> source = env.fromElements("a", "b", "c", "d");
5.2.4 从Socket读取数据
DataStreamSource<String> source = env.socketTextStream("hadoop162", 9999);
5.2.5 从Kafka读取数据
1) 依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.13.1</version>
</dependency>
2: hadoop的支持依赖,flink没有自带
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
<scope>provided</scope>
</dependency>
2) 代码实现
第一种从kafka消费方式
第二种从kafka消费方式
5.2.6 自定义Source
public class Source_Custom {
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.setInteger("rest.port", 2000);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(1);
env.addSource(new MySocket()).print();
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
public static class MySocket implements SourceFunction<WaterSensor> {
// 自定义一个可以从socket读取数据的source
boolean isStop = false;
@Override
public void run(SourceContext<WaterSensor> ctx) throws Exception {
Socket socket = new Socket("hadoop162", 9999);
//字节流转换为字符流InputStreamReader,在用buffer封装BufferedReade
InputStream is = socket.getInputStream();
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8));
String line = bufferedReader.readLine();
while (!isStop && line != null) {
//对读到一行数据做处理
String[] data = line.split(",");
//把数据放入流中
ctx.collect(new WaterSensor(data[0],
Long.valueOf(data[1]),
Integer.valueOf(data[2])
)
);
line = bufferedReader.readLine();
}
}
// 取消: 关闭source
// 这个方法不会自动调用, 如果有需要的时候, 外部可以通过调用这个方法实现关闭source
@Override
public void cancel() {
}
}
}
5.3 Transform
5.3.1 map和 Rich Map
每个算子都有new Rich的 function, 减少和数据库的连接
public class Map_01 {
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.setInteger("rest.port", 2000);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(3);
//1: 转换算子 map: 一对一的关系
DataStreamSource<Integer> source = env.fromElements(1, 6, 9, 20, 10, 2);
source.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer value) throws Exception {
return value ;
}
});//.print();
//1.2 Map_Rich
/*
如果一个元素建立一个连接, 连接数过多, 效率太低
flink, 应该是一个并行度建立一个到数据库的连接
连接建立的时机: 应该所有的数据道来之前建立
连接关闭: 等到所有的数据都处理完了, 或者程序停止之前关闭
*/
// 需要查询数据库才能确定如何对value做什么样的操作
env.fromElements(1, 6, 9, 20, 10, 2)
.map(new RichMapFunction<Integer, Integer>() {
@Override
public void open(Configuration parameters) throws Exception {
//程序启动的时候调用. 这个方法, 是每个并行度执行一次, 一般在这里执行创建连接, 开辟资源
System.out.println("Flink02_Map_Rich.open");
}
@Override
public void close() throws Exception {
// 程序终止的时候的调用. 每个并行度执行一次, 关闭连接, 是否资源
System.out.println("Flink02_Map_Rich.close");
}
@Override
public Integer map(Integer value) throws Exception {
// System.out.println("map" + value);
// 建立到数据库的连接
// 查询
// 得到结果
// 关闭数据库的连接
return value * value;
}
})
.print();
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
5.3.2 filter
5.3.3 flatMap
5.3.4 keyBy
5.3.5 对流重新分区的几个算子
keyby --> shuffle --> rebalance --> rescale
5.3.6 split和select
被移除了
5.3.7 connect 和 union
5.3.8 sum简单聚合算子
public class Sum {
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.setInteger("rest.port", 2000);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(1);
//10:sum: 统计每个传感器的水位和,一定要先keyBy
env.readTextFile("input/sensor.txt")
.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String value) throws Exception {
String[] data = value.split(",");
WaterSensor waterSensor = new WaterSensor(data[0], Long.valueOf(data[1]), Integer.valueOf(data[2])
);
return waterSensor;
}
})
.keyBy(new KeySelector<WaterSensor, String>() {
@Override
public String getKey(WaterSensor value) throws Exception {
return value.getId();
}
})
.sum("vc");
//.print();
/*
11: max:分组聚合后, 其他字段默认情况是取的是这个组内第一个元素的字段值
min:
maxBy:其他字段是否取第一个? false代表不取第一个,取最大最小值同一行的一个
minBy:
以上4个聚合+sum聚合: 1)只能对其中的一个字段聚合 2)只能聚合数字类型
*/
env.readTextFile("input/sensor.txt")
.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String value) throws Exception {
String[] data = value.split(",");
WaterSensor waterSensor = new WaterSensor(data[0], Long.valueOf(data[1]), Integer.valueOf(data[2])
);
return waterSensor;
}
})
.keyBy(new KeySelector<WaterSensor, String>() {
@Override
public String getKey(WaterSensor value) throws Exception {
return value.getId();
}
})
//.max("vc")
//.min("vc")
.minBy("vc",false) //false代表不取第一个,取最大最小同一行的一个
.print();
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
5.3.9 reduce
==reduce和状态的reduce是不一样的,一定要区分==
public class Reduce {
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.setInteger("rest.port", 2000);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(2);
env
.readTextFile("input/sensor.txt")
.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String value) throws Exception {
String[] data = value.split(",");
WaterSensor waterSensor = new WaterSensor(data[0], Long.valueOf(data[1]), Integer.valueOf(data[2])
);
return waterSensor;
}
})
.keyBy(WaterSensor::getId)
.reduce(new ReduceFunction<WaterSensor>() {
/*12: reduce:怎么聚合? reduce还是不如上面几个简单
1) 聚合后的类型和聚合前的类型一致
2) 可以同时聚合对个字段, 要聚合的字段类型无所谓
参数1: 上次聚合的结果 参数2: 这次需要聚合的元素
当某个key第一个数据进来是, 这个方法根本就不执行, 直接输出结果
*/
@Override
public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
// ts是第一个
//System.out.println(value1.getId() + " ");
//value1.setVc(value1.getVc() + value2.getVc());
//ts是计算到自己的那个
//value2.setVc(value1.getVc() + value2.getVc());
//return value2;
//求最大值最小值也可以
value1.setVc(Math.max(value1.getVc(),value2.getVc()));
return value1;
}
})
.print();
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
5.3.10 process
public class Process {
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.setInteger("rest.port", 2000);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(1);
//13.1: Process(没有key的计算) 统计每个传感器的水位和 ,没有keyBy的功能
env
.readTextFile("input/sensor.txt")
.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String value) throws Exception {
String[] data = value.split(",");
WaterSensor waterSensor = new WaterSensor(data[0],
Long.valueOf(data[1]),
Integer.valueOf(data[2]));
return waterSensor;
}
})
.process(new ProcessFunction<WaterSensor, Integer>() {
int sum = 0;
// 处理流中的数据
// 参数1: 就是要处理的元素
// 参数2: 上下文对象
// 参数3: 输出数据
@Override
public void processElement(WaterSensor value,
Context ctx,
Collector<Integer> out) throws Exception {
sum += value.getVc();
out.collect(sum);
}
});
//.print();
//13.2: Process(有key的计算) 统计每个传感器的水位和 ,没有keyBy的功能
env
.readTextFile("input/sensor.txt")
.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String value) throws Exception {
String[] data = value.split(",");
WaterSensor waterSensor = new WaterSensor(data[0],
Long.valueOf(data[1]),
Integer.valueOf(data[2]));
return waterSensor;
}
})
.keyBy(WaterSensor::getId).print();
// .process(new KeyedProcessFunction<String, WaterSensor, String>() {
//
// int sum = 0; // 不分key, 和并行度有关
// @Override
// public void processElement(WaterSensor value,
// Context ctx,
// Collector<String> out) throws Exception {
//
// sum += value.getVc();
// out.collect(ctx.getCurrentKey() + "的水位是" +sum);
//
// }
// });
//.print();
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
5.4 Sink
5.4.1 KafkaSink
1) 添加Kafka Connector依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.13.1</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
2) sink to kafka 官方和定义两种
public class SingToKafka {
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.setInteger("rest.port", 2000);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(1);
Properties props = new Properties();
props.setProperty("bootstrap.servers", "hadoop162:9092");
env
.readTextFile("input/words.txt")
.flatMap(new FlatMapFunction<String, Tuple2<String,Long>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Long>> out) throws Exception {
for (String word : value.split(" ")) {
out.collect(Tuple2.of(word,1L));
}
}
})
//分组好像什么现象不改变
.keyBy(new KeySelector<Tuple2<String, Long>, String>() {
@Override
public String getKey(Tuple2<String, Long> value) throws Exception {
return value.f0;
}
})
.sum(1)
// .map(new MapFunction<Tuple2<String, Long>, String>() {
// @Override
// public String map(Tuple2<String, Long> value) throws Exception {
// return value.f0 + "_" + value.f1;
// }
// })
/* 1: 利用配置把数据写到kafka
参数1: 写出的topi
参数2: 序列化器
参数2: kafka的配置信息
*/
//.addSink(new FlinkKafkaProducer<String>("s1",new SimpleStringSchema(),props));
// 2:自定义sink to kafka,自己自定义一个生产者
.addSink(new FlinkKafkaProducer<Tuple2<String, Long>>(
"defalut", // 这个topic没用
new KafkaSerializationSchema<Tuple2<String, Long>>() {
@Override
public ProducerRecord<byte[], byte[]> serialize(Tuple2<String, Long> element,
@Nullable Long timestamp) {
String msg = element.f0 + "_" + element.f1;
return new ProducerRecord<>("s1", msg.getBytes(StandardCharsets.UTF_8));
}
},
props,
AT_LEAST_ONCE // 写出的一致性语义: 目前只能选至少一次
));
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
5.4.2 RedisSink
1) 添加Redis Connector依赖
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-redis -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.1.5</version>
</dependency>
2) 代码实现,看官方文档
/*
redis的数据结构: 你可以多种演示
1) string
2) list
key value
sensor_1 [json格式字符串, ...]
3) set
4) hash
key field value
"s" "sensor_1" json格式字符串
5) zset
*/
public class SinkToRedis {
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.setInteger("rest.port", 2000);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(1);
//FlinkJedisPoolConfig redisConf = new FlinkJedisPoolConfig.Builder()
FlinkJedisPoolConfig redisConf = new FlinkJedisPoolConfig.Builder()
.setHost("hadoop162")
.setPort(6379)
// .setDatabase(1) // 选择数据库
.setMaxTotal(100)// 连接提供的最大连接数
.setMaxIdle(10) // 最大空闲连接数
.setMinIdle(2) // 最小空闲连接数
.build();//设置参数最后返回这些值了
env
.readTextFile("input/sensor.txt")
.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String value) throws Exception {
String[] split = value.split(",");
return new WaterSensor(split[0],
Long.valueOf(split[1]),
Integer.valueOf(split[2]));
}
})
.keyBy(WaterSensor::getId)
.sum("vc")
.addSink(new RedisSink<>(redisConf, new RedisMapper<WaterSensor>() {
// 返回一个命令描述符 字符串: set list: rpush lpush ...
@Override
public RedisCommandDescription getCommandDescription() {
// 参数1: 只能命令(确定要写的数据类型)
// 参数2: 额外的key, 当写出的的类型是 hash和zset的时候有效(他们会包几个)
return new RedisCommandDescription(RedisCommand.SET,null);
}
// 从数据中返回key. 返回的这条数据写入的时候的key
@Override
public String getKeyFromData(WaterSensor data) {
// 用sensor id 作为每条数据的key
return data.getId();
}
// 返回写入的具体数据
@Override
public String getValueFromData(WaterSensor data) {
return JSON.toJSONString(data);
}
}));
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
5.4.3 ElasticsearchSink
1) 添加Elasticsearch Connector依赖
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-elasticsearch6 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.12</artifactId>
<version>1.13.1</version>
</dependency>
2) 代码实现 idea有
5.4.4 自定义Sink to mysql
1) 在mysql中创建数据库和表
create database test;
use test;
CREATE TABLE `sensor` (
`id` varchar(20) NOT NULL,
`ts` bigint(20) NOT NULL,
`vc` int(11) NOT NULL,
PRIMARY KEY (`id`,`ts`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
2) 导入Mysql驱动
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
</dependency>
3) 写到Mysql的自定义Sink示例代码
public class Flink05_Sink_Custom {
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.setInteger("rest.port", 2000);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(1);
env
.socketTextStream("hadoop162", 9999)
.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String value) throws Exception {
String[] data = value.split(",");
return new WaterSensor(
data[0],
Long.valueOf(data[1]),
Integer.valueOf(data[2])
);
}
})
.keyBy(WaterSensor::getId) // 方法引用
.sum("vc")
.addSink(new MySqlSink());
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
public static class MySqlSink extends RichSinkFunction<WaterSensor>{
private Connection connection;
@Override
public void open(Configuration parameters) throws Exception {
// 获取mysql 连接
// 1. 加载驱动
Class.forName("com.mysql.jdbc.Driver");
// 2. 获取连接 提升成员变量快捷键: alt+ctrl+f
connection = DriverManager.getConnection("jdbc:mysql://hadoop162:3306/test?useSSL=false", "root", "aaaaaa");
}
@Override
public void close() throws Exception {
if (connection != null) {
connection.close();
}
}
@Override
public void invoke(WaterSensor ws, Context context) throws Exception {
// 插入数据
// sql语句
// String sql = "insert into sensor(id, ts, vc)values(?,?,?)";
// 如果主键不重复就新增, 重复就更新
// String sql = "insert into sensor(id, ts, vc)values(?,?,?) on duplicate key update vc=?";
String sql = "replace into sensor(id, ts, vc)values(?,?,?)"; // 等价上面这个
PreparedStatement ps = connection.prepareStatement(sql);
// 给sql中的占位符赋值
ps.setString(1, ws.getId());
ps.setLong(2, ws.getTs());
ps.setInt(3, ws.getVc());
// ps.setInt(4, ws.getVc());
ps.execute();
connection.commit(); // 提交执行. 当自动提交是true的时候,不能调用这个方法. mysql的自动提交就是true
ps.close();
}
}
}
5.5 流和批的执行模式
-- 真正流批一体 流模式: 有界流 无界流 1) 有界流自动选择批(我刚刚测试的就自动选择了批) 2) 无界流不能设置批模式 3) 批模式不能设置时间 -- 1:执行模式有3个选择可配: 1) STREAMING(默认) 2) BATCH 3) AUTOMATIC 自动;如果数据离线数据, 则自动选择批处理, 如果是流数据, 则自动选流处理 -- 2: 配置 1) 通过命令行配置 bin/flink run -Dexecution.runtime-mode=BATCH ... 2) 通过代码配置 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.BATCH); 建议: 不要在运行时配置(代码中配置), 而是使用命令行配置, 因为这样会更灵活: 同一个应用即可以用于无界数据也可以用于有界数据 -- 3:模式 1) 流式模式 // 默认流式模式, 可以不用配置 env.setRuntimeMode(RuntimeExecutionMode.STREAMING); 2) 批处理模式 env.setRuntimeMode(RuntimeExecutionMode.BATCH); 3) 自动模式 env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); -- 4: 注意 1.12.0版批模式下有个bug, 如果单词只有1个, 则不会输出. 1.13开始, 这个bug已经解决了
第6章 Flink的Transform算子的运用
6.1 基于埋点日志网络流量统计
6.1.1 pv的计算
衡量网站流量一个最简单的指标,就是网站的页面浏览量(Page View,PV)。用户每次打开一个页面便记录1次PV,多次打开同一页面则浏览量累计。 一般来说,PV与来访者的数量成正比,但是PV并不直接决定页面的真实来访者数量,如同一个来访者通过不断的刷新页面,也可以制造出非常高的PV。接下来我们就用咱们之前学习的Flink算子来实现PV的统计
1) 封装一个javaBean 这里面会有getset方法 还有构造器 方法
@Data @NoArgsConstructor @AllArgsConstructor public class UserBehavior { private Long userId; private Long itemId; private Integer categoryId; private String behavior; private Long timestamp; }
2) pv实现思路1: WordCount
public class F01_Project_PV { public static void main(String[] args) { Configuration conf = new Configuration(); conf.setInteger("rest.port", 2000); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1); env .readTextFile("input/UserBehavior.csv") .map(new MapFunction<String, UserBehavior>() { @Override public UserBehavior map(String value) throws Exception { String[] data = value.split(","); return new UserBehavior(Long.valueOf(data[0]), Long.valueOf(data[1]), Integer.valueOf(data[2]), data[3], Long.valueOf(data[4])); } }) .filter(value -> "pv".equals(value.getBehavior())) //先过完所有的在封装成元组计算wordcount .map(new MapFunction<UserBehavior, Tuple2<String,Long>>() { @Override public Tuple2<String, Long> map(UserBehavior value) throws Exception { return Tuple2.of("pv",1L); } }) //使用sum之前一定要聚合 .keyBy(new KeySelector<Tuple2<String, Long>, String>() { @Override public String getKey(Tuple2<String, Long> value) throws Exception { return value.f0; } }) .sum(1) .print(); try { env.execute(); } catch (Exception e) { e.printStackTrace(); } } }
3) process实现wordcount
public class F01_Project_PV_2 {
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.setInteger("rest.port", 2000);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(1);
env
.readTextFile("input/UserBehavior.csv")
.map(new MapFunction<String, UserBehavior>() {
@Override
public UserBehavior map(String line) throws Exception {
String[] data = line.split(",");
return new UserBehavior(
Long.valueOf(data[0]),
Long.valueOf(data[1]),
Integer.valueOf(data[2]),
data[3],
Long.valueOf(data[4])
);
}
})
.keyBy(UserBehavior::getBehavior)
.process(new KeyedProcessFunction<String, UserBehavior, String>() {
int sum = 0 ;
@Override
public void processElement(UserBehavior value,
Context ctx,
Collector<String> out) throws Exception {
if ("pv".equals(value.getBehavior())){
sum ++;
out.collect("pv的值: " + sum);
}
}
})
.print();
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
6.1.2 uv的计算
1) 去重客户,对pv的数据做一个去重
public class F02_Project_UV {
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.setInteger("rest.port", 2000);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(1);
env
.readTextFile("input/UserBehavior.csv")
.map(new MapFunction<String, UserBehavior>() {
@Override
public UserBehavior map(String line) throws Exception {
String[] data = line.split(",");
return new UserBehavior(
Long.valueOf(data[0]),
Long.valueOf(data[1]),
Integer.valueOf(data[2]),
data[3],
Long.valueOf(data[4])
);
}
})
.filter( value -> "pv".equals(value.getBehavior()))
.keyBy(UserBehavior::getBehavior)
.process(new KeyedProcessFunction<String, UserBehavior, String>() {
//计算uv的性质,就是计算有不同的客户,客户点击很多次,需要去重, 建议一个set集合就可以解决
HashSet<Long> uidSet = new HashSet<>();
@Override
public void processElement(UserBehavior value,
Context ctx,
Collector<String> out) throws Exception {
// 如果set中添加成功就是新用户, 否则就是旧用户
if (uidSet.add(value.getUserId())){
out.collect("uv的值:" + uidSet.size());
}
}
})
.print();
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
6.2 市场营销商业指标统计分析
1) 粒度是 行为 和数据来源一起 的分析 ,可以操作不同的粒度
public class F03_Project_App { public static void main(String[] args) { Configuration conf = new Configuration(); conf.setInteger("rest.port", 2000); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1); // 统计不同的行为不同渠道的 数量 两个粒度就一起拼接 env .addSource(new MarketSource2()) .map(new MapFunction<MarketingUserBehavior, Tuple2<String,Long>>() { @Override public Tuple2<String, Long> map(MarketingUserBehavior value) throws Exception { return Tuple2.of(value.getBehavior() + "_" +value.getChannel(),1L ); } }) .keyBy(t -> t.f0) .sum(1) .print(); try { env.execute(); } catch (Exception e) { e.printStackTrace(); } } public static class MarketSource2 implements SourceFunction <MarketingUserBehavior> { @Override public void run(SourceContext<MarketingUserBehavior> ctx) throws Exception { Random random = new Random(); // app行为: 下载 安装 卸载 更新 String[] behaviors = {"download", "install", "uninstall", "update"}; String[] channels = {"huawei", "xiaomi", "oppo", "vivo", "appstore", "zhongxing"}; while (true) { Long userId = Math.abs(random.nextLong()) % 10000 + 1; // [1, 10000] String behavior = behaviors[random.nextInt(behaviors.length)]; String channel = channels[random.nextInt(channels.length)]; Long timestamp = System.currentTimeMillis(); MarketingUserBehavior bean = new MarketingUserBehavior( userId, behavior, channel, timestamp ); ctx.collect(bean); Thread.sleep(300); } } @Override public void cancel() { } } }
6.3 各省份页面广告点击量实时统计
1) 封装数据的JavaBean
@Data @AllArgsConstructor @NoArgsConstructor public class AdsClickLog { private Long userId; private Long adId; private String province; private String city; private Long timestamp; }
2) 代码实现 和上年报wordcount是一样的
6.4 订单支付和收款的对账
1) JavaBean类的准备
@Data @AllArgsConstructor @NoArgsConstructor public class OrderEvent { private Long orderId; private String eventType; private String txId; private Long eventTime; } @Data @AllArgsConstructor @NoArgsConstructor public class TxEvent { private String txId; private String payChannel; private Long eventTime;
2) 需求: 来自两条流的订单交易匹配
1条流是订单的信息,里面有支付的信息,订单
另1条流是商户收款的具体到账没,商家那边支付了,我们这边收到款有这个记录才算成功了
考虑: 两条流的数据有先来后来的问题,我么可以用hashMap的get方法判断是否一一对应
public class F04_Project_Order { public static void main(String[] args) { Configuration conf = new Configuration(); conf.setInteger("rest.port", 2000); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1); //需求: 来自两条流的订单交易匹配 //订单数据 (里面有pay的订单) 两者要对账的 交易数据(支付订单id,微信支付宝,时间) SingleOutputStreamOperator<OrderEvent> orderStream = env .readTextFile("input/OrderLog.csv") .map(new MapFunction<String, OrderEvent>() { @Override public OrderEvent map(String value) throws Exception { String[] data = value.split(","); return new OrderEvent(Long.valueOf(data[0]), data[1], data[2], Long.valueOf(data[3])); } }) .filter(e -> "pay".equals(e.getEventType())); SingleOutputStreamOperator<TxEvent> payStream = env .readTextFile("input/ReceiptLog.csv") .map(new MapFunction<String, TxEvent>() { @Override public TxEvent map(String value) throws Exception { String[] data = value.split(","); return new TxEvent( data[0], data[1], Long.valueOf(data[2]) ); } }); orderStream.connect(payStream) //一定要keyby数据,要不然可能不在一个并行度就不行 .keyBy(OrderEvent::getTxId,TxEvent::getTxId) .process(new CoProcessFunction<OrderEvent, TxEvent, String>() { // 用map集合存储先来的数据 HashMap<String, OrderEvent> orderEventMap = new HashMap<>(); HashMap<String, TxEvent> txEventMap = new HashMap<>(); @Override public void processElement1(OrderEvent value, Context ctx, Collector<String> out) throws Exception { // 来了支付数据, 去交易对应的map集合中,查找交易数据是否存在 TxEvent txEvent = txEventMap.get(value.getTxId()); if (txEvent != null) { // 交易数据已经存在 out.collect("订单: " + value.getOrderId() + " 对账成功... "); }else{ // 把支付数据存入到oderEventMap中 orderEventMap.put(value.getTxId(), value); } } @Override public void processElement2(TxEvent value, Context ctx, Collector<String> out) throws Exception { OrderEvent orderEvent = orderEventMap.get(value.getTxId()); if (orderEvent != null){ out.collect("订单: " + orderEvent.getOrderId() + " 对账成功... "); }else{ txEventMap.put(value.getTxId(),value); } } }) .print(); try { env.execute(); } catch (Exception e) { e.printStackTrace(); } } }
第7章 Flink流处理高阶编程
7.1 Flink的window (窗口)
7.1.1 窗口概述
1) 在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据都到了才开始处理。当然我们可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。在这种情况下,我们必须定义一个窗口,用来收集最近一分钟内的数据,并对这个窗口内的数据进行计算。
2) 流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而Window窗口是一种切割无限数据为有限块进行处理的手段。
3) 在Flink中, 窗口(window)是处理无界流的核心. ==窗口把流切割成有限大小的多个"存储桶"(bucket),== 我们在这些桶上进行计算.
7.1.2 窗口的分类
1: 基于时间的窗口(时间驱动)
时间驱动: 事件时间 (重点) 处理时间
2: 基于元素个数的(数据驱动)
7.1.3 窗口代码实现
7.1.3.1 基于处理时间的窗口
-- 基于处理时间的窗口 属性: 开始时间和关闭时间 [ ) 1) 滚动 TumblingProcessingTimeWindows 窗口之间没有间隙也没重叠 2) 滑动 SlidingProcessingTimeWindows 窗口之间有间隙也可能重叠 所以, 如果滑动步长小于窗口长度, 滑动窗口会重叠. 这种情况下, 一个元素可能会被分配到多个窗口中 例如, 滑动窗口长度10分钟, 滑动步长5分钟, 则, 每5分钟会得到一个包含最近10分钟的数据. 3)会话 ProcessingTimeSessionWindows gap 超过gap时间没有数据, 窗口关闭 如果gap在3秒的时间一直有数据,这样的话一直不会被关闭, 会话窗口分配器会根据活动的元素进行分组. 会话窗口不会有重叠, 与滚动窗口和滑动窗口相比, 会话窗口也没有固定的开启和关闭时间. 4) 在关闭窗口的时候,不同的key关闭时间是否一样? 滚动和滑动,是同时关闭 key1 和key2 session不会同时关闭,不同的key的窗口和关闭时间一般不一样
例子:
public class Flink01_Window_Tumble { public static void main(String[] args) { Configuration conf = new Configuration(); conf.setInteger("rest.port", 2000); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1); env .socketTextStream("hadoop162", 9999) .map(new MapFunction<String, WaterSensor>() { @Override public WaterSensor map(String value) throws Exception { String[] data = value.split(","); return new WaterSensor(data[0], Long.valueOf(data[1]), Integer.valueOf(data[2])); } }) .keyBy(WaterSensor::getId) // 定义一个长度为5s的窗口 // .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) // 定义一个长度为5, 滑动步长为2窗口 //.window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(2))) .window(ProcessingTimeSessionWindows.withGap(Time.seconds(3))) // 窗口处理函数 .process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() { // 处理窗口中元素的方法: 当窗口关闭的时候执行一次 @Override public void process(String key, // 属于哪个key的窗口 Context ctx, // 上下文对象, 可以获取到窗口信息 Iterable<WaterSensor> elements, // 窗口触发计算的时候, 存储着窗口内所有的元素 Collector<String> out) throws Exception { ArrayList<WaterSensor> list = new ArrayList<>(); // Iterable很多操作不能做, 比如排序, 一般是把他转成一个List集合再操作 for (WaterSensor ws : elements) { list.add(ws); } // 获取窗口的开始时间 String stt = AtguiguUtil.toDatTime(ctx.window().getStart()); // 获取窗口的结束时间 String edt = AtguiguUtil.toDatTime(ctx.window().getEnd()); out.collect(key + " " + stt + " " + edt + " " + list); } }) .print(); try { env.execute(); } catch (Exception e) { e.printStackTrace(); } } }
7.1.3.2 基于元素个数的窗口
7.1.3.3 没有keyBy加窗口
7.1.4 窗口处理函数
窗口处理函数: 1) 增量 每来一条元素, 就开始计算, 等到窗口关闭的时候, 直接就可以返回最终的结果 简单 sum max min maxBy minBy 只能聚合数据 rduce (是对某个字段的求和 3+10+10) 输出输出类型一样 Aggregate (是对任意字段的求和 word 1 word 1) 由于可以定义累加器(就是可以有中间值), 所以输出和输入可以不一样 "额外的功能: 补充窗口的信息:增加processwindow处理函数" 2) 全量 process (比较复杂的逻辑都是在这边处理) 来的元素会先缓存到内存中,等待窗口关闭的时候才开始计算 尽量使用增量处理函数, 如果增量处理不了(排序, topN), 再选择全量
1) 增量代码实现 sum 和reduce
2) 增量代码实现 Aggregate: 求水位平均数
public class Flink05_Window_ProcessFunction_Aggregate { public static void main(String[] args) { Configuration conf = new Configuration(); conf.setInteger("rest.port", 2000); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1); env .socketTextStream("hadoop162", 8888) .map(new MapFunction<String, WaterSensor>() { @Override public WaterSensor map(String value) throws Exception { String[] data = value.split(","); return new WaterSensor(data[0], Long.valueOf(data[1]), Integer.valueOf(data[2])); } }) .keyBy(WaterSensor::getId) .window(TumblingProcessingTimeWindows.of(Time.seconds(10))) .aggregate(new AggregateFunction<WaterSensor, Avg, Double>() { // 初始一个累加器 // 这个窗口内的第一条元素来的触发一次 @Override public Avg createAccumulator() { System.out.println("Flink05_Window_ProcessFunction_Aggregate.createAccumulator"); return new Avg(); } // 真正的累加器方法 // 每来一条元素触发一次 @Override public Avg add(WaterSensor value, Avg acc) { System.out.println("Flink05_Window_ProcessFunction_Aggregate.add"); acc.sum += value.getVc(); acc.count++; return acc; } // 返回最终的结果 // 窗口关闭的时候触发一次 @Override public Double getResult(Avg acc) { System.out.println("Flink05_Window_ProcessFunction_Aggregate.getResult"); return acc.sum * 1.0 / acc.count; } // 合并累加器 // 只有窗口是 session窗口的时候才有效,其他窗口不会触发 @Override public Avg merge(Avg a, Avg b) { System.out.println("Flink05_Window_ProcessFunction_Aggregate.merge"); a.sum += b.sum; a.count += b.count; return a; } } ) .print(); try { env.execute(); } catch (Exception e) { e.printStackTrace(); } } public static class Avg { public Integer sum = 0; public Long count = 0L; } }
3) reduce 和 Aggregate的补充窗口的信息:增加processwindow处理函数
public class Flink06_Window_ProcessFunction_1 {
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.setInteger("rest.port", 2000);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(1);
env
.socketTextStream("hadoop162", 9999)
.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String value) throws Exception {
String[] data = value.split(",");
return new WaterSensor(data[0], Long.valueOf(data[1]), Integer.valueOf(data[2]));
}
})
.keyBy(WaterSensor::getId)
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.reduce(
new ReduceFunction<WaterSensor>() {
@Override
public WaterSensor reduce(WaterSensor value1,
WaterSensor value2) throws Exception {
value1.setVc(value1.getVc() + value2.getVc());
return value1;
}
},
// 前面聚合函数最终的输出, 就是这里的输入
new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
@Override
public void process(String key,
Context ctx,
// 这个集合存储的值, 是聚合函数最终的结果
// 所以说, 这个集合中有且仅有一个元素, 就是前面聚合的最终结果
// 对这个结果最再加工: 补充点窗口信息
Iterable<WaterSensor> elements,
Collector<String> out) throws Exception {
System.out.println("窗口关闭的时候执行....");
WaterSensor ws = elements.iterator().next();
out.collect(key + " " + ctx.window() + " " + ws);
}
}
)
.print();
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
==7.2 事件时间和WaterMark(水印)==
7.2.1 处理时间
1) 处理时间是指的执行操作的各个设备的时间 2) 对于运行在处理时间上的流程序, 所有的基于时间的操作(比如时间窗口)都是使用的设备时钟.比如, 一个长度为1个小时的窗口将会包含设备时钟表示的1个小时内所有的数据. 假设应用程序在 9:15am分启动, 第1个小时窗口将会包含9:15am到10:00am所有的数据, 然后下个窗口是10:00am-11:00am, 等等 处理时间是最简单时间语义, 数据流和设备之间不需要做任何的协调. 他提供了最好的性能和最低的延迟. 但是, 在分布式和异步的环境下, 处理时间没有办法保证确定性, 容易受到数据传递速度的影响: 事件的延迟和乱序 3) 在使用窗口的时候, 如果使用处理时间, 就指定时间分配器为处理时间分配器
7.2.2 事件时间(event time)
1) 事件时间是指的这个事件发生的时间.
2) 在event进入Flink之前, 通常被嵌入到了event中, 一般作为这个event的时间戳存在.
3) 在事件时间体系中, 时间的进度依赖于数据本身, 和任何设备的时间无关. 事件时间程序必须制定如何产生Event Time Watermarks(水印) . 在事件时间体系中, 水印是表示时间进度的标志(作用就相当于现实时间的时钟).
4) 在理想情况下,不管事件时间何时到达或者他们的到达的顺序如何, 事件时间处理将产生完全一致且确定的结果. 事件时间处理会在等待无序事件(迟到事件)时产生一定的延迟。由于只能等待有限的时间,因此这限制了确定性事件时间应用程序的可使用性。
5) 假设所有数据都已到达,事件时间操作将按预期方式运行,即使在处理无序或迟到的事件或重新处理历史数据时,也会产生正确且一致的结果。例如,每小时事件时间窗口将包含带有事件时间戳的所有记录,该记录落入该小时,无论它们到达的顺序或处理时间。
6) 在使用窗口的时候, 如果使用事件时间, 就指定时间分配器为事件时间分配器
注意:
在1.12之前默认的时间语义是处理时间, 从1.12开始, Flink内部已经把默认的语义改成了事件时间
水印是一个时钟,决定事件时间窗口什么时候关
7.2.3 事件时间和水印
// 1: 乱序流中的水印
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
// 返回值必须是毫秒. 是一个绝对时间 getTs是javaBean的时间,数据的时间水印
.withTimestampAssigner((element, ts) -> element.getTs())
)
// 2: 有序流中的水印
// 3: 事件时间设置
滚动 TumblingEventTimeWindows.of
窗口之间没有间隙也没重叠
滑动
窗口之间有间隙也可能重叠
会话
gap
超过gap时间没有数据, 窗口关闭
// 4: 水印的生产机制
最大时间戳-乱序程度-1ms
水印 随着数据的流动而流动
时钟
表示事件时间的进度
窗口的关闭
定时器的触发
触发的触发...
// 5:时间事件+水印
1) 解决数据迟到问题的一从保证:
2) 假如你是启动一个5秒的窗口,Duration.ofSeconds(3)乱序程度是3秒,
水印决定了关闭窗口,当你输入s1,8000,10 8000Ts-3000-1 和 [0,5000)的窗口的数据最对比,
这样就算是s1,5000,10的数据都可以进去窗口,水印>5000时候,窗口关闭,这样数据才进不来了.
s1,8000,10 他只会金 [5000,10000)的窗口,看水印决定窗口是否关闭
// 6: 水印的类型 水印可以不用快速的更新,这样迟到的数据也可以进去
周期性的水印( 每隔3秒中 )(默认每隔200毫秒发射一次水印)
打点是水印( 来一条数据执行一次 )
1: 水印和事件时间的演示
public class WindowWaterMark {
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.setInteger("rest.port", 2000);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
// 今天测试水印的时候: 并行度必须设置 1
env.setParallelism(1);
// 这里测试的是抽象时间
// 返回值就是这个与元素的事件时间
env
.socketTextStream("hadoop162", 9999)
.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String value) throws Exception {
String[] data = value.split(",");
return new WaterSensor(data[0], Long.valueOf(data[1]), Integer.valueOf(data[2]));
}
})
/*分配时间时间和水印,一起运用
<WaterSensor>forBoundedOutOfOrderness是一个乱序的数据,流中的类型指定
withTimestampAssigner 告诉别人这个时间时间的分配器
*/
.assignTimestampsAndWatermarks(
WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner
((element, ts) -> element.getTs())
)
.keyBy(WaterSensor::getId)
//这里一定要事件时间的代码
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
// 处理窗口中元素的方法: 当窗口关闭的时候执行一次
@Override
public void process(String s, // 属于哪个key的窗口
Context context, // 上下文对象, 可以获取到窗口信息
Iterable<WaterSensor> elements, // 窗口触发计算的时候, 存储着窗口内所有的元素
Collector<String> out) throws Exception {
ArrayList<WaterSensor> list = new ArrayList<>();
// Iterable很多操作不能做, 比如排序, 一般是把他转成一个List集合再操作
for (WaterSensor ws : elements) {
list.add(ws);
}
// 获取窗口的开始时间
String start = AtguiguUtil.toDatTime(context.window().getStart());
//获取窗口结束时间
String stop = AtguiguUtil.toDatTime(context.window().getEnd());
out.collect(s + ":" + start + " " + stop + " " + list);
}
})
.print();
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
2: 自定义的水印机制,可以改变的
public class WindowWaterMarkCustom {
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.setInteger("rest.port", 2000);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
// 水印更新周期
env.getConfig().setAutoWatermarkInterval(3000);
// 今天测试水印的时候: 并行度必须设置 1
env.setParallelism(1);
// 这里测试的是抽象时间
// 返回值就是这个与元素的事件时间
env
.socketTextStream("hadoop162", 9999)
.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String value) throws Exception {
String[] data = value.split(",");
return new WaterSensor(data[0], Long.valueOf(data[1]), Integer.valueOf(data[2]));
}
})
.assignTimestampsAndWatermarks(
new WatermarkStrategy<WaterSensor>() {
@Override
public WatermarkGenerator<WaterSensor> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new MyWM();
}
}
.withTimestampAssigner( (element, recordTimestamp) -> element.getTs())
)
.keyBy(WaterSensor::getId)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
// 处理窗口中元素的方法: 当窗口关闭的时候执行一次
@Override
public void process(String key, // 属于哪个key的窗口
Context ctx, // 上下文对象, 可以获取到窗口信息
Iterable<WaterSensor> elements, // 窗口触发计算的时候, 存储着窗口内所有的元素
Collector<String> out) throws Exception {
ArrayList<WaterSensor> list = new ArrayList<>();
// Iterable很多操作不能做, 比如排序, 一般是把他转成一个List集合再操作
for (WaterSensor ws : elements) {
list.add(ws);
}
// 获取窗口的开始时间
String stt = AtguiguUtil.toDatTime(ctx.window().getStart());
// 获取窗口的结束时间
String edt = AtguiguUtil.toDatTime(ctx.window().getEnd());
out.collect(key + " " + stt + " " + edt + " " + list);
}
})
.print();
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
public static class MyWM implements WatermarkGenerator<WaterSensor> {
Long MaxTs = Long.MIN_VALUE + 3000 + 1 ;
// 流中每来一个元素触发一次
// 参数1:就是那个元素
//参数2: 这个元素的事件时间
// 参数3: 水印发射器
@Override
public void onEvent(WaterSensor event, long ts, WatermarkOutput output) {
System.out.println("MyWMG.onEvent");
// 计算最大时间戳,默认的水印调用这个方法
MaxTs = Math.max(MaxTs, ts);
// 间隙性的水印/打点式 (两种水印的方式)
output.emitWatermark(new Watermark(MaxTs - 3000 - 1));
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// 周期性的执行:默认是200ms
System.out.println("MyWMG.onPeriodicEmit");
// 周期性的水印
output.emitWatermark(new Watermark(MaxTs - 3000 - 1));
//走这里就是没有水印,一直是0
}
}
}
7.2.4 多并行度WaterMark的传递
总结: 多并行度的条件下, 向下游传递WaterMark的时候, 总是以最小的那个WaterMark为准! 木桶原理!
// 1: 多个并行度传递,就有多个水印?
广播传递:找一个最小的时间传递后面
// 2:多个source读取kafka的数据会发生数据倾斜,怎么解决?
1) map之前平局分布数据rebalence(时效降低)
2) 找别的部门解决数据倾斜
3) 1个source(不建议)
4) 把不更新的数据并行度标记空闲,就以其他并行度为标准,水印传递下去
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
// 返回值必须是毫秒. 是一个绝对时间
.withTimestampAssigner((element, ts) -> element.getTs())
// 如果一个并行度10s每个更新数据(没有更新水印), 水印传递就以其他的为准
.withIdleness(Duration.ofSeconds(10))
7.3 数据迟到的三从保证
// 1:解决数据迟到问题的一重保证:
事件时间 + 水印
// 2:二重保证:允许迟到 类似他们的最大容忍程度
当到了窗口关闭时间, 窗口先不关闭, 只是对窗口内的元素进行计算.
然后允许迟到期间, 如果有属于这个窗口的数据来了, 则来一条就计算一次.
超过了允许迟到时间, 窗口才真正的关闭!!!!
// 允许迟到2s
.allowedLateness(Time.seconds(2))
// 3:三重保证 安排一辆车侧输出流
侧输出流:如果窗口真正关闭之后, 这是时候来的属于关闭窗口的数据, 会直接进入侧输出流
1: 二从保证代码实现
2: 侧输出流三重保证
public class Flink05_WMK_Sideout {
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.setInteger("rest.port", 2000);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
// 今天测试水印的时候: 并行度必须设置 1
env.setParallelism(1);
SingleOutputStreamOperator<String> normal = env
.socketTextStream("hadoop162", 9999)
.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String value) throws Exception {
String[] data = value.split(",");
return new WaterSensor(data[0], Long.valueOf(data[1]), Integer.valueOf(data[2]));
}
})
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
// 返回值必须是毫秒. 是一个绝对时间
.withTimestampAssigner((element, ts) -> element.getTs())
)
.keyBy(WaterSensor::getId)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
// 把真正迟到的迟到(窗口之后进来的数据)的数据写入到侧输出流中
// 做成匿名内部类的方式,来保存泛型到运行时
.sideOutputLateData(new OutputTag<WaterSensor>("late"){})
.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
// 处理窗口中元素的方法: 当窗口关闭的时候执行一次
@Override
public void process(String key, // 属于哪个key的窗口
Context ctx, // 上下文对象, 可以获取到窗口信息
Iterable<WaterSensor> elements, // 窗口触发计算的时候, 存储着窗口内所有的元素
Collector<String> out) throws Exception {
ArrayList<WaterSensor> list = new ArrayList<>();
// Iterable很多操作不能做, 比如排序, 一般是把他转成一个List集合再操作
for (WaterSensor ws : elements) {
list.add(ws);
}
// 获取窗口的开始时间
String stt = AtguiguUtil.toDatTime(ctx.window().getStart());
// 获取窗口的结束时间
String edt = AtguiguUtil.toDatTime(ctx.window().getEnd());
out.collect(key + " " + stt + " " + edt + " " + list);
}
});
normal.print("正常数据");
normal.getSideOutput(new OutputTag<WaterSensor>("late"){}).print("迟到数据");
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
7.4 侧输出流(sideOutput)
// 1: 三重保证,接收窗口关闭之后的迟到数据
// 2: 使用侧输出流把一个流拆成多个流
split算子可以把一个流分成两个流, 从1.12开始已经被移除了. 官方建议我们用侧输出流来替换split算子的功能.
7.4.1 侧输出流拆分多个流
public class Flink01_Sideout {
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.setInteger("rest.port", 2000);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
// 今天测试水印的时候: 并行度必须设置 1
env.setParallelism(1);
// s1 一个流 主流
// s2 一个流 侧输出流
// 其他所有的传感器的数据 进入一个流 侧输出流
SingleOutputStreamOperator<String> main = env
.socketTextStream("hadoop162", 9999)
.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String value) throws Exception {
String[] data = value.split(",");
return new WaterSensor(data[0], Long.valueOf(data[1]), Integer.valueOf(data[2]));
}
})
.process(new ProcessFunction<WaterSensor, String>() {
@Override
public void processElement(WaterSensor ws,
Context ctx, //上下文对象
Collector<String> out) throws Exception {
if ("s1".equals(ws.getId())) {
out.collect(ws.toString());
}else if ("s2".equals(ws.getId())) {
ctx.output(new OutputTag<String>("侧2"){}, JSON.toJSONString(ws));
}else{
ctx.output(new OutputTag<WaterSensor>("other"){}, ws);
}
}
});
main.print("主流");
main.getSideOutput(new OutputTag<String>("侧2") {}).print("侧输出流2");
main.getSideOutput(new OutputTag<WaterSensor>("other") {}).print("其他");
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
7.5 8个prcoess Function
1) 我们之前学习的转换算子是无法访问事件的时间戳信息和水位线信息的。而这在一些应用场景下,极为重要。例如MapFunction这样的map转换算子就无法访问时间戳或者当前事件的事件时间。
2) 基于此,DataStream API提供了一系列的Low-Level转换算子。可以访问时间戳、watermark以及注册定时事件。还可以输出特定的一些事件,例如超时事件等。Process Function用来构建事件驱动的应用以及实现自定义的业务逻辑(使用之前的window函数和转换算子无法实现)。例如,Flink SQL就是使用Process Function实现的。
==7.6 定时器==
// 1:定时器只有在keyby的时候使用:语法规定了,要不然报错
// 2:k1 k2 不同的key设置定时器是不一样的
一个定时器只会触发一次,你做需求的时候创建和删除需要做好
// 3:测试都是用的有序流的水印
forMonotonousTimestamps()
7.6.1 处理时间的定时器
public class Flink01_Timer_PT {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.setInteger("rest.port", 2000);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(1);
env
.socketTextStream("hadoop162", 9999)
.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String value) throws Exception {
String[] data = value.split(",");
return new WaterSensor(data[0], Long.valueOf(data[1]), Integer.valueOf(data[2]));
}
})
.keyBy(WaterSensor::getId)
.process(new KeyedProcessFunction<String, WaterSensor, String>() {
private long ts;
// 当定时器触发的时候, 调用这个方法
// 一个定时器只会触发一次
@Override
public void onTimer(long timestamp, // 定时器的时间
OnTimerContext ctx,
Collector<String> out) throws Exception {
out.collect(ctx.getCurrentKey() + ": 水位大于20, 发出红色预警...");
}
@Override
public void processElement(WaterSensor value,
Context ctx,
Collector<String> out) throws Exception {
// 如果vc大于 20 , 定义一个 5s后触发的基于处理时间的定时器
if (value.getVc() > 20) {
// 时间是绝对时间
// 10000
ts = System.currentTimeMillis() + 5000;
ctx.timerService().registerProcessingTimeTimer(ts);
System.out.println(ctx.getCurrentKey() + " 定义了一个定时器: " + ts);
}else if (value.getVc() < 10){
// 删除定时器
ctx.timerService().deleteProcessingTimeTimer(ts);
System.out.println(ctx.getCurrentKey() + " 取消了一个定时器: " + ts);
}
}
})
.print();
//s1,1000,22 设置一个 水印6000的定时器的时间才会触发 forMonotonousTimestamps() 不用减乱序的时间
env.execute();
}
}
7.6.2 事件时间的定时器
.process(new KeyedProcessFunction<String, WaterSensor, String>() { private long ts; // 当定时器触发的时候, 调用这个方法 // 一个定时器只会触发一次 @Override public void onTimer(long timestamp, // 定时器的时间 OnTimerContext ctx, Collector<String> out) throws Exception { out.collect(ctx.getCurrentKey() + ": 水位大于20, 发出红色预警..."); } @Override public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception { // 如果vc大于 20 , 定义一个 5s后触发的基于事件时间的定时器 if (value.getVc() > 20) { // 时间是绝对时间 // 10000 ts = value.getTs() + 5000; ctx.timerService().registerEventTimeTimer(ts); System.out.println(ctx.getCurrentKey() + " 定义了一个定时器: " + ts); } else if (value.getVc() < 10) { // 删除定时器 ctx.timerService().deleteEventTimeTimer(ts); System.out.println(ctx.getCurrentKey() + " 取消了一个定时器: " + ts); } } }) .print();
7.6.3 定时器练习
分析: 监控水位传感器的水位值,如果水位值在5s之内(event time)连续上升,则报警。
滚动窗口不行,会跨窗口,滑动窗口有重复的数据
定时器最好的,可以在每个连续的5秒都可以设置定时器,和取消定期器
这里有一个标记,用的true ,false标记,之后我们可以用状态来做标记
env
.socketTextStream("hadoop162", 9999)
.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String value) throws Exception {
String[] data = value.split(",");
return new WaterSensor(data[0], Long.valueOf(data[1]), Integer.valueOf(data[2]
}
})
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<WaterSensor>forMonotonousTimestamps()
.withTimestampAssigner((ws, ts) -> ws.getTs())
)
.keyBy(WaterSensor::getId)
.process(new KeyedProcessFunction<String, WaterSensor, String>() {
Integer lastVc;
Long time;
boolean isFirst = true;
@Override
public void onTimer(long timestamp,
OnTimerContext ctx,
Collector<String> out) throws Exception {
out.collect(ctx.getCurrentKey() + " 连续5s水位上升, 发出预警....");
// 注册新的定时器 ,发送完预警之后,在连续发送修改这个
isFirst = true; // 下一跳数据进来, 有会当成第一条数据使用
}
@Override
public void processElement(WaterSensor value,
Context ctx,
Collector<String> out) throws Exception {
// 如何判断是否第一条数据进来: 定义一个变量(boolean值), 默认是true, 第一个来了之后置为false
if ( isFirst){ // 表示第一条数据进来
//注册定时器
time = value.getTs() + 5000;
ctx.timerService().registerEventTimeTimer(time);
System.out.println("第一条数据紧进来, 注册一个定时器:" + time);
// 把标记置为false
isFirst = false;
}else {
// 不是第一条数据: 要判断水位是否上升 当前的水位值和上一条水位值做对比
if (value.getVc() < lastVc){
// 水位下降, 取消定时器
ctx.timerService().deleteEventTimeTimer(time);
System.out.println("水位下降, 取消定时器:" + time);
//然后要注册一个新的定时器判断
time = value.getTs() + 5000;
ctx.timerService().registerEventTimeTimer(time);
System.out.println("注册一个新的定时器:" + time);
}else {
System.out.println("水位上升或不变, 不做任何操作....");
}
}
// 把当前的水位存储到变量中, 下一条数据进来的时候判断使用
lastVc = value.getVc();
}
})
.print();
==7.7 Flink状态编程==
1) 有状态的计算是流处理框架要实现的重要功能,因为稍复杂的流处理场景都需要记录状态,然后在新流入数据的基础上不断更新状态。 2) SparkStreaming在状态管理这块做的不好, 很多时候需要借助于外部存储(例如Redis)来手动管理状态, 增加了编程的难度. 3) Flink的状态管理是它的优势之一.
无状态: 不需要借助外部的力量,自己就可以解决问题
有状态: 操作状态处理,将状态存储在一个地方
4) Managed State的分类
对Managed State继续细分,它又有2种类型
a) Operator State(算子状态)
b) Keyed State(键控状态)
State | Keyed State | |
---|---|---|
适用用算子类型 | 可用于所有算子: 常用于source, sink, 例如 FlinkKafkaConsumer | 只能用于用于KeyedStream上的算子 |
状态分配 | 一个算子的子任务对应一个状态 | 一个Key对应一个State: 一个算子会处理多个Key, 则访问相应的多个State |
创建和访问方式 | 实现CheckpointedFunction或ListCheckpointed(已经过时)接口 | 重写RichFunction, 通过里面的RuntimeContext访问w |
横向扩展 | 并发改变时有多重重写分配方式可选: 均匀分配和合并后每个得到全量 | 并发改变, State随着Key在实例间迁移 |
支持的数据结构 | ListState,UnionListStste和BroadCastState | ValueState, ListState,MapState ReduceState, AggregatingState |
7.7.1 算子列表状态(List state)
1: new 需要两个类,就不要做匿名内部类了
2: 先初始化状态:initializeState 在做快照 snapshotState
public class Operator_List {
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.setInteger("rest.port", 2000);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(2);
// 每隔3秒中做一次checkpoint
// 这样理解: 每隔3秒,把程序的所有的状态做一次持久化, 程序出了问题重启的时候, 可以从持久化状态中恢复数据
env.enableCheckpointing(3000);
SingleOutputStreamOperator<String> stream = env
.socketTextStream("hadoop162", 9999)
.flatMap(new MyFlatMapFunction2());
stream.addSink(new SinkFunction<String>() {
@Override
public void invoke(String value, Context context) throws Exception {
if (value.contains("x")){
throw new RuntimeException("碰到了x, 手动抛出一个异常....");
}
}
});
stream.print();
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
public static class MyFlatMapFunction2 implements FlatMapFunction<String,String>, CheckpointedFunction {
List<String> words = new ArrayList<>();
private ListState<String> wordsState;
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
// 每行数据用空格切割, 存入到一个List集合中
String[] data = value.split(",");
for (String datum : data) {
words.add(datum);
}
out.collect(words.toString());
}
// 对状态做快照: 每隔3s中, 把状态中的数据进行一次快照存储(持久化)
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
//每隔3s每个并行度执行一次
// System.out.println("MyFlatMapFunction.snapshotState");
// wordsState.clear(); // 清楚地所有的值
// wordsState.addAll(words); //每次都是追加,就是越来越多了
wordsState.update(words);
}
// 初始化状态: 程序启动的每个并行度执行一次(把快照中的数据加载到状态中)
//先初始化获得listState, 才去更新
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
System.out.println("MyFlatMapFunction.initializeState");
// 程序失败了, 再次启动的时候, 把状态中的数据取出, 存入到 ArrayList中
wordsState = context.getOperatorStateStore().getListState(new ListStateDescriptor<String>("wordsState", String.class));
Iterable<String> strings = wordsState.get();
for (String data : strings) {
words.add(data);
}
}
}
}
7.7.2 列表状态 Union list state
一种是均匀分配(List state),另外一种是将所有 State 合并为全量 State 再分发给每个实例(Union list state)。
// 启动的时候, 把状态中的数据取出, 存入到 ArrayList中,就是这里不同
wordsState = ctx.getOperatorStateStore().getUnionListState
(new ListStateDescriptor<String>("wordsState", String.class));
7.7.3 广播状态
1: 广播数据到每一个并行度去,这样数据流每条都会出来
public class Operator_BroadCastState {
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.setInteger("rest.port", 2000);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(1);
// 每隔3秒中做一次checkpoint
// 这样理解: 每隔3秒,把程序的所有的状态做一次持久化, 程序出了问题重启的时候, 可以从持久化状态中恢复数据
//env.enableCheckpointing(3000);
// 1. 读取一个数据流
DataStreamSource<String> dataStream = env.socketTextStream("hadoop162", 9999);
// 2. 读取一个配置流
DataStreamSource<String> confStream = env.socketTextStream("hadoop162", 8888);
// 3. 把配置流做成一个广播流
MapStateDescriptor<String, String> bcStateDesc = new MapStateDescriptor<String, String>("bcState", String.class, String.class);
BroadcastStream<String> bcStream = confStream.broadcast(bcStateDesc);
// 4. 数据流去 connect 广播流
BroadcastConnectedStream<String, String> dataConfStream = dataStream.connect(bcStream);
// 5. 分布处理广播流中的数据 和数据流中的数据
dataConfStream.process(new BroadcastProcessFunction<String, String, String>() {
// 5.2 处理数据流, 从广播状态读取配置信息, 来根据配置信息决定代码的处理方式
// 数据流中的数据, 每来一条执行一次
@Override
public void processElement(String value,
ReadOnlyContext ctx,
Collector<String> out) throws Exception {
System.out.println("Flink03_Operator_BroadCastState.processElement");
ReadOnlyBroadcastState<String, String> broadcastState = ctx.getBroadcastState(bcStateDesc);
String s = broadcastState.get("switch");
if ("1".equals(s)) {
out.collect("使用 1 号逻辑");
}else if ("2".equals(s)){
out.collect("使用 2 号逻辑");
}else {
out.collect("使用 other 号逻辑");
}
}
// 5.1 广播流中的数据存入到广播状态, 状态会自动广播到每个并行度中
// 配置流中的数据每来一条, 每个并行度执行一次
@Override
public void processBroadcastElement(String value,
Context ctx,
Collector<String> out) throws Exception {
System.out.println("Flink03_Operator_BroadCastState.processBroadcastElement");
// 获取广播状态 也是一个key value键值对类型
BroadcastState<String, String> broadcastState = ctx.getBroadcastState(bcStateDesc);
broadcastState.put("switch",value);
}
})
.print();
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
7.7.4 键控状态 ValueState
检测传感器的水位值,如果连续的两个水位值超过10,就输出报警。
就是保持状态了,就不用设置hashmap去判断了
public class F01_KeyValue {
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.setInteger("rest.port", 2000);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(2);
// 每隔3秒中做一次checkpoint
// 这样理解: 每隔3秒,把程序的所有的状态做一次持久化, 程序出了问题重启的时候, 可以从持久化状态中恢复数据
env.enableCheckpointing(3);
env
.socketTextStream("hadoop162",9999)
.map((MapFunction<String, WaterSensor>) value -> {
String[] data = value.split(",");
return new WaterSensor(data[0],
Long.valueOf(data[1]),
Integer.valueOf(data[2]));
})
.keyBy(WaterSensor::getId)
//检测传感器的水位值,如果连续的两个水位值超过10,就输出报警。
.process(new KeyedProcessFunction<String, WaterSensor, String>() {
private ValueState<Integer> lastState;
@Override
public void open(Configuration parameters) throws Exception {
// 获取监控状态实例
lastState = getRuntimeContext().getState(new ValueStateDescriptor<>("lastState ", Integer.class));
}
@Override
public void processElement(WaterSensor value,
Context ctx,
Collector<String> out) throws Exception {
// 从单值的状态获取值. 第一次来的时候, 状态中肯定没值, 所以返回值是 null
Integer lastValue = lastState.value();
if (lastValue != null && value.getVc()>10 && lastValue > 10) {
out.collect(ctx.getCurrentKey() + " 的水位连续两次超过10, 发出红色预警...");
}
// 把当前的值, 赋值到状态
lastState.update(value.getVc());
}
})
.print();
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
7.7.5 键控状态 ListState
针对每个传感器输出最高的3个水位值
把值放进状态里面, get状态的值都很重要
public class F02_KeyList {
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.setInteger("rest.port", 2000);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(2);
// 每隔3秒中做一次checkpoint
// 这样理解: 每隔3秒,把程序的所有的状态做一次持久化, 程序出了问题重启的时候, 可以从持久化状态中恢复数据
env.enableCheckpointing(3000);
env
.socketTextStream("hadoop162", 9999)
.map(line -> {
String[] data = line.split(",");
return new WaterSensor(
data[0],
Long.valueOf(data[1]),
Integer.valueOf(data[2])
);
})
.keyBy(WaterSensor::getId)
//针对每个传感器输出最高的3个水位值
.process(new KeyedProcessFunction<String, WaterSensor, String>() {
private ListState<Integer> top3VcState;
@Override
public void open(Configuration parameters) throws Exception {
// 获取监控状态实例,
top3VcState = getRuntimeContext().getListState(new ListStateDescriptor<Integer>("top3VcState ", Integer.class));
}
@Override
public void processElement(WaterSensor value,
Context ctx,
Collector<String> out) throws Exception {
// 每来一个水位值, 就先存入到列表状态中
// 覆盖操作: 用心的集合中的值, 去覆盖状态中的值
// top3VcState.update();
// 添加操作: 把list中的元素添加到状态中
// top3VcState.addAll(list);
// 添加操作: 向状态中添加一个值
top3VcState.add(value.getVc());
// 状态中所有的值取出, 排序, 取top3, 转成list可以排序
Iterable<Integer> integers = top3VcState.get();
List<Integer> list = AtguiguUtil.toList(integers);
// list.sort(new Comparator<Integer>() {
// @Override
// public int compare(Integer o1, Integer o2) {
// return o2.compareTo(o1);
// }
// });
//降序
list.sort(Comparator.reverseOrder());
// 如果发现长度等于4, 就干掉一个, 保持长度是3
if (list.size() == 4) {
list.remove(list.size() - 1);
}
// 用list中值, 去覆盖状态中的值
top3VcState.update(list);
out.collect(ctx.getCurrentKey() + " 的top3: " + list);
}
})
.print();
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
7.7.6 键控状态 ReducingState
计算每个传感器的水位和
一般不会用到状态来求,直接算子或者窗口函数就可以
// 在flatMap算子中使用算子状态
env
.socketTextStream("hadoop162", 9999)
.map(line -> {
String[] data = line.split(",");
return new WaterSensor(
data[0],
Long.valueOf(data[1]),
Integer.valueOf(data[2])
);
})
.keyBy(WaterSensor::getId)
.process(new KeyedProcessFunction<String, WaterSensor, String>() {
private ReducingState<Integer> vcSumState;
@Override
public void open(Configuration parameters) throws Exception {
vcSumState = getRuntimeContext()
.getReducingState(new ReducingStateDescriptor<Integer>(
"vcSumState",
Integer::sum,
Integer.class
)
);
}
@Override
public void processElement(WaterSensor value,
Context ctx,
Collector<String> out) throws Exception {
vcSumState.add(value.getVc());
out.collect(ctx.getCurrentKey() + " 的水位和: " + vcSumState.get());
}
})
.print();
7.7.7 键控状态 Aggregate State
env
.socketTextStream("hadoop162", 9999)
.map(line -> {
String[] data = line.split(",");
return new WaterSensor(
data[0],
Long.valueOf(data[1]),
Integer.valueOf(data[2])
);
})
.keyBy(WaterSensor::getId)
.process(new KeyedProcessFunction<String, WaterSensor, String>() {
private AggregatingState<WaterSensor, Double> avgVcState;
@Override
public void open(Configuration parameters) throws Exception {
avgVcState = getRuntimeContext()
.getAggregatingState(
new AggregatingStateDescriptor<WaterSensor, Avg, Double>(
"avgVcState",
new AggregateFunction<WaterSensor, Avg, Double>() {
@Override
public Avg createAccumulator() {
return new Avg();
}
@Override
public Avg add(WaterSensor value, Avg acc) {
acc.sum += value.getVc();
acc.count ++;
return acc;
}
@Override
public Double getResult(Avg acc) {
return acc.sum * 1.0 / acc.count;
}
@Override
public Avg merge(Avg a, Avg b) {
return null;
}
},
Avg.class
)
);
}
@Override
public void processElement(WaterSensor value,
Context ctx,
Collector<String> out) throws Exception {
avgVcState.add(value);
out.collect(ctx.getCurrentKey() + " 的平均水位: " +avgVcState.get());
}
})
.print();
7.7.8 键控状态 Map
这是一对key value监控状态
去重: 去掉重复的水位值. 思路: 把水位值作为MapState的key来实现去重, value随意
env
.socketTextStream("hadoop162", 9999)
.map(line -> {
String[] data = line.split(",");
return new WaterSensor(
data[0],
Long.valueOf(data[1]),
Integer.valueOf(data[2])
);
})
.keyBy(WaterSensor::getId)
//去重: 去掉重复的水位值. 思路: 把水位值作为MapState的key来实现去重, value随意
.process(new KeyedProcessFunction<String, WaterSensor, String>() {
private MapState<Integer, Object> vcMapState;
@Override
public void open(Configuration parameters) throws Exception {
vcMapState = getRuntimeContext().getMapState(
new MapStateDescriptor<Integer, Object>("vcMapState", Integer.class, Object.class
);
}
//写了三个发现,这个地方就是写逻辑的,前面open是获取状态的
@Override
public void processElement(WaterSensor value,
Context ctx,
Collector<String> out) throws Exception {
// 每来一条数据, 把vc存入到Map状态的key中
vcMapState.put(value.getVc(),new Object());
// 输出不重复的水位值
Iterable<Integer> keys = vcMapState.keys();
List<Integer> integers = AtguiguUtil.toList(keys);
out.collect(ctx.getCurrentKey() + " 的不重复水位是: " +integers);
}
})
.print();
7.7.9 状态后端
每传入一条数据,有状态的算子任务都会读取和更新状态。由于有效的状态访问对于处理数据的低延迟至关重要,因此每个并行任务(子任务)都会在本地维护其状态,以确保快速的状态访问。
状态的存储、访问以及维护,由一个可插入的组件决定,这个组件就叫做状态后端(state backend)
状态后端主要负责两件事:
1) 本地(taskmanager)的状态管理
2) 将检查点(checkpoint)状态写入远程存储
Flink提供了3种状态后端(1.13之前):
-- 1: MemoryStateBackend
旧存储方式:本地状态存储在TaskManager的内存中, checkpoint 存储在JobManager的内存中.
新存储方式: HashMapStateBackend JobManager
-- 2: FsStateBackend
旧存储方式: 本地状态在TaskManager内存, Checkpoint时, 存储在文件系统(hdfs)中
新存储方式: HashMapStateBackend 存储在文件系统(hdfs)中
-- 3: RocksDBStateBackend
旧存储方式: 本地状态存储在TaskManager的RocksDB数据库中(实际是内存+磁盘) Checkpoint在外部文件系统(hdfs)中.
新存储方式:内存+磁盘 Checkpoint在外部文件系统(hdfs)中.
代码设置一下状态的管理
// 设置这个属性值为atguiug, 这样才可以向hdfs写入数据
// ctrl + shift + u 大小写切换
System.setProperty("HADOOP_USER_NAME", "atguigu");
Configuration conf = new Configuration();
conf.setInteger("rest.port", 2000);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(2);
// 每隔3秒中做一次checkpoint
// 这样理解: 每隔3秒,把程序的所有的状态做一次持久化, 程序出了问题重启的时候, 可以从持久化状态中恢复数据
env.enableCheckpointing(3000);//要开启这个才会存储,每隔3秒就刷新一次状态
// 1. 内存
// 1.1 old 两个在一起设置
// env.setStateBackend(new MemoryStateBackend()); // 默认
// 1.2 new
//env.setStateBackend(new HashMapStateBackend()); // 本地
//env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage()); // checkpoint 配置
// 2. fs
// 2.1 old
//env.setStateBackend(new FsStateBackend("hdfs://hadoop162:8020/ck1"));
// 2.2 new
//env.setStateBackend(new HashMapStateBackend());
//env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop162:8020/ck2");
// 3. rocksdb
// 3.1 old
// env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop162:8020/ck3"));
// 3.2 new
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop162:8020/ck4");
7.8 Flink的容错机制
7.8.1 状态的一致性
当在分布式系统中引入状态时,自然也引入了一致性问题。
一致性实际上是"正确性级别"的另一种说法,也就是说在成功处理故障并恢复之后得到的结果,与没有发生任何故障时得到的结果相比,前者到底有多正确?举例来说,假设要对最近一小时登录的用户计数。在系统经历故障之后,计数结果是多少?如果有偏差,是有漏掉的计数还是重复计数?
1 一致性级别
at-most-once(最多一次):
at-least-once(至少一次):
exactly-once(严格一次):
Flink的一个重大价值在于,它既保证了exactly-once,又具有低延迟和高吞吐的处理能力。
2 端到端的状态一致性
-- 1: source端
需要外部源可重设数据的读取位置.目前我们使用的Kafka Source具有这种特性: 读取数据的时候可以指定offset
-- 2: flink内部
依赖checkpoint机制
-- 3: sink端
需要保证从故障恢复时,数据不会重复写入外部系统. 有2种实现形式:
a)幂等(Idempotent)写入 数据库的操作就是
所谓幂等操作,是说一个操作,可以重复执行很多次,但只导致一次结果更改,也就是说,后面再重复执行就不起作用了。
b)事务性(Transactional)写入 mysql就是事务型
需要构建事务来写入外部系统,构建的事务对应着 checkpoint,等到 checkpoint 真正完成的时候,才把所有对应的结果写入 sink 系统中。对于事务性写入,具体又有两种实现方式:预写日志(WAL)和两阶段提交(2PC)
7.8.2 Checkpoint原理
Flink检查点的作用就类似于皮筋标记。数珠子这个类比的关键点是: 对于指定的皮筋而言,珠子的相对位置是确定的; 这让皮筋成为重新计数的参考点。总状态(珠子的总数)在每颗珠子被拨动之后更新一次,助手则会保存与每根皮筋对应的检查点状态,如当遇到粉色皮筋时一共数了多少珠子,当遇到橙色皮筋时又是多少。当问题出现时,这种方法使得重新计数变得简单。
-- 1: Flink的检查点算法
checkpoint机制是Flink可靠性的基石,可以保证Flink集群在某个算子因为某些原因(如 异常退出)出现故障时,能够将整个应用流图的状态恢复到故障之前的某一状态,保证应用流图状态的一致性.
快照的实现算法:
a) 简单算法 : 暂停应用, 然后开始做检查点, 再重新恢复应用
b) Flink的改进Checkpoint算法. Flink的checkpoint机制原理来自"Chandy-Lamport algorithm"算法(分布式快照算)的一种变体: "异步 barrier 快照(asynchronous barrier snapshotting)"
每个需要checkpoint的应用在启动时,Flink的JobManager为其创建一个 CheckpointCoordinator,CheckpointCoordinator全权负责本应用的快照制作。
-- 2: 理解Barrier
1) 流的barrier是Flink的Checkpoint中的一个核心概念. 多个barrier被插入到数据流中, 然后作为数据流的一部分随着数据流动(有点类似于Watermark).这些barrier不会跨越流中的数据.
2) 每个barrier会把数据流分成两部分: 一部分数据进入当前的快照 , 另一部分数据进入下一个快照 . 每个barrier携带着快照的id. barrier 不会暂停数据的流动, 所以非常轻量级. 在流中, 同一时间可以有来源于多个不同快照的多个barrier, 这个意味着可以并发的出现不同的快照.
-- 3: Flink的检查点制作过程
严格一次语义: barrier对齐
至少一次语义: barrier不对齐
7.8.3 Kafka_Flink_Kafka 实现端到端严格一次
具体的两阶段提交步骤总结如下:
1) 某个checkpoint的第一条数据来了之后,开启一个 kafka 的事务(transaction),正常写入 kafka 分区日志但标记为未提交,这就是“预提交”(第一阶段提交)
2) jobmanager 触发 checkpoint 操作,barrier 从 source 开始向下传递,遇到 barrier 的算子状态后端会进行相应进行checkpoint,并通知 jobmanagerr
3) sink 连接器收到 barrier,保存当前状态,存入 checkpoint,通知 jobmanager,并开启下一阶段的事务,用于提交下个检查点的数据
4) jobmanager 收到所有任务的通知,发出确认信息,表示 checkpoint 完成
5) sink 任务收到 jobmanager 的确认信息,正式提交这段时间的数据(第二阶段提交)
6) 外部kafka关闭事务,提交的数据可以正常消费了(最后要设置外部的关闭的内容)
注意: 1: sink事务开启了,但是没有关闭事务,kafka没有回滚能力 当宕机时候 : 之后一直重复消费,消费了不改消费的数据 默认的设置是 read -uncommitted的数据
read -committed 手动提交job设置才可以正常对1消费
和正常的消费做对比
2: 从checkpoint恢复数据
3: 手动保存 savepoint
85fe367是进入ui界面的任务id,复制过来的
4: 从savepoint恢复数据
与checkpoint恢复数据一样
savepoint保存是随便保存哪一个地方放在
bin/flink run -d -s hdfs://hadoop162:8020/sp/savepoint-0fe7ea-b03c0faf9882 -c com.atguigu.test06.Kafka_Flink_Kafka FlinkDEMO-1.0-SNAPSHOT.jar
5: savepoint 和 Checkpoint的区别
Savepoint | Checkpoint |
---|---|
Savepoint是由命令触发, 由用户创建和删除 | Checkpoint被保存在用户指定的外部路径中, flink自动触发 |
保存点存储在标准格式存储中,并且可以升级作业版本并可以更改其配置。 | 当作业失败或被取消时,将保留外部存储的检查点。 |
用户必须提供用于还原作业状态的保存点的路径。 | 用户必须提供用于还原作业状态的检查点的路径。 如果是flink的自动重启, 则flink会自动找到最后一个完整的状态 |
public static void main(String[] args) {
System.setProperty("HADOOP_USER_NAME", "atguigu");
Configuration conf = new Configuration();
conf.setInteger("rest.port", 2000);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(2);
// 0. 设置一下状态后端
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop162:8020/ck100");
// 1. 开启checkpoint: 参数checkpoint的周期(注入barrier的周期)
env.enableCheckpointing(5000);
// 2. 设置checkpoint的一致性语义: 严格一次
env.getCheckpointConfig().setCheckpointingMode(EXACTLY_ONCE);
// 3. 设置checkpoint的超时时间: 一分钟
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 4. 设置同时进行的checkpoint的个数为1,
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 5. 设置两个checkpoint之间的最小间隔: 前面结束500ms毫秒之后, 下一个才是
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// 6. 当job取消的时候, 是否删除checkpoint数据
env.getCheckpointConfig().setExternalizedCheckpointCleanup( RETAIN_ON_CANCELLATION );
Properties props = new Properties();
props.setProperty("bootstrap.servers", "hadoop162:9092,hadoop163:9092,hadoop164:9092");
props.setProperty("group.id", "Flink10_Kafka_Flink_Kafka91");
SingleOutputStreamOperator<Tuple2<String, Long>> s1 = env
.addSource(new FlinkKafkaConsumer<String>("s1", new SimpleStringSchema(), props))
.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Long>> out) throws Exception {
for (String data : value.split("")) {
out.collect(Tuple2.of(data, 1L));
}
}
})
.keyBy(t -> t.f0)
.sum(1);
//不设置15分钟就会报错 生产者的时间不能超过15分钟设置,flink默认的是一个小时,可以改上面大于1个小时
//要使用严格一次,就要满足这个时间
Properties sinkProps = new Properties();
sinkProps.setProperty("bootstrap.servers", "hadoop162:9092,hadoop163:9092,hadoop164:9092");
sinkProps.setProperty("transaction.timeout.ms", 15 * 60 * 1000 + "");
// 如果流中的数据不是字符串类型, 则需要自己定义序列化器
s1.addSink(new FlinkKafkaProducer<Tuple2<String, Long>>(
"default",
new KafkaSerializationSchema<Tuple2<String,Long>>(){
public ProducerRecord<byte[], byte[]> serialize(Tuple2<String,
Long> element,
@Nullable Long timestamp) {
return new ProducerRecord<>("s2", (element.f0 + " " +element.f1).getBytes(StandardCharsets.UTF_8));
}
},
sinkProps,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE // 开启两阶段提交, 开启事务
));
s1.addSink(new SinkFunction<Tuple2<String, Long>>() {
@Override
public void invoke(Tuple2<String, Long> value, Context context) throws Exception {
if (value.f0.contains("x")){
throw new RuntimeException("包含了x, 抛出一个异常....");
}
}
});
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
第9章 Flink CEP编程
第10章 Flink SQL编程
10.1 SQL核心概念
==SQL是完全写的sql, table API 封装的还不够完全,==
1: 动态表和连续查询
1) 动态表(Dynamic Tables) 是 Flink 的支持流数据的 Table API 和 SQL 的核心概念。 2) 与表示批处理数据的静态表不同,动态表是随时间变化的。可以像查询静态批处理表一样查询它们。查询动态表将生成一个连续查询(Continuous Query)。一个连续查询永远不会终止,结果会生成一个动态表。查询不断更新其(动态)结果表,以反映其(动态)输入表上的更改。 3) 需要注意的是,连续查询的结果在语义上总是等价于以批处理模式在输入表快照上执行的相同查询的结果。
1.将流转换为动态表。 2.在动态表上计算一个连续查询,生成一个新的动态表。 3.生成的动态表被转换回流。
2: 需要导入的依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-compress -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.21</version>
</dependency>
10.2 Table API
10.2.1 流-->表-->查询表-->流
1: 第一种方法过去的
public class k01_Tabale_BaseUse { public static void main(String[] args) { Configuration conf = new Configuration(); conf.setInteger("rest.port", 2000); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1); DataStreamSource<WaterSensor> stream = env.fromElements( new WaterSensor("sensor_1", 1000L, 10), new WaterSensor("sensor_2", 2000L, 20), new WaterSensor("sensor_1", 3000L, 30), new WaterSensor("sensor_1", 4000L, 40), new WaterSensor("sensor_2", 6000L, 50), new WaterSensor("sensor_1", 7000L, 60), new WaterSensor("sensor_1", 8000L, 80) ); // 1. 先去创建表的执行环境 StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); // 2. 通过表的执行环境把流转成一个动态表 Table table = tEnv.fromDataStream(stream); // 表的元数据: 列名, 列的类型 table.printSchema(); // 3. 在table对象(动态表)上执行查询(连续查询), 得到一个结果表: 也是动态表 // select * from t where id='sensor_1' Table result = table.where("id= 'sensor_1' ") .select("*"); // 4. 把表转成流, 打印 DataStream<WaterSensor> stream2 = tEnv.toAppendStream(result, WaterSensor.class); stream2.print(); env.execute(); }
2: 新的连续查询方法
3: Row 什么类型都可以
Table result = table.where($("id").isEqual("sensor_1")) .select($("id"), $("vc")); // 4. 把表转成流, 打印 Row可以看成是把表转成流的是一种通用类型,就是什么类型都可以 DataStream<Row> rowDataStream = tEnv.toAppendStream(result, Row.class); //rowDataStream.print(); rowDataStream.map(new MapFunction<Row, String>() { @Override public String map(Row value) throws Exception { // 如何从row中取数据 return value.getFieldAs("id" ) + "," + value.getFieldAs("vc"); } }) .print();
10.2.2 追加 更新流
1) Append-only 流 追加流 仅通过 INSERT 操作修改的动态表可以通过输出插入的行转换为流。
2) Retract 流 撤回流
3) Upsert 流: 在把数据写到kafka的时候用, 消费流的算子需要知道唯一键的属性,以便正确地应用 message。与 retract 流的主要区别在于 UPDATE 操作是用单个 message 编码的,因此效率更高
==请注意,在将动态表转换为 DataStream 时,只支持 append 流和 retract 流。==
public class k01_Tabale_BaseUse_3 { public static void main(String[] args) { Configuration conf = new Configuration(); conf.setInteger("rest.port", 2000); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1); DataStreamSource<WaterSensor> stream = env.fromElements( new WaterSensor("sensor_1", 1000L, 10), new WaterSensor("sensor_2", 2000L, 20), new WaterSensor("sensor_1", 3000L, 30), new WaterSensor("sensor_1", 4000L, 40), new WaterSensor("sensor_2", 6000L, 50), new WaterSensor("sensor_1", 7000L, 60), new WaterSensor("sensor_1", 8000L, 80) ); StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env); Table table = tableEnvironment.fromDataStream(stream); // select id, sum(vc) as vc_sum from t group by id; Table result = table.groupBy($("id")) .aggregate($("vc").sum().as("vc_sum")) .select($("id"), $("vc_sum")); // 1:之前只有追加数据 toAppendStream // 2:因为result这张表有聚合, 所以其中就有更新数据, 要用撤回流 toRetractStream // 3: upsert流,在把数据写到kafka的时候用 DataStream<Tuple2<Boolean, Row>> tuple2DataStream = tableEnvironment.toRetractStream(result, Row.class); //tuple2DataStream.print(); //返回你想要的数据 tuple2DataStream.filter(t -> t.f0) // 过滤掉撤回数据,这个算子只要返回为true的数据 .map(t -> t.f1) // 只要row .print(); env.execute(); }
10.2.3 读取文件 ->写入文件
Configuration conf = new Configuration(); conf.setInteger("rest.port", 2000); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); // 直接从文件中读取数据 Schema schema = new Schema().field("id", DataTypes.STRING()) .field("ts", DataTypes.BIGINT()) .field("vc", DataTypes.INT()); tEnv .connect(new FileSystem().path("input/sensor.txt")) // 文件的格式: csv, 列的分隔符 , 行的分隔符 \n .withFormat(new Csv().fieldDelimiter(',').lineDelimiter("\n")) .withSchema(schema) .createTemporaryTable("sensor"); //给表取一个名字 Table table= tEnv.from("sensor"); // 上课为了方便调试, 直接打印表的内容, 不再转成流再打印 //.execute().print(); Table result = table .where($("id").isEqual("sensor_1")) .select($("id"), $("ts"), $("vc")); // 创建一个动态表与文件关联 tEnv .connect( new FileSystem().path("input/a.txt") ) .withFormat(new Csv()) // 文件的格式: csv, 列的分隔符 , 行的分隔符 \n .withSchema(schema) // 表的schema .createTemporaryTable("sensor_out"); // 给表起个表名 // 把表的内容写出到文件中 result.executeInsert("sensor_out");
10.2.4 Connector_Kafka
Configuration conf = new Configuration(); conf.setInteger("rest.port", 2000); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); // 设置一个格式 Schema schema = new Schema() .field("id", DataTypes.STRING()) .field("ts", DataTypes.BIGINT()) .field("vc", DataTypes.INT()); // 从kafka读取数据 tEnv .connect( new Kafka() .version("universal") //通用的版本 .property("bootstrap.servers","hadoop162:9092,hadoop163:9092,hadoop164:9092") .property("group.id","atguigu") //消费者组 .topic("s1") .startFromLatest())//重最新的消费 .withFormat(new Csv()) .withSchema(schema) .createTemporaryTable("sensor"); Table table = tEnv.from("sensor"); Table result = table.where($("id").isEqual("sensor")) .select($("id"), $("ts"), $("vc")); //把读取到的数据放到kafka中 tEnv .connect( new Kafka() .version("universal") //通用的版本 .property("bootstrap.servers","hadoop162:9092,hadoop163:9092,hadoop164:9092") .topic("s2") .sinkPartitionerFixed()) .withFormat(new Json()) .withSchema(schema) .createTemporaryTable("sensor_out"); result.executeInsert("sensor_out");
10.3 Flink SQL
10.3.1 注册和未注册
==般使用查询注册的表==
Configuration conf = new Configuration(); conf.setInteger("rest.port", 2000); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1); DataStreamSource<WaterSensor> stream= env.fromElements (new WaterSensor("sensor_1", 1000L, 10), new WaterSensor("sensor_2", 2000L, 20), new WaterSensor("sensor_1", 3000L, 30), new WaterSensor("sensor_1", 4000L, 40), new WaterSensor("sensor_2", 6000L, 50), new WaterSensor("sensor_1", 7000L, 60), new WaterSensor("sensor_1", 8000L, 80)); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); Table table = tEnv.fromDataStream(stream); // flink 提供了两个key执行SQL的方法: // 1. executeSQL // tEnv.executeSql("只能执行DDL语句(创建删除表和库)和增删改(DML)表和库的数据") // 2. sqlQuery // tEnv.sqlQuery("这个只能执行查询语句") // 对表执行查询的时候, 有两种查询方式: // 1. 查询未注册的表: 把Table直接拼接在需要表名的地方 // Table table1 = tEnv.sqlQuery("select * from " + table); // table1.execute().print(); // 2. 查询已注册的表(建议使用): 先注册一个临时表, 有了一个表名, 然后进行查询 tEnv.createTemporaryView("sensor",table); tEnv.sqlQuery("select * from sensor where id='sensor_1' ").execute().print();
10.3.2 SQL_File
从读取文件数据 --> 写入到文件数据
Configuration conf = new Configuration(); conf.setInteger("rest.port", 2000); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); // 通过ddl方式建表, 直接和文件关联, 将来从这个表读取数据, 就自动从文件读取 tEnv.executeSql("create table sensor(" + "id String," + "ts bigint," + "vc int" + ")with(" + " 'connector' = 'filesystem', " + " 'path' = 'input/sensor.txt' , " + " 'format' ='csv' " + ")"); Table table = tEnv.sqlQuery("select * from sensor where id='sensor_1' "); //类似tableApi的读取文件 tEnv.executeSql("create table sensor_out(" + "id String," + "ts bigint," + "vc int" + ")with(" + " 'connector' = 'filesystem', " + " 'path' = 'input/test' , " + " 'format' ='json' " + ")"); table.executeInsert("sensor_out");
10.3.3 SQL_Kafka
1: 从kafka读取数据,连接他,在把数据写入到kafka
Configuration conf = new Configuration(); conf.setInteger("rest.port", 2000); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); // 通过ddl方式建表, 直接和文件关联, 将来从这个表读取数据, 就自动从文件读取 (官方推荐使用这种) tEnv.executeSql("create table sensor(" + "id string," + "ts bigint," + "vc int" + ")with(" + " 'connector' = 'kafka', " + " 'topic' = 's1', "+ " 'properties.bootstrap.servers' = 'hadoop162:9092', " + " 'properties.group.id' = 'atguigu', " + " 'scan.startup.mode' = 'latest-offset', " + " 'format' = 'csv'" + ")"); tEnv.executeSql("create table sensor_out(" + "id string," + "ts bigint," + "vc int" + ")with(" + " 'connector' = 'kafka'," + " 'topic' = 's2', "+ " 'properties.bootstrap.servers' = 'hadoop162:9092', " + " 'format' = 'json'" + ")"); // 1: 第一种执行的方法 // Table table = tEnv.sqlQuery("select * from sensor"); // table.executeInsert(" sensor_out"); // 2:第二种执行的方法 //tEnv.executeSql("insert into sensor_out select * from sensor "); /* 3:注意前后表字段顺序的问题 你查到的数据是bigint的字段,写入kafka的字段也应该一一对应 也是看顺序, 不看字段名, 只看类型是否对应 */ Table table = tEnv.sqlQuery("select id, vc, ts from sensor"); table.executeInsert("sensor_out");
2: 有key的 kafka解决 更新数据的问题
就比如sum()聚合数据 ,就是来一条数据就会更新结果,你把sum的结果写如kafka就要用key-kafka
/* 普通的kafka不能写入update change 数据, 只能写入 append only 数据 但是, 实时数仓中我们确实有需求要写入变化数据, 怎么办? 使用upsert kafka 为什么能够更新? sensor_1, 1000, 10 插入(正常的kafka消息) -> key: sensor_1 sensor_1, 1000, 30 插入(更新后的数据, 正常的kafka消息)-> key: sensor_1 第二条数据是对第一条数据的更新, kakfa如何知道? 靠什么指定key? 靠主键 注意: 如果数据有更新, 写入到kafka的时候, 必须使用 upset-kafka 如果从 upset-kafka写入的topic中读取的时候, 可以使用upsert-kafka, 也可以使用普通的kafka 一般使用普通的kafka */ Configuration conf = new Configuration(); conf.setInteger("rest.port", 2000); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); tEnv.executeSql("create table sensor(" + " id string, " + " ts int, " + " vc int" + ")with(" + " 'connector' = 'kafka', " + " 'topic' = 's1', " + " 'properties.bootstrap.servers' = 'hadoop162:9092', " + " 'properties.group.id' = 'atguigu', " + " 'scan.startup.mode' = 'latest-offset', " + " 'format' = 'csv'" + ")"); tEnv.executeSql("create table sensor_out(" + " id string, " + " vc int ," + " PRIMARY KEY (`id`) NOT ENFORCED " + // 不检测主键的特性: 非空不重复 ")with(" + " 'connector' = 'upsert-kafka', " + " 'topic' = 's3', " + " 'properties.bootstrap.servers' = 'hadoop162:9092', " + " 'key.format' = 'json' ,"+ " 'value.format' = 'json' "+ ")"); Table table = tEnv.sqlQuery("select id, sum(vc) vc_sum from sensor group by id "); table.executeInsert("sensor_out");
10.4 表中写入时间属性
-- 1: 流和sql中都需要开窗,所以都需要写入时间属性 流中非常重要的操作: 开窗, 对窗口内的元素进行处理 sql中也要开窗, 也是要聚合的, 其实可以把窗口当成一个普通的聚合字段来理解 -- 2:统计不同的传感器的水位和 select id, sum(vc) from sensor group by id 每隔5s统计不同的传感器的水位和,对5秒窗口来groupby select w, wid, sum(vc) from sensor group by id, w -- 3:在sql中只支持基于时间的窗口 事件时间 和处理时间 -- 4:在sql中定义窗口的时候, 需要时间, 其实就是一个时间字段(时间属性) -- 5:如何在flink-sql中添加时间属性
10.4.1 添加处理时间
注意: + 一定要代有一个空格才可以连接
Configuration conf = new Configuration(); conf.setInteger("rest.port", 2000); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1); DataStreamSource<WaterSensor> stream = env.fromElements( new WaterSensor("sensor_1", 1000L, 10), new WaterSensor("sensor_2", 2000L, 20), new WaterSensor("sensor_1", 3000L, 30), new WaterSensor("sensor_1", 4000L, 40), new WaterSensor("sensor_2", 6000L, 50), new WaterSensor("sensor_1", 7000L, 60), new WaterSensor("sensor_1", 8000L, 80) ); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); // 1. 把流转成表的时候添加处理时间属性 proctime() // 添加字段: 添加处理时间属性 Table table = tEnv.fromDataStream(stream, $("id"), $("ts"), $("vc"), $("pt").proctime()); table.printSchema(); table.execute().print(); // 2. 在ddl中添加处理时间属性 proctime() tEnv.executeSql("create table sensor(" + " id string, " + " ts bigint, " + " vc int, " + " pt as proctime()" + ")with(" + " 'connector' = 'filesystem', " + " 'path' = 'input/sensor.txt', " + " 'format' = 'csv' " + ")"); tEnv.sqlQuery("select * from sensor").execute().print();
10.4.2 添加事件时间
事件时间一定是和水印绑定的
1: 把流转成表的时候添加事件时间属性 rowtime()
Configuration conf = new Configuration(); conf.setInteger("rest.port", 2000); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1); // 流转成表的 时候添加时间时间, 则流中一定要添加水印 DataStream<WaterSensor> stream = env .fromElements( new WaterSensor("sensor_1", 1000L, 10), new WaterSensor("sensor_2", 2000L, 20), new WaterSensor("sensor_1", 3000L, 30), new WaterSensor("sensor_1", 4000L, 40), new WaterSensor("sensor_2", 6000L, 50), new WaterSensor("sensor_1", 7000L, 60), new WaterSensor("sensor_1", 8000L, 80) ) .assignTimestampsAndWatermarks( WatermarkStrategy .<WaterSensor>forMonotonousTimestamps() .withTimestampAssigner((ws, ts) -> ws.getTs()) ); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); // 1. 添加一个字段作为事件时间 rowtime会去找水印的getTs字段数据 //Table table = tEnv.fromDataStream(stream, $("id"), $("ts"), $("vc"), $("et").rowtime()); //// 2. 把现有字段直接变成事件时间 Table table = tEnv.fromDataStream(stream, $("id"), $("ts").rowtime(), $("vc")); table.printSchema();//打印字段信息显示rowtime table.execute().print();
2: 在ddl中添加处理时间属性 rowtime()
Configuration conf = new Configuration(); conf.setInteger("rest.port", 2000); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); // timestamp(3) 时间必须是这个类型 // bigint -> timestamp(3) tEnv.executeSql("create table sensor(" + " id string, " + " ts bigint, " + " vc int, " + // 把一个数字转成一个时间戳类型: 参数1: 数字(毫秒或秒) 参数: 精度, 前面是s, 则填0, 是ms, 则填3 " et as TO_TIMESTAMP_LTZ(ts, 3), " + // 添加一个时间戳字段, 能否直接认为他就是事件时间? 必能, 必须添加水印 减去3最大乱序程度 " watermark for et as et - interval '3' second " + // ")with(" + " 'connector' = 'filesystem', " + " 'path' = 'input/sensor.txt', " + " 'format' = 'csv' " + ")"); Table table = tEnv.sqlQuery("select * from sensor"); table.printSchema(); table.execute().print();
==10.5 定义窗口==
10.5.1 group TableAPI
Configuration conf = new Configuration(); conf.setInteger("rest.port", 2000); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1); // 流转成表的 时候添加时间时间, 则流中一定要添加水印 DataStream<WaterSensor> stream = env .fromElements( new WaterSensor("sensor_1", 1000L, 10), new WaterSensor("sensor_2", 2000L, 20), new WaterSensor("sensor_1", 3000L, 30), new WaterSensor("sensor_1", 4000L, 40), new WaterSensor("sensor_2", 6000L, 50), new WaterSensor("sensor_1", 7000L, 60), new WaterSensor("sensor_1", 8000L, 80) ) .assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forMonotonousTimestamps() .withTimestampAssigner((ws,ts) -> ws.getTs())); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); // Table table = tEnv.fromDataStream(stream, $("id"), $("ts").rowtime(), $("vc")); Table table = tEnv.fromDataStream(stream, $("id"), $("ts"), $("vc"), $("et").rowtime()); // 1: 定义一个5s的滚动窗口 GroupWindow win1 = Tumble.over(lit(5).second()).on($("et")).as("w"); // 2: 定义一个长度为5s, 滑动步长为2s的滑动窗口 GroupWindow win2 = Slide.over(lit(5).second()).every(lit(2).seconds()).on($("et")).as("w"); // 3: 定义一个3秒的会话窗口 GroupWindow win3 = Session.withGap(lit(3).second()).on($("et")).as("w"); table .window(win3) .groupBy($("id"),$("w"))// 窗口名字可以作为一个普通的分组字段来使用 .select($("id"),$("w").start(),$("w").end(),$("vc").sum().as("vc_sum")) .execute() .print();
10.5.2 group API
分别定义滚动,滑动会话窗口
Configuration conf = new Configuration(); conf.setInteger("rest.port", 2000); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); tEnv.executeSql("create table sensor(" + " id string, " + " ts bigint, " + " vc int, " + " et as TO_TIMESTAMP_LTZ(ts,0), " +// 添加一个时间戳字段, 能否直接认为他就是事件时间? 必能, 必须添加水印 " watermark for et as et - interval '3' second " + // 添加水印 ")with(" + " 'connector' = 'filesystem', " + " 'path' = 'input/sensor.txt', " + " 'format' = 'csv' " + ")"); // 1: 定义一个5s的滚动窗口 tEnv.sqlQuery("select " + " id, " + " tumble_start(et, interval '5' second) stt," + "tumble_end(et, interval '5' second) edt, " + " sum(vc) vc_sum " + "from sensor " + "group by id ,tumble(et, interval '5' second)") .execute() .print(); // 2: 定义一个长度为5s, 滑动步长为2s的滑动窗口 tEnv.sqlQuery("select " + " id, " + " hop_start(et, interval '2' second, interval '5' second) stt , " + " hop_end(et, interval '2' second, interval '5' second) edt , " + " sum(vc) vc_sum " + "from sensor " + "group by id ,hop(et, interval '2' second, interval '5' second)") .execute() .print(); // 3: 定义一个3秒的会话窗口 tEnv.sqlQuery("select " + " id, " + " session_start(et, interval '3' second) stt, " + " session_end(et, interval '3' second) edt, " + " sum(vc) vc_sum " + "from sensor " + "group by id, session(et, interval '2' second)") .execute() .print();
10.5.3 TVF窗口
1) tvf新的窗口,不支持tableAPI
2) 只可以定义: 滚动窗口 滑动窗口 , 还有一个累计窗口
3) 累计窗口可以解决; 窗口是一个小时, 累计是24个小时的计算 不需要用状态了
Configuration conf = new Configuration(); conf.setInteger("rest.port", 2000); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); tEnv.executeSql("create table sensor(" + " id string, " + " ts bigint, " + " vc int, " + " et as TO_TIMESTAMP_LTZ(ts, 0), " + " watermark for et as et - interval '3' second " + // 添加水印 ")with(" + " 'connector' = 'filesystem', " + " 'path' = 'input/sensor.txt', " + " 'format' = 'csv' " + ")"); // 1: 滚动5秒窗口 // tEnv // .sqlQuery("select" + // " id, window_start, window_end, " + // " sum(vc) " + // " from table(tumble(table sensor, descriptor(et),interval '5' second) )" + // " group by id ,window_start, window_end") // 分组的时候: window_start, window_end 至少添加一个 // .execute() // .print(); // 2: 定义一个长度为5s, 滑动步长为2s的滑动窗口 tEnv .sqlQuery("select" + " id, window_start, window_end, " + " sum(vc) " + // tvf 的滑动窗口: 限制窗口长度必须是滑动步长的整数倍 " from table(hop(table sensor, descriptor(et),interval '2' second, interval '6' second))" + " group by id ,window_start, window_end") // 分组的时候: window_start, window_end 至少添加一个 .execute() .print(); //3:没有会话窗口
10.5.4 TVF累计窗口
Configuration conf = new Configuration(); conf.setInteger("rest.port", 2000); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); tEnv.executeSql("create table sensor(" + " id string, " + " ts bigint, " + " vc int, " + " et as TO_TIMESTAMP_LTZ(ts, 0), " + " watermark for et as et - interval '3' second " + // 添加水印 ")with(" + " 'connector' = 'filesystem', " + " 'path' = 'input/sensor.txt', " + " 'format' = 'csv' " + ")"); //4: 累计窗口,可以解决每一个小时,算当天的数据,就不用之前的状态去求了 tEnv .sqlQuery("select" + " id, window_start, window_end, " + " sum(vc) " + // 定义了一个累积窗口:1. 窗口的步长(step): 窗口长度增长的单位 ; 2. 窗口的最大长度 " from table( CUMULATE(table sensor, descriptor(et),interval '2' second, interval '8' second))" + " group by id ,window_start, window_end") .execute() .print();
10.5.5 TVF分组集代替union
/* 1: group by window_start, window_end, grouping sets( (a, b, c), (a,b), (a), () ) // roll up 上钻 等价于: group by window_start, window_end,rollup(a,b,c) 2: 多维立方体: 维度的各种组合全部统计一遍 group by window_start, window_end, grouping sets( (a, b, c), (a,b), (a, c), (b, c) , (a), (b), (c), () ) 等价于: group by window_start, window_end,cube(a,b,c) */ Configuration conf = new Configuration(); conf.setInteger("rest.port", 2000); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); tEnv.executeSql("create table sensor(" + " id string, " + " ts bigint, " + " vc int, " + " et as TO_TIMESTAMP_LTZ(ts, 0), " +// 添加一个时间戳字段, 能否直接认为他就是事件时间? 必能, 必须添加水印 " watermark for et as et - interval '3' second " + // 添加水印 ")with(" + " 'connector' = 'filesystem', " + " 'path' = 'input/sensor.txt', " + " 'format' = 'csv' " + ")"); //:两个粒度的结果都需要 // select .. from table(...) group by id, window_start, window_end // select .. from table(...) group window_start, window_end tEnv .sqlQuery("select " + " id, window_start, window_end, " + " sum(vc) vc_sum " + "from table( tumble( table sensor, descriptor(et), interval '5' second ))" + "group by id, window_start, window_end " + "union " + "select " + " '', window_start, window_end, " + " sum(vc) vc_sum " + "from table( tumble( table sensor, descriptor(et), interval '5' second ))" + "group by window_start, window_end ") .execute() .print(); // 5:分组集用来替换前面的写法 // select .. from table(...) group window_start, window_end tEnv .sqlQuery("select " + " id, window_start, window_end, " + " sum(vc) vc_sum " + "from table( tumble( table sensor, descriptor(et), interval '5' second ))" + //"group by window_start, window_end , grouping sets ( (id),() ) ") //"group by window_start, window_end, rollup(id)" ) "group by window_start, window_end, cube(id)" ) .execute() .print();
10.5.6 Over窗口 TableAPI
-- 1: group window: 分组 tvf: 替代 group window 支持分组集 grouping sets -- 2: over 窗口 非常大的用处: topN select *, sum(vc) over( partition by id order by et rows between unbounded preceding and current row ) from sensor -- 3: over窗口的正交性: 默认就是按照时间来的 是按照行ROW , 还是按照时间来划分窗口 RANG
代码实现
Configuration conf = new Configuration(); conf.setInteger("rest.port", 2000); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1); DataStream<WaterSensor> stream = env .fromElements( new WaterSensor("sensor_1", 1000L, 10), new WaterSensor("sensor_2", 2000L, 20), new WaterSensor("sensor_1", 3002L, 30), new WaterSensor("sensor_1", 3002L, 40), new WaterSensor("sensor_2", 6000L, 50), new WaterSensor("sensor_1", 8000L, 80) ) .assignTimestampsAndWatermarks( WatermarkStrategy .<WaterSensor>forMonotonousTimestamps() .withTimestampAssigner((ws, ts) -> ws.getTs()) ); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); Table table = tEnv.fromDataStream(stream, $("id"), $("ts"), $("vc"), $("et").rowtime()); // 在table api中使用over窗口 /* select *, sum(vc) over( partition by id order by et between rows unbounded preceding and current row ) from sensor */ //1 :基于row的写法,一行看到谁来就计算谁 第一行到当前行 OverWindow win1 = Over.partitionBy($("id")).orderBy($("et")).preceding(UNBOUNDED_ROW).following(CURRENT_ROW).as("w"); // 计算上一行和当前行的水位和 OverWindow win2 = Over.partitionBy($("id")).orderBy($("et")).preceding(rowInterval(1L)).as("w"); // 2: 时间的正交性 就是和时间的到来有关系 同一个时间到来会分到一个窗口 OverWindow win3 =Over.partitionBy($("id")).orderBy($("et")).preceding(UNBOUNDED_RANGE).as("w"); OverWindow win4 =Over.partitionBy($("id")).orderBy($("et")).preceding(lit(2).second()).as("w"); table .window(win4) .select($("id"),$("ts"),$("vc"),$("vc").sum().over($("w")).as("vc_sum")) .execute() .print();
10.5.7 Over窗口 API
Configuration conf = new Configuration(); conf.setInteger("rest.port", 2000); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1); DataStream<WaterSensor> stream = env .fromElements( new WaterSensor("sensor_1", 1000L, 10), new WaterSensor("sensor_2", 2000L, 20), new WaterSensor("sensor_1", 3001L, 30), new WaterSensor("sensor_1", 3001L, 40), new WaterSensor("sensor_2", 6000L, 50), new WaterSensor("sensor_1", 8000L, 80) ) .assignTimestampsAndWatermarks( WatermarkStrategy .<WaterSensor>forMonotonousTimestamps() .withTimestampAssigner((ws, ts) -> ws.getTs()) ); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); Table table = tEnv.fromDataStream(stream, $("id"), $("ts"), $("vc"), $("et").rowtime()); tEnv.createTemporaryView("sensor", table); // 在sql中使用 tEnv .sqlQuery("select " + " *, " + // " sum(vc) over(partition by id order by et rows between unbounded preceding and current row) as vc_sum " + // " sum(vc) over(partition by id order by et rows between 1 preceding and current row) as vc_sum " + // " sum(vc) over(partition by id order by et range between unbounded preceding and current row) as vc_sum " + // " sum(vc) over(partition by id order by et range between interval '2' second preceding and current row) as vc_sum " + " sum(vc) over(partition by id order by et) as vc_sum " + // 默认是时间这种正交性 partition by id order by et range between unbounded preceding and current row "from sensor") .execute() .print(); //1. 同一个sql中的over窗口必须一样,简化版的对比 tEnv .sqlQuery("select " + " *, " + " sum(vc) over w as vc_sum, " + " max(vc) over w as max_sum " + "from sensor " + "window w as(partition by id order by et rows between unbounded preceding and current row)") .execute() .print(); tEnv .sqlQuery("select " + " *, " + " sum(vc) over(partition by id order by et rows between unbounded preceding and current row) as vc_sum, " + " max(vc) over(partition by id order by et rows between unbounded preceding and current row) as max_sum " + "from sensor " ) .execute() .print();
10.6 Catalog
1) Catalog 提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。 2) 数据处理最关键的方面之一是管理元数据。 元数据可以是临时的,例如临时表、或者通过 TableEnvironment 注册的 UDF。 元数据也可以是持久化的,例如 Hive Metastore 中的元数据。Catalog 提供了一个统一的API,用于管理元数据,并使其可以从 Table API 和 SQL 查询语句中来访问。 3) 前面用到Connector其实就是在使用Catalog
"Catalog类型" -- 1:GenericInMemoryCatalog GenericInMemoryCatalog 是基于内存实现的 Catalog,所有元数据只在 session 的生命周期内可用。 -- 2:JdbcCatalog JdbcCatalog 使得用户可以将 Flink 通过 JDBC 协议连接到关系数据库。PostgresCatalog 是当前实现的唯一一种 JDBC Catalog。 -- 3: HiveCatalog HiveCatalog 有两个用途:作为原生 Flink 元数据的持久化存储,以及作为读写现有 Hive 元数据的接口。 Flink 的 Hive 文档 提供了有关设置 HiveCatalog 以及访问现有 Hive 元数据的详细信息。 -- 4:访问多个数据库时候? 就是利用catlog 当表的名字一样的时候,访问hive和mysql,就会给增加前缀来区分hive的数据和mysql的数据
1: 测试 HiveCatalog
1) 导入需要的依赖
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-hive_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <!-- Hive Dependency --> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>3.1.2</version> </dependency>
2) 在hadoop162启动hive元数据
nohup hive --service metastore >/dev/null 2>&1 &
3) 在hive中创建一个person表来测试
4) 连接 Hive
System.setProperty("HADOOP_USER_NAME", "atguigu"); Configuration conf = new Configuration(); conf.setInteger("rest.port", 2000); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); // 测试用的默认的内存似的catalog tEnv.executeSql("create table person( "+ "id String, " + "ts bigint, " + "vc int " + ")with("+ " 'connector' = 'filesystem', " + " 'path' = 'input/sensor.txt', " + " 'format' = 'csv' " + ")"); // 1. 先创建一个 HiveCatalog 假如多个数据库的时候,是为了访问全类名字 // 参数1: catalog的名字 参数2: 连接的时候默认的数据 参数3:hive的配置文件所在的目录 HiveCatalog hive = new HiveCatalog("hive", "gmall", "input/"); // 2. 注册 HiveCatalog,让Flink知道 // tEnv.useCatalog 这是设置库的参数,设置了就不用加前缀 tEnv.registerCatalog("hive",hive); tEnv.useCatalog("hive"); // 设置默认catalog,就不用定义hive.gamll了 tEnv.useDatabase("gmall"); // 3. 通过HiveCatalog来访问hive中表 // 表的绝对路径: catalog.database.table //tEnv.sqlQuery("select * from hive.gmall.person").execute().print(); tEnv.sqlQuery("select * from person").execute().print(); //tEnv.sqlQuery("select * from hive.gmall.person").execute().print(); // 4:访问内存的 tEnv.useCatalog("default_catalog"); tEnv.sqlQuery("select * from default_catalog.default_database.person").execute().print();
10.7 函数(function)
10.7.1 内置函数
Flink Table API和SQL给用户提供了大量的函数用于数据转换.
1) Scalar Functions(标量函数) 1进1出的函数
2) Comparison Functions(比较函数)
3) Logical Functions(逻辑函数)
4) Aggregate Functions(聚合函数)
10.7.2 自定义函数
==自定义函数分类:== 1)标量函数(Scalar functions) 将标量值转换成一个新标量值; 2)表值函数(Table functions) 1出多行的数据 就是炸裂 3)聚合函数(Aggregate functions) 将多行数据里的标量值转换成一个新标量值; 4)表值聚合函数(Table aggregate) 表值和聚合的一起运用 异步表值函数(Async table functions) 是异步查询外部数据系统的特殊函数
1)标量函数(Scalar functions)
自定一个小写变大写的函数: 在table API使用 在SQL中使用
Configuration conf = new Configuration(); conf.setInteger("rest.port", 2000); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1); DataStreamSource<WaterSensor> stream = env.fromElements( new WaterSensor("sensor_1", 1000L, 10), new WaterSensor("sensor_2", 2000L, 20), new WaterSensor("sensor_1", 3000L, 30), new WaterSensor("sensor_1", 4000L, 40), new WaterSensor("sensor_2", 6000L, 50), new WaterSensor("sensor_1", 7000L, 60), new WaterSensor("sensor_1", 8000L, 80) ); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); Table table = tEnv.fromDataStream(stream, $("id"), $("ts"), $("vc")); tEnv.createTemporaryView("sensor", table); // 使用自定义函数有两种用法: // 1. 在table_api中使用 // 1.1 内联用法 /*table // call: 参数1: 自定义函数的类型 参数2: 传给自定义函数的数据 .select($("id"), call(MyUpper.class, $("id")).as("upper_case")) .execute() .print();*/ // 1.2 先注册后使用 /*tEnv.createTemporaryFunction("my_upper", MyUpper.class); table // call: 参数1: 自定义函数的类型 参数2: 传给自定义函数的数据 .select($("id"), call("my_upper", $("id")).as("upper_case")) .execute() .print();*/ // 2. 在sql中使用, 必须先注册函数 tEnv.createTemporaryFunction("my_upper", MyUpper.class); tEnv.sqlQuery("select " + " id, " + " my_upper(id) id_upper " + " from sensor") .execute() .print(); } // 标量函数: 一(一行)进一出 public static class MyUpper extends ScalarFunction { // 内部必须提供一个方法: 对传入的字符串进行处理 // 这个是一个约定好的方法: // 1. 返回值必须有类型也是根据实际情况来定, // 2. 方法名必须是eval // 3. 参数根据实际情况来定 public String eval(String s) { // 当用到一个对象的时候, 最好做下非空处理 if (s == null) { return null; } return s.toUpperCase(); } }
2)表值函数(Table functions)
1: 传入一个字符串, 对字符串按照空格切割, 返回切割后的每个单词和这个单词的长度 "hello hello atguigu" hello 5 hello 5 atguigu 7 有点炸裂的意思 "hello atguigu" hello 5 atguigu 7 2: 如何让原表和制出来的表连在一起? join 原表就是流的数据转成的表 和侧写的表join 在取字段 这个join 是 内连接 左边的数据和右边的数据都有才会连接 leftOuterJoinLateral 是做左连接 左表有的数据也会连接
1) 代码实现内连接
public class Flink02_Function_Table { public static void main(String[] args) { Configuration conf = new Configuration(); conf.setInteger("rest.port", 2000); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1); DataStreamSource<WaterSensor> stream = env.fromElements( new WaterSensor("hello hello atguigu", 1000L, 10), new WaterSensor("hello hello", 2000L, 20), new WaterSensor("atguigu world", 3000L, 30), new WaterSensor("abc a", 4000L, 40), new WaterSensor("aa cc dd ee ff gg", 8000L, 80) ); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); Table table = tEnv.fromDataStream(stream, $("id"), $("ts"), $("vc")); tEnv.createTemporaryView("sensor", table); // 使用自定义函数有两种用法: // 1. 在table_api中使用 // 1.1 内联用法 // 1.2 先注册后使用 tEnv.createTemporaryFunction("my_split", MySplit.class); /*table // 内连接 .joinLateral(call("my_split", $("id"))) .leftOuterJoinLateral(call("my_split", $("id"))) .select($("id"), $("word"), $("len")) .execute() .print();*/ // 2. 在sql中使用, 必须先注册函数 //内连接1: select .. from a join b on a.id=b.id //内连接2: select .. from a, b where a.id=b.id /*tEnv.sqlQuery("select " + " id, word, len " + "from sensor " + "join lateral table(my_split(id)) on true") .execute() .print();*/ /*tEnv.sqlQuery("select " + " id, word, len " + "from sensor " + ", lateral table(my_split(id))") .execute() .print();*/ tEnv.sqlQuery("select " + " id, a, b " + "from sensor " + ", lateral table(my_split(id)) as t(a, b)") // t是临时表的表名 a和b是按照顺序来的列名 .execute() .print(); /*tEnv.sqlQuery("select " + " id, word, len " + "from sensor " + "left join lateral table(my_split(id)) on true") .execute() .print();*/ } @FunctionHint(output = @DataTypeHint("row<word string, len int>")) public static class MySplit extends TableFunction<Row> { // 约定方法 // 返回值必须是void // 方法名必须是 eval // 参数根据实际情况来定 public void eval(String s){ if (s != null) { if (s.contains("aa")) { return; } for (String word : s.split(" ")) { // 这个方法执行几次, 结果就是几行 // 怎么告诉flink 每行有两列 collect(Row.of(word, word.length())); } } } } }
2) 实现 left join连接
tEnv.sqlQuery("select " + " id, word, len " + "from sensor " + "left join lateral table(my_split(id)) on true") .execute() .print(); // 如果这里的泛型使用的是POJO形式, 则不需要添加注解来说明每列的类型 public static class MySplit extends TableFunction<WordLen> { // 约定方法 // 返回值必须是void // 方法名必须是 eval // 参数根据实际情况来定 public void eval(String s){ if (s != null) { if (s.contains("aa")) { return; } for (String word : s.split(" ")) { // 这个方法执行几次, 结果就是几行 // 怎么告诉flink 每行有两列 collect(new WordLen(word, word.length())); } } } } @Data @NoArgsConstructor @AllArgsConstructor public static class WordLen{ private String word; private Integer len; }
3)聚合函数 Aggregate
求一个水位的平均值
public static void main(String[] args) { Configuration conf = new Configuration(); conf.setInteger("rest.port", 2000); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1); DataStreamSource<WaterSensor> stream = env.fromElements( new WaterSensor("s1", 1000L, 10), new WaterSensor("s1", 2000L, 20), new WaterSensor("s1", 3000L, 30), new WaterSensor("s1", 4000L, 40), new WaterSensor("s2", 8000L, 80) ); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); Table table = tEnv.fromDataStream(stream, $("id"), $("ts"), $("vc")); tEnv.createTemporaryView("sensor", table); // 使用自定义函数有两种用法: // 1. 在table_api中使用 // 1.1 内联用法 // 1.2 先注册后使用 tEnv.createTemporaryFunction("my_avg", MyAvg.class); /*table .groupBy($("id")) .select($("id"), call("my_avg", $("vc"))) .execute() .print();*/ // 2. 在sql中使用, 必须先注册函数 tEnv.sqlQuery("select " + "id, my_avg(vc) `avg` " + "from sensor " + "group by id ") .execute() .print(); } public static class MyAvg extends AggregateFunction<Double, Avg> { // 初始化一个累加器 @Override public Avg createAccumulator() { return new Avg(); } // 就是返回最终的聚合结果 @Override public Double getValue(Avg acc) { return acc.sum * 1.0 / acc.count; } // 累加过程的方法也是约定 /* 返回值必须是void 方法名必须是: accumulate 参数1: 必须是累加器, 把传入的参数用累加器进行累加 参数2: 根据实际情况来定: 就传入的数据 call "vc"的数据传进去了 */ public void accumulate(Avg acc, Integer vc){ acc.sum += vc; acc.count++; } } public static class Avg { public Integer sum = 0; public Long count = 0L; }
4)表值聚合函数
每来一个水位, 输出每个传感器的第一和第二水位 new WaterSensor("s1", 1000L, 10), 第一名 10 new WaterSensor("s1", 2000L, 20), 第一名 20 第二名 10 new WaterSensor("s1", 3000L, 30), 第一名 30 第二名 20 new WaterSensor("s1", 4000L, 5), 第一名 30 第二名 20 ..... 每来一条数据, 得到两行两列: 这是什么函数的功能? TableFunction 收集2次 每来一条数据, 计算最大和次大: 这是什么函数的功能? AggregateFunction
代码实现
public class k04_Function_TableAgg { public static void main(String[] args) { Configuration conf = new Configuration(); conf.setInteger("rest.port", 2000); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1); DataStreamSource<WaterSensor> stream = env.fromElements( new WaterSensor("s1", 1000L, 10), new WaterSensor("s1", 2000L, 20), new WaterSensor("s1", 3000L, 30), new WaterSensor("s1", 4000L, 40), new WaterSensor("s2", 8000L, 80) ); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); Table table = tEnv.fromDataStream(stream, $("id"), $("ts"), $("vc")); tEnv.createTemporaryView("sensor", table); // 1: 在table_api中使用 先注册后使用 tEnv.createTemporaryFunction("top2",Top2.class); table .groupBy($("id")) .flatAggregate(call("top2",$("vc"))) .select($("id"),$("rank"),$("rk")) .execute() .print(); //2: 在sql中使用还不支持,目前还不支持 } public static class Top2 extends TableAggregateFunction<result,FirstSecond2>{ // 创建累加器 @Override public FirstSecond2 createAccumulator() { return new FirstSecond2(); } // 累加过程 public void accumulate(FirstSecond2 fs , Integer vc){ //计算出来第一名和第二名 if (vc > fs.first) { fs.second2 = fs.first; //原来的第一名变成第二名 fs.first = vc ; //新来的变成第一名 }else if ( vc > fs.second2){ fs.second2 = vc; } } // 输出多行多列 // 1. 返回值必须是 void // 2. 方法名必须是 emitValue // 参数1: 必须是累加器, 从累加器中获取最终的结果 // 参数2: 必须是 Collector<T> 泛型必须是结果类型 // 聚合的时候, 需要计算的是第一名和第二名的水位的值, 这个累加器只需要记住两个水位 public void emitValue(FirstSecond2 fs , Collector<result> out){ // 每来一条数据, 调用连词collect方法,意味着输出两行 out.collect(new result("第一名",fs.first)); if (fs.second2 > Integer.MIN_VALUE){ // 当second有值的才向外输出. 如果等于MIN_VALUE表示没有赋值过 out.collect(new result("第二名",fs.second2)); } } } public static class FirstSecond2{ public Integer first = Integer.MIN_VALUE; public Integer second2 = Integer.MIN_VALUE; } @AllArgsConstructor public static class result{ public String rank; public Integer rk; } }
==10.8 实战求 topN==
统计最近 1hour的热门商品 top3, 并把统计的结果写入到mysql中
如果需要求1天累计的数据化 用TVF累计窗口
public static void main(String[] args) { Configuration conf = new Configuration(); conf.setInteger("rest.port", 2000); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); // 1. 创建动态表与数据源(文件)关联, 定义时间属性, 后期用到了窗口 tEnv.executeSql(" create table ub( " + " user_id bigint, " + " item_id bigint, " + " category_id int, " + " behavior string, " + " ts bigint," + " et as to_timestamp_ltz(ts,0), " + " watermark for et as et - interval '3' second " + " )with( " + " 'connector' = 'filesystem', " + " 'path' = 'input/UserBehavior.csv', " + " 'format' = 'csv' " + ")"); // 2. 过滤出点击数据(pv), 统计每个商品在每个窗口内的点击量 tvf聚合 (流中的第一次keyBy) Table t1 = tEnv.sqlQuery("select " + " window_start, window_end, item_id, " + " count(*) ct " + " from table ( tumble(table ub, descriptor(et), interval '1' hour) ) " + " where behavior = 'pv' " + " group by window_start, window_end, item_id"); tEnv.createTemporaryView("t1", t1); // 3. 使用over窗口,按照窗口(窗口结束时间)分组 按照点击量进行降序排序, 取名次 (流中的第二次keyBy) // rank/dense_rank/row_number(仅支持) Table t2 = tEnv.sqlQuery("select " + " window_end, item_id, ct," + " row_number()over( partition by window_end order by ct desc) rk " + " from t1 "); tEnv.createTemporaryView("t2",t2); // 4. 过滤出 top3 (where rn <= 3) Table t3 = tEnv.sqlQuery("select " + " window_end, item_id, ct," + " rk " + " from t2 " + " where rk <=3 "); // 5. 写出到mysql中 // 5.1 建一个动态表与mysql关联, 用jdbc连接器 tEnv.executeSql("CREATE TABLE hot_item ( " + " w_end timestamp, " + " item_id bigint, " + " item_count bigint, " + " rk bigint, " + " PRIMARY KEY (w_end, rk) NOT ENFORCED " + ") WITH ( " + " 'connector' = 'jdbc', " + " 'url' = 'jdbc:mysql://hadoop162:3306/flink_sql?useSSL=false', " + " 'table-name' = 'hot_item', " + " 'username' = 'root', " + " 'password' = 'aaaaaa' " + ")"); t3.executeInsert("hot_item");
第11章 一些补充知识和总结
11.1 双流join
11.1.1 Window Join
窗口join会join具有相同的key并且处于同一个窗口中的两个流的元素.
注意:
1: 所有的窗口join都是 inner join, 意味着a流中的元素如果在b流中没有对应的, 则a流中这个元素就不会处理(就是忽略掉了)
2: join成功后的元素的会以所在窗口的最大时间作为其时间戳. 例如窗口[5,10), 则元素会以9作为自己的时间戳
3: 只有窗口关闭的时候才会出现数据
4: return first + "<>" + second 两个流的数据到时候可以取的
5: 滚动 会话 滑动 都有
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<WaterSensor> s1 = env .socketTextStream("hadoop162", 9999) .map(value -> { String[] datas = value.split(","); return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2])); }) .assignTimestampsAndWatermarks (WatermarkStrategy.<WaterSensor>forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() { @Override public long extractTimestamp(WaterSensor element, long recordTimestamp) { return element.getTs() * 1000; } })); SingleOutputStreamOperator<WaterSensor> s2 = env .socketTextStream("hadoop162", 8888) // 在socket终端只输入毫秒级别的时间戳 .map(value -> { String[] datas = value.split(","); return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2])); }) .assignTimestampsAndWatermarks( WatermarkStrategy .<WaterSensor>forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() { @Override public long extractTimestamp(WaterSensor element, long recordTimestamp) { return element.getTs() * 1000; } }) ); s1 .join(s2) // 相当于key: 两个进行join的时候, 前提是key必须相等.都会多面join的 .where(WaterSensor::getId) // 第一个流的keyBy .equalTo(WaterSensor::getId) // 第二个流的keyBy .window(TumblingEventTimeWindows.of(Time.seconds(5))) .apply(new JoinFunction<WaterSensor, WaterSensor, String>() { @Override public String join(WaterSensor first, WaterSensor second) throws Exception { return first + "<>" + second; } }).print(); env.execute(); }
11.1.2 Interval Join
1) 如下图: 橙色的流去join绿色的流.
范围是由橙色流的event-time + lower bound和event-time + upper bound来决定的.
2) Interval Join只支持 event-time
3) 必须是keyBy之后的流才可以interval join
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(new Configuration()); env.setParallelism(1); KeyedStream<WaterSensor, String> s1 = env .socketTextStream("hadoop162", 8888) // 在socket终端只输入毫秒级别的时间戳 .map(value -> { String[] datas = value.split(","); return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2])); }) .assignTimestampsAndWatermarks( WatermarkStrategy .<WaterSensor>forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() { @Override public long extractTimestamp(WaterSensor element, long recordTimestamp) { return element.getTs() * 1000; } }) ) .keyBy(WaterSensor::getId); KeyedStream<WaterSensor, String> s2 = env .socketTextStream("hadoop162", 9999) // 在socket终端只输入毫秒级别的时间戳 .map(value -> { String[] datas = value.split(","); return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2])); }) .assignTimestampsAndWatermarks( WatermarkStrategy .<WaterSensor>forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() { @Override public long extractTimestamp(WaterSensor element, long recordTimestamp) { return element.getTs() * 1000; } }) ) .keyBy(WaterSensor::getId); s1 .intervalJoin(s2) .between(Time.seconds(-5),Time.seconds(5)) .process(new ProcessJoinFunction<WaterSensor, WaterSensor, String>() { @Override public void processElement(WaterSensor left, WaterSensor right, Context ctx, Collector<String> out) throws Exception { out.collect(left + "<>" + right); } }) .print(); env.execute();
11.2 布隆过滤器 去重
"去重的方法" -- 1: 借助redis的Set 1.需要频繁连接Redis 2.如果数据量过大, 对redis的内存也是一种压力 -- 2: 使用Flink的MapState 1.如果数据量过大, 状态后端最好选择 RocksDBStateBackend -- 3: 使用布隆过滤器 布隆过滤器可以大大减少存储的数据的数据量 -- 4:为什么需要布隆过滤器 1布隆过滤器的原理是,当一个元素被加入集合时,通过K个散列函数将这个元素映射成一个位数组中的K个点,把它们置为1。检索时,我们只要看看这些点是不是都是1就(大约)知道集合中有没有它了:如果这些点有任何一个0,则被检元素一定不在;如果都是1,则被检元素很可能在。这就是布隆过滤器的基本思想。
1) 布隆过滤器的原理是,当一个元素被加入集合时,通过K个散列函数将这个元素映射成一个位数组中的K个点,把它们置为1。检索时,我们只要看看这些点是不是都是1就(大约)知道集合中有没有它了:如果这些点有任何一个0,则被检元素一定不在;如果都是1,则被检元素很可能在。这就是布隆过滤器的基本思想。
2) BF是由一个长度为m比特的位数组(bit array)与k个哈希函数(hash function)组成的数据结构。位数组均初始化为0,所有哈希函数都可以分别把输入数据尽量均匀地散列。
3) 当要插入一个元素时,将其数据分别输入k个哈希函数,产生k个哈希值。以哈希值作为位数组中的下标,将所有k个对应的比特置为1。
4) 当要查询(即判断是否存在)一个元素时,同样将其数据输入哈希函数,然后检查对应的k个比特。如果有任意一个比特为0,表明该元素一定不在集合中。如果所有比特均为1,表明该集合有(较大的)可能性在集合中。为什么不是一定在集合中呢?因为一个比特被置为1有可能会受到其他元素的影响(hash碰撞),这就是所谓“假阳性”(false positive)。相对地,“假阴性”(false negative)在BF中是绝不会出现的。
下图示出一个m=18, k=3的BF示例。集合中的x、y、z三个元素通过3个不同的哈希函数散列到位数组中。当查询元素w时,因为有一个比特为0,因此w不在该集合中。
-- 优点 1.不需要存储数据本身,只用比特表示,因此空间占用相对于传统方式有巨大的优势,并且能够保密数据; 2.时间效率也较高,插入和查询的时间复杂度均为, 所以他的时间复杂度实际是 3.哈希函数之间相互独立,可以在硬件指令层面并行计算。 -- 缺点 1.存在假阳性的概率,不适用于任何要求100%准确率的情境; 只能插入和查询元素,不能删除元素,这与产生假阳性的原因是相同的。我们可以简单地想到通过计数(即将一个比特扩展为计数值)来记录元素数,但仍然无法保证删除的元素一定在集合中
对于给定的m和n,使得假阳性率(误判率)最小的k通过如下公式定义:
-
比特数组m长度越大, p越小, 表示假阳性率越低
-
已插入的元素个数n越大, p越大, 表示假阳性率越大
代码实现布隆 : uv的去重 去重用户重复的访问需求
布隆过滤器是存在了数据, 在存的时候就不会存了,会确认已经存过了
public class k03_High_Project_UV { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.setInteger("rest.port", 2000); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1); env.setRuntimeMode(RuntimeExecutionMode.BATCH); env .readTextFile("input/UserBehavior.csv") .map(new MapFunction<String,UserBehavior>() { @Override public UserBehavior map(String value) throws Exception { String[] data = value.split(","); return new UserBehavior( Long.valueOf(data[0]), Long.valueOf(data[1]), Integer.valueOf(data[2]), data[3], Long.parseLong(data[4]) * 1000); // 把s变成ms } }) .assignTimestampsAndWatermarks( WatermarkStrategy .<UserBehavior>forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner((ub, ts) -> ub.getTimestamp()) ) .filter(ub -> "pv".equals(ub.getBehavior())) // 如果用的windowAll, 你可以理解成只有一个key, 所以窗口处理函数中是可以使用键控状态的 .windowAll(TumblingEventTimeWindows.of(Time.hours(1))) .process(new ProcessAllWindowFunction<UserBehavior, String, TimeWindow>() { @Override public void process(Context context, Iterable<UserBehavior> elements, Collector<String> out) throws Exception { // 参数1: 确定布隆过滤器中存储的数据类型: int long string // 参数2: 数据规模. 内部会根据这个参数和错误率来决定维数组的长度和hash算法的个数 // 参数3: 希望的最大错误率 BloomFilter<Long> longBloomFilter = BloomFilter.create(Funnels.longFunnel(), 1000 * 100, 0.001); long count = 0; for (UserBehavior els : elements) { if (longBloomFilter.put(els.getUserId())) { //返回值是true表示这次存储成功 count++; } } out.collect(AtguiguUtil.toDatTime(context.window().getStart()) + " " + count); } }) .print(); env.execute(); }
11.3 总结
-- 1: 使用sum聚合算子 ,一定要keyby才可以 -- 2: process 里面的processElement方法就是写逻辑的 需求的逻辑怎么写就是在这里 --3: keyBy为了保证数据是在一起的,这样就算相同的key不在同一个并行度也是在一起的 --4: process new KeyedProcessFunction onTimer 是重写定时器的方法 --5: 并行度和流多少无关,一个并行度可能会有三条流的数据 ---并行度 三条流 ---并行度 三条流 --6: 键控状态 1) open方法 获取监控状态实例 获取连接池 2) list value map open是来获取状态的 process实现的关系逻辑判断来 3) 其他两个相反 --7: 状态的方法 value 获取当前的状态 get 获取当前的状态 -- 8: RichFuctiond 当然,RichFuction除了提供原来MapFuction的方法之外,还提供open, close, getRuntimeContext 和setRuntimeContext方法,这些功能可用于参数化函数(传递参数),创建和完成本地状态,访问广播变量以及访问运行时信息以及有关迭代中的信息。 下面我们来看看RichFuction中传递参数的例子,以下代码是测试RichFilterFuction的例子,基于DataSet而非DataStream。 -- 9: 流和SQL 读写kafka的区别 流: 需要new 一个kafka 需要有topic的自定义 SQL: 不需要自定义参数topic 而是创建一个表直接写topic 可以看官网标签:WaterSensor,计算,conf,flink,引擎,env,new,public,String From: https://www.cnblogs.com/somospace/p/17435009.html