首页 > 其他分享 >记一次Flink遇到性能瓶颈

记一次Flink遇到性能瓶颈

时间:2023-04-15 22:22:42浏览次数:39  
标签:瓶颈 性能 Flink marketPrice updatePS connection insertPS close price

前言

这周的主要时间花在Flink上面,做了一个简单的从文本文件中读取数据,然后存入数据库的例子,能够正常的实现功能,但是遇到个问题,我有四台机器,自己搭建了一个standalone的集群,不论我把并行度设置多少,跑起来的耗时都非常接近,实在是百思不得其解。机器多似乎并不能帮助它。 把过程记录在此,看后面随着学习的深入能不能解答出这个问题。
image

尝试过的修复方法

集群搭建

出现这个问题后,我从集群的角度来进行了些修改,
1,机器是2核的,slots被设置成了6,那我就有点怀疑是这个设置问题,因为其实只有2核,设置的多了,反而存在抢占资源,导致运行达不到效果,改成2后效果一样,没有改进。这个参数在
taskmanager.numberOfTaskSlots: 2
2,调整内存, taskmanager 从2G调整为4G, 效果也没有变化。
taskmanager.memory.process.size: 4000m
这里说下这个内存,我们设置的是总的Memory,也就是这个Total Process Memory。
image
剔除掉些比较固定的Memory,剩下的大头就是这个Task Heap 和 Managed Memory。
所以我们调整大小后,它两个也就相应的增加了。 我查了下这两个,可以理解为堆内存和堆外内存,
一个是存放我们程序的对象,会被垃圾回收器回收;一个是堆外内存,比如RockDB 和 缓存 sort,hash 等的中间结果。

程序方面修改

最开始的时候我把保存数据库操作写在MapFunction里面,后来改到SinkFunction里面。
SinkFunction里面保存数据库的方法也进行了反复修改,从开始使用Spring的JdbcTemplate,换成后来直接使用最原始JDBC。 而且还踩了一个坑,开始的时候用的注入的JdbcTemplate, 本地运行没有问题,到了集群上面,发到别的机器的时候,注入的东西就是空的了。
换成原始的JDBC速度能提升不少, 我猜想这里的原因是jdbctemplate做了些多余的事情, JDBC打开一次,后面Invoke的时候就直接存了,效率要高些,所以速度上提升不少。
这里把部分代码贴出来, 在Open的时候就预加载好PreparedStatement, Invoke的时候直接传参数,调用就可以了。

public class SinkToMySQL2 extends RichSinkFunction<MarketPrice> {
    private PreparedStatement updatePS;
    private PreparedStatement insertPS;
    private Connection connection;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        HikariDataSource dataSource = new HikariDataSource();
        connection = getConnection(dataSource);
        if(connection != null)
        {
            String updateSQL = " update MarketPrice set open_price=?,high_price=?,low_price=?,close_price=? where performance_id = ? and price_as_of_date = ?";
            updatePS = this.connection.prepareStatement(updateSQL);

            String insertSQL = " insert into MarketPrice(performance_id,price_as_of_date,open_price,high_price,low_price,close_price) values (?,?,?,?,?,?)";
            insertPS = this.connection.prepareStatement(insertSQL);
        }

    }

    @Override
    public void close() throws Exception {
        super.close();
        if (updatePS != null) {
            updatePS.close();
        }
        if (insertPS != null) {
            insertPS.close();
        }
        //关闭连接和释放资源
        if (connection != null) {
            connection.close();
        }

    }

    /**
     * 每条数据的插入都要调用一次 invoke() 方法
     *
     * @param marketPrice
     * @param context
     * @throws Exception
     */
    @Override
    public void invoke(MarketPrice marketPrice, Context context) throws Exception {

        log.info("start save for {}", marketPrice.getPerformanceId().toString() );

        updatePS.setDouble(1,marketPrice.getOpenPrice());
        updatePS.setDouble(2,marketPrice.getHighPrice());
        updatePS.setDouble(3,marketPrice.getLowPrice());
        updatePS.setDouble(4,marketPrice.getClosePrice());
        updatePS.setString(5, marketPrice.getPerformanceId().toString());
        updatePS.setInt(6, marketPrice.getPriceAsOfDate());
        int result = updatePS.executeUpdate();


        log.info("finish update for {} result {}", marketPrice.getPerformanceId().toString(), result);

        if(result == 0)
        {
            String insertSQL = " insert into MarketPrice(performance_id,price_as_of_date,open_price,high_price,low_price,close_price) values (?,?,?,?,?,?)";
            insertPS = this.connection.prepareStatement(insertSQL);
            insertPS.setString(1, marketPrice.getPerformanceId().toString());
            insertPS.setInt(2, marketPrice.getPriceAsOfDate());
            insertPS.setDouble(3,marketPrice.getOpenPrice());
            insertPS.setDouble(4,marketPrice.getHighPrice());
            insertPS.setDouble(5,marketPrice.getLowPrice());
            insertPS.setDouble(6,marketPrice.getClosePrice());

            result = insertPS.executeUpdate();
            log.info("finish save for {} result {}", marketPrice.getPerformanceId().toString(), result);
        }
    }

}

总结

从多个方面去改进,结果发现还是一样的,就是使用一台机器和使用三台机器,时间上一样的,再怀疑我只能怀疑是某台机器有问题,然后运行的时候,由最慢的机器决定了速度。 我在使用MapFunction的时候有观察到,有的时候,某台机器已经处理上千条,而有的只处理了几十条,到最后完成的时候,大家处理的数量又是很接近的。这样能够解释为什么机器多了,速度却是一样的。但是我没有办法找出哪台机器来。 我自己的本地运行,并行数设置的多,速度上面是有提升的,到了集群就碰到这样的现象,后面看能不能解决它, 先记录在此。

标签:瓶颈,性能,Flink,marketPrice,updatePS,connection,insertPS,close,price
From: https://www.cnblogs.com/dk168/p/17322082.html

相关文章

  • 整数平方和开根号的性能优化
    整数的平方和开根号操作通过sqrt实现性能已经不容易优化,但如果要求精度不高,可以进一步优化,方法有三种:1、isqrt;2、查表法;3、三角函数法1、isqrt即整数平方根,有多种算法。通过询问ChatGPT,AI给出了几种实现,这里取一种比较快的实现:1u32isqrt2(u32x)2{3u32res=0;......
  • 【性能调优】总体指导
    参考《java性能优化权威指南》 Java堆大小计算规则名称设置参数说明Java堆 -Xms和-Xmx 3-4倍FullGC后老年代空间量新生代 -Xmn 1-1.5倍FullGC后老年代空间量老年代 Java堆大小减新生代大小 2-3倍FullGC后老年代空间量永久代 -XX:permSize和-XX:MaxPermSize 1.2-1.5倍FullGC......
  • 性能测试工具
    一、工具介绍1、Loadrunner介绍  2、JMeter介绍  二、jmter环境搭建1、安装JDKJDK下载---安装JDK---配置环境变量---校验  2、安装JMter(记住版本需要与JDK版本匹配)  3、JMter功能概要1、JDK......
  • 性能测试概述
    一、性能测试的概念进行性能测试:满足真实的业务场景需求(活动场景)、支持大量用户。满足商户要求。1、什么是性能:软件质量属性中的“效率”特性。效率特性:时间特性(表示系统处理用户请求的响应时间),资源特性(表示系统运行过程中,系统资源的内耗情况。包括:CPU、内......
  • 软件架构性能
    今天要介绍的是六大属性之一的性能,根据读一些别人博客以及课上我们组的同学介绍我认为其实易用性就是用户对于一个软件操作起来在不影响他原本的功能以及性能的情况下操作起来越是简单便捷则这个系统的易用性越高。性能可以进一步分为易理解性:软件产品使用客户能理解软件是否适......
  • tcp性能优化方法
    一、TCPfastopen原理简介:三次握手带来的延迟使得每创建一个新TCP连接都要付出很大代价。而这也决定了提高TCP应用性能的关键,在于想办法重用连接。TFO(TCPfastopen)允许服务器和客户端在连接建立握手阶段交换数据,从而使应用节省了一个RTT的时延。但是TFO会引起一些问题,因此......
  • 软件质量属性之性能 科技小论文
    软件质量属性之性能耿晴(石家庄铁道大学信息科学与技术学院软件工程系;河北省石家庄市;050000) 摘要:性能是一个软件架构的重要指标,从用户的角度来看对于一个运行速度很慢的软件是很难长久地存在的,所以软件的性能的优化对于软件的存亡有着至关重要的作用。本文从开发人员视角、......
  • APP性能测试_启动时间
     Android应用性能测试通常包括:启动时间、内存、CPU、耗电量、流量、流畅度等,下面就给大家介绍APP启动时间的测试。启动时间,分为下面二种冷启动:应用程序首次启动,进程首次创建并加载资源的过程热启动:应用程序启动后点“back”键、“Home”键,应用程序退到后台,并未被完全“杀死......
  • 自主阅读笔记03《基于web 服务器的网站性能优化研究》
    文章来源信息记录材料by戴胜,朱琳、广东科技学院计算机系一、客户端优化实际指的是浏览器,在浏览器完成访问网站的时间中,有80%的时间用于加载图像、样式表、脚本等静态资源。浏览器的主要作用就是显示数据和发送http请求。1.减少HTTP请求一个页面中包含多个种类和多个数量的组件......
  • Hexo博客Next主题配置加载优化性能提升
    主题源加载优化把在NexT主题的_config.yml里面的:#Urioffontshost.E.g.//fonts.googleapis.com(Default)host:改为:#Urioffontshost.E.g.//fonts.googleapis.com(Default)host://fonts.lug.ustc.edu.cn因为fonts.lug.ustc.edu.cn是中科大的源,相比之前能快一下博客双......