首页 > 其他分享 >浅谈SOFAJRaft中的ShutdownHook

浅谈SOFAJRaft中的ShutdownHook

时间:2023-01-25 14:33:07浏览次数:32  
标签:return 浅谈 SOFAJRaft ShutdownHook awaitTermination server shutdown grpcServer fin

Java程序经常会遇到进程挂掉的情况,一些状态没有正确的保存下来,这时候就需要在JVM关掉的时候执行一些清理现场的代码。JAVA中的ShutdownHook提供了比较好的方案。而在SOFAJRaft-example模块的CounterServer-main方法中就使用了shutdownHook实现优雅停机。
@Author:Akai-yuan
@更新时间:2023/1/25

1.触发场景与失效场景

JDK提供了Java.Runtime.addShutdownHook(Thread hook)方法,可以注册一个JVM关闭的钩子这个钩子可以在以下几种场景中被调用:

  1. 程序正常退出
  2. 执行了System.exit()方法
  3. 终端使用Ctrl+C触发的中断
  4. 系统关闭
  5. OutOfMemory宕机
  6. 使用Kill pid命令干掉进程(使用 **kill -9 pid **是不会被调用的)

以下几种情况中是无法被调用的:

  1. 通过kill -9命令杀死进程——所以kill -9一定要慎用;
  2. 程序中执行了Runtime.getRuntime().halt()方法;
  3. 操作系统突然崩溃,或机器掉电(用电设备因断电、失电、或电的质量达不到要求而不能正常工作)。

2.addShutdownHook方法简述

Runtime.getRuntime().addShutdownHook(shutdownHook);

该方法指,在JVM中增加一个关闭的钩子,当JVM关闭的时候,会执行系统中已经设置的所有通过方法addShutdownHook添加的钩子,当系统执行完这些钩子后,JVM才会关闭。所以这些钩子可以在JVM关闭的时候进行内存清理、对象销毁、关闭连接等操作。

3.SOFAJRaft中钩子函数的实现

通过反射获取到grpcServer实例的shutdown方法和awaitTerminationLimit方法,并添加到钩子函数当中

public static void blockUntilShutdown() {
        if (rpcServer == null) {
            return;
        }
        //当RpcFactoryHelper中维护的工厂类型是GrpcRaftRpcFactory时进入if条件内部
        if ("com.alipay.sofa.jraft.rpc.impl.GrpcRaftRpcFactory".equals(RpcFactoryHelper.rpcFactory().getClass()
            .getName())) {
            try {
                //反射获取grpcServer中维护的(io.grpc包下的)server实例
                Method getServer = rpcServer.getClass().getMethod("getServer");
                Object grpcServer = getServer.invoke(rpcServer);
                //反射获取server实例的shutdown方法和awaitTerminationLimit方法
                Method shutdown = grpcServer.getClass().getMethod("shutdown");
                Method awaitTerminationLimit = grpcServer.getClass().getMethod("awaitTermination", long.class,
                    TimeUnit.class);
            	//添加一个shutdownHook线程执行方法
                Runtime.getRuntime().addShutdownHook(new Thread() {
                    @Override
                    public void run() {
                        try {
                            shutdown.invoke(grpcServer);
                            awaitTerminationLimit.invoke(grpcServer, 30, TimeUnit.SECONDS);
                        } catch (Exception e) {
                            // Use stderr here since the logger may have been reset by its JVM shutdown hook.
                            e.printStackTrace(System.err);
                        }
                    }
                });
                //执行awaitTermination方法
                Method awaitTermination = grpcServer.getClass().getMethod("awaitTermination");
                awaitTermination.invoke(grpcServer);
            } catch (Exception e) {
                LOG.error("Failed to block grpc server", e);
            }
        }
    }

4.grpc中的shutdown方法

GrpcServer下的shutdown方法与本文的钩子函数无关,此处再对比分析一下GrpcServer的shutdown方法。

    public void shutdown() {
        //CAS
        //当且仅当期待值为true时(与当前AtomicBoolean类型的started一致),设置为false关闭
        if (!this.started.compareAndSet(true, false)) {
            return;
        }
        ExecutorServiceHelper.shutdownAndAwaitTermination(this.defaultExecutor);
        GrpcServerHelper.shutdownAndAwaitTermination(this.server);
    }

ExecutorServiceHelper#shutdownAndAwaitTermination:
我们可以发现实际上就是在执行ExecutorService 中 的shutdown()、shutdownNow()、awaitTermination() 方法,那么我们来区别以下这几个方法

public static boolean shutdownAndAwaitTermination(final ExecutorService pool, final long timeoutMillis) {
        if (pool == null) {
            return true;
        }
        // 禁止提交新任务
        pool.shutdown();
        final TimeUnit unit = TimeUnit.MILLISECONDS;
        final long phaseOne = timeoutMillis / 5;
        try {
            // 等待一段时间以终止现有任务
            if (pool.awaitTermination(phaseOne, unit)) {
                return true;
            }
            pool.shutdownNow();
            // 等待一段时间,等待任务响应被取消
            if (pool.awaitTermination(timeoutMillis - phaseOne, unit)) {
                return true;
            }
            LOG.warn("Fail to shutdown pool: {}.", pool);
        } catch (final InterruptedException e) {
            // (Re-)cancel if current thread also interrupted
            pool.shutdownNow();
            // preserve interrupt status
            Thread.currentThread().interrupt();
        }
        return false;
    }

  1. shutdown():停止接收新任务,原来的任务继续执行

1、停止接收新的submit的任务;
2、已经提交的任务(包括正在跑的和队列中等待的),会继续执行完成;
3、等到第2步完成后,才真正停止;


  1. shutdownNow():停止接收新任务,原来的任务停止执行

1、跟 shutdown() 一样,先停止接收新submit的任务;
2、忽略队列里等待的任务;
3、尝试将正在执行的任务interrupt中断;
4、返回未执行的任务列表;
说明:
它试图终止线程的方法是通过调用 Thread.interrupt() 方法来实现的,这种方法的作用有限,如果线程中没有sleep 、wait、Condition、定时锁等应用, interrupt() 方法是无法中断当前的线程的。
所以,shutdownNow() 并不代表线程池就一定立即就能退出,它也可能必须要等待所有正在执行的任务都执行完成了才能退出。但是大多数时候是能立即退出的。


  1. awaitTermination(long timeOut, TimeUnit unit):当前线程阻塞

当前线程阻塞,直到:

  • 等所有已提交的任务(包括正在跑的和队列中等待的)执行完;
  • 或者 等超时时间到了(timeout 和 TimeUnit设定的时间);
  • 或者 线程被中断,抛出InterruptedException

然后会监测 ExecutorService 是否已经关闭,返回true(shutdown请求后所有任务执行完毕)或false(已超时)

GrpcServerHelper#shutdownAndAwaitTermination
与ExecutorServiceHelper类中的shutdownAndAwaitTermination方法类似的,该方法将优雅的关闭grpcServer.

public static boolean shutdownAndAwaitTermination(final Server server, final long timeoutMillis) {
        if (server == null) {
            return true;
        }
        // disable new tasks from being submitted
        server.shutdown();
        final TimeUnit unit = TimeUnit.MILLISECONDS;
        final long phaseOne = timeoutMillis / 5;
        try {
            // wait a while for existing tasks to terminate
            if (server.awaitTermination(phaseOne, unit)) {
                return true;
            }
            server.shutdownNow();
            // wait a while for tasks to respond to being cancelled
            if (server.awaitTermination(timeoutMillis - phaseOne, unit)) {
                return true;
            }
            LOG.warn("Fail to shutdown grpc server: {}.", server);
        } catch (final InterruptedException e) {
            // (Re-)cancel if current thread also interrupted
            server.shutdownNow();
            // 保持中断状态
            Thread.currentThread().interrupt();
        }
        return false;
    }

标签:return,浅谈,SOFAJRaft,ShutdownHook,awaitTermination,server,shutdown,grpcServer,fin
From: https://www.cnblogs.com/akai-yuan/p/17066909.html

相关文章

  • 浅谈PHP设计模式的组合模式
    简介:组合模式,属于结构型的设计模式。将对象组合成树形结构以表示“部分-整体”的层次结构。组合模式使得用户对单个对象和组合对象的使用具有一致性。组合模式分两种状态......
  • SOFAJRaft源码阅读-框架Disruptor浅析
    Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题。与Kafka、RabbitMQ用于服务间的消息队列不同,disruptor一般用于线程间消息的......
  • 【转载】 浅谈欧洲专利授权后提交生效国程序注意事项及费用控制
    原文地址:浅谈欧洲专利授权后提交生效国程序注意事项及费用控制-Lexology     ============================================   一、引言随着知识......
  • 浅谈测试
    浅谈测试本篇文章将会讲讲个人对测试的理解,以及测试所需要的知识体系,我会从不同角度来分析我们作为测试,所需要掌握的基本技能以及扩展技能。首先,说说测试的职责吧。这......
  • SOFAJRaft源码阅读-模块启动过程
    本篇文章旨在分析SOFAJRaft中jraft-example模块的启动过程,由于SOFAJRaft在持续开源的过程中,所以无法保证示例代码永远是最新的,要是有较大的变动或者纰漏、错误的地方,欢迎......
  • SOFAJRaft模块启动过程
    本篇文章旨在分析SOFAJRaft中jraft-example模块的启动过程,由于SOFAJRaft在持续开源的过程中,所以无法保证示例代码永远是最新的,要是有较大的变动或者纰漏、错误的地方,欢迎......
  • SOFAJRaft模块启动过程
    本篇文章旨在分析SOFAJRaft中jraft-example模块的启动过程,由于SOFAJRaft在持续开源的过程中,所以无法保证示例代码永远是最新的,要是有较大的变动若有纰漏或者错误的地方,欢......
  • 浅谈Nginx负载均衡与F5的区别
    前言笔者最近在负责某集团网站时,同时用到了Nginx与F5,如图所示,负载均衡器F5作为处理外界请求的第一道“墙”,将请求分发到web服务器后,web服务器上的Nginx再进行处理,静态内容......
  • 浅谈SpringAOP功能源码执行逻辑
    如题,该篇博文主要是对Spring初始化后AOP部分代码执行流程的分析,仅仅只是粗略的讲解整个执行流程,喜欢细的小伙伴请结合其他资料进行学习。在看本文之前请一定要有动态代理的......
  • 浅谈使用实现FactoryBean接口的方式定义Bean
    在定义一个Bean的时候,我们可以直接实现FactoryBean接口,然后重写对应的getXxx方法,就能够完成一个Bean的定义工作文章目录​​一、使用实现接口的方式定义Bean​​​​二、其......