首页 > 其他分享 >Spring Cloud全解析:熔断之HystrixCommand如何执行

Spring Cloud全解析:熔断之HystrixCommand如何执行

时间:2024-09-12 10:26:39浏览次数:1  
标签:return get Spring cmd Cloud new public HystrixCommand metaHolder

HystrixCommand如何执行

有一个HystrixCommandAspect是专门用来处理@HystrixCommand注解的

@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")
public void hystrixCommandAnnotationPointcut() {
}

是不是感觉很熟悉,就是一个AOP切面,然后会使用@Around环绕来进行处理

@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
  // 获取到对应的执行方法
    Method method = getMethodFromTarget(joinPoint);
    Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);
    if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
        throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " +
                "annotations at the same time");
    }
  // 使用@HystrixCommand注解的话这里的type就是HystrixPointcutType.COMMAND,即得到的CommandMetaHolderFactory
    MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
  	// 1.创建MetaHolder
    MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
  // 2.创建HystrixInvokable
    HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
    ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
            metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();

    Object result;
    try {
      // 这里我就先按照 普通的方式来进行分析了,所以就!metaHolder.isObservable()为true
      // 3.执行
        if (!metaHolder.isObservable()) {
            result = CommandExecutor.execute(invokable, executionType, metaHolder);
        } else {
            result = executeObservable(invokable, executionType, metaHolder);
        }
    } catch (HystrixBadRequestException e) {
        throw e.getCause();
    } catch (HystrixRuntimeException e) {
        throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
    }
    return result;
}

private Observable executeObservable(HystrixInvokable invokable, ExecutionType executionType, final MetaHolder metaHolder) {
    return ((Observable) CommandExecutor.execute(invokable, executionType, metaHolder))
            .onErrorResumeNext(new Func1<Throwable, Observable>() {
                @Override
                public Observable call(Throwable throwable) {
                    if (throwable instanceof HystrixBadRequestException) {
                        return Observable.error(throwable.getCause());
                    } else if (throwable instanceof HystrixRuntimeException) {
                        HystrixRuntimeException hystrixRuntimeException = (HystrixRuntimeException) throwable;
                        return Observable.error(hystrixRuntimeExceptionToThrowable(metaHolder, hystrixRuntimeException));
                    }
                    return Observable.error(throwable);
                }
            });
}

1.创建MetaHolder

private static class CommandMetaHolderFactory extends MetaHolderFactory {
    @Override
    public MetaHolder create(Object proxy, Method method, Object obj, Object[] args, final ProceedingJoinPoint joinPoint) {
      // 获取到@HystrixCommand注解
        HystrixCommand hystrixCommand = method.getAnnotation(HystrixCommand.class);
      // 根据方法的返回类型来区分执行类型
      // Future ->ExecutionType.ASYNCHRONOUS  异步
      // Observable -> ExecutionType.OBSERVABLE  反应式(异步回调)
      // 否则 -> ExecutionType.SYNCHRONOUS 同步
        ExecutionType executionType = ExecutionType.getExecutionType(method.getReturnType());
      // 设置配置的@DefaultProperties以及fallback方法
        MetaHolder.Builder builder = metaHolderBuilder(proxy, method, obj, args, joinPoint);
        if (isCompileWeaving()) {
            builder.ajcMethod(getAjcMethodFromTarget(joinPoint));
        }
        return builder.defaultCommandKey(method.getName())
                        .hystrixCommand(hystrixCommand)
                        .observableExecutionMode(hystrixCommand.observableExecutionMode())
                        .executionType(executionType)
                        .observable(ExecutionType.OBSERVABLE == executionType)
                        .build();
    }
}

2.创建HystrixInvokable

public HystrixInvokable create(MetaHolder metaHolder) {
    HystrixInvokable executable;
    if (metaHolder.isCollapserAnnotationPresent()) {
        executable = new CommandCollapser(metaHolder);
    } else if (metaHolder.isObservable()) {
        executable = new GenericObservableCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));
    } else {
        executable = new GenericCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));
    }
    return executable;
}

3.执行

public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException {
    Validate.notNull(invokable);
    Validate.notNull(metaHolder);

    switch (executionType) {
        case SYNCHRONOUS: {
          // 进行执行
            return castToExecutable(invokable, executionType).execute();
        }
        case ASYNCHRONOUS: {
            HystrixExecutable executable = castToExecutable(invokable, executionType);
            if (metaHolder.hasFallbackMethodCommand()
                    && ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) {
                return new FutureDecorator(executable.queue());
            }
            return executable.queue();
        }
        case OBSERVABLE: {
            HystrixObservable observable = castToObservable(invokable);
            return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable();
        }
        default:
            throw new RuntimeException("unsupported execution type: " + executionType);
    }
}

executionType为SYNCHRONOUS

public R execute() {
    try {
        return queue().get();
    } catch (Exception e) {
        throw Exceptions.sneakyThrow(decomposeException(e));
    }
}

3.1

 public Future<R> queue() {
     /*
      * The Future returned by Observable.toBlocking().toFuture() does not implement the
      * interruption of the execution thread when the "mayInterrupt" flag of Future.cancel(boolean) is set to true;
      * thus, to comply with the contract of Future, we must wrap around it.
      */
     final Future<R> delegate = toObservable().toBlocking().toFuture();
   
     final Future<R> f = new Future<R>() {

         @Override
         public boolean cancel(boolean mayInterruptIfRunning) {
             if (delegate.isCancelled()) {
                 return false;
             }

             if (HystrixCommand.this.getProperties().executionIsolationThreadInterruptOnFutureCancel().get()) {
                 /*
                  * The only valid transition here is false -> true. If there are two futures, say f1 and f2, created by this command
                  * (which is super-weird, but has never been prohibited), and calls to f1.cancel(true) and to f2.cancel(false) are
                  * issued by different threads, it's unclear about what value would be used by the time mayInterruptOnCancel is checked.
                  * The most consistent way to deal with this scenario is to say that if *any* cancellation is invoked with interruption,
                  * than that interruption request cannot be taken back.
                  */
                 interruptOnFutureCancel.compareAndSet(false, mayInterruptIfRunning);
          }

             final boolean res = delegate.cancel(interruptOnFutureCancel.get());

             if (!isExecutionComplete() && interruptOnFutureCancel.get()) {
                 final Thread t = executionThread.get();
                 if (t != null && !t.equals(Thread.currentThread())) {
                     t.interrupt();
                 }
             }

             return res;
}

         @Override
         public boolean isCancelled() {
             return delegate.isCancelled();
}

         @Override
         public boolean isDone() {
             return delegate.isDone();
}

         @Override
         public R get() throws InterruptedException, ExecutionException {
             return delegate.get();
         }

         @Override
         public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
             return delegate.get(timeout, unit);
         }
       
     };

     /* special handling of error states that throw immediately */
     if (f.isDone()) {
         try {
             f.get();
             return f;
         } catch (Exception e) {
             Throwable t = decomposeException(e);
             if (t instanceof HystrixBadRequestException) {
                 return f;
             } else if (t instanceof HystrixRuntimeException) {
                 HystrixRuntimeException hre = (HystrixRuntimeException) t;
                 switch (hre.getFailureType()) {
      case COMMAND_EXCEPTION:
      case TIMEOUT:
         // we don't throw these types from queue() only from queue().get() as they are execution errors
         return f;
      default:
         // these are errors we throw from queue() as they as rejection type errors
         throw hre;
      }
             } else {
                 throw Exceptions.sneakyThrow(t);
             }
         }
     }

     return f;
 }
3.1.1
public Observable<R> toObservable() {
    final AbstractCommand<R> _cmd = this;

    //doOnCompleted handler already did all of the SUCCESS work
    //doOnError handler already did all of the FAILURE/TIMEOUT/REJECTION/BAD_REQUEST work
    final Action0 terminateCommandCleanup = new Action0() {

        @Override
        public void call() {
            if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.TERMINAL)) {
                handleCommandEnd(false); //user code never ran
            } else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.TERMINAL)) {
                handleCommandEnd(true); //user code did run
            }
        }
    };

    //mark the command as CANCELLED and store the latency (in addition to standard cleanup)
    final Action0 unsubscribeCommandCleanup = new Action0() {
        @Override
        public void call() {
            if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.UNSUBSCRIBED)) {
                if (!_cmd.executionResult.containsTerminalEvent()) {
                    _cmd.eventNotifier.markEvent(HystrixEventType.CANCELLED, _cmd.commandKey);
                    try {
                        executionHook.onUnsubscribe(_cmd);
                    } catch (Throwable hookEx) {
                        logger.warn("Error calling HystrixCommandExecutionHook.onUnsubscribe", hookEx);
                    }
                    _cmd.executionResultAtTimeOfCancellation = _cmd.executionResult
                            .addEvent((int) (System.currentTimeMillis() - _cmd.commandStartTimestamp), HystrixEventType.CANCELLED);
                }
                handleCommandEnd(false); //user code never ran
            } else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.UNSUBSCRIBED)) {
                if (!_cmd.executionResult.containsTerminalEvent()) {
                    _cmd.eventNotifier.markEvent(HystrixEventType.CANCELLED, _cmd.commandKey);
                    try {
                        executionHook.onUnsubscribe(_cmd);
                    } catch (Throwable hookEx) {
                        logger.warn("Error calling HystrixCommandExecutionHook.onUnsubscribe", hookEx);
                    }
                    _cmd.executionResultAtTimeOfCancellation = _cmd.executionResult
                            .addEvent((int) (System.currentTimeMillis() - _cmd.commandStartTimestamp), HystrixEventType.CANCELLED);
                }
                handleCommandEnd(true); //user code did run
            }
        }
    };

    final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
        @Override
        public Observable<R> call() {
            if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
                return Observable.never();
            }
            return applyHystrixSemantics(_cmd);
        }
    };

    final Func1<R, R> wrapWithAllOnNextHooks = new Func1<R, R>() {
        @Override
        public R call(R r) {
            R afterFirstApplication = r;

            try {
                afterFirstApplication = executionHook.onComplete(_cmd, r);
            } catch (Throwable hookEx) {
                logger.warn("Error calling HystrixCommandExecutionHook.onComplete", hookEx);
            }

            try {
                return executionHook.onEmit(_cmd, afterFirstApplication);
            } catch (Throwable hookEx) {
                logger.warn("Error calling HystrixCommandExecutionHook.onEmit", hookEx);
                return afterFirstApplication;
            }
        }
    };

    final Action0 fireOnCompletedHook = new Action0() {
        @Override
        public void call() {
            try {
                executionHook.onSuccess(_cmd);
            } catch (Throwable hookEx) {
                logger.warn("Error calling HystrixCommandExecutionHook.onSuccess", hookEx);
            }
        }
    };

    return Observable.defer(new Func0<Observable<R>>() {
        @Override
        public Observable<R> call() {
             /* this is a stateful object so can only be used once */
            if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {
                IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
                //TODO make a new error type for this
                throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null);
            }

            commandStartTimestamp = System.currentTimeMillis();

            if (properties.requestLogEnabled().get()) {
                // log this command execution regardless of what happened
                if (currentRequestLog != null) {
                    currentRequestLog.addExecutedCommand(_cmd);
                }
            }

            final boolean requestCacheEnabled = isRequestCachingEnabled();
            final String cacheKey = getCacheKey();

            /* try from cache first */
            if (requestCacheEnabled) {
                HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
                if (fromCache != null) {
                    isResponseFromCache = true;
                    return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                }
            }

            Observable<R> hystrixObservable =
                    Observable.defer(applyHystrixSemantics)
                            .map(wrapWithAllOnNextHooks);

            Observable<R> afterCache;

            // put in cache
            if (requestCacheEnabled && cacheKey != null) {
                // wrap it for caching
                HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
                HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
                if (fromCache != null) {
                    // another thread beat us so we'll use the cached value instead
                    toCache.unsubscribe();
                    isResponseFromCache = true;
                    return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                } else {
                    // we just created an ObservableCommand so we cast and return it
                    afterCache = toCache.toObservable();
                }
            } else {
                afterCache = hystrixObservable;
            }

            return afterCache
                    .doOnTerminate(terminateCommandCleanup)     // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
                    .doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
                    .doOnCompleted(fireOnCompletedHook);
        }
    });
}

参考文献

标签:return,get,Spring,cmd,Cloud,new,public,HystrixCommand,metaHolder
From: https://www.cnblogs.com/life-time/p/18409673

相关文章

  • 基于Springboot流浪动物之家平台【附源码+文档】
    ......
  • springboot一个ACG主题网站-计算机毕业设计源码94119
    目录摘要1绪论1.1选题背景与意义1.2国内外研究现状1.3论文结构与章节安排2网站分析2.1可行性分析2.2网站流程分析2.2.1网站开发流程2.2.2用户登录流程2.2.3网站操作流程2.2.4添加信息流程2.2.5修改信息流程2.2.6删除信息流程2.3 网站......
  • springboot学生毕设选题系统-计算机毕业设计源码90521
    摘 要本文详述了基于SpringBoot框架的学生毕业设计选题系统的设计与实现过程。该系统针对学生、指导老师和管理员三类用户,提供了全面的功能支持,旨在优化毕业设计选题流程,提高选题效率,并保障数据的安全性和系统的稳定性。系统为学生用户提供了注册登录、查看学校......
  • 14-spring cache 学习
    SpringCache介绍SpringCache是一个框架,实现了基于注解的缓存功能,只需要简单地加一个注解,就能实现缓存功能。SpringCahce提供了一层抽象,底层可以切换不同的cache实现,具体就是通过CacheManager接口来统一不同的缓存技术。CacheManager是Spring提供的各种缓存技术抽象......
  • Springboot项目部署时使用Mail注入Bean时的Constructor threw exception
    缘起:今天打算把写了好一段时间的项目提溜到服务器上转转,然后在启动的时候就发现了个问题,在日志跑到【JobFactorysetto:org.springframework.scheduling.quartz.SpringBeanJobFactory@31e7afde】的时候他就不动了,然后等了好一会他才抛出了个异常【org.springframework.beans.f......
  • 健身房|基于springboot的健身房管理系统设计与实现(附项目源码+论文+数据库)
    私信或留言即免费送开题报告和任务书(可指定任意题目)目录一、摘要二、相关技术三、系统设计四、数据库设计     五、核心代码      六、论文参考 七、源码获取  一、摘要随着信息技术在管理上越来越深入而广泛的应用,管理信息系统的实施在技术上已逐步......
  • 网上请假|基于springboot的学生网上请假系统设计与实现(附项目源码+论文+数据库)
    私信或留言即免费送开题报告和任务书(可指定任意题目)目录一、摘要二、相关技术三、系统设计四、数据库设计   五、核心代码      六、论文参考 七、源码获取  一、摘要随着信息技术在管理上越来越深入而广泛的应用,管理信息系统的实施在技术上已逐步成熟......
  • springboot+vue州州购【程序+论文+开题】计算机毕业设计
    系统程序文件列表开题报告内容研究背景随着互联网技术的飞速发展,电子商务已成为推动全球经济增长的重要引擎。在“新零售”概念的引领下,消费者对于购物体验的需求日益多元化和个性化,传统零售模式正面临前所未有的挑战与机遇。在此背景下,“州州购”应运而生,旨在打造一个集地......
  • springboot+vue新疆IP形象NFT藏品网站【程序+论文+开题】计算机毕业设计
    系统程序文件列表开题报告内容研究背景随着区块链技术的迅猛发展与数字经济的崛起,非同质化代币(NFT)作为一种全新的数字资产形式,正逐步改变着艺术品、收藏品乃至文化产业的传统格局。新疆,作为中国多元文化的瑰宝之地,其丰富的民族文化、自然风光及历史故事为NFT创作提供了无尽......
  • springboot+vue医院门诊管理自动化实现【程序+论文+开题】计算机毕业设计
    系统程序文件列表开题报告内容研究背景随着医疗技术的飞速发展和人口老龄化的加剧,医院门诊面临着前所未有的压力与挑战。传统的手工管理模式已难以满足日益增长的患者需求,排队时间长、信息不对称、效率低下等问题日益凸显。信息化、自动化成为提升医院门诊管理水平、优化患......