首页 > 数据库 >【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(7) - 广播变量-Broadcast Variable

【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(7) - 广播变量-Broadcast Variable

时间:2023-12-28 13:36:59浏览次数:26  
标签:变量 示例 flink State Broadcast 广播 import



文章目录

  • Flink 系列文章
  • 一、maven依赖
  • 二、广播变量Broadcast Variables示例
  • 1、介绍
  • 2、广播变量示例
  • 3、验证
  • 三、Broadcast State 与 Broadcast Variable 区别



本文简单的介绍了flink中关于广播变量的简单使用示例。

一、maven依赖

为避免篇幅过长,所有基础依赖均在第一篇文章中列出,具体依赖参考文章
【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(1) - File、Socket、console中的依赖

下文中具体需要的依赖将在介绍时添加新增的依赖。

二、广播变量Broadcast Variables示例

1、介绍

可以将数据广播到TaskManager上就可以供TaskManager中的SubTask/task去使用,数据存储到内存中。这样可以减少大量的shuffle操作,而不需要多次传递给集群节点。比如在数据join阶段,可以把其中一个dataSet广播出去,一直加载到taskManager的内存中,可以直接在内存中拿数据,避免了大量的shuffle,导致集群性能下降。

广播变量允许您使数据集可用于操作的所有并行实例,以及操作的常规输入。这对于辅助数据集或依赖数据的参数化非常有用。然后,操作员可以将数据集作为集合进行访问。

  • 广播:通过withBroadcastSet(DataSet,String)按名称注册广播集,以及
  • Access:可通过目标运算符处的getRuntimeContext().getBroadcastVariable(String)进行访问。

图示广播的工作方式

【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(7) - 广播变量-Broadcast Variable_flink hive


官方示例

// 1. The DataSet to be broadcast
DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3);

DataSet<String> data = env.fromElements("a", "b");

data.map(new RichMapFunction<String, String>() {
    @Override
    public void open(Configuration parameters) throws Exception {
      // 3. Access the broadcast DataSet as a Collection
      Collection<Integer> broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName");
    }


    @Override
    public String map(String value) throws Exception {
        ...
    }
}).withBroadcastSet(toBroadcast, "broadcastSetName"); // 2. Broadcast the DataSet

在注册和访问广播数据集时,请确保名称(上一示例中的broadcastSetName)匹配。

由于广播变量的内容保存在每个节点的内存中,因此不应变得太大。对于标量值等更简单的事情,您可以简单地将参数作为函数闭包的一部分,或者使用withParameters(…)方法传入配置。

2、广播变量示例

本示例实现上一个缓存示例一样的内容,不过是使用广播实现的。
该示例比较简单,实现逻辑与分布式缓存基本上一样。

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;

/**
 * @author alanchan
 *
 */
public class TestBroadcastVariablesDemo {

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		// env
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

		// Source
		// student数据集(学号,姓名)
		DataSource<Tuple2<Integer, String>> studentDS = env.fromCollection(Arrays.asList(Tuple2.of(1, "alan"), Tuple2.of(2, "alanchan"), Tuple2.of(3, "alanchanchn")));

		// score数据集(学号,学科,成绩)
		DataSource<Tuple3<Integer, String, Integer>> scoreDS = env.fromCollection(
				Arrays.asList(Tuple3.of(1, "chinese", 50), Tuple3.of(1, "math", 90), Tuple3.of(1, "english", 90), Tuple3.of(2, "math", 70), Tuple3.of(3, "art", 86)));

		// Transformation
		// 将studentDS(学号,姓名)集合广播出去(广播到各个TaskManager内存中)
		// 然后使用scoreDS(学号,学科,成绩)和广播数据studentDS(学号,姓名)进行关联,得到这样格式的数据:(学号,姓名,学科,成绩)
		MapOperator<Tuple3<Integer, String, Integer>, Tuple4<Integer, String, String, Integer>> result = scoreDS
				.map(new RichMapFunction<Tuple3<Integer, String, Integer>, Tuple4<Integer, String, String, Integer>>() {

					Map<Integer, String> studentsMap = new HashMap<>();

					@Override
					public void open(Configuration parameters) throws Exception {
						// 获取广播数据
						List<Tuple2<Integer, String>> studentList = getRuntimeContext().getBroadcastVariable("studentsInfo");
						for (Tuple2<Integer, String> tuple : studentList) {
							studentsMap.put(tuple.f0, tuple.f1);
						}

					}

					@Override
					public Tuple4<Integer, String, String, Integer> map(Tuple3<Integer, String, Integer> value) throws Exception {
						// 使用广播数据
						Integer stuId = value.f0;
						String stuName = studentsMap.getOrDefault(stuId, "");

						return new Tuple4(stuId, stuName, value.f1, value.f2);
					}
				}).withBroadcastSet(studentDS, "studentsInfo");
		
		// 4.Sink
		result.print();

	}

}

3、验证

启动程序,运行程序,控制台输出如下:

【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(7) - 广播变量-Broadcast Variable_flink hive_02

三、Broadcast State 与 Broadcast Variable 区别

关于Broadcast State的介绍,请参考文章:53、Flink 的Broadcast State 模式介绍及示例

Broadcast State 和 Broadcast Variable 都是 Flink 中用于广播数据的机制,但它们之间有一些区别:

  • Broadcast State 是 KeyedStateBackend 的一个实现,它允许将状态数据广播到所有并行任务中。每个并行任务都可以访问相同的状态数据,从而实现状态的共享。Broadcast State 主要用于处理键控的状态,即状态与某个键相关联。
  • Broadcast Variable 是一种简单的广播机制,它可以将任意类型的数据广播到所有并行任务中。每个并行任务都可以访问相同的广播变量值。Broadcast Variable 主要用于处理非键控的数据,即不需要与特定键关联的数据。

总结一下,Broadcast State 和 Broadcast Variable 的主要区别在于:

  • Broadcast State 用于广播键控的状态数据,而 Broadcast Variable 用于广播非键控的数据。
  • Broadcast State 需要与 KeyedStream 一起使用,而 Broadcast Variable 可以与任何类型的 DataStream 一起使用。

以上,本文简单的介绍了flink中关于广播变量的简单使用示例。


标签:变量,示例,flink,State,Broadcast,广播,import
From: https://blog.51cto.com/alanchan2win/9013154

相关文章

  • 【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、
    文章目录Flink系列文章一、maven依赖二、Flinksink介绍三、sink到文件、console示例1、console输出2、sink到文件1)、sinktxt文件到hdfs上2)、sinkcsv文件到本地3)、sinktext文件到hdfs上(writeUsingOutputFormat)四、sink到socket示例(writeToSocket)五、Jdbc/mysql示例1、maven依......
  • 【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、
    文章目录Flink系列文章一、maven依赖二、分布式缓存(DistributedCache)示例1、介绍2、maven依赖3、实现4、验证1)、验证步骤2)、验证本文介绍了flink关于分布式缓存的使用示例,比较简单。本文除了maven依赖外,没有其他依赖。本示例需要hadoop环境可用。一、maven依赖为避免篇幅过长,所......
  • 【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、
    文章目录Flink系列文章一、maven依赖二、sink到ClickHouse示例1、介绍2、maven依赖3、创建clickhouse表4、验证clickhouseweb页面是否正常5、实现1)、userbean2)、sink实现6、验证1)、nc输入2)、启动应用程序3)、观察应用程序控制台输出4)、查看clickhouse表中的数据本文介绍了nc作......
  • 【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、
    文章目录Flink系列文章一、maven依赖二、环境或版本说明三、flinksink到kafka示例1、介绍2、1.13.6版本示例1)、maven依赖2)、实现3)、验证步骤3、1.17.0版本示例1)、maven依赖2)、实现3)、验证步骤本文介绍了flink将数据sink到kafka的示例,并提供了flink的1.13.6和1.17两个版本sink到......
  • Flink实验
     题目:实验八姓名 日期12.8实验环境:(1)Ubuntu18.04(或Ubuntu16.04)。(2)IntelliJIDEA。(3)Flink1.9.1。 实验内容与完成情况:(1)使用IntelliJIDEA工具开发WordCount程序在Linux系统中安装IntelliJIDEA,然后使用IntelliJIDEA工具开发WordCount程序,并打包......
  • Java定义变量容易出错的点
    在Java中,定义的数值有默认的类型(整型为int、浮点型为double),所以在定义变量的时候要注意类型的问题。例如:因为给l的值(10000000000)默认类型是int类型,所以超出了范围;修改:在给的值后面加上L,......
  • Sass变量-------持续更新
    目录变量局部变量全局变量变量默认值多值变量用法一用法二---列表列表函数maps映射有关maps的其它函数map-has-key(map,key)map-keys(map)变量定义一个变量(以$号开头):$name:green使用定义好的变量:color:$name变量也可以用在属性选择器上:#{变量名}例如:$className:main;......
  • Impala与Flink开发应用_tyt2023
    本实验基于MRS环境,Impala部分主要介绍基本操作。假定用户开发一个应用程序,用于管理企业中的使用A业务的用户信息,使用Impala客户端实现A业务操作流程。Flink部分主要介绍如何实现Flink与Kafka的连接以满足实时计算场景应用。购买MRS集群选择“自定义购买”区域:华北-北京四......
  • flink 的安装以及fink-cdc 基于多数据源导入的es 的简单使用
    此文档是参照flink-cdc文档(https://ververica.github.io/flink-cdc-connectors/master/content/快速上手/mysql-postgres-tutorial-zh.html)案例 的最佳实践1.下载flinkrelease最新版本1.18.0并解压, https://repo.maven.apache.org/maven2/org/apache/flink/flink-......
  • jmeter的json提取器多个取值的取法&ForEach控制器对多取值变量进行循环调用
    1、jmeter的json提取器多个取值的取法userId有多个值 $.responseData.datas[*].userId-1代表取所有的值  2、ForEach控制器对多取值变量进行循环调用 --实现多取值变量进行循环调用${userId_matchNr} ---代表存储变量的长度  3、循环控制器--实现id自增 ......