首页 > 数据库 >【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(6) - 分布式缓存-Distributed Cache

【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(6) - 分布式缓存-Distributed Cache

时间:2023-12-28 13:36:25浏览次数:24  
标签:缓存 flink file apache import 分布式



文章目录

  • Flink 系列文章
  • 一、maven依赖
  • 二、分布式缓存(Distributed Cache)示例
  • 1、介绍
  • 2、maven依赖
  • 3、实现
  • 4、验证
  • 1)、验证步骤
  • 2)、验证



本文介绍了flink关于分布式缓存的使用示例,比较简单。

本文除了maven依赖外,没有其他依赖。

本示例需要hadoop环境可用。

一、maven依赖

为避免篇幅过长,所有基础依赖均在第一篇文章中列出,具体依赖参考文章
【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(1) - File、Socket、console中的依赖

下文中具体需要的依赖将在介绍时添加新增的依赖。

二、分布式缓存(Distributed Cache)示例

1、介绍

Flink提供了一个类似于Hadoop的分布式缓存,以使用户函数的并行实例可以在本地访问文件。此功能可用于共享包含静态外部数据(如字典或机器学习回归模型)的文件。

关于hadoop分布式缓存参考:19、Join操作map side join 和 reduce side join

缓存的工作方式如下:

  • 程序在其ExecutionEnvironment中以特定名称将本地或远程文件系统(如HDFS或S3)的文件或目录注册为缓存文件。
  • 当程序执行时,Flink会自动将文件或目录复制到所有工作程序的本地文件系统。
  • 用户函数可以查找指定名称下的文件或目录,并从工作者的本地文件系统访问它。

官方示例代码

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// register a file from HDFS
env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile");

// register a local executable file (script, executable, ...)
env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true);

// define your program and execute
...
DataSet<String> input = ...;
DataSet<Integer> result = input.map(new MyMapper());
...
env.execute();

访问用户函数(此处为MapFunction)中的缓存文件。函数必须扩展RichFunction类,因为它需要访问RuntimeContext。

// extend a RichFunction to have access to the RuntimeContext
public final class MyMapper extends RichMapFunction<String, Integer> {

    @Override
    public void open(Configuration config) {

      // access cached file via RuntimeContext and DistributedCache
      File myFile = getRuntimeContext().getDistributedCache().getFile("hdfsFile");
      // read the file (or navigate the directory)
      ...
    }

    @Override
    public Integer map(String value) throws Exception {
      // use content of cached file
      ...
    }
}

2、maven依赖

<dependency>
	<groupId>org.apache.hadoop</groupId>
	<artifactId>hadoop-common</artifactId>
	<version>3.1.4</version>
</dependency>
<dependency>
	<groupId>org.apache.hadoop</groupId>
	<artifactId>hadoop-client</artifactId>
	<version>3.1.4</version>
</dependency>
<dependency>
	<groupId>org.apache.hadoop</groupId>
	<artifactId>hadoop-hdfs</artifactId>
	<version>3.1.4</version>
</dependency>

3、实现

import java.io.File;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;

import akka.japi.tuple.Tuple4;

/**
 * @author alanchan
 *
 */
public class DistributedCacheSink {

	public static void main(String[] args) throws Exception {
		// env
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		// Source
		// 注册分布式缓存文件
		env.registerCachedFile("hdfs://server2:8020//flinktest/words/goodsDistributedCacheFile", "goodsDistributedCacheFile");

		// order数据集(id,name,goodsid)
		DataSource<Tuple3<Integer, String, Integer>> ordersDS = env
				.fromCollection(Arrays.asList(Tuple3.of(1, "alanchanchn", 1), Tuple3.of(2, "alanchan", 4), Tuple3.of(3, "alan", 123)));

		// Transformation
		// 将ordersDS(id,name,goodsid)中的数据和分布式缓存中goodsDistributedCacheFile的数据(goodsid,goodsname)关联,得到这样格式的数据: (id,name,goodsid,goodsname)
		MapOperator<Tuple3<Integer, String, Integer>, Tuple4<Integer, String, Integer, String>> result = ordersDS

				// public abstract class RichMapFunction<IN, OUT> extends AbstractRichFunction
				// implements MapFunction<IN, OUT> {
				// @Override
				// public abstract OUT map(IN value) throws Exception;
				// }

				.map(new RichMapFunction<Tuple3<Integer, String, Integer>, Tuple4<Integer, String, Integer, String>>() {

					// 获取缓存数据,并存储,具体以实际应用为准
					Map<Integer, String> goodsMap = new HashMap<>();

					//读取缓存数据,并放入本地数据结构中
					@Override
					public void open(Configuration parameters) throws Exception {
						// 加载分布式缓存文件
						File file = getRuntimeContext().getDistributedCache().getFile("goodsDistributedCacheFile");
						List<String> goodsList = FileUtils.readLines(file);
						for (String str : goodsList) {
							String[] arr = str.split(",");
							goodsMap.put(Integer.parseInt(arr[0]), arr[1]);
						}
					}

					//关联数据,并输出需要的数据结构
					@Override
					public Tuple4<Integer, String, Integer, String> map(Tuple3<Integer, String, Integer> value) throws Exception {
						// 使用分布式缓存文件中的数据
						// 返回(id,name,goodsid,goodsname)
						return new Tuple4(value.f0, value.f1, value.f2, goodsMap.get(value.f2));
					}
				});

		// Sink
		result.print();

	}

}

4、验证

1)、验证步骤

1、准备分布式文件及其内容,并上传至hdfs中
2、运行程序,查看输出

2)、验证

1、缓存文件内容

【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(6) - 分布式缓存-Distributed Cache_kafka


2、上传至hdfs

【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(6) - 分布式缓存-Distributed Cache_flink_02


3、运行程序,查看结果

【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(6) - 分布式缓存-Distributed Cache_flink_03

以上,本文介绍了flink关于分布式缓存的使用示例,比较简单。

标签:缓存,flink,file,apache,import,分布式
From: https://blog.51cto.com/alanchan2win/9013166

相关文章

  • 【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、
    文章目录Flink系列文章一、maven依赖二、sink到ClickHouse示例1、介绍2、maven依赖3、创建clickhouse表4、验证clickhouseweb页面是否正常5、实现1)、userbean2)、sink实现6、验证1)、nc输入2)、启动应用程序3)、观察应用程序控制台输出4)、查看clickhouse表中的数据本文介绍了nc作......
  • 【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、
    文章目录Flink系列文章一、maven依赖二、环境或版本说明三、flinksink到kafka示例1、介绍2、1.13.6版本示例1)、maven依赖2)、实现3)、验证步骤3、1.17.0版本示例1)、maven依赖2)、实现3)、验证步骤本文介绍了flink将数据sink到kafka的示例,并提供了flink的1.13.6和1.17两个版本sink到......
  • Flink实验
     题目:实验八姓名 日期12.8实验环境:(1)Ubuntu18.04(或Ubuntu16.04)。(2)IntelliJIDEA。(3)Flink1.9.1。 实验内容与完成情况:(1)使用IntelliJIDEA工具开发WordCount程序在Linux系统中安装IntelliJIDEA,然后使用IntelliJIDEA工具开发WordCount程序,并打包......
  • 项目应用多级缓存示例
    前不久做的一个项目,需要在前端实时展示硬件设备的数据。设备很多,并且每个设备的数据也很多,总之就是数据很多。同时,设备的刷新频率很快,需要每2秒读取一遍数据。问题来了,我们如何读取数据,并且在前端展示?我的想法是利用多级缓存:1)首先是有个数据采集程序,不停地采集设备的数据。采集到......
  • Impala与Flink开发应用_tyt2023
    本实验基于MRS环境,Impala部分主要介绍基本操作。假定用户开发一个应用程序,用于管理企业中的使用A业务的用户信息,使用Impala客户端实现A业务操作流程。Flink部分主要介绍如何实现Flink与Kafka的连接以满足实时计算场景应用。购买MRS集群选择“自定义购买”区域:华北-北京四......
  • 后端技术:Redis进行数据缓存的两种方法
    在fastapi项目中Redis进行数据缓存的两种不同的方法的demo第一种方法:通过FastAPI应用状态准备文件:models/redis.py为fastapi的数据库模型文件importosimportaioredisfromaioredisimportRedisasyncdefsys_cache()->Redis:"""系统缓存:return:cac......
  • flink 的安装以及fink-cdc 基于多数据源导入的es 的简单使用
    此文档是参照flink-cdc文档(https://ververica.github.io/flink-cdc-connectors/master/content/快速上手/mysql-postgres-tutorial-zh.html)案例 的最佳实践1.下载flinkrelease最新版本1.18.0并解压, https://repo.maven.apache.org/maven2/org/apache/flink/flink-......
  • Flutter 中常用的缓存数据方式
    SharedPreferences:优点:使用简单,轻量级,适用于少量数据的缓存;缺点:不适合存储大型、结构化、复杂的数据;SQLite:优点:可以存储大量、结构化、复杂的数据,支持复杂的数据查询操作;缺点:比较复杂,需要学习SQL和数据库操作;Hive:优点:快速、可扩展,性能较好,适用于存储大量数据;缺点:不支持......
  • OB_执行计划缓存
    执行计划缓存淘汰自动淘汰如果租户内存大小为10G,并且变量设置如下:ob_plan_cache_percentage=10;ob_plan_cache_evict_high_percentage=90;ob_plan_cache_evict_low_percentage=50;则:计划缓存内存上限绝对值=10G*10/100=1G;淘汰计划的高水位线=1G*90/100=......
  • shared_preferences缓存
    封装import'dart:convert';import'package:shared_preferences/shared_preferences.dart';classJSpUtil{JSpUtil._internal();//私有的构造方法,防止外部实例化factoryJSpUtil()=>_instance;//工厂方法,返回JSpUtil唯一实例staticlatefinalJSpU......