本篇内容为解析Spring Cloud Openfeign在如下场景中的运行原理
- Openfeign单独使用
- 集成负载均衡器,这里选择Ribbon,也可以选择Spring LoadBalancer
- 集成断路器,这里选择Hystrix,也可以选择Sentinel
相关依赖如下,使用的Spring Cloud版本为Hoxton.SR3
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
</dependency>
使用feign
单独使用feign时,就只是一个声明式的HTTP客户端。我们需要在SpringBootApplication类上添加注解@EnableFeignClients,然后再创建一个接口,并添加注解@FeignClient,然后在该接口中添加方法。
接口示例
@FeignClient(value = "manage-api", contextId = "listbox-client", url = "http://127.0.0.1:8088/")
public interface ListBoxClient {
@GetMapping(value = "/list/box")
ResponseEntity<ListBoxVO> getListBox(@RequestParam("code") String code, @RequestParam("codeValue") String codeValue);
}
原理解析
我们在使用时,直接调用接口方法,就可以进行服务调用。为什么如此简单?这是因为Spring在后面默默做了很多事情。
在程序启动时,Spring 会扫描注解@FeignClient,并根据注解信息,为每一个接口创建一个feign.Feign实例的代理对象,并将接口关联到该代理对象。
调用ListBoxClient接口,其实是调用对应的Feign实例。Feign中包含很多模块,例如Contract、Logger、Client、Retryer、Encoder、Decoder、RequestInterceptor、InvocationHandlerFactory等。
Feign中部分模块含义如下
- Contract:用于解析方法上的注解信息,例如请求方式,请求地址
- InvocationHandlerFactory:用于处理接口回调,所有对接口的调用,都会转入到InvocationHandler中
- Client:用于执行请求发送
接下来,我们重点关注一下代理对象Feign的创建、回调处理器InvocationHandlerFactory和请求执行客户端Client
代理对象的创建
FeignClientFactoryBean
Spring在收集到@Feignclient注解时,会根据注解中的配置,为其创建代理对象,代理对象是一个Feign实例。对象的创建使用FeignClientFactoryBean.getObject()方法。
@Override
public Object getObject() throws Exception {
return getTarget();
}
<T> T getTarget() {
FeignContext context = this.applicationContext.getBean(FeignContext.class);
Feign.Builder builder = feign(context);
if (!StringUtils.hasText(this.url)) {
if (!this.name.startsWith("http")) {
this.url = "http://" + this.name;
}
else {
this.url = this.name;
}
this.url += cleanPath();
return (T) loadBalance(builder, context,
new HardCodedTarget<>(this.type, this.name, this.url));
}
if (StringUtils.hasText(this.url) && !this.url.startsWith("http")) {
this.url = "http://" + this.url;
}
String url = this.url + cleanPath();
Client client = getOptional(context, Client.class);
if (client != null) {
if (client instanceof LoadBalancerFeignClient) {
// not load balancing because we have a url,
// but ribbon is on the classpath, so unwrap
client = ((LoadBalancerFeignClient) client).getDelegate();
}
if (client instanceof FeignBlockingLoadBalancerClient) {
// not load balancing because we have a url,
// but Spring Cloud LoadBalancer is on the classpath, so unwrap
client = ((FeignBlockingLoadBalancerClient) client).getDelegate();
}
builder.client(client);
}
Targeter targeter = get(context, Targeter.class);
return (T) targeter.target(this, builder, context,
new HardCodedTarget<>(this.type, this.name, url));
}
Targeter
FeignClientFactoryBean.getTarget()最终调用的是Targeter.target(),在这一步中,就将代理对象(Feign实例)和FeignClient接口绑定在了一起。
package org.springframework.cloud.openfeign;
class DefaultTargeter implements Targeter {
@Override
public <T> T target(FeignClientFactoryBean factory, Feign.Builder feign,
FeignContext context, Target.HardCodedTarget<T> target) {
return feign.target(target);
}
}
ReflectiveFeign
Targeter.target()方法中,最终调用的是Feign.Builder.target()方法,Builder是Feign中的静态类。在这一步中真正创建了Feign实例,并将其作为代理对象,代理feignclient接口。
可以发现,创建的Feign实例实现是ReflectiveFeign类。
public <T> T target(Target<T> target) {
return build().newInstance(target);
}
public Feign build() {
SynchronousMethodHandler.Factory synchronousMethodHandlerFactory =
new SynchronousMethodHandler.Factory(client, retryer, requestInterceptors, logger,
logLevel, decode404, closeAfterDecode, propagationPolicy);
ParseHandlersByName handlersByName =
new ParseHandlersByName(contract, options, encoder, decoder, queryMapEncoder,
errorDecoder, synchronousMethodHandlerFactory);
return new ReflectiveFeign(handlersByName, invocationHandlerFactory, queryMapEncoder);
}
@Override
public <T> T newInstance(Target<T> target) {
Map<String, MethodHandler> nameToHandler = targetToHandlersByName.apply(target);
Map<Method, MethodHandler> methodToHandler = new LinkedHashMap<Method, MethodHandler>();
List<DefaultMethodHandler> defaultMethodHandlers = new LinkedList<DefaultMethodHandler>();
for (Method method : target.type().getMethods()) {
if (method.getDeclaringClass() == Object.class) {
continue;
} else if (Util.isDefault(method)) {
DefaultMethodHandler handler = new DefaultMethodHandler(method);
defaultMethodHandlers.add(handler);
methodToHandler.put(method, handler);
} else {
methodToHandler.put(method, nameToHandler.get(Feign.configKey(target.type(), method)));
}
}
InvocationHandler handler = factory.create(target, methodToHandler);
T proxy = (T) Proxy.newProxyInstance(target.type().getClassLoader(),
new Class<?>[] {target.type()}, handler);
for (DefaultMethodHandler defaultMethodHandler : defaultMethodHandlers) {
defaultMethodHandler.bindTo(proxy);
}
return proxy;
}
回调处理器
Feign实例中,用于处理接口回调的类是InvocationHandler,其由一个工厂类InvocationHandlerFactory生成。工厂类接口中还有一个MethodHandler,其用于处理接口中的方法。在进行回调时,先调用InvocationHandler的invoke()方法,在invoke()方法中,再根据实际的方法调用使用对应的MethodHandler的invoke()方法进行处理。
也就是说,InvocationHandler总览一个接口所有的调用,其中再根据调用方法的不同,分发到不同的MethodHandler。
public interface InvocationHandlerFactory {
// 一个target表示一个接口,dispatch代表的是接口中的方法以及对应的方法处理
InvocationHandler create(Target target, Map<Method, MethodHandler> dispatch);
/**
* Like {@link InvocationHandler#invoke(Object, java.lang.reflect.Method, Object[])}, except for a
* single method.
*/
interface MethodHandler {
Object invoke(Object[] argv) throws Throwable;
}
static final class Default implements InvocationHandlerFactory {
@Override
public InvocationHandler create(Target target, Map<Method, MethodHandler> dispatch) {
return new ReflectiveFeign.FeignInvocationHandler(target, dispatch);
}
}
}
InvocationHandlerFactory
Feign中使用的InvocationHandlerFactory实现是InvocationHandlerFactory.Default。
private InvocationHandlerFactory invocationHandlerFactory =
new InvocationHandlerFactory.Default();
InvocationHandlerFactory.Default使用Feign子类ReflectiveFeign中的静态类FeignInvocationHandler来处理回调。
static final class Default implements InvocationHandlerFactory {
@Override
public InvocationHandler create(Target target, Map<Method, MethodHandler> dispatch) {
return new ReflectiveFeign.FeignInvocationHandler(target, dispatch);
}
}
InvocationHandler
InvocationHandler的实现类是ReflectiveFeign中的静态类FeignInvocationHandler,其中invoke()方法用于处理具体的接口调用。
static class FeignInvocationHandler implements InvocationHandler {
private final Target target;
private final Map<Method, MethodHandler> dispatch;
FeignInvocationHandler(Target target, Map<Method, MethodHandler> dispatch) {
this.target = checkNotNull(target, "target");
this.dispatch = checkNotNull(dispatch, "dispatch for %s", target);
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if ("equals".equals(method.getName())) {
try {
Object otherHandler =
args.length > 0 && args[0] != null ? Proxy.getInvocationHandler(args[0]) : null;
return equals(otherHandler);
} catch (IllegalArgumentException e) {
return false;
}
} else if ("hashCode".equals(method.getName())) {
return hashCode();
} else if ("toString".equals(method.getName())) {
return toString();
}
// 转发到具体的methodHandler
return dispatch.get(method).invoke(args);
}
}
MethodHandler
MethodHandler的实现类是SynchronousMethodHandler,在invoke()方法中组装Request,然后使用Client发送请求。
在invoke()方法中,还封装了失败重试的逻辑,见Retryer。
@Override
public Object invoke(Object[] argv) throws Throwable {
RequestTemplate template = buildTemplateFromArgs.create(argv);
// readTimeout、connectTimeout
Options options = findOptions(argv);
// 重试器
Retryer retryer = this.retryer.clone();
while (true) {
try {
return executeAndDecode(template, options);
} catch (RetryableException e) {
try {
retryer.continueOrPropagate(e);
} catch (RetryableException th) {
Throwable cause = th.getCause();
if (propagationPolicy == UNWRAP && cause != null) {
throw cause;
} else {
throw th;
}
}
if (logLevel != Logger.Level.NONE) {
logger.logRetry(metadata.configKey(), logLevel);
}
continue;
}
}
}
请求执行客户端
在MethodHandler中,组装好Request对象后,需要通过一个Client对象进行发送。Client中定义了一个execute方法,用于执行请求发送。
public interface Client {
/**
* Executes a request against its {@link Request#url() url} and returns a response.
*
* @param request safe to replay.
* @param options options to apply to this request.
* @return connected response, {@link Response.Body} is absent or unread.
* @throws IOException on a network error connecting to {@link Request#url()}.
*/
Response execute(Request request, Options options) throws IOException;
class Default implements Client {
private final SSLSocketFactory sslContextFactory;
private final HostnameVerifier hostnameVerifier;
public Default(SSLSocketFactory sslContextFactory, HostnameVerifier hostnameVerifier) {
this.sslContextFactory = sslContextFactory;
this.hostnameVerifier = hostnameVerifier;
}
@Override
public Response execute(Request request, Options options) throws IOException {
HttpURLConnection connection = convertAndSend(request, options);
return convertResponse(connection, request);
}
}
Feign中使用的Client实现是feign.Client.Default,我们可以看到execute方法实现中,新建了一个Http连接对象HttpURLConnection。
Default的实现中每次都会新建连接,这明显增加了调用延迟。更好的方式是应用连接池的方式,将创建的连接缓存起来使用,这类实现有Apache httpclient、okhttp。
class Default implements Client {
private final SSLSocketFactory sslContextFactory;
private final HostnameVerifier hostnameVerifier;
public Default(SSLSocketFactory sslContextFactory, HostnameVerifier hostnameVerifier) {
this.sslContextFactory = sslContextFactory;
this.hostnameVerifier = hostnameVerifier;
}
@Override
public Response execute(Request request, Options options) throws IOException {
HttpURLConnection connection = convertAndSend(request, options);
return convertResponse(connection, request);
}
结合ribbon
feign集成ribbon后,在服务存在多个实例时,获得了负载均衡的能力。集成ribbon后@FeignCLient注解中不需要再进行url的配置,只需配置服务名即可。
@FeignClient(value = "manage-api", contextId = "listbox-client")
public interface ListBoxClient{
}
那负载均衡怎么实现?回顾feign中的模块,我们知道有一个客户端用于执行请求发送。那我们对这个执行请求发送的客户端进行重写,在新建连接时,根据服务名,在多个实例中动态选择一个实例,这不就实现了负载均衡吗?
实际上,ribbon就是这样做的。
封装请求执行客户端
集成ribbon后,feign中的请求执行客户端从feign.Client.Default切换为LoadBalancerFeignClient,见自动配置类FeignRibbonClientAutoConfiguration。
其中封装了一个feign.Client对象,用于执行请求发送。还新增了一个CachingSpringLoadBalancerFactory和一个SpringClientFactory。
- SpringClientFactory:作用是通过服务名获取对应的配置,例如readTimeout。
- CachingSpringLoadBalancerFactory:作用是通过服务名获取对应的负载均衡对象FeignLoadBalancer,这个负载均衡对象中封装了ribbon中的核心功能,例如负载均衡。
ribbon的相关功能和配置,见RibbonClientConfiguration类。
LoadBalancerFeignClient
在LoadBalancerFeignClient的execute() 方法中,其根据参数和配置组装RibbonRequest、RequestConfig对象。
然后调用lbClient()方法获取一个FeignLoadBalancer对象,执行其executeWithLoadBalancer()方法,执行请求发送。
public Response execute(Request request, Request.Options options) throws IOException {
try {
URI asUri = URI.create(request.url());
String clientName = asUri.getHost();
URI uriWithoutHost = cleanUrl(request.url(), clientName);
// this.delegate就是一个feign.Client
FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
this.delegate, request, uriWithoutHost);
// 服务配置,如readTimeout、connectTimeout
IClientConfig requestConfig = getClientConfig(options, clientName);
// lbClient()返回一个FeignLoadBalancer对象
return lbClient(clientName)
.executeWithLoadBalancer(ribbonRequest, requestConfig).toResponse();
}
catch (ClientException e) {
IOException io = findIOException(e);
if (io != null) {
throw io;
}
throw new RuntimeException(e);
}
}
FeignLoadBalancer
FeignLoadBalancer.executeWithLoadBalancer()方法继承自其抽象类AbstractLoadBalancerAwareClient,在方法中,ribbon相关的功能被封装在了一个LoadBalancerCommand对象中。
调用command.submit()方法,并提供了一个ServerOperation实现,ServerOperation的作用是使用一个Feign.Client进行请求的发送。
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);
try {
return command.submit(
new ServerOperation<T>() {
@Override
public Observable<T> call(Server server) {
URI finalUri = reconstructURIWithServer(server, request.getUri());
S requestForServer = (S) request.replaceUri(finalUri);
try {
return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
}
catch (Exception e) {
return Observable.error(e);
}
}
})
.toBlocking()
.single();
} catch (Exception e) {
Throwable t = e.getCause();
if (t instanceof ClientException) {
throw (ClientException) t;
} else {
throw new ClientException(e);
}
}
}
LoadBalancerCommand
LoadBalancerCommand的submit()方法,包含了ribbon中的核心功能,一些方法和变量作用如下
- selectServer():动态选择一个服务实例,默认轮询
- retryHandler:用于执行请求重试,默认未启用
- serverStats:用于跟踪和记录服务实例的统计信息,例如请求成功次数、请求失败次数、响应时间等。它可以与负载均衡器结合使用,以帮助其做出更加智能的选择
public Observable<T> submit(final ServerOperation<T> operation) {
final ExecutionInfoContext context = new ExecutionInfoContext();
if (listenerInvoker != null) {
try {
listenerInvoker.onExecutionStart();
} catch (AbortExecutionException e) {
return Observable.error(e);
}
}
final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();
// Use the load balancer
Observable<T> o =
(server == null ? selectServer() : Observable.just(server))
.concatMap(new Func1<Server, Observable<T>>() {
@Override
// Called for each server being selected
public Observable<T> call(Server server) {
context.setServer(server);
final ServerStats stats = loadBalancerContext.getServerStats(server);
// Called for each attempt and retry
Observable<T> o = Observable
.just(server)
.concatMap(new Func1<Server, Observable<T>>() {
@Override
public Observable<T> call(final Server server) {
context.incAttemptCount();
loadBalancerContext.noteOpenConnection(stats);
if (listenerInvoker != null) {
try {
listenerInvoker.onStartWithServer(context.toExecutionInfo());
} catch (AbortExecutionException e) {
return Observable.error(e);
}
}
final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start();
return operation.call(server).doOnEach(new Observer<T>() {
private T entity;
@Override
public void onCompleted() {
recordStats(tracer, stats, entity, null);
// TODO: What to do if onNext or one rror are never called?
}
@Override
public void one rror(Throwable e) {
recordStats(tracer, stats, null, e);
logger.debug("Got error {} when executed on server {}", e, server);
if (listenerInvoker != null) {
listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo());
}
}
@Override
public void onNext(T entity) {
this.entity = entity;
if (listenerInvoker != null) {
listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo());
}
}
private void recordStats(Stopwatch tracer, ServerStats stats, Object entity, Throwable exception) {
tracer.stop();
loadBalancerContext.noteRequestCompletion(stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler);
}
});
}
});
if (maxRetrysSame > 0)
o = o.retry(retryPolicy(maxRetrysSame, true));
return o;
}
});
if (maxRetrysNext > 0 && server == null)
o = o.retry(retryPolicy(maxRetrysNext, false));
return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() {
@Override
public Observable<T> call(Throwable e) {
if (context.getAttemptCount() > 0) {
if (maxRetrysNext > 0 && context.getServerAttemptCount() == (maxRetrysNext + 1)) {
e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED,
"Number of retries on next server exceeded max " + maxRetrysNext
+ " retries, while making a call for: " + context.getServer(), e);
}
else if (maxRetrysSame > 0 && context.getAttemptCount() == (maxRetrysSame + 1)) {
e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED,
"Number of retries exceeded max " + maxRetrysSame
+ " retries, while making a call for: " + context.getServer(), e);
}
}
if (listenerInvoker != null) {
listenerInvoker.onExecutionFailed(e, context.toFinalExecutionInfo());
}
return Observable.error(e);
}
});
}
负载均衡策略
riibbon中定义一个负载均衡策略接口IRule,并提供了多种实现,默认使用的策略是RoundRobinRule。
- RoundRobinRule:轮询策略,按照顺序轮询选择服务实例,每个请求依次分发到不同的服务实例上。
- RandomRule:随机策略,随机选择一个服务实例来处理请求。
- BestAvailableRule:最佳可用策略,最佳可用指的是指选择当前可用服务实例中并发连接数最低的实例。服务状态信息由ServerStats维护,
除了ribbon中提供的负载均衡实现,一些注册中心也提供了自己的规则,如NacosRule。其可以结合服务实例的权重、健康状态来实现更加灵活的负载策略。
自定义****负载均衡
使用BestAvailableRule作为负载均衡规则,BestAvailableRule中创建了一个LoadBalancerStats变量,用于记录和保存负载均衡器的状态。在其内部,使用ServerStats记录每个服务实例的状态信息。每次进行实例选择时,会先检测服务实例的状态,优先选择负载小的实例。
配置策略
全局配置
@Configuration
public class RibbonConfiguration {
@Bean
public IRule ribbonRule() {
return new BestAvailableRule();
}
}
结合hystrix
在feign集成hystrix后,可以获得服务熔断、降级等能力。
这些能力是怎么实现的呢?回顾feign中的内容,我们知道有一个回调处理器,在回调处理器中进行具体方法调用。那我们可不可以重写这个回调处理器,在一个时间窗口中,记录服务请求的次数,并计算失败率。在请求次数和失败率达到一定条件时,对该服务的请求,我们不再进行Http调用,而是走降级通道,调用方法对应的fallback方法。
实际上,hystrix也是这样做的。
封装回调处理器
在集成hystrix后,在通过FeignClientFactoryBean.getObject()获取Feign实例时,使用的Targeter是HystrixTargeter。在其中,将原本的Feign.Builder替换成了新的实现HystrixFeign.Builder。
class HystrixTargeter implements Targeter {
@Override
public <T> T target(FeignClientFactoryBean factory, Feign.Builder feign,
FeignContext context, Target.HardCodedTarget<T> target) {
if (!(feign instanceof feign.hystrix.HystrixFeign.Builder)) {
return feign.target(target);
}
// hystrix feign实现
feign.hystrix.HystrixFeign.Builder builder = (feign.hystrix.HystrixFeign.Builder) feign;
String name = StringUtils.isEmpty(factory.getContextId()) ? factory.getName()
: factory.getContextId();
// hystrixCommand 相关的配置
SetterFactory setterFactory = getOptional(name, context, SetterFactory.class);
if (setterFactory != null) {
builder.setterFactory(setterFactory);
}
// 服务降级回调相关
Class<?> fallback = factory.getFallback();
if (fallback != void.class) {
return targetWithFallback(name, context, target, builder, fallback);
}
Class<?> fallbackFactory = factory.getFallbackFactory();
if (fallbackFactory != void.class) {
return targetWithFallbackFactory(name, context, target, builder,
fallbackFactory);
}
return feign.target(target);
}
}
HystrixFeign.Builder中重写了InvocationHandlerFactory实现,使用一个HystrixInvocationHandler处理请求回调。
Feign build(final FallbackFactory<?> nullableFallbackFactory) {
super.invocationHandlerFactory(new InvocationHandlerFactory() {
@Override
public InvocationHandler create(Target target,
Map<Method, MethodHandler> dispatch) {
return new HystrixInvocationHandler(target, dispatch, setterFactory,
nullableFallbackFactory);
}
});
super.contract(new HystrixDelegatingContract(contract));
return super.build();
}
HystrixInvocationHandler中将方法回调(MethodHandler)封装在了一个HystrixCommand中执行,hystrix中的功能基本都封装在了这个HystrixCommand对象中。
@Override
public Object invoke(final Object proxy, final Method method, final Object[] args)
throws Throwable {
// early exit if the invoked method is from java.lang.Object
// code is the same as ReflectiveFeign.FeignInvocationHandler
if ("equals".equals(method.getName())) {
try {
Object otherHandler =
args.length > 0 && args[0] != null ? Proxy.getInvocationHandler(args[0]) : null;
return equals(otherHandler);
} catch (IllegalArgumentException e) {
return false;
}
} else if ("hashCode".equals(method.getName())) {
return hashCode();
} else if ("toString".equals(method.getName())) {
return toString();
}
// 新建一个HystrixCommand对象,参数为方法上的注解@HystrixCommand中的配置信息
// setterMethodMap.get(method)的作用是获取该方法上的@HystrixCommand注解配置信息
HystrixCommand<Object> hystrixCommand =
new HystrixCommand<Object>(setterMethodMap.get(method)) {
@Override
protected Object run() throws Exception {
try {
// 执行具体的方法回调
return HystrixInvocationHandler.this.dispatch.get(method).invoke(args);
} catch (Exception e) {
throw e;
} catch (Throwable t) {
throw (Error) t;
}
}
@Override
protected Object getFallback() {
if (fallbackFactory == null) {
return super.getFallback();
}
try {
// 获取该方法对应的fallback方法
Object fallback = fallbackFactory.create(getExecutionException());
Object result = fallbackMethodMap.get(method).invoke(fallback, args);
if (isReturnsHystrixCommand(method)) {
return ((HystrixCommand) result).execute();
} else if (isReturnsObservable(method)) {
// Create a cold Observable
return ((Observable) result).toBlocking().first();
} else if (isReturnsSingle(method)) {
// Create a cold Observable as a Single
return ((Single) result).toObservable().toBlocking().first();
} else if (isReturnsCompletable(method)) {
((Completable) result).await();
return null;
} else if (isReturnsCompletableFuture(method)) {
return ((Future) result).get();
} else {
return result;
}
} catch (IllegalAccessException e) {
// shouldn't happen as method is public due to being an interface
throw new AssertionError(e);
} catch (InvocationTargetException | ExecutionException e) {
// Exceptions on fallback are tossed by Hystrix
throw new AssertionError(e.getCause());
} catch (InterruptedException e) {
// Exceptions on fallback are tossed by Hystrix
Thread.currentThread().interrupt();
throw new AssertionError(e.getCause());
}
}
};
if (Util.isDefault(method)) {
return hystrixCommand.execute();
} else if (isReturnsHystrixCommand(method)) {
return hystrixCommand;
} else if (isReturnsObservable(method)) {
// Create a cold Observable
return hystrixCommand.toObservable();
} else if (isReturnsSingle(method)) {
// Create a cold Observable as a Single
return hystrixCommand.toObservable().toSingle();
} else if (isReturnsCompletable(method)) {
return hystrixCommand.toObservable().toCompletable();
} else if (isReturnsCompletableFuture(method)) {
return new ObservableCompletableFuture<>(hystrixCommand);
}
return hystrixCommand.execute();
}
HystrixCommand
HystrixCommand是Hystrix中的核心类,其根据注解@HystrixCommand中的配置来进行处理
- 服务隔离方式,如默认的策略是线程隔离,由一个线程池来执行方法回调
- 请求监控,如默认的一个监控时间窗口为10秒
- 断路器配置,例如一个时间窗口中,至少有10个请求且有50%的失败率,则断路器打开持续5秒
- 服务降级,在服务熔断后,对该服务的请求直接走服务降级逻辑,即对应的fallback方法
相关的类见HystrixCommandProperties、HystrixCircuitBreaker
abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
private static final Logger logger= LoggerFactory.getLogger(AbstractCommand.class);
protected final HystrixCircuitBreaker circuitBreaker;
protected final HystrixThreadPool threadPool;
protected final HystrixThreadPoolKey threadPoolKey;
protected final HystrixCommandProperties properties;
}
标签:return,Openfeign,Hystrix,final,public,new,method,Ribbon,target
From: https://www.cnblogs.com/cd-along/p/18213932