首页 > 其他分享 >flink学习(13)—— 重试机制和维表join

flink学习(13)—— 重试机制和维表join

时间:2024-12-03 23:29:36浏览次数:9  
标签:13 join String flink new 重试 org apache import

重试机制

当任务出现异常的时候,会直接停止任务——解决方式,重试机制

1、设置checkpoint后,会给任务一个重启策略——无限重启

2、可以手动设置任务的重启策略

代码设置

//开启checkpoint后,默认是无限重启,可以设置该值 表示不重启
env.setRestartStrategy(RestartStrategies.noRestart());


//作业失败flink中最多重启3次,每次重启的最小间隔是10s
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));

//2分钟内最多重启3次,每次重启的最小间隔是5秒
env.setRestartStrategy(
    RestartStrategies.failureRateRestart(3,
                                         Time.of(2,TimeUnit.MINUTES),
                                         Time.of(5,TimeUnit.SECONDS))
);

//无限重启
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
    Integer.MAX_VALUE,  // 无限重启次数
    Time.of(10, TimeUnit.SECONDS)  // 每次重启的延迟时间
));

维表join

所谓的维表Join: 进入Flink的数据,需要关联另外一些存储设备的数据,才能计算出来结果

那么存储在外部设备上的表称之为维表,可能存储在mysql也可能存储在hbase 等。

维表一般的特点是变化比较慢。——名词表,维度表。

解决方式

 解决维表join的方式
        方式一:
            可以用一个静态代码块,或者在open方法中对一个集合初始化,用于存放想要相关联的数据。
            缺点:数据不能动态改变了
        方式二:
            在open中初始化连接,在map中每拿到流中的一条数据,就去mysql中查找一次
            缺点:数据可以动态改变,但是去mysql查找的次数太多了
        方式三:
            创建一个缓存区,用于存放数据,若过期则再去mysql中查询数据。
            没有缺点,可以动态获取数据了,也减少了mysql的查询次数(缓冲)
            唯一的是,若是多线程,可能会去mysql查询多次

方式一

package com.bigdata.day06;

import com.mchange.v2.c3p0.ComboPooledDataSource;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.handlers.MapHandler;
import org.apache.commons.dbutils.handlers.MapListHandler;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
 * 直接从mysql中拿出
 * 弊端 只能拿到一次 不能实现动态
 */
public class _03_维表join_01 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "bigdata01:9092");
        properties.setProperty("group.id", "g1");
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("edu",new SimpleStringSchema(),properties);
        DataStreamSource<String> source = env.addSource(consumer);

        source.map(new RichMapFunction<String, String>() {
            ComboPooledDataSource pool = null;
            QueryRunner queryRunner = null;
            List<Map<String, Object>> list = null;
            @Override
            public void open(Configuration parameters) throws Exception {
            // 在open中执行sql
                pool = new ComboPooledDataSource();
                queryRunner = new QueryRunner(pool);
                String sql = "select * from city ";
                list = queryRunner.query(sql, new MapListHandler());

            }

            @Override
            public void close() throws Exception {
                pool.close();
            }

            @Override
            public String map(String line) throws Exception {
                String[] split = line.split(",");
                Object cityName = "未知";
                for (Map<String, Object> map : list) {
                    String cityId = (String)map.get("city_id");
                    if (cityId.equals(split[1])){
                        cityName = map.get("city_name");
                    }
                }

                return line+","+cityName;
            }
        }).print();

        env.execute();
    }
}

方式二

package com.bigdata.day06;

import com.mchange.v2.c3p0.ComboPooledDataSource;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.handlers.MapHandler;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Map;
import java.util.Properties;

/**
 * 每次从kafka中拿到一条数据就从mysql中查一遍
 * 弊端 对mysql的压力加大
 */
public class _03_维表join_02 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "bigdata01:9092");
        properties.setProperty("group.id", "g1");
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("edu",new SimpleStringSchema(),properties);
        DataStreamSource<String> source = env.addSource(consumer);

        source.map(new RichMapFunction<String, String>() {
            ComboPooledDataSource pool = null;
            QueryRunner queryRunner = null;
            @Override
            public void open(Configuration parameters) throws Exception {
                pool = new ComboPooledDataSource();
                queryRunner = new QueryRunner(pool);
            }

            @Override
            public void close() throws Exception {
                pool.close();
            }

            @Override
            public String map(String line) throws Exception {
            // 在处理逻辑中执行sql
                String[] split = line.split(",");
                String sql = "select city_name from city where city_id = ?";
                Map<String, Object> rs = queryRunner.query(sql, new MapHandler(), split[1]);
                String cityName="未知";
                if (rs !=null){
                     cityName = (String) rs.get("city_name");
                }

                return line+","+cityName;
            }
        }).print();

        env.execute();
    }
}

方式三

package com.bigdata.day06;

import com.mchange.v2.c3p0.ComboPooledDataSource;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.handlers.MapHandler;
import org.apache.commons.dbutils.handlers.MapListHandler;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.cache.*;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

/**
 * 最终 非常好的方式
 * 现在内存中查 查不到在去mysql中找
 * 唯一的问题是,假如是多线程情况下,可能会触发多次去mysql中查找的方法
 */
public class _03_维表join_03_cache {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "bigdata01:9092");
        properties.setProperty("group.id", "g1");
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("edu",new SimpleStringSchema(),properties);
        DataStreamSource<String> source = env.addSource(consumer);
        // 记得设置并行度
        env.setParallelism(1);

        source.map(new RichMapFunction<String, String>() {
            ComboPooledDataSource pool = null;
            QueryRunner queryRunner = null;

            // 定义一个Cache
            // 第一个是传入的参数类型 第二个是存放的值的类型
            // 也就是,传入一个参数,根据这个值获取结果,拿的时候通过传入的值 拿存放的值
            LoadingCache<String, String> cache;
            @Override
            public void open(Configuration parameters) throws Exception {
                pool = new ComboPooledDataSource();
                queryRunner = new QueryRunner(pool);

                cache = CacheBuilder.newBuilder()
                        //最多缓存个数,超过了就根据最近最少使用算法来移除缓存 LRU
                        .maximumSize(1000)
                        //在更新后的指定时间后就回收
                        // 不会自动调用,而是当过期后,又用到了过期的key值数据才会触发的。
                        .expireAfterWrite(50, TimeUnit.SECONDS)
                        //指定移除通知
                        .removalListener(new RemovalListener<String, String>() {
                            @Override
                            public void onRemoval(RemovalNotification<String, String> removalNotification) {
                                System.out.println(removalNotification.getKey() + "被移除了,值为:" + removalNotification.getValue());
                            }
                        })
                        .build(//指定加载缓存的逻辑
                                new CacheLoader<String, String>() {
                                    // 假如缓存中没有数据,会触发该方法的执行,并将结果自动保存到缓存中
                                    @Override
                                    public String load(String cityId) throws Exception {

                                        String sql = "select city_name from city where city_id = ? ";
                                        Map<String, Object> rs = queryRunner.query(sql, new MapHandler(), cityId);
                                        String cityName = null;
                                        if (rs!=null){
                                            cityName = (String) rs.get("city_name");
                                        }
                                        System.out.println("进入数据库查询成功,查询的值为"+cityId+"--"+cityName);
                                        return cityName;
                                    }
                                });

            }

            @Override
            public void close() throws Exception {
                pool.close();
            }

            @Override
            public String map(String line) throws Exception {
                String[] arr = line.split(",");
                // 使用这种方式取值
                String cityName = cache.get(arr[1]);
                return line+","+cityName;
            }
        }).print();

        env.execute();
    }
}

标签:13,join,String,flink,new,重试,org,apache,import
From: https://blog.csdn.net/weixin_52642840/article/details/144180361

相关文章

  • 题海拾贝——救济金发放(The Dole Queue, UVa 133)
            Hello大家好!很高兴我们又见面啦!给生活添点passion,开始今天的编程之路!我的博客:<但凡.我的专栏:《题海拾贝》、《编程之路》欢迎点赞,关注!目录1、题目2、分析3、题解(附详细解析)4、拓展 1、题目        n(n个人站成一圈,逆时针编号为1~n。......
  • 多维数组及其应用————13
    1.二维数组如果我们把⼀维数组做为数组的元素,这时候就是⼆维数组,⼆维数组作为数组元素的数组被为三维数组,⼆维数组以上的数组统称为多维数组。1.1二维数组的创建先行后列其实也可以这样理解:把二维数组当成特殊的一维数组,即比如arr1[0]......
  • 25.100ASK_T113-PRO 测试摄像头(型号)
    1.摄像头USB2.0摄像头,支持 UVC协议, 就是V4L2+USB2.0 大概可这样理解吧.这个是2K分辨率.2.8mm焦距.开发板还是 100ASK_T113-PRO V1.2版2.查看摄像头驱动挂载情况这样接好.看看设备有没有挂载上#ls/dev/video*/dev/video0/dev/video1这两个就是USB摄像......
  • 题解:P11362 [NOIP2024] 遗失的赋值
    这里写一个我在考场上差点想出来的、比较另类的做法。若\(\existsc_i=c_j(i\nej),d_i\ned_j\),则答案显然为\(0\)。否则,我们可以将序列\(x\)中的数分为已确定和未确定两类。设\(f_0(i)\)为当\(x_i\)未确定时前\(i-1\)个二元限制的方案数,\(f_1(i)\)为当\(x_i\)确......
  • E86 换根DP CF1324F Maximum White Subtree
    视频链接:E86换根DPCF1324FMaximumWhiteSubtree_哔哩哔哩_bilibili  MaximumWhiteSubtree-洛谷|计算机科学教育新生态//换根DPO(n)#include<bits/stdc++.h>usingnamespacestd;constintN=200005;vector<int>e[N];intn,a[N],f[N];voiddfs(int......
  • 题解:AT_arc139_d [ARC139D] Priority Queue 2
    题面发现我们不好算到最后还剩些什么。考虑计算\(\sum\limits_{i=1}^m\sum\limits_{j=1}^n[s_j\gei]\),容易发现这和原式等价。记\(b_i\)表示\(s\)中不小于\(i\)的数的个数,每次删去第\(x\)小的等价于将所有超过\(n-x+1\)的地方减1,加入\(k\)等价于将\(b_{1,k}\)......
  • 题解:AT_abc138_f [ABC138F] Coincidence
    https://www.luogu.com.cn/problem/AT_abc138_f对于\(x\ley\):若\(2x\ley\),则\(y-x>y\bmodx\)。若\(2x>y\),则\(y-x=y\bmodx\)。有\(x\oplusy\gey-x\)。当\(2x\ley\)时,不可能存在\(y\bmodx=x\oplusy\)了。现......
  • web期末大作业:基于html+css+js制作深圳大学网站(13页) 学校班级网页制作模板 学生静态
    ......
  • 1303 [POJ 1830] 开关问题
    //1303[POJ1830]开关问题.cpp:此文件包含"main"函数。程序执行将在此处开始并结束。///*http://oj.daimayuan.top/course/22/problem/1080有n个相同的开关,每个开关都与某些开关有着联系,每当你打开或者关闭某个开关的时候,其他的与此开关相关联的开关也会相应地发生......
  • [luoguP11361/NOIP2024] 编辑字符串
    题意给出两个0/1字符串,每个字符串有一些位置被标记,无法交换。求通过任意多次的交换相邻元素操作能够使两个字符串最多多少位置相同。sol一道贪心题。显然交换相邻的操作可以使该字符串可以交换的一段任意排列。由于不同位置的贡献最大只为\(1\),因此在任何位置贡献都没有区......