首页 > 其他分享 >Hystrix的工作原理

Hystrix的工作原理

时间:2023-06-08 16:33:45浏览次数:65  
标签:return hystrix Hystrix 工作 new 原理 public HystrixCommand metaHolder

Hystrix的工作原理

原文:https://www.cnblogs.com/sglx/p/15771838.html

一、简介

  hystrix经常被我们用于服务的熔断,降级等领域,基于RxJava(一种基于观察者模式的响应式编程框架)实现,具备服务降级、服务熔断、线程与信号隔离、请求缓存、请求合并以及服务监控等强大功能。

二、基本原理

  当我们需要调用某个方法时(一般是远程调用),通过 Hystrix 将方法调用包裹起来,交由 Hystrix 来完成,从而享受 Hystrix 带来保护。

img

Hystrix 提供了两个请求命令:HystrixCommand、HystrixObservableCommand,可以使用这两个对象来包裹待执行的任务。

HystrixCommand用在依赖服务返回单个操作结果的时候:

  execute():同步执行,从依赖的服务返回一个单一的结果对象,或是在发生错误的时候抛出异常。

  queue():异步执行,直接返回一个Future对象,其中包含了服务执行结束时要返回的单一结果对象。

HystrixObservableCommand用在依赖服务返回多个操作结果的时候:

  observe():返回Obervable对象,他代表了操作的多个结果,它是一个Hot Observable。

  toObservable():同样返回Observable对象,也代表了操作多个结果,但它返回的是一个Cold Observable。

三、基本用法

  hystrix可以使用手动自定义Command、注解、结合feign的方式来实现,手动创建和结合feign的这里就不介绍了,主要看一下注解的实现方式。

1、开启hystrix,启动类上加上@EnableHystrix注解

2、在需要降级的方法上使用@HystrixCommand

public class UserServiceImpl{
   @Autowired
   private UserDao userDao;

    @HystrixCommand(fallbackMethod = "getUserNameFallBack")
    @Override
    public String getUserName(String userId){
        int i = 1/0
        return userDao.getNameById(userId);
    }
     
    public String getUserNameFallBack(String userId){
        return "服务暂时不可用,请稍后再试";
    }

} 

四、初始化

从@EnableHystrix注解看起

//开启EnableCircuitBreaker
@EnableCircuitBreaker
public @interface EnableHystrix {
}

//导入EnableCircuitBreakerImportSelector
@Import(EnableCircuitBreakerImportSelector.class)
public @interface EnableCircuitBreaker {
}

在注解上又开启了一个注解@EnableCircuitBreaker,并导入了一个Selector

@Order(Ordered.LOWEST_PRECEDENCE - 100)
public class EnableCircuitBreakerImportSelector
        extends SpringFactoryImportSelector<EnableCircuitBreaker> {

    @Override
    protected boolean isEnabled() {
        return getEnvironment().getProperty("spring.cloud.circuit.breaker.enabled",
                Boolean.class, Boolean.TRUE);
    }

}

这是hystrix生效的一个关键点,继承了SpringFactoryImportSelector,此类在初始化后,会执行selectImports(AnnotationMetadata metadata)的方法。此方法会根据注解启动的注解(这里指@EnableCircuitBreaker)从spring.factories文件中获取其配置需要初始化@Configuration类,看下关键代码

List<String> factories = new ArrayList<>(new LinkedHashSet<>(SpringFactoriesLoader
.loadFactoryNames(this.annotationClass, this.beanClassLoader)));

看一下spring.factories文件

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.hystrix.HystrixAutoConfiguration,\
org.springframework.cloud.netflix.hystrix.HystrixCircuitBreakerAutoConfiguration,\
org.springframework.cloud.netflix.hystrix.ReactiveHystrixCircuitBreakerAutoConfiguration,\
org.springframework.cloud.netflix.hystrix.security.HystrixSecurityAutoConfiguration

org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker=\
org.springframework.cloud.netflix.hystrix.HystrixCircuitBreakerConfiguration

配置类HystrixCircuitBreakerConfiguration

@Configuration(proxyBeanMethods = false)
public class HystrixCircuitBreakerConfiguration {

    @Bean
    public HystrixCommandAspect hystrixCommandAspect() {
        return new HystrixCommandAspect();
    }

}

向spring注入了HystrixCommandAspect

@Aspect
public class HystrixCommandAspect {

    private static final Map<HystrixPointcutType, MetaHolderFactory> META_HOLDER_FACTORY_MAP;

    static {
        META_HOLDER_FACTORY_MAP = ImmutableMap.<HystrixPointcutType, MetaHolderFactory>builder()
                .put(HystrixPointcutType.COMMAND, new CommandMetaHolderFactory())
                .put(HystrixPointcutType.COLLAPSER, new CollapserMetaHolderFactory())
                .build();
    }
    
    //切点是所有使用了HystrixCommand注解的地方
    @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")
    public void hystrixCommandAnnotationPointcut() {
    }
    //切点是所有使用了HystrixCollapser注解的地方
    @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)")
    public void hystrixCollapserAnnotationPointcut() {
    }
    
    //环绕通知
    @Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
    public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
        Method method = getMethodFromTarget(joinPoint);
        Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);
        //两个注解不能同时作用
        if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
            throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " +
                    "annotations at the same time");
        }
        //META_HOLDER_FACTORY_MAP预先初始化了两个工厂类
        //@HystrixCommand:CommandMetaHolderFactory
        //@HystrixCollapser:CollapserMetaHolderFactory
        MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
        //创建,把切点封装进了MetaHolder
        MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
        // 创建HystrixInvokable,只是一个空接口,没有任何方法,只是用来标记具备可执行的能力
        // 具体的执行由实现类来做
        HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
        //执行类型
        ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
                metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();

        Object result;
        try {
            if (!metaHolder.isObservable()) {
                // 利用工具CommandExecutor来执行
                result = CommandExecutor.execute(invokable, executionType, metaHolder);
            } else {
                result = executeObservable(invokable, executionType, metaHolder);
            }
        } catch (HystrixBadRequestException e) {
            throw e.getCause();
        } catch (HystrixRuntimeException e) {
            throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
        }
        return result;
    }

   ···

}

通过AOP编程,创建方法的代理,决定执行何种逻辑,创建参数包装类

public MetaHolder create(final ProceedingJoinPoint joinPoint) {
  //获取方法对象
  Method method = getMethodFromTarget(joinPoint);
  //目标对象
  Object obj = joinPoint.getTarget();
  //方法的参数列表
  Object[] args = joinPoint.getArgs();
  //代理对象
  Object proxy = joinPoint.getThis();
  //调用子类create方法
  return create(proxy, method, obj, args, joinPoint);
}

public MetaHolder create(Object proxy, Method method, Object obj, Object[] args, final ProceedingJoinPoint joinPoint) {
  //获取HystrixCommand注解的信息
  HystrixCommand hystrixCommand = method.getAnnotation(HystrixCommand.class);
  //判断方法的返回值类型Future:异步,Observable:rxjava中的被观察者,其他:同步
  ExecutionType executionType = ExecutionType.getExecutionType(method.getReturnType());
  //获取MetaHolder的builder对象
  MetaHolder.Builder builder = metaHolderBuilder(proxy, method, obj, args, joinPoint);
  if (isCompileWeaving()) {
    builder.ajcMethod(getAjcMethodFromTarget(joinPoint));
  }
  //建造者模式,创建MetaHolder包装类
  return builder.defaultCommandKey(method.getName())
    .hystrixCommand(hystrixCommand)
    .observableExecutionMode(hystrixCommand.observableExecutionMode())
    .executionType(executionType)
    .observable(ExecutionType.OBSERVABLE == executionType)
    .build();
} 

创建命令执行器

    public HystrixInvokable create(MetaHolder metaHolder) {
        HystrixInvokable executable;
        //@HystrixCollapser
        if (metaHolder.isCollapserAnnotationPresent()) {
            executable = new CommandCollapser(metaHolder);
        //@HystrixCommand 并且 返回值类型是Observable
        } else if (metaHolder.isObservable()) {
            executable = new GenericObservableCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));
        } else {
            //其他情况 把HystrixCommandBuilder封装进GenericCommand
            executable = new GenericCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));
        }
        return executable;
    }
    public GenericCommand(HystrixCommandBuilder builder) {
        super(builder);
    }
    
    protected AbstractHystrixCommand(HystrixCommandBuilder builder) {
        super(builder.getSetterBuilder().build());
        //命令形式 包含需要执行的方法 fallback方法
        this.commandActions = builder.getCommandActions();
        this.collapsedRequests = builder.getCollapsedRequests();
        this.cacheResultInvocationContext = builder.getCacheResultInvocationContext();
        this.cacheRemoveInvocationContext = builder.getCacheRemoveInvocationContext();
        this.ignoreExceptions = builder.getIgnoreExceptions();
        //执行类型 ASYNCHRONOUS SYNCHRONOUS OBSERVABLE
        this.executionType = builder.getExecutionType();
    }
    
    public HystrixCommand.Setter build() throws HystrixPropertyException {
        //分组key:类名,命令key:方法名,线程池key
        HystrixCommand.Setter setter = HystrixCommand.Setter
                .withGroupKey(HystrixCommandGroupKey.Factory.asKey(groupKey))
                .andCommandKey(HystrixCommandKey.Factory.asKey(commandKey));
        if (StringUtils.isNotBlank(threadPoolKey)) {
            setter.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey(threadPoolKey));
        }
        try {
            //初始化线程池的配置
            setter.andThreadPoolPropertiesDefaults(HystrixPropertiesManager.initializeThreadPoolProperties(threadPoolProperties));
        } catch (IllegalArgumentException e) {
            throw new HystrixPropertyException("Failed to set Thread Pool properties. " + getInfo(), e);
        }
        try {
            //初始化命令执行配置
            setter.andCommandPropertiesDefaults(HystrixPropertiesManager.initializeCommandProperties(commandProperties));
        } catch (IllegalArgumentException e) {
            throw new HystrixPropertyException("Failed to set Command properties. " + getInfo(), e);
        }
        return setter;
    }

执行命令

    public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder)
                                           throws RuntimeException {
        Validate.notNull(invokable);
        Validate.notNull(metaHolder);

        switch (executionType) {
            case SYNCHRONOUS: {
                //转换成子接口HystrixExecutable
                return castToExecutable(invokable, executionType).execute();
            }
            case ASYNCHRONOUS: {
                HystrixExecutable executable = castToExecutable(invokable, executionType);
                if (metaHolder.hasFallbackMethodCommand()
                        && ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) {
                    return new FutureDecorator(executable.queue());
                }
                return executable.queue();
            }
            case OBSERVABLE: {
                HystrixObservable observable = castToObservable(invokable);
                return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() 
                                ? observable.observe() : observable.toObservable();
            }
            default:
                throw new RuntimeException("unsupported execution type: " + executionType);
        }
    }
    public R execute() {
        try {
            return queue().get();
        } catch (Exception e) {
            throw Exceptions.sneakyThrow(decomposeException(e));
        }
    }

HystrixCommand的四种执行方式

img

测试类

public class HystrixCommandTest {

    private <T> com.netflix.hystrix.HystrixCommand<T> createCommand(T message) {
        com.netflix.hystrix.HystrixCommand.Setter setter = com.netflix.hystrix.HystrixCommand.Setter
                .withGroupKey(HystrixCommandGroupKey.Factory.asKey("serviceA"))
                .andCommandKey(HystrixCommandKey.Factory.asKey("methodA"));
        return new com.netflix.hystrix.HystrixCommand<T>(setter) {
            @Override
            protected T run() throws Exception {
                System.out.println("HystrixCommand执行了!!!" + System.currentTimeMillis());
                return message;
            }
        };
    }

    @Test
    public void test01() {
        com.netflix.hystrix.HystrixCommand<String> command = createCommand("this is test01");
        System.out.println(command.execute());
    }


    @Test
    public void test02() throws ExecutionException, InterruptedException {
        com.netflix.hystrix.HystrixCommand<String> command = createCommand("this is test02");
        Future<String> f = command.queue();
        System.out.println("queue之后,command执行:" + System.currentTimeMillis());
        Thread.sleep(1000);
        System.out.println(f.get());
    }

    @Test
    public void test03() throws InterruptedException {
        HystrixCommand<String> command = createCommand("this is test03");
        // observe直接执行run方法,称为Hot Observable
        Observable<String> observe = command.observe();
        System.out.println("observe之后,command执行:" + System.currentTimeMillis());
        Thread.sleep(1000);
        observe.subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                System.out.println("通过订阅,获取执行结果:" + s);
            }
        });
    }

    @Test
    public void test04() throws InterruptedException {
        HystrixCommand<String> command = createCommand("this is test04");
        // toObservable不直接执行run方法
        Observable<String> observe = command.toObservable();
        System.out.println("未订阅,command不执行:" + System.currentTimeMillis());
        Thread.sleep(1000);
        observe.subscribe();
        System.out.println("订阅后,command执行了" + System.currentTimeMillis());
        Thread.sleep(1000);
    }

    @Test
    public void test05() throws InterruptedException, ExecutionException {
        HystrixCommand<String> command = createCommand("this is test05");
        // toObservable不直接执行run方法
        Future<String> f = command.toObservable().toBlocking().toFuture();
        System.out.println("转成future执行:" + System.currentTimeMillis());
        Thread.sleep(1000);
        System.out.println(f.get());
    }
}

回到hystrix源码中queue()方法

    public Future<R> queue() {
        // toObservable转换为Observable
        // toBlocking转换为BlockingObservable
        // toFuture转换为Future
        // 完成了Observable的创建和订阅
        final Future<R> delegate = toObservable().toBlocking().toFuture();
        // 代理future,对于cancel操作做特殊处理
        // 因为toObservable().toBlocking().toFuture()返回的future无法通过cancel方法中断执行线程。
        final Future<R> f = new Future<R>() {

            @Override
            public boolean cancel(boolean mayInterruptIfRunning) {
                if (delegate.isCancelled()) {
                    return false;
                }
                // 如果 execution.isolation.thread.interruptOnFutureCancel = true(默认false)
                if (HystrixCommand.this.getProperties().executionIsolationThreadInterruptOnFutureCancel().get()) {
                    // 设置标志位
                    interruptOnFutureCancel.compareAndSet(false, mayInterruptIfRunning);
                }
                // 执行目标future的cancel
                final boolean res = delegate.cancel(interruptOnFutureCancel.get());
                // 如果command还没执行完成 且 需要中断执行的线程
                if (!isExecutionComplete() && interruptOnFutureCancel.get()) {
                    // 获取执行线程
                    final Thread t = executionThread.get();
                    // 执行线程非当前线程则中断线程
                    if (t != null && !t.equals(Thread.currentThread())) {
                        t.interrupt();
                    }
                }

                return res;
            }

            ```
            
        };

        //判断是否执行完成
        if (f.isDone()) {
            try {
                //获取结果
                f.get();
                return f;
            } catch (Exception e) {
                ···
            }
        }

        return f;
    }

这里用到了RxJava框架的响应式编程,会执行到具体Command(之前封装的GenericCommand)的run方法

public class GenericCommand extends AbstractHystrixCommand<Object> {

    private static final Logger LOGGER = LoggerFactory.getLogger(GenericCommand.class);

    public GenericCommand(HystrixCommandBuilder builder) {
        super(builder);
    }

    //执行目标的方法
    @Override
    protected Object run() throws Exception {
        LOGGER.debug("execute command: {}", getCommandKey().name());
        return process(new Action() {
            @Override
            Object execute() {
                return getCommandAction().execute(getExecutionType());
            }
        });
    }
    
    //执行fallback方法
    @Override
    protected Object getFallback() {
        final CommandAction commandAction = getFallbackAction();
        if (commandAction != null) {
            try {
                return process(new Action() {
                    @Override
                    Object execute() {
                        MetaHolder metaHolder = commandAction.getMetaHolder();
                        Object[] args = createArgsForFallback(metaHolder, getExecutionException());
                        return commandAction.executeWithArgs(metaHolder.getFallbackExecutionType(), args);
                    }
                });
            } catch (Throwable e) {
                LOGGER.error(FallbackErrorMessageBuilder.create()
                        .append(commandAction, e).build());
                throw new FallbackInvocationException(unwrapCause(e));
            }
        } else {
            return super.getFallback();
        }
    }

}

五、Hystrix上下文

  当feign结合Hystrix使用线程隔离时,如果我们想要使用ThreadLocal传递参数是不行的,存在跨线程传递的问题,Hystrix提供了一个上下文类HystrixRequestContext,以传递traceId为例

img

标签:return,hystrix,Hystrix,工作,new,原理,public,HystrixCommand,metaHolder
From: https://www.cnblogs.com/JaxYoun/p/17466880.html

相关文章

  • AQS的实现原理
    AQS的实现原理原文:https://www.cnblogs.com/sglx/p/15190246.html一、简介AQS全称为AbstractQueuedSynchronizer,它提供了一个FIFO(FirstinFirstout先入先出)队列,可以看成是一个用来实现同步锁以及其他涉及到同步功能的核心组件,常见的有:ReentrantLock、CountDownLatch等......
  • C# 中的yield return机制和原理
    前言#    当我们编写C#代码时,经常需要处理大量的数据集合。在传统的方式中,我们往往需要先将整个数据集合加载到内存中,然后再进行操作。但是如果数据集合非常大,这种方式就会导致内存占用过高,甚至可能导致程序崩溃。    C#中的yieldreturn机制可以帮助我们解决这个问......
  • 雷达课堂 (第4讲) 雷达目标检测原理(3)
    本文编辑:@调皮连续波,保持关注调皮哥,获得更多雷达学习资料和建议!一、引言感谢大家前来捧场,我是调皮哥,我希望自己能够坚持把【雷达课堂】这个专题做得更加完美,让点进来的读者,一定要带着收获出去,不然就是纯属浪费大家的宝贵时间。今天是【雷达课堂】的第4讲,雷达目标检测原理(3)小节。上......
  • 2013年工作中遇到的20个问题:121-140
     121.Springz中,根据实现类找不到bean。UserImplimplementsUser{}XmlWebApplicationContextcontext;context.getBean(User.class);√javcontext.getBean(UserImpl.class);获取不到  没有使用Cgilib库!  --------貌似也不行------------ 因为spring的......
  • 2013年工作中遇到的20个问题(Bug):161-180
    161.用户表和超级用户分成2个表,很不合理,查询的时候,非常复杂。162.leftjoin还是很有“市场”的。机构表Org连接User时,想获得user的名字,可能存在,也可能不存在,leftjoin就适合。##多个leftjoin之间不能使用","隔开selectcg.*,u.loginNamecreatorName,org.nativeNameadvertiser......
  • 5款让你工作效率大幅提升的国产办公软件,准时下班就看这篇!
    近年来,随着国产软件的逐渐成熟和发展,越来越多的办公软件开始涌现。这些软件不仅在功能上与国外同类软件不相上下,能大幅提升工作效率,而且还可以免费试用,成为了越来越多企业和个人选择的首选。今天给大家分享5款国产办公软件,让你的工作效率大幅提升。 迅捷PDF转换器 迅捷PDF转......
  • 【深入浅出Spring原理及实战】「夯实基础系列」360全方位分析和探究SpringMVC的核心原
    SpringMVC简介SpringWebMVC是一种基于Java的轻量级Web框架,它实现了WebMVC设计模式,使用VC架构模式的思想将web层进行职责解耦。这种请求驱动类型的框架使用请求-响应模型,旨在简化Web开发过程。使用SpringWebMVC,我们可以更加高效地开发Web应用程序,而不必为了每个接口编写一个Ser......
  • 计算机组成原理:指令系统、CPU数据通路信号(例题
    分析:由题目可知操作码占4位,所以支持的操作指令为\(2^4\)种指令操作数占6位,其中寻址3位,寄存器编号3位,所以最多有\(2^3\)个通用寄存器主存大小为128KB,机器字长为16位,且按字编址,所以有\(\frac{128KB}{2B}\quad=2^{16}\)个存储单元,即MAR至少16位机器字长为16为,那么MDR至少也......
  • CSI架构和原理
    CSICSI简介CSI的诞生背景K8s原生支持一些存储类型的PV,如iSCSI、NFS、CephFS等等,这些in-tree类型的存储代码放在Kubernetes代码仓库中。这里带来的问题是K8s代码与三方存储厂商的代码强耦合:更改in-tree类型的存储代码,用户必须更新K8s组件,成本较高in-tree存......
  • slack 团队及个人工作学习的好帮手
    Slack将人员集中到统一的团队中,改变了组织的沟通方式。  Slack工具下载直通车Slack下载直通车Slack侧栏你可以从侧栏访问Slack对话。你会看到你已经加入的频道列表、你的私信、特定对话的通知以及编写新消息的选项。 撰写你可以使用撰写按钮来编辑和发送消息到任......