Hystrix的工作原理
原文:https://www.cnblogs.com/sglx/p/15771838.html
一、简介
hystrix经常被我们用于服务的熔断,降级等领域,基于RxJava(一种基于观察者模式的响应式编程框架)实现,具备服务降级、服务熔断、线程与信号隔离、请求缓存、请求合并以及服务监控等强大功能。
二、基本原理
当我们需要调用某个方法时(一般是远程调用),通过 Hystrix 将方法调用包裹起来,交由 Hystrix 来完成,从而享受 Hystrix 带来保护。
Hystrix 提供了两个请求命令:HystrixCommand、HystrixObservableCommand,可以使用这两个对象来包裹待执行的任务。
HystrixCommand用在依赖服务返回单个操作结果的时候:
execute():同步执行,从依赖的服务返回一个单一的结果对象,或是在发生错误的时候抛出异常。
queue():异步执行,直接返回一个Future对象,其中包含了服务执行结束时要返回的单一结果对象。
HystrixObservableCommand用在依赖服务返回多个操作结果的时候:
observe():返回Obervable对象,他代表了操作的多个结果,它是一个Hot Observable。
toObservable():同样返回Observable对象,也代表了操作多个结果,但它返回的是一个Cold Observable。
三、基本用法
hystrix可以使用手动自定义Command、注解、结合feign的方式来实现,手动创建和结合feign的这里就不介绍了,主要看一下注解的实现方式。
1、开启hystrix,启动类上加上@EnableHystrix注解
2、在需要降级的方法上使用@HystrixCommand
public class UserServiceImpl{
@Autowired
private UserDao userDao;
@HystrixCommand(fallbackMethod = "getUserNameFallBack")
@Override
public String getUserName(String userId){
int i = 1/0
return userDao.getNameById(userId);
}
public String getUserNameFallBack(String userId){
return "服务暂时不可用,请稍后再试";
}
}
四、初始化
从@EnableHystrix注解看起
//开启EnableCircuitBreaker
@EnableCircuitBreaker
public @interface EnableHystrix {
}
//导入EnableCircuitBreakerImportSelector
@Import(EnableCircuitBreakerImportSelector.class)
public @interface EnableCircuitBreaker {
}
在注解上又开启了一个注解@EnableCircuitBreaker,并导入了一个Selector
@Order(Ordered.LOWEST_PRECEDENCE - 100)
public class EnableCircuitBreakerImportSelector
extends SpringFactoryImportSelector<EnableCircuitBreaker> {
@Override
protected boolean isEnabled() {
return getEnvironment().getProperty("spring.cloud.circuit.breaker.enabled",
Boolean.class, Boolean.TRUE);
}
}
这是hystrix生效的一个关键点,继承了SpringFactoryImportSelector,此类在初始化后,会执行selectImports(AnnotationMetadata metadata)的方法。此方法会根据注解启动的注解(这里指@EnableCircuitBreaker)从spring.factories文件中获取其配置需要初始化@Configuration类,看下关键代码
List<String> factories = new ArrayList<>(new LinkedHashSet<>(SpringFactoriesLoader
.loadFactoryNames(this.annotationClass, this.beanClassLoader)));
看一下spring.factories文件
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.hystrix.HystrixAutoConfiguration,\
org.springframework.cloud.netflix.hystrix.HystrixCircuitBreakerAutoConfiguration,\
org.springframework.cloud.netflix.hystrix.ReactiveHystrixCircuitBreakerAutoConfiguration,\
org.springframework.cloud.netflix.hystrix.security.HystrixSecurityAutoConfiguration
org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker=\
org.springframework.cloud.netflix.hystrix.HystrixCircuitBreakerConfiguration
配置类HystrixCircuitBreakerConfiguration
@Configuration(proxyBeanMethods = false)
public class HystrixCircuitBreakerConfiguration {
@Bean
public HystrixCommandAspect hystrixCommandAspect() {
return new HystrixCommandAspect();
}
}
向spring注入了HystrixCommandAspect
@Aspect
public class HystrixCommandAspect {
private static final Map<HystrixPointcutType, MetaHolderFactory> META_HOLDER_FACTORY_MAP;
static {
META_HOLDER_FACTORY_MAP = ImmutableMap.<HystrixPointcutType, MetaHolderFactory>builder()
.put(HystrixPointcutType.COMMAND, new CommandMetaHolderFactory())
.put(HystrixPointcutType.COLLAPSER, new CollapserMetaHolderFactory())
.build();
}
//切点是所有使用了HystrixCommand注解的地方
@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")
public void hystrixCommandAnnotationPointcut() {
}
//切点是所有使用了HystrixCollapser注解的地方
@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)")
public void hystrixCollapserAnnotationPointcut() {
}
//环绕通知
@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
Method method = getMethodFromTarget(joinPoint);
Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);
//两个注解不能同时作用
if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " +
"annotations at the same time");
}
//META_HOLDER_FACTORY_MAP预先初始化了两个工厂类
//@HystrixCommand:CommandMetaHolderFactory
//@HystrixCollapser:CollapserMetaHolderFactory
MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
//创建,把切点封装进了MetaHolder
MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
// 创建HystrixInvokable,只是一个空接口,没有任何方法,只是用来标记具备可执行的能力
// 具体的执行由实现类来做
HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
//执行类型
ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();
Object result;
try {
if (!metaHolder.isObservable()) {
// 利用工具CommandExecutor来执行
result = CommandExecutor.execute(invokable, executionType, metaHolder);
} else {
result = executeObservable(invokable, executionType, metaHolder);
}
} catch (HystrixBadRequestException e) {
throw e.getCause();
} catch (HystrixRuntimeException e) {
throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
}
return result;
}
···
}
通过AOP编程,创建方法的代理,决定执行何种逻辑,创建参数包装类
public MetaHolder create(final ProceedingJoinPoint joinPoint) {
//获取方法对象
Method method = getMethodFromTarget(joinPoint);
//目标对象
Object obj = joinPoint.getTarget();
//方法的参数列表
Object[] args = joinPoint.getArgs();
//代理对象
Object proxy = joinPoint.getThis();
//调用子类create方法
return create(proxy, method, obj, args, joinPoint);
}
public MetaHolder create(Object proxy, Method method, Object obj, Object[] args, final ProceedingJoinPoint joinPoint) {
//获取HystrixCommand注解的信息
HystrixCommand hystrixCommand = method.getAnnotation(HystrixCommand.class);
//判断方法的返回值类型Future:异步,Observable:rxjava中的被观察者,其他:同步
ExecutionType executionType = ExecutionType.getExecutionType(method.getReturnType());
//获取MetaHolder的builder对象
MetaHolder.Builder builder = metaHolderBuilder(proxy, method, obj, args, joinPoint);
if (isCompileWeaving()) {
builder.ajcMethod(getAjcMethodFromTarget(joinPoint));
}
//建造者模式,创建MetaHolder包装类
return builder.defaultCommandKey(method.getName())
.hystrixCommand(hystrixCommand)
.observableExecutionMode(hystrixCommand.observableExecutionMode())
.executionType(executionType)
.observable(ExecutionType.OBSERVABLE == executionType)
.build();
}
创建命令执行器
public HystrixInvokable create(MetaHolder metaHolder) {
HystrixInvokable executable;
//@HystrixCollapser
if (metaHolder.isCollapserAnnotationPresent()) {
executable = new CommandCollapser(metaHolder);
//@HystrixCommand 并且 返回值类型是Observable
} else if (metaHolder.isObservable()) {
executable = new GenericObservableCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));
} else {
//其他情况 把HystrixCommandBuilder封装进GenericCommand
executable = new GenericCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));
}
return executable;
}
public GenericCommand(HystrixCommandBuilder builder) {
super(builder);
}
protected AbstractHystrixCommand(HystrixCommandBuilder builder) {
super(builder.getSetterBuilder().build());
//命令形式 包含需要执行的方法 fallback方法
this.commandActions = builder.getCommandActions();
this.collapsedRequests = builder.getCollapsedRequests();
this.cacheResultInvocationContext = builder.getCacheResultInvocationContext();
this.cacheRemoveInvocationContext = builder.getCacheRemoveInvocationContext();
this.ignoreExceptions = builder.getIgnoreExceptions();
//执行类型 ASYNCHRONOUS SYNCHRONOUS OBSERVABLE
this.executionType = builder.getExecutionType();
}
public HystrixCommand.Setter build() throws HystrixPropertyException {
//分组key:类名,命令key:方法名,线程池key
HystrixCommand.Setter setter = HystrixCommand.Setter
.withGroupKey(HystrixCommandGroupKey.Factory.asKey(groupKey))
.andCommandKey(HystrixCommandKey.Factory.asKey(commandKey));
if (StringUtils.isNotBlank(threadPoolKey)) {
setter.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey(threadPoolKey));
}
try {
//初始化线程池的配置
setter.andThreadPoolPropertiesDefaults(HystrixPropertiesManager.initializeThreadPoolProperties(threadPoolProperties));
} catch (IllegalArgumentException e) {
throw new HystrixPropertyException("Failed to set Thread Pool properties. " + getInfo(), e);
}
try {
//初始化命令执行配置
setter.andCommandPropertiesDefaults(HystrixPropertiesManager.initializeCommandProperties(commandProperties));
} catch (IllegalArgumentException e) {
throw new HystrixPropertyException("Failed to set Command properties. " + getInfo(), e);
}
return setter;
}
执行命令
public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder)
throws RuntimeException {
Validate.notNull(invokable);
Validate.notNull(metaHolder);
switch (executionType) {
case SYNCHRONOUS: {
//转换成子接口HystrixExecutable
return castToExecutable(invokable, executionType).execute();
}
case ASYNCHRONOUS: {
HystrixExecutable executable = castToExecutable(invokable, executionType);
if (metaHolder.hasFallbackMethodCommand()
&& ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) {
return new FutureDecorator(executable.queue());
}
return executable.queue();
}
case OBSERVABLE: {
HystrixObservable observable = castToObservable(invokable);
return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode()
? observable.observe() : observable.toObservable();
}
default:
throw new RuntimeException("unsupported execution type: " + executionType);
}
}
public R execute() {
try {
return queue().get();
} catch (Exception e) {
throw Exceptions.sneakyThrow(decomposeException(e));
}
}
HystrixCommand的四种执行方式
测试类
public class HystrixCommandTest {
private <T> com.netflix.hystrix.HystrixCommand<T> createCommand(T message) {
com.netflix.hystrix.HystrixCommand.Setter setter = com.netflix.hystrix.HystrixCommand.Setter
.withGroupKey(HystrixCommandGroupKey.Factory.asKey("serviceA"))
.andCommandKey(HystrixCommandKey.Factory.asKey("methodA"));
return new com.netflix.hystrix.HystrixCommand<T>(setter) {
@Override
protected T run() throws Exception {
System.out.println("HystrixCommand执行了!!!" + System.currentTimeMillis());
return message;
}
};
}
@Test
public void test01() {
com.netflix.hystrix.HystrixCommand<String> command = createCommand("this is test01");
System.out.println(command.execute());
}
@Test
public void test02() throws ExecutionException, InterruptedException {
com.netflix.hystrix.HystrixCommand<String> command = createCommand("this is test02");
Future<String> f = command.queue();
System.out.println("queue之后,command执行:" + System.currentTimeMillis());
Thread.sleep(1000);
System.out.println(f.get());
}
@Test
public void test03() throws InterruptedException {
HystrixCommand<String> command = createCommand("this is test03");
// observe直接执行run方法,称为Hot Observable
Observable<String> observe = command.observe();
System.out.println("observe之后,command执行:" + System.currentTimeMillis());
Thread.sleep(1000);
observe.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println("通过订阅,获取执行结果:" + s);
}
});
}
@Test
public void test04() throws InterruptedException {
HystrixCommand<String> command = createCommand("this is test04");
// toObservable不直接执行run方法
Observable<String> observe = command.toObservable();
System.out.println("未订阅,command不执行:" + System.currentTimeMillis());
Thread.sleep(1000);
observe.subscribe();
System.out.println("订阅后,command执行了" + System.currentTimeMillis());
Thread.sleep(1000);
}
@Test
public void test05() throws InterruptedException, ExecutionException {
HystrixCommand<String> command = createCommand("this is test05");
// toObservable不直接执行run方法
Future<String> f = command.toObservable().toBlocking().toFuture();
System.out.println("转成future执行:" + System.currentTimeMillis());
Thread.sleep(1000);
System.out.println(f.get());
}
}
回到hystrix源码中queue()方法
public Future<R> queue() {
// toObservable转换为Observable
// toBlocking转换为BlockingObservable
// toFuture转换为Future
// 完成了Observable的创建和订阅
final Future<R> delegate = toObservable().toBlocking().toFuture();
// 代理future,对于cancel操作做特殊处理
// 因为toObservable().toBlocking().toFuture()返回的future无法通过cancel方法中断执行线程。
final Future<R> f = new Future<R>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (delegate.isCancelled()) {
return false;
}
// 如果 execution.isolation.thread.interruptOnFutureCancel = true(默认false)
if (HystrixCommand.this.getProperties().executionIsolationThreadInterruptOnFutureCancel().get()) {
// 设置标志位
interruptOnFutureCancel.compareAndSet(false, mayInterruptIfRunning);
}
// 执行目标future的cancel
final boolean res = delegate.cancel(interruptOnFutureCancel.get());
// 如果command还没执行完成 且 需要中断执行的线程
if (!isExecutionComplete() && interruptOnFutureCancel.get()) {
// 获取执行线程
final Thread t = executionThread.get();
// 执行线程非当前线程则中断线程
if (t != null && !t.equals(Thread.currentThread())) {
t.interrupt();
}
}
return res;
}
```
};
//判断是否执行完成
if (f.isDone()) {
try {
//获取结果
f.get();
return f;
} catch (Exception e) {
···
}
}
return f;
}
这里用到了RxJava框架的响应式编程,会执行到具体Command(之前封装的GenericCommand)的run方法
public class GenericCommand extends AbstractHystrixCommand<Object> {
private static final Logger LOGGER = LoggerFactory.getLogger(GenericCommand.class);
public GenericCommand(HystrixCommandBuilder builder) {
super(builder);
}
//执行目标的方法
@Override
protected Object run() throws Exception {
LOGGER.debug("execute command: {}", getCommandKey().name());
return process(new Action() {
@Override
Object execute() {
return getCommandAction().execute(getExecutionType());
}
});
}
//执行fallback方法
@Override
protected Object getFallback() {
final CommandAction commandAction = getFallbackAction();
if (commandAction != null) {
try {
return process(new Action() {
@Override
Object execute() {
MetaHolder metaHolder = commandAction.getMetaHolder();
Object[] args = createArgsForFallback(metaHolder, getExecutionException());
return commandAction.executeWithArgs(metaHolder.getFallbackExecutionType(), args);
}
});
} catch (Throwable e) {
LOGGER.error(FallbackErrorMessageBuilder.create()
.append(commandAction, e).build());
throw new FallbackInvocationException(unwrapCause(e));
}
} else {
return super.getFallback();
}
}
}
五、Hystrix上下文
当feign结合Hystrix使用线程隔离时,如果我们想要使用ThreadLocal传递参数是不行的,存在跨线程传递的问题,Hystrix提供了一个上下文类HystrixRequestContext,以传递traceId为例
标签:return,hystrix,Hystrix,工作,new,原理,public,HystrixCommand,metaHolder From: https://www.cnblogs.com/JaxYoun/p/17466880.html