首页 > 其他分享 >Flink 热存储维表 使用 Guava Cache 减轻访问压力

Flink 热存储维表 使用 Guava Cache 减轻访问压力

时间:2024-11-27 20:00:48浏览次数:7  
标签:Flink 缓存 String Cache dimValue 维表 Guava

目录

背景

Guava Cache 简介

实现方案

1. 项目依赖

(1) 定义 Cache

(2) 使用 Cache 优化维表查询

3. 应用运行效果

(1) 维表查询逻辑优化

(2) 减少存储压力

Guava Cache 配置优化

总结


背景

在实时计算场景中,Flink 应用中经常需要通过维表进行维度数据的关联。为了保证关联的实时性,常将维表数据存储在 Redis 或数据库中。然而,这种方案可能会因高频访问导致存储压力过大,甚至出现性能瓶颈。

为了解决这个问题,可以在 Flink 中引入本地缓存。本文介绍如何通过 Google 的开源库 Guava Cache,实现对热存储维表访问的优化。


Guava Cache 简介

Guava Cache 是 Google 开发的一个 Java 缓存工具库,具有以下优点:

  1. 支持本地缓存,提升查询性能。
  2. 提供缓存淘汰策略(如基于时间或容量)。
  3. 线程安全,适合高并发场景。
  4. 提供监听机制,可在缓存失效时触发回调。

实现方案

1. 项目依赖

在 Maven 项目中引入 Guava 依赖:

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>31.1-jre</version>
</dependency>

以下是一个典型的实现步骤:

(1) 定义 Cache

使用 Guava 提供的 CacheBuilder 创建一个本地缓存:

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;

import java.util.concurrent.TimeUnit;

public class CacheUtil {
    private static final Cache<String, String> DIM_CACHE = CacheBuilder.newBuilder()
            .maximumSize(10000) // 最大缓存数量
            .expireAfterWrite(10, TimeUnit.MINUTES) // 缓存过期时间
            .build();

    public static String getFromCache(String key) {
        return DIM_CACHE.getIfPresent(key);
    }

    public static void putToCache(String key, String value) {
        DIM_CACHE.put(key, value);
    }
}
(2) 使用 Cache 优化维表查询

在自定义的 RichFlatMapFunction 中使用缓存查询维表数据:

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;

public class DimensionJoinFunction extends RichFlatMapFunction<String, String> {
    @Override
    public void open(Configuration parameters) throws Exception {
        // 初始化连接到 Redis 或其他外部存储
    }

    @Override
    public void flatMap(String value, Collector<String> out) throws Exception {
        String dimKey = extractKey(value);
        
        // 1. 先查询缓存
        String dimValue = CacheUtil.getFromCache(dimKey);
        
        // 2. 如果缓存未命中,再查询外部存储
        if (dimValue == null) {
            dimValue = queryFromExternalStorage(dimKey);
            if (dimValue != null) {
                CacheUtil.putToCache(dimKey, dimValue); // 写入缓存
            }
        }

        // 3. 关联维度数据
        if (dimValue != null) {
            String result = enrichData(value, dimValue);
            out.collect(result);
        }
    }

    private String extractKey(String value) {
        // 从输入数据中提取维表关联键
        return value.split(",")[0];
    }

    private String queryFromExternalStorage(String key) {
        // 模拟查询 Redis 或数据库
        return "mock_value_for_" + key;
    }

    private String enrichData(String input, String dimValue) {
        // 组合维度数据
        return input + "," + dimValue;
    }
}

3. 应用运行效果

(1) 维表查询逻辑优化
  • 缓存命中时:直接返回缓存数据,访问延迟为纳秒级。
  • 缓存未命中时:查询外部存储,并将结果写入缓存,后续重复访问相同的 Key 时不再查询外部存储。
(2) 减少存储压力

Guava Cache 本地缓存避免了大量高频查询直接命中外部存储,降低了 Redis、MySQL 等服务的负载。


Guava Cache 配置优化

  1. 缓存淘汰策略

    • expireAfterWrite:基于写入时间自动过期。
    • expireAfterAccess:基于访问时间自动过期。
    • maximumSize:限制最大缓存数量,避免内存占用过高。
  2. 异步加载机制: 如果需要异步加载数据,可以使用 CacheLoader,在缓存未命中时自动加载:

    Cache<String, String> cache = CacheBuilder.newBuilder()
            .maximumSize(10000)
            .build(new CacheLoader<String, String>() {
                @Override
                public String load(String key) throws Exception {
                    return queryFromExternalStorage(key);
                }
            });
  3. 监控与统计: 使用 Cache.stats() 查看缓存命中率等统计数据,便于优化缓存策略。


总结

通过在 Flink 中引入 Guava Cache,可以显著降低热存储维表的访问压力,提升系统性能。
这种方案适用于维表数据更新频率较低,且查询热点相对集中的场景

标签:Flink,缓存,String,Cache,dimValue,维表,Guava
From: https://blog.csdn.net/m0_70691645/article/details/144093051

相关文章

  • flink学习(8)——窗口函数
    增量聚合函数——指窗口每进入一条数据就计算一次例如:要计算数字之和,进去一个12计算结果为20,再进入一个7——结果为27 reduceaggregate(aggregateFunction)packagecom.bigdata.day04;publicclass_04_agg函数{publicstaticfinalTuple3[]ENGLISH=new......
  • flink学习(7)——window
     概述窗口的长度(大小):决定了要计算最近多长时间的数据窗口的间隔:决定了每隔多久计算一次举例:每隔10min,计算最近24h的热搜词,24小时是长度,每隔10分钟是间隔。窗口的分类1、根据window前是否调用keyBy分为键控窗口和非键控窗口2、根据window中参数的配置分为基于时间......
  • Flink 从入门到实战
    Flink中的批和流批处理的特点是有界、持久、大量,非常适合需要访问全部记录才能完成的计算工作,一般用于离线统计。流处理的特点是无界、实时,无需针对整个数据集执行操作,而是对通过系统传输的每个数据项执行操作,一般用于实时统计。一个无界流可以分解为多个有界流性能F......
  • Flink的安装与使用
    一、概述这几年大数据的飞速发展,出现了很多热门的开源社区,其中著名的有Hadoop、Storm,以及后来的Spark,他们都有着各自专注的应用场景。Spark掀开了内存计算的先河,也以内存为赌注,赢得了内存计算的飞速发展。Spark的火热或多或少的掩盖了其他分布式计算的系统身影。就像Flink,......
  • Flink安装部署
    一、Standalone集群模式安装部署condadeactivate退出base环境Flink支持多种安装模式。local(本地)——本地模式standalone——独立模式,Flink自带集群,开发测试环境使用standaloneHA—独立集群高可用模式,Flink自带集群,开发测试环境使用yarn——计算资源统一由HadoopYARN......
  • [Javascript] Import the Same JavaScript Module Multiple Times with Cache Busting
    WhenattemptingtoloadthesamemoduletwiceinJavaScriptyou'llhitacacheandcodewon'tre-run.Inscenarioswhereyouactuallydowanttohavestateinyourmodules,you'llhavetouseacache-bustingtechniquebypassingaquerypar......
  • 请描述下application cache的更新过程?
    ApplicationCache,或者说是AppCache,是一个已经被废弃的HTML5特性,用于离线存储网页资源。由于其更新机制复杂且容易出错,它已经被ServiceWorkers和CacheAPI取代。尽管如此,如果您仍然需要了解其更新过程,以下是其工作原理:manifest文件检查:浏览器会定期检查manifest......
  • Flink普通API之Source使用全解析
    Flink普通API之Source使用全解析一、引言在Flink的流式计算世界里,Source作为数据的源头起着至关重要的作用。它能够为Flink任务提供数据输入,无论是批处理还是流处理场景,合适的Source选择与使用都能让数据处理流程顺利开启。本文将深入探讨Flink中Source的相关知识,包括预定......
  • Java自定义函数查看OS的File Cache — 从原理到实战
    全文目录:开篇语......
  • 从0-1入门Flink全网最全吐血总结
    Flink是Apache基金会旗下的一个开源大数据处理框架。目前,Flink已经成为各大公司大数据实时处理的发力重点,特别是国内以阿里为代表的一众互联网大厂都在全力投入,为Flink社区贡献了大量源码。如今Flink已被很多人认为是大数据实时处理的方向和未来,许多公司也都在招聘和......