文章目录
- Flink 系列文章
- 一、maven依赖
- 二、广播变量Broadcast Variables示例
- 1、介绍
- 2、广播变量示例
- 3、验证
- 三、Broadcast State 与 Broadcast Variable 区别
本文简单的介绍了flink中关于广播变量的简单使用示例。
一、maven依赖
为避免篇幅过长,所有基础依赖均在第一篇文章中列出,具体依赖参考文章
【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(1) - File、Socket、console中的依赖
下文中具体需要的依赖将在介绍时添加新增的依赖。
二、广播变量Broadcast Variables示例
1、介绍
可以将数据广播到TaskManager上就可以供TaskManager中的SubTask/task去使用,数据存储到内存中。这样可以减少大量的shuffle操作,而不需要多次传递给集群节点。比如在数据join阶段,可以把其中一个dataSet广播出去,一直加载到taskManager的内存中,可以直接在内存中拿数据,避免了大量的shuffle,导致集群性能下降。
广播变量允许您使数据集可用于操作的所有并行实例,以及操作的常规输入。这对于辅助数据集或依赖数据的参数化非常有用。然后,操作员可以将数据集作为集合进行访问。
- 广播:通过withBroadcastSet(DataSet,String)按名称注册广播集,以及
- Access:可通过目标运算符处的getRuntimeContext().getBroadcastVariable(String)进行访问。
图示广播的工作方式
官方示例
// 1. The DataSet to be broadcast
DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3);
DataSet<String> data = env.fromElements("a", "b");
data.map(new RichMapFunction<String, String>() {
@Override
public void open(Configuration parameters) throws Exception {
// 3. Access the broadcast DataSet as a Collection
Collection<Integer> broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName");
}
@Override
public String map(String value) throws Exception {
...
}
}).withBroadcastSet(toBroadcast, "broadcastSetName"); // 2. Broadcast the DataSet
在注册和访问广播数据集时,请确保名称(上一示例中的broadcastSetName)匹配。
由于广播变量的内容保存在每个节点的内存中,因此不应变得太大。对于标量值等更简单的事情,您可以简单地将参数作为函数闭包的一部分,或者使用withParameters(…)方法传入配置。
2、广播变量示例
本示例实现上一个缓存示例一样的内容,不过是使用广播实现的。
该示例比较简单,实现逻辑与分布式缓存基本上一样。
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
/**
* @author alanchan
*
*/
public class TestBroadcastVariablesDemo {
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// env
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Source
// student数据集(学号,姓名)
DataSource<Tuple2<Integer, String>> studentDS = env.fromCollection(Arrays.asList(Tuple2.of(1, "alan"), Tuple2.of(2, "alanchan"), Tuple2.of(3, "alanchanchn")));
// score数据集(学号,学科,成绩)
DataSource<Tuple3<Integer, String, Integer>> scoreDS = env.fromCollection(
Arrays.asList(Tuple3.of(1, "chinese", 50), Tuple3.of(1, "math", 90), Tuple3.of(1, "english", 90), Tuple3.of(2, "math", 70), Tuple3.of(3, "art", 86)));
// Transformation
// 将studentDS(学号,姓名)集合广播出去(广播到各个TaskManager内存中)
// 然后使用scoreDS(学号,学科,成绩)和广播数据studentDS(学号,姓名)进行关联,得到这样格式的数据:(学号,姓名,学科,成绩)
MapOperator<Tuple3<Integer, String, Integer>, Tuple4<Integer, String, String, Integer>> result = scoreDS
.map(new RichMapFunction<Tuple3<Integer, String, Integer>, Tuple4<Integer, String, String, Integer>>() {
Map<Integer, String> studentsMap = new HashMap<>();
@Override
public void open(Configuration parameters) throws Exception {
// 获取广播数据
List<Tuple2<Integer, String>> studentList = getRuntimeContext().getBroadcastVariable("studentsInfo");
for (Tuple2<Integer, String> tuple : studentList) {
studentsMap.put(tuple.f0, tuple.f1);
}
}
@Override
public Tuple4<Integer, String, String, Integer> map(Tuple3<Integer, String, Integer> value) throws Exception {
// 使用广播数据
Integer stuId = value.f0;
String stuName = studentsMap.getOrDefault(stuId, "");
return new Tuple4(stuId, stuName, value.f1, value.f2);
}
}).withBroadcastSet(studentDS, "studentsInfo");
// 4.Sink
result.print();
}
}
3、验证
启动程序,运行程序,控制台输出如下:
三、Broadcast State 与 Broadcast Variable 区别
关于Broadcast State的介绍,请参考文章:53、Flink 的Broadcast State 模式介绍及示例
Broadcast State 和 Broadcast Variable 都是 Flink 中用于广播数据的机制,但它们之间有一些区别:
- Broadcast State 是 KeyedStateBackend 的一个实现,它允许将状态数据广播到所有并行任务中。每个并行任务都可以访问相同的状态数据,从而实现状态的共享。Broadcast State 主要用于处理键控的状态,即状态与某个键相关联。
- Broadcast Variable 是一种简单的广播机制,它可以将任意类型的数据广播到所有并行任务中。每个并行任务都可以访问相同的广播变量值。Broadcast Variable 主要用于处理非键控的数据,即不需要与特定键关联的数据。
总结一下,Broadcast State 和 Broadcast Variable 的主要区别在于:
- Broadcast State 用于广播键控的状态数据,而 Broadcast Variable 用于广播非键控的数据。
- Broadcast State 需要与 KeyedStream 一起使用,而 Broadcast Variable 可以与任何类型的 DataStream 一起使用。
以上,本文简单的介绍了flink中关于广播变量的简单使用示例。