首页 > 其他分享 >Openfeign集成Ribbon、Hystrix原理解析

Openfeign集成Ribbon、Hystrix原理解析

时间:2024-05-26 16:55:13浏览次数:18  
标签:return Openfeign Hystrix final public new method Ribbon target

本篇内容为解析Spring Cloud Openfeign在如下场景中的运行原理

  • Openfeign单独使用
  • 集成负载均衡器,这里选择Ribbon,也可以选择Spring LoadBalancer
  • 集成断路器,这里选择Hystrix,也可以选择Sentinel

相关依赖如下,使用的Spring Cloud版本为Hoxton.SR3

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
</dependency>

使用feign

单独使用feign时,就只是一个声明式的HTTP客户端。我们需要在SpringBootApplication类上添加注解@EnableFeignClients,然后再创建一个接口,并添加注解@FeignClient,然后在该接口中添加方法。

接口示例

@FeignClient(value = "manage-api", contextId = "listbox-client", url = "http://127.0.0.1:8088/")
public interface ListBoxClient {

@GetMapping(value = "/list/box")
ResponseEntity<ListBoxVO> getListBox(@RequestParam("code") String code, @RequestParam("codeValue") String codeValue);
}

原理解析

我们在使用时,直接调用接口方法,就可以进行服务调用。为什么如此简单?这是因为Spring在后面默默做了很多事情。

在程序启动时,Spring 会扫描注解@FeignClient,并根据注解信息,为每一个接口创建一个feign.Feign实例的代理对象,并将接口关联到该代理对象。

调用ListBoxClient接口,其实是调用对应的Feign实例。Feign中包含很多模块,例如Contract、Logger、Client、Retryer、Encoder、Decoder、RequestInterceptor、InvocationHandlerFactory等。

Feign中部分模块含义如下

  • Contract:用于解析方法上的注解信息,例如请求方式,请求地址
  • InvocationHandlerFactory:用于处理接口回调,所有对接口的调用,都会转入到InvocationHandler中
  • Client:用于执行请求发送

接下来,我们重点关注一下代理对象Feign的创建、回调处理器InvocationHandlerFactory和请求执行客户端Client

代理对象的创建

FeignClientFactoryBean

Spring在收集到@Feignclient注解时,会根据注解中的配置,为其创建代理对象,代理对象是一个Feign实例。对象的创建使用FeignClientFactoryBean.getObject()方法。

@Override
public Object getObject() throws Exception {
   return getTarget();
}

<T> T getTarget() {
   FeignContext context = this.applicationContext.getBean(FeignContext.class);
   Feign.Builder builder = feign(context);

   if (!StringUtils.hasText(this.url)) {
      if (!this.name.startsWith("http")) {
         this.url = "http://" + this.name;
      }
      else {
         this.url = this.name;
      }
      this.url += cleanPath();
      return (T) loadBalance(builder, context,
            new HardCodedTarget<>(this.type, this.name, this.url));
   }
   if (StringUtils.hasText(this.url) && !this.url.startsWith("http")) {
      this.url = "http://" + this.url;
   }
   String url = this.url + cleanPath();
   Client client = getOptional(context, Client.class);
   if (client != null) {
      if (client instanceof LoadBalancerFeignClient) {
         // not load balancing because we have a url,
// but ribbon is on the classpath, so unwrap
client = ((LoadBalancerFeignClient) client).getDelegate();
      }
      if (client instanceof FeignBlockingLoadBalancerClient) {
         // not load balancing because we have a url,
// but Spring Cloud LoadBalancer is on the classpath, so unwrap
client = ((FeignBlockingLoadBalancerClient) client).getDelegate();
      }
      builder.client(client);
   }
   Targeter targeter = get(context, Targeter.class);
   return (T) targeter.target(this, builder, context,
         new HardCodedTarget<>(this.type, this.name, url));
}
Targeter

FeignClientFactoryBean.getTarget()最终调用的是Targeter.target(),在这一步中,就将代理对象(Feign实例)和FeignClient接口绑定在了一起。

package org.springframework.cloud.openfeign;

class DefaultTargeter implements Targeter {

   @Override
   public <T> T target(FeignClientFactoryBean factory, Feign.Builder feign,
         FeignContext context, Target.HardCodedTarget<T> target) {
      return feign.target(target);
   }
}
ReflectiveFeign

Targeter.target()方法中,最终调用的是Feign.Builder.target()方法,Builder是Feign中的静态类。在这一步中真正创建了Feign实例,并将其作为代理对象,代理feignclient接口。

可以发现,创建的Feign实例实现是ReflectiveFeign类。

public <T> T target(Target<T> target) {
  return build().newInstance(target);
}

public Feign build() {
  SynchronousMethodHandler.Factory synchronousMethodHandlerFactory =
      new SynchronousMethodHandler.Factory(client, retryer, requestInterceptors, logger,
          logLevel, decode404, closeAfterDecode, propagationPolicy);
  ParseHandlersByName handlersByName =
      new ParseHandlersByName(contract, options, encoder, decoder, queryMapEncoder,
          errorDecoder, synchronousMethodHandlerFactory);
  return new ReflectiveFeign(handlersByName, invocationHandlerFactory, queryMapEncoder);
}

@Override
public <T> T newInstance(Target<T> target) {
  Map<String, MethodHandler> nameToHandler = targetToHandlersByName.apply(target);
  Map<Method, MethodHandler> methodToHandler = new LinkedHashMap<Method, MethodHandler>();
  List<DefaultMethodHandler> defaultMethodHandlers = new LinkedList<DefaultMethodHandler>();

  for (Method method : target.type().getMethods()) {
    if (method.getDeclaringClass() == Object.class) {
      continue;
    } else if (Util.isDefault(method)) {
      DefaultMethodHandler handler = new DefaultMethodHandler(method);
      defaultMethodHandlers.add(handler);
      methodToHandler.put(method, handler);
    } else {
      methodToHandler.put(method, nameToHandler.get(Feign.configKey(target.type(), method)));
    }
  }
  InvocationHandler handler = factory.create(target, methodToHandler);
  T proxy = (T) Proxy.newProxyInstance(target.type().getClassLoader(),
      new Class<?>[] {target.type()}, handler);

  for (DefaultMethodHandler defaultMethodHandler : defaultMethodHandlers) {
    defaultMethodHandler.bindTo(proxy);
  }
  return proxy;
}

回调处理器

Feign实例中,用于处理接口回调的类是InvocationHandler,其由一个工厂类InvocationHandlerFactory生成。工厂类接口中还有一个MethodHandler,其用于处理接口中的方法。在进行回调时,先调用InvocationHandler的invoke()方法,在invoke()方法中,再根据实际的方法调用使用对应的MethodHandler的invoke()方法进行处理。

也就是说,InvocationHandler总览一个接口所有的调用,其中再根据调用方法的不同,分发到不同的MethodHandler。

public interface InvocationHandlerFactory {

  // 一个target表示一个接口,dispatch代表的是接口中的方法以及对应的方法处理
  InvocationHandler create(Target target, Map<Method, MethodHandler> dispatch);

  /**
   * Like {@link InvocationHandler#invoke(Object, java.lang.reflect.Method, Object[])}, except for a
   * single method.
   */
interface MethodHandler {

    Object invoke(Object[] argv) throws Throwable;
  }

  static final class Default implements InvocationHandlerFactory {

    @Override
    public InvocationHandler create(Target target, Map<Method, MethodHandler> dispatch) {
      return new ReflectiveFeign.FeignInvocationHandler(target, dispatch);
    }
  }
}
InvocationHandlerFactory

Feign中使用的InvocationHandlerFactory实现是InvocationHandlerFactory.Default。

private InvocationHandlerFactory invocationHandlerFactory =
    new InvocationHandlerFactory.Default();

InvocationHandlerFactory.Default使用Feign子类ReflectiveFeign中的静态类FeignInvocationHandler来处理回调。

static final class Default implements InvocationHandlerFactory {

  @Override
  public InvocationHandler create(Target target, Map<Method, MethodHandler> dispatch) {
    return new ReflectiveFeign.FeignInvocationHandler(target, dispatch);
  }
}
InvocationHandler

InvocationHandler的实现类是ReflectiveFeign中的静态类FeignInvocationHandler,其中invoke()方法用于处理具体的接口调用。

static class FeignInvocationHandler implements InvocationHandler {

      private final Target target;
      private final Map<Method, MethodHandler> dispatch;
    
      FeignInvocationHandler(Target target, Map<Method, MethodHandler> dispatch) {
        this.target = checkNotNull(target, "target");
        this.dispatch = checkNotNull(dispatch, "dispatch for %s", target);
      }
  
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
      if ("equals".equals(method.getName())) {
        try {
          Object otherHandler =
              args.length > 0 && args[0] != null ? Proxy.getInvocationHandler(args[0]) : null;
          return equals(otherHandler);
        } catch (IllegalArgumentException e) {
          return false;
        }
      } else if ("hashCode".equals(method.getName())) {
        return hashCode();
      } else if ("toString".equals(method.getName())) {
        return toString();
      }

      // 转发到具体的methodHandler
      return dispatch.get(method).invoke(args);
    }
  }
MethodHandler

MethodHandler的实现类是SynchronousMethodHandler,在invoke()方法中组装Request,然后使用Client发送请求。

在invoke()方法中,还封装了失败重试的逻辑,见Retryer。

@Override
public Object invoke(Object[] argv) throws Throwable {

  RequestTemplate template = buildTemplateFromArgs.create(argv);
  // readTimeout、connectTimeout
  Options options = findOptions(argv);
  // 重试器
  Retryer retryer = this.retryer.clone();
  while (true) {
    try {
      return executeAndDecode(template, options);
    } catch (RetryableException e) {
      try {
        retryer.continueOrPropagate(e);
      } catch (RetryableException th) {
        Throwable cause = th.getCause();
        if (propagationPolicy == UNWRAP && cause != null) {
          throw cause;
        } else {
          throw th;
        }
      }
      if (logLevel != Logger.Level.NONE) {
        logger.logRetry(metadata.configKey(), logLevel);
      }
      continue;
    }
  }
}

请求执行客户端

在MethodHandler中,组装好Request对象后,需要通过一个Client对象进行发送。Client中定义了一个execute方法,用于执行请求发送。

public interface Client {

  /**
   * Executes a request against its {@link Request#url() url} and returns a response.
   *
   * @param request safe to replay.
   * @param options options to apply to this request.
   * @return connected response, {@link Response.Body} is absent or unread.
   * @throws IOException on a network error connecting to {@link Request#url()}.
   */
Response execute(Request request, Options options) throws IOException;

  class Default implements Client {

    private final SSLSocketFactory sslContextFactory;
    private final HostnameVerifier hostnameVerifier;

    public Default(SSLSocketFactory sslContextFactory, HostnameVerifier hostnameVerifier) {
      this.sslContextFactory = sslContextFactory;
      this.hostnameVerifier = hostnameVerifier;
    }
    
    @Override
    public Response execute(Request request, Options options) throws IOException {
      HttpURLConnection connection = convertAndSend(request, options);
      return convertResponse(connection, request);
    }  
}

Feign中使用的Client实现是feign.Client.Default,我们可以看到execute方法实现中,新建了一个Http连接对象HttpURLConnection。

Default的实现中每次都会新建连接,这明显增加了调用延迟。更好的方式是应用连接池的方式,将创建的连接缓存起来使用,这类实现有Apache httpclient、okhttp。

class Default implements Client {

    private final SSLSocketFactory sslContextFactory;
    private final HostnameVerifier hostnameVerifier;

    public Default(SSLSocketFactory sslContextFactory, HostnameVerifier hostnameVerifier) {
      this.sslContextFactory = sslContextFactory;
      this.hostnameVerifier = hostnameVerifier;
    }
    
    @Override
    public Response execute(Request request, Options options) throws IOException {
      HttpURLConnection connection = convertAndSend(request, options);
      return convertResponse(connection, request);
    }

结合ribbon

feign集成ribbon后,在服务存在多个实例时,获得了负载均衡的能力。集成ribbon后@FeignCLient注解中不需要再进行url的配置,只需配置服务名即可。

@FeignClient(value = "manage-api", contextId = "listbox-client")
public interface ListBoxClient{
}

那负载均衡怎么实现?回顾feign中的模块,我们知道有一个客户端用于执行请求发送。那我们对这个执行请求发送的客户端进行重写,在新建连接时,根据服务名,在多个实例中动态选择一个实例,这不就实现了负载均衡吗?

实际上,ribbon就是这样做的。

封装请求执行客户端

集成ribbon后,feign中的请求执行客户端从feign.Client.Default切换为LoadBalancerFeignClient,见自动配置类FeignRibbonClientAutoConfiguration。

其中封装了一个feign.Client对象,用于执行请求发送。还新增了一个CachingSpringLoadBalancerFactory和一个SpringClientFactory。

  • SpringClientFactory:作用是通过服务名获取对应的配置,例如readTimeout。
  • CachingSpringLoadBalancerFactory:作用是通过服务名获取对应的负载均衡对象FeignLoadBalancer,这个负载均衡对象中封装了ribbon中的核心功能,例如负载均衡。

ribbon的相关功能和配置,见RibbonClientConfiguration类。

LoadBalancerFeignClient

在LoadBalancerFeignClient的execute() 方法中,其根据参数和配置组装RibbonRequest、RequestConfig对象。

然后调用lbClient()方法获取一个FeignLoadBalancer对象,执行其executeWithLoadBalancer()方法,执行请求发送。

public Response execute(Request request, Request.Options options) throws IOException {
   try {
      URI asUri = URI.create(request.url());
      String clientName = asUri.getHost();
      URI uriWithoutHost = cleanUrl(request.url(), clientName);
      // this.delegate就是一个feign.Client
      FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
            this.delegate, request, uriWithoutHost);
      // 服务配置,如readTimeout、connectTimeout
      IClientConfig requestConfig = getClientConfig(options, clientName);
      // lbClient()返回一个FeignLoadBalancer对象
      return lbClient(clientName)
            .executeWithLoadBalancer(ribbonRequest, requestConfig).toResponse();
   }
   catch (ClientException e) {
      IOException io = findIOException(e);
      if (io != null) {
         throw io;
      }
      throw new RuntimeException(e);
   }
}

FeignLoadBalancer

FeignLoadBalancer.executeWithLoadBalancer()方法继承自其抽象类AbstractLoadBalancerAwareClient,在方法中,ribbon相关的功能被封装在了一个LoadBalancerCommand对象中。

调用command.submit()方法,并提供了一个ServerOperation实现,ServerOperation的作用是使用一个Feign.Client进行请求的发送。

public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
    LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);

    try {
        return command.submit(
            new ServerOperation<T>() {
                @Override
                public Observable<T> call(Server server) {
                    URI finalUri = reconstructURIWithServer(server, request.getUri());
                    S requestForServer = (S) request.replaceUri(finalUri);
                    try {
                        return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
                    } 
                    catch (Exception e) {
                        return Observable.error(e);
                    }
                }
            })
            .toBlocking()
            .single();
    } catch (Exception e) {
        Throwable t = e.getCause();
        if (t instanceof ClientException) {
            throw (ClientException) t;
        } else {
            throw new ClientException(e);
        }
    }
    
}

LoadBalancerCommand

LoadBalancerCommand的submit()方法,包含了ribbon中的核心功能,一些方法和变量作用如下

  • selectServer():动态选择一个服务实例,默认轮询
  • retryHandler:用于执行请求重试,默认未启用
  • serverStats:用于跟踪和记录服务实例的统计信息,例如请求成功次数、请求失败次数、响应时间等。它可以与负载均衡器结合使用,以帮助其做出更加智能的选择
public Observable<T> submit(final ServerOperation<T> operation) {
    final ExecutionInfoContext context = new ExecutionInfoContext();
    
    if (listenerInvoker != null) {
        try {
            listenerInvoker.onExecutionStart();
        } catch (AbortExecutionException e) {
            return Observable.error(e);
        }
    }

    final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
    final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();

    // Use the load balancer
Observable<T> o = 
            (server == null ? selectServer() : Observable.just(server))
            .concatMap(new Func1<Server, Observable<T>>() {
                @Override
                // Called for each server being selected
public Observable<T> call(Server server) {
                    context.setServer(server);
                    final ServerStats stats = loadBalancerContext.getServerStats(server);
                    
                    // Called for each attempt and retry
Observable<T> o = Observable
                            .just(server)
                            .concatMap(new Func1<Server, Observable<T>>() {
                                @Override
                                public Observable<T> call(final Server server) {
                                    context.incAttemptCount();
                                    loadBalancerContext.noteOpenConnection(stats);
                                    
                                    if (listenerInvoker != null) {
                                        try {
                                            listenerInvoker.onStartWithServer(context.toExecutionInfo());
                                        } catch (AbortExecutionException e) {
                                            return Observable.error(e);
                                        }
                                    }
                                    
                                    final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start();
                                    
                                    return operation.call(server).doOnEach(new Observer<T>() {
                                        private T entity;
                                        @Override
                                        public void onCompleted() {
                                            recordStats(tracer, stats, entity, null);
                                            // TODO: What to do if onNext or one rror are never called?
}

                                        @Override
                                        public void one rror(Throwable e) {
                                            recordStats(tracer, stats, null, e);
                                            logger.debug("Got error {} when executed on server {}", e, server);
                                            if (listenerInvoker != null) {
                                                listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo());
                                            }
                                        }

                                        @Override
                                        public void onNext(T entity) {
                                            this.entity = entity;
                                            if (listenerInvoker != null) {
                                                listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo());
                                            }
                                        }                            
                                        
                                        private void recordStats(Stopwatch tracer, ServerStats stats, Object entity, Throwable exception) {
                                            tracer.stop();
                                            loadBalancerContext.noteRequestCompletion(stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler);
                                        }
                                    });
                                }
                            });
                    
                    if (maxRetrysSame > 0) 
                        o = o.retry(retryPolicy(maxRetrysSame, true));
                    return o;
                }
            });
        
    if (maxRetrysNext > 0 && server == null) 
        o = o.retry(retryPolicy(maxRetrysNext, false));
    
    return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() {
        @Override
        public Observable<T> call(Throwable e) {
            if (context.getAttemptCount() > 0) {
                if (maxRetrysNext > 0 && context.getServerAttemptCount() == (maxRetrysNext + 1)) {
                    e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED,
                            "Number of retries on next server exceeded max " + maxRetrysNext
                            + " retries, while making a call for: " + context.getServer(), e);
                }
                else if (maxRetrysSame > 0 && context.getAttemptCount() == (maxRetrysSame + 1)) {
                    e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED,
                            "Number of retries exceeded max " + maxRetrysSame
                            + " retries, while making a call for: " + context.getServer(), e);
                }
            }
            if (listenerInvoker != null) {
                listenerInvoker.onExecutionFailed(e, context.toFinalExecutionInfo());
            }
            return Observable.error(e);
        }
    });
}

负载均衡策略

riibbon中定义一个负载均衡策略接口IRule,并提供了多种实现,默认使用的策略是RoundRobinRule。

  • RoundRobinRule:轮询策略,按照顺序轮询选择服务实例,每个请求依次分发到不同的服务实例上。
  • RandomRule:随机策略,随机选择一个服务实例来处理请求。
  • BestAvailableRule:最佳可用策略,最佳可用指的是指选择当前可用服务实例中并发连接数最低的实例。服务状态信息由ServerStats维护,

除了ribbon中提供的负载均衡实现,一些注册中心也提供了自己的规则,如NacosRule。其可以结合服务实例的权重、健康状态来实现更加灵活的负载策略。

自定义****负载均衡

使用BestAvailableRule作为负载均衡规则,BestAvailableRule中创建了一个LoadBalancerStats变量,用于记录和保存负载均衡器的状态。在其内部,使用ServerStats记录每个服务实例的状态信息。每次进行实例选择时,会先检测服务实例的状态,优先选择负载小的实例。

配置策略

全局配置

@Configuration
public class RibbonConfiguration {

    @Bean
    public IRule ribbonRule() {
        return new BestAvailableRule();
    }
}

结合hystrix

在feign集成hystrix后,可以获得服务熔断、降级等能力。

这些能力是怎么实现的呢?回顾feign中的内容,我们知道有一个回调处理器,在回调处理器中进行具体方法调用。那我们可不可以重写这个回调处理器,在一个时间窗口中,记录服务请求的次数,并计算失败率。在请求次数和失败率达到一定条件时,对该服务的请求,我们不再进行Http调用,而是走降级通道,调用方法对应的fallback方法。

实际上,hystrix也是这样做的。

封装回调处理器

在集成hystrix后,在通过FeignClientFactoryBean.getObject()获取Feign实例时,使用的Targeter是HystrixTargeter。在其中,将原本的Feign.Builder替换成了新的实现HystrixFeign.Builder。

class HystrixTargeter implements Targeter {

       @Override
       public <T> T target(FeignClientFactoryBean factory, Feign.Builder feign,
             FeignContext context, Target.HardCodedTarget<T> target) {
          if (!(feign instanceof feign.hystrix.HystrixFeign.Builder)) {
             return feign.target(target);
          }
// hystrix feign实现
          feign.hystrix.HystrixFeign.Builder builder = (feign.hystrix.HystrixFeign.Builder) feign;
          String name = StringUtils.isEmpty(factory.getContextId()) ? factory.getName()
                : factory.getContextId();
          // hystrixCommand 相关的配置
          SetterFactory setterFactory = getOptional(name, context, SetterFactory.class);
          if (setterFactory != null) {
             builder.setterFactory(setterFactory);
          }
          // 服务降级回调相关
          Class<?> fallback = factory.getFallback();
          if (fallback != void.class) {
             return targetWithFallback(name, context, target, builder, fallback);
          }
          Class<?> fallbackFactory = factory.getFallbackFactory();
          if (fallbackFactory != void.class) {
             return targetWithFallbackFactory(name, context, target, builder,
                   fallbackFactory);
          }
    
          return feign.target(target);
       }
   }

HystrixFeign.Builder中重写了InvocationHandlerFactory实现,使用一个HystrixInvocationHandler处理请求回调。

Feign build(final FallbackFactory<?> nullableFallbackFactory) {
  super.invocationHandlerFactory(new InvocationHandlerFactory() {
    @Override
    public InvocationHandler create(Target target,
                                    Map<Method, MethodHandler> dispatch) {
      return new HystrixInvocationHandler(target, dispatch, setterFactory,
          nullableFallbackFactory);
    }
  });
  super.contract(new HystrixDelegatingContract(contract));
  return super.build();
}

HystrixInvocationHandler中将方法回调(MethodHandler)封装在了一个HystrixCommand中执行,hystrix中的功能基本都封装在了这个HystrixCommand对象中。

@Override
public Object invoke(final Object proxy, final Method method, final Object[] args)
    throws Throwable {
  // early exit if the invoked method is from java.lang.Object
// code is the same as ReflectiveFeign.FeignInvocationHandler
if ("equals".equals(method.getName())) {
    try {
      Object otherHandler =
          args.length > 0 && args[0] != null ? Proxy.getInvocationHandler(args[0]) : null;
      return equals(otherHandler);
    } catch (IllegalArgumentException e) {
      return false;
    }
  } else if ("hashCode".equals(method.getName())) {
    return hashCode();
  } else if ("toString".equals(method.getName())) {
    return toString();
  }

// 新建一个HystrixCommand对象,参数为方法上的注解@HystrixCommand中的配置信息
// setterMethodMap.get(method)的作用是获取该方法上的@HystrixCommand注解配置信息
  HystrixCommand<Object> hystrixCommand =
      new HystrixCommand<Object>(setterMethodMap.get(method)) {
        @Override
        protected Object run() throws Exception {
          try {
            // 执行具体的方法回调
            return HystrixInvocationHandler.this.dispatch.get(method).invoke(args);
          } catch (Exception e) {
            throw e;
          } catch (Throwable t) {
            throw (Error) t;
          }
        }

        @Override
        protected Object getFallback() {
          if (fallbackFactory == null) {
            return super.getFallback();
          }
          try {
// 获取该方法对应的fallback方法
            Object fallback = fallbackFactory.create(getExecutionException());
            Object result = fallbackMethodMap.get(method).invoke(fallback, args);
            if (isReturnsHystrixCommand(method)) {
              return ((HystrixCommand) result).execute();
            } else if (isReturnsObservable(method)) {
              // Create a cold Observable
return ((Observable) result).toBlocking().first();
            } else if (isReturnsSingle(method)) {
              // Create a cold Observable as a Single
return ((Single) result).toObservable().toBlocking().first();
            } else if (isReturnsCompletable(method)) {
              ((Completable) result).await();
              return null;
            } else if (isReturnsCompletableFuture(method)) {
              return ((Future) result).get();
            } else {
              return result;
            }
          } catch (IllegalAccessException e) {
            // shouldn't happen as method is public due to being an interface
throw new AssertionError(e);
          } catch (InvocationTargetException | ExecutionException e) {
            // Exceptions on fallback are tossed by Hystrix
throw new AssertionError(e.getCause());
          } catch (InterruptedException e) {
            // Exceptions on fallback are tossed by Hystrix
Thread.currentThread().interrupt();
            throw new AssertionError(e.getCause());
          }
        }
      };

  if (Util.isDefault(method)) {
    return hystrixCommand.execute();
  } else if (isReturnsHystrixCommand(method)) {
    return hystrixCommand;
  } else if (isReturnsObservable(method)) {
    // Create a cold Observable
return hystrixCommand.toObservable();
  } else if (isReturnsSingle(method)) {
    // Create a cold Observable as a Single
return hystrixCommand.toObservable().toSingle();
  } else if (isReturnsCompletable(method)) {
    return hystrixCommand.toObservable().toCompletable();
  } else if (isReturnsCompletableFuture(method)) {
    return new ObservableCompletableFuture<>(hystrixCommand);
  }
  return hystrixCommand.execute();
}

HystrixCommand

HystrixCommand是Hystrix中的核心类,其根据注解@HystrixCommand中的配置来进行处理

  1. 服务隔离方式,如默认的策略是线程隔离,由一个线程池来执行方法回调
  2. 请求监控,如默认的一个监控时间窗口为10秒
  3. 断路器配置,例如一个时间窗口中,至少有10个请求且有50%的失败率,则断路器打开持续5秒
  4. 服务降级,在服务熔断后,对该服务的请求直接走服务降级逻辑,即对应的fallback方法

相关的类见HystrixCommandProperties、HystrixCircuitBreaker

abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
    private static final Logger logger= LoggerFactory.getLogger(AbstractCommand.class);
    protected final HystrixCircuitBreaker circuitBreaker;
    protected final HystrixThreadPool threadPool;
    protected final HystrixThreadPoolKey threadPoolKey;
    protected final HystrixCommandProperties properties;
}

标签:return,Openfeign,Hystrix,final,public,new,method,Ribbon,target
From: https://www.cnblogs.com/cd-along/p/18213932

相关文章

  • Ribbon负载均衡
    SpringCloudRibbon是一套基于NetflixRibbon实现的客户端负载均衡和服务调用工具。负载均衡是系统处理高并发、缓解网络压力和服务端扩容的重要手段之一。通过Ribbon,以将面向服务的REST模板(RestTemplate)请求转换为客户端负载均衡的服务调用,SpringCloud微服务之间的......
  • 【Springboot】复杂单元测试启动类-只测试OpenFeign
    复杂单元测试启动类-只测试OpenFeign背景随着springboot应用工程规模越来越大,集成了较多的自动配置的程序,例如SpringDataJPA,SpringCloudOpenFeign,ApacheDubbo有时会需要在本地运行测试,但要么因为数据库无法在办公网络环境连接,要么注册中心无法连接,这就导致本地完全无......
  • SpringCloud(3)-OpenFeign相关配置
    OpenFeign是个声明式WebService客户端,使用OpenFeign让编写WebService客户端更简单。SpringCloud对OpenFeign进行了封装使其支持了SpringMVC标准注解和HttpMessageConverters。OpenFeign可以与Eureka和Ribbon组合使用以支持负载均衡。1.配......
  • SpringCloud(2)-Ribbon相关配置
    Ribbon是一套提供客户端负载均衡的工具Ribbon=负载均衡+RestTemplateRibbon属于进程内LoadBalance,含义是将LB逻辑集成到消费方,消费方从服务注册中心获知有哪些服务地址可用,然后再从这些地址中选择出一个合适的服务地址。Ribbon常见负载算法:我们接下来以随机负载......
  • 整合Spring Cloud + Nacos(服务注册与服务发现) + OpenFeign
    一、事前准备1、本文章不包含Nacos的安装教程,请参考其他文章安装并启动Nacos2、创建一个SpringBoot项目先创建一个简单的SpringBoot项目,引入普通项目中需要用到的相关依赖<dependencies><!--web--><dependency><groupId>org.springframework.boot</groupId......
  • openfeign接口Springboot启动Bean报错未找到Singleton bean creation not allowed whi
    检查步骤检查springboot启动类是否标注@EnableFeignClients注解,未标注该注解会导致无法注入bean检查远程调用模块是否标注注解@FeignClient检查@FeignClient注解中是否写了正确的微服务名称(区分大小写)检查@FeignClient注解中标识的微服务是否启动​​原因:此处接......
  • OpenFeign @PathVariable需注明参数名称
    在定义OpenFeign的远程接口时,如果是路径拼接作为参数的远程接口,需要在@PathVariable需注明参数名称,不然代码启动时会报错。正例@FeignClient(value=ServiceConstants.SYSTEM,fallbackFactory=RemoteFileFallbackFactory.class)publicinterfaceRemoteFileService{......
  • OpenFeign 定义后备工厂进行服务降级
    OpenFeign定义后备工厂进行服务降级可以使得远程接口调用失败时进行降级处理,而不会直接报错,影响后续代码逻辑。定义后备工厂的步骤如下:远程接口处定义。@FeignClient(value=ServiceConstants.SYSTEM,fallbackFactory=RemoteFileFallbackFactory.class)publicinterface......
  • 微服务Spring Cloud17_熔断器Hystrix7
    一、简介Hystrix在英文里面的意思是豪猪,它的logo看下面的图是一头豪猪,它在微服务系统中是一款提供保护机制的组件,和eureka一样也是由netflix公司开发。主页:https://github.com/Netflix/Hystrix/ 那么Hystrix的作用是什么呢?具体要保护什么呢?Hystrix是Netflix开源的一......
  • 微服务Spring Cloud17_负载均衡Ribbon6
    一、概述在刚才的案例中,我们启动了一个user-service,然后通过DiscoveryClient来获取服务实例信息,然后获取ip和端口来访问。但是实际环境中,往往会开启很多个user-service的集群。此时获取的服务列表中就会有多个,到底该访问哪一个呢?一般这种情况下就需要编写负载均衡算......