转自:https://blog.csdn.net/qq_36882793/article/details/117366243
一、前言
本系列为个人Dubbo学习笔记衍生篇,是正文篇之外的衍生内容,内容来源于《深度剖析Apache Dubbo 核心技术内幕》, 过程参考官方源码分析文章。仅用于个人笔记记录。本文分析基于Dubbo2.7.0版本,由于个人理解的局限性,若文中不免出现错误,感谢指正。
本文为 Dubbo笔记⑬ :Dubbo 集群组件 之 Cluster & ClusterInvoker衍生篇,主要介绍 MergeableClusterInvoker 策略在集群容错的实现已经作用。
二、多分组调用
首先需要明确在Dubbo中是通过 服务接口 + 服务分组 + 服务版本号确定唯一服务。当一个接口有多种实现时,可以用group区分,所以一个服务可能存在多个不同分组,而一个消费者可以同时引用一个接口的不同分组实现。
1 // 引用服务分组为 aaa, bbb 的 demeService 服务 2 @Reference(version = "1.0.0", group = "aaa,bbb") 3 private DemoService demoService;
默认情况下,Dubbo会挑选其中一个分组的接口调用并返回接口。但是某些情况下,我们可能需要合并多个分组的返回接口。比如菜单服务,接口一样,但有多种实现,用group区分,现在消费方需从每种group中调用一次返回结果,合并结果返回,这样就可以实现聚合菜单项。此时可以通过 merger 参数指定合并策略。
下面我们看一下demo 实现
1. 分组结果合并 Demo
1 // dubbo 服务接口 2 public interface SimpleDemoService { 3 // 用于测试 分组方法合并 4 MergerResult sayHello(String msg); 5 // 用于测试 分组策略合并,之所以区分两个接口是因为,策略合并限制了某些只有某些返回类型才能使用。 6 List<String> sayHello2(String msg); 7 8 } 9 10 // spring 接口分组实现 11 @Service(group = "spring", version = "1.0.0") 12 public class SpringSimpleDemoServiceImpl implements SimpleDemoService { 13 14 @Override 15 public MergerResult sayHello(String msg) { 16 return new MergerResult("StringSimpleDemoServiceImpl : " + msg); 17 } 18 19 @Override 20 public List<String> sayHello2(String msg) { 21 return Lists.newArrayList("StringSimpleDemoServiceImpl : " + msg); 22 } 23 } 24 25 // main 分组接口实现 26 @Service(group = "main", version = "1.0.0") 27 public class MainSimpleDemoServiceImpl implements SimpleDemoService { 28 @Override 29 public MergerResult sayHello(String msg) { 30 return new MergerResult("MainSimpleDemoServiceImpl : " + msg); 31 } 32 33 @Override 34 public List<String> sayHello2(String msg) { 35 return Lists.newArrayList("MainSimpleDemoServiceImpl : " + msg); 36 } 37 } 38 39 // 返回结果,用于合并结果 40 @Data 41 public class MergerResult implements Serializable { 42 private String result; 43 44 public MergerResult(String result) { 45 this.result = result; 46 } 47 // 合并结果时调用的方法 48 public MergerResult merger(MergerResult mergerResult) { 49 return new MergerResult(this.result + " | " + mergerResult.getResult()); 50 } 51 }
这里为了方便 直接使用Main 方法调用
1 public class SimpleConsumer { 2 public static void main(String[] args) { 3 // 自定义的简易生成ReferenceConfig工具类 4 ReferenceConfig<SimpleDemoService> referenceConfig = DubboUtil.referenceConfig("dubbo-consumer", SimpleDemoService.class); 5 // Map map = Maps.newHashMap(); 6 // map.put("merger", "list"); 7 // map.put("merger", ".merger"); 8 // referenceConfig.setParameters(map); 9 // 设置调用服务分组为 spring 和 main 10 referenceConfig.setGroup("spring,main"); 11 SimpleDemoService demoService = referenceConfig.get(); 12 System.out.println(demoService.sayHello("SimpleConsumer")); 13 } 14 }
我们这里这里可以分为三种情况:
1.merger 参数默认,即不指定 merger 方式:
这种情况即默认情况,Dubbo 会选择一个 invoker 调用并返回
1 ReferenceConfig<SimpleDemoService> referenceConfig = DubboUtil.referenceConfig("dubbo-consumer", SimpleDemoService.class); 2 referenceConfig.setGroup("spring,main"); 3 SimpleDemoService demoService = referenceConfig.get(); 4 System.out.println(demoService.sayHello("SimpleConsumer"));
返回结果如下,可以看到调用了 group=spring 的服务。
1 MergerResult(result=StringSimpleDemoServiceImpl : SimpleConsumer)
2.merger 指定 合并方法:
当调用时会将多个分组的结果通过指定方法来进行合并。
1 ReferenceConfig<SimpleDemoService> referenceConfig = DubboUtil.referenceConfig("dubbo-consumer", SimpleDemoService.class); 2 Map map = Maps.newHashMap(); 3 map.put("merger", ".merger"); 4 referenceConfig.setParameters(map); 5 referenceConfig.setGroup("spring,main"); 6 SimpleDemoService demoService = referenceConfig.get(); 7 System.out.println(demoService.sayHello("SimpleConsumer"));
返回结果如下,可以看到调用了 group=spring 和 main 的服务,并将其结果合并了,合并调用是的MergerResult#merger 方法。
1 MergerResult(result=StringSimpleDemoServiceImpl : SimpleConsumer | MainSimpleDemoServiceImpl : SimpleConsumer)
这里通过 merger 指定了合并结果集的方法。需要注意的是:
- 如果以 . 开头则表示指定了合并结果集的方法,合并方法必须是返回类的方法,即这里的合并方法为方法返回类 MergerResult 的 merger 方法。
- 合并方法的入参必须为接口方法返回类型的方法。这里指定的即为MergerResult#merger方法,因为sayHello 方法的返回类型为 MergerResult。
- 合并方法的入参必须是结果类型,即必须是自身类型。如 MergerResult#merger 方法的入参必须是 MergerResult。
3.merger 指定合并策略:
这里我们调用sayHello2 方法,因为合并策略限定了返回类型。
1 ReferenceConfig<SimpleDemoService> referenceConfig = DubboUtil.referenceConfig("dubbo-consumer", SimpleDemoService.class); 2 Map map = Maps.newHashMap(); 3 // 指定合并策略为 list。即结果类型为 list 的才能合并 4 map.put("merger", "list"); 5 referenceConfig.setParameters(map); 6 referenceConfig.setGroup("spring,main"); 7 SimpleDemoService demoService = referenceConfig.get(); 8 System.out.println(demoService.sayHello2("SimpleConsumer"));
返回结果如下为一个 List集合,可以看到 group=spring 和 main 的服务的返回结果被合并了。合并过程在 ListMerger 中实现。
1 [StringSimpleDemoServiceImpl : SimpleConsumer, MainSimpleDemoServiceImpl : SimpleConsumer]
上面区分了三种情况的demo 实现,下面我们来对代码进行分析。
三、源码分析
1. MergeableClusterInvoker 调用时机
在<Dubbo笔记⑬ :Dubbo 集群 之 Cluster & ClusterInvoker 中,我们介绍过,Cluster Invoker 功能的实现是通过doInvoker 方法完成。但是对于与其他 Cluster Invoker 不同的是,MergeableClusterInvoker不仅仅可以通过指定策略的方式加载执行。当消费者调用多个分组的服务时,Dubbo会自动指定MergeableClusterInvoker 为集群容错策略 ,如下 :
在 RegistryProtocol#refer 中如果当前消费了多个分组的同一个服务则会通过getMergeableCluster() 方法 来指定mergeable 策略,使用 MergeableClusterInvoker 来进行集群容错。(关于 RegistryProtocol#refer 的内容不是本文重点,如果需要详参Dubbo笔记⑨ : 消费者启动流程 - RegistryProtocol#refer) :
1 public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { 2 //取 registry 参数值,并将其设置为协议头 3 url = url.setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY)).removeParameter(REGISTRY_KEY); 4 Registry registry = registryFactory.getRegistry(url); 5 if (RegistryService.class.equals(type)) { 6 return proxyFactory.getInvoker((T) registry, type, url); 7 } 8 // group="a,b" or group="*" 9 // 将 url 查询字符串转为 Map 10 Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY)); 11 // 获取 group 配置 12 String group = qs.get(Constants.GROUP_KEY); 13 if (group != null && group.length() > 0) { 14 if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) { 15 // 1. 如果是多分组的情况下,通过 SPI 加载 MergeableCluster 实例,并调用 doRefer 继续执行服务引用逻辑。 16 return doRefer(getMergeableCluster(), registry, type, url); 17 } 18 } 19 // 调用 doRefer 继续执行服务引用逻辑 20 return doRefer(cluster, registry, type, url); 21 } 22 23 private Cluster getMergeableCluster() { 24 return ExtensionLoader.getExtensionLoader(Cluster.class).getExtension("mergeable"); 25 }
2. MergeableClusterInvoker#doInvoker
上述Demo 中的结果处理是在 MergeableClusterInvoker#doInvoker 中完成。下面我们来看 MergeableClusterInvoker 是如何处理合并策略 :
1 @Override 2 protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { 3 checkInvokers(invokers, invocation); 4 // 1. 获取 merger 属性 5 String merger = getUrl().getMethodParameter(invocation.getMethodName(), Constants.MERGER_KEY); 6 // 2. 如果没有指定分组结果合并,则遍历执行到一个invoker 成功便返回结果 7 if (ConfigUtils.isEmpty(merger)) { // If a method doesn't have a merger, only invoke one Group 8 for (final Invoker<T> invoker : invokers) { 9 if (invoker.isAvailable()) { 10 try { 11 return invoker.invoke(invocation); 12 } catch (RpcException e) { 13 if (e.isNoInvokerAvailableAfterFilter()) { 14 log.debug("No available provider for service" + directory.getUrl().getServiceKey() + " on group " + invoker.getUrl().getParameter(Constants.GROUP_KEY) + ", will continue to try another group."); 15 } else { 16 throw e; 17 } 18 } 19 } 20 } 21 return invokers.iterator().next().invoke(invocation); 22 } 23 // 3. 到这里说明指定了分组合并, 24 Class<?> returnType; 25 try { 26 // 4 获取调用方法的返回类型 27 returnType = getInterface().getMethod( 28 invocation.getMethodName(), invocation.getParameterTypes()).getReturnType(); 29 } catch (NoSuchMethodException e) { 30 returnType = null; 31 } 32 // 5. 通过线程池进行异步调用 33 Map<String, Future<Result>> results = new HashMap<String, Future<Result>>(); 34 for (final Invoker<T> invoker : invokers) { 35 Future<Result> future = executor.submit(new Callable<Result>() { 36 @Override 37 public Result call() throws Exception { 38 return invoker.invoke(new RpcInvocation(invocation, invoker)); 39 } 40 }); 41 results.put(invoker.getUrl().getServiceKey(), future); 42 } 43 44 Object result = null; 45 46 List<Result> resultList = new ArrayList<Result>(results.size()); 47 // 获取超时时间 48 int timeout = getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); 49 for (Map.Entry<String, Future<Result>> entry : results.entrySet()) { 50 Future<Result> future = entry.getValue(); 51 try { 52 // 6. 获取异步调用的结果 53 Result r = future.get(timeout, TimeUnit.MILLISECONDS); 54 if (r.hasException()) { 55 log.error("Invoke " + getGroupDescFromServiceKey(entry.getKey()) + 56 " failed: " + r.getException().getMessage(), 57 r.getException()); 58 } else { 59 resultList.add(r); 60 } 61 } catch (Exception e) { 62 throw new RpcException("Failed to invoke service " + entry.getKey() + ": " + e.getMessage(), e); 63 } 64 } 65 // 7. 结果集的预处理 66 // 结果集为空则返回空的RpcResult 67 if (resultList.isEmpty()) { 68 return new RpcResult((Object) null); 69 } else if (resultList.size() == 1) { 70 // 结果集只有一个,则不需要合并直接返回 71 return resultList.iterator().next(); 72 } 73 // 如果调用方法返回类型是 void。则返回 空RpcResult 74 if (returnType == void.class) { 75 return new RpcResult((Object) null); 76 } 77 // 8. 对结果集进行合并 78 // 8.1 对merger 参数的处理,如果以. 开头则说明指定了合并方法,对合并方法的处理 79 if (merger.startsWith(".")) { 80 merger = merger.substring(1); 81 Method method; 82 try { 83 method = returnType.getMethod(merger, returnType); 84 } catch (NoSuchMethodException e) { 85 throw new RpcException("Can not merge result because missing method [ " + merger + " ] in class [ " + 86 returnType.getClass().getName() + " ]"); 87 } 88 if (!Modifier.isPublic(method.getModifiers())) { 89 method.setAccessible(true); 90 } 91 result = resultList.remove(0).getValue(); 92 // 8.2 调用指定的方法进行结果合并 93 try { 94 if (method.getReturnType() != void.class 95 && method.getReturnType().isAssignableFrom(result.getClass())) { 96 for (Result r : resultList) { 97 result = method.invoke(result, r.getValue()); 98 } 99 } else { 100 for (Result r : resultList) { 101 method.invoke(result, r.getValue()); 102 } 103 } 104 } catch (Exception e) { 105 throw new RpcException("Can not merge result: " + e.getMessage(), e); 106 } 107 } else { 108 // 8.3 否则认为是指定了合并策略. 开始获取合并策略。 109 Merger resultMerger; 110 if (ConfigUtils.isDefault(merger)) { 111 resultMerger = MergerFactory.getMerger(returnType); 112 } else { 113 resultMerger = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(merger); 114 } 115 if (resultMerger != null) { 116 List<Object> rets = new ArrayList<Object>(resultList.size()); 117 for (Result r : resultList) { 118 rets.add(r.getValue()); 119 } 120 // 8.4 通过合并策略进行合并 121 result = resultMerger.merge( 122 rets.toArray((Object[]) Array.newInstance(returnType, 0))); 123 } else { 124 throw new RpcException("There is no merger to merge result."); 125 } 126 } 127 // 9. 返回合并后的结果集 128 return new RpcResult(result); 129 }
上面的逻辑比较清晰,这里来简单总结:
- 首先获取merger 参数。如果 merger 参数没有配置,如果没有指定分组结果合并,则遍历执行到一个invoker 成功便返回结果。
- 如果代码执行到这一步,则说明消费之配置了 merger。则开始并发调用所有的 invoker获取到所有的结果集。
- 对结果集进行判断,如果只有一个结果集,或者方法没有返回值则不通过合并策略直接返回。
- 判断合并逻辑是通过合并方法还是合并策略。判断的依据即是 merger 是否以 . 开头。
- 如果是合并方法,则需要判断方法返回类型是否有合法的合并方法。如果有,则通过反射调用指定方法,将最终结果返回。
- 如果指定了合并策略,则根据merger 参数获取到Merger 实现类,调用Merger#merge来将结果集合并并返回。
3. Merger 接口
Merger 如其名,用来合并多个分组的结果集。其中关于 Merger 接口的类型, org.apache.dubbo.rpc.cluster.Merger
中提供了如下一些实现类。
map=org.apache.dubbo.rpc.cluster.merger.MapMerger set=org.apache.dubbo.rpc.cluster.merger.SetMerger list=org.apache.dubbo.rpc.cluster.merger.ListMerger byte=org.apache.dubbo.rpc.cluster.merger.ByteArrayMerger char=org.apache.dubbo.rpc.cluster.merger.CharArrayMerger short=org.apache.dubbo.rpc.cluster.merger.ShortArrayMerger int=org.apache.dubbo.rpc.cluster.merger.IntArrayMerger long=org.apache.dubbo.rpc.cluster.merger.LongArrayMerger float=org.apache.dubbo.rpc.cluster.merger.FloatArrayMerger double=org.apache.dubbo.rpc.cluster.merger.DoubleArrayMerger boolean=org.apache.dubbo.rpc.cluster.merger.BooleanArrayMerger
不同类型的 Merger 用于合并不同的结果集。比如MapMerger 会合并多个Map,并返回合并后的Map。
1 public class MapMerger implements Merger<Map<?, ?>> { 2 3 @Override 4 public Map<?, ?> merge(Map<?, ?>... items) { 5 if (ArrayUtils.isEmpty(items)) { 6 return Collections.emptyMap(); 7 } 8 Map<Object, Object> result = new HashMap<Object, Object>(); 9 for (Map<?, ?> item : items) { 10 if (item != null) { 11 result.putAll(item); 12 } 13 } 14 return result; 15 } 16 17 }
消费者根据返回类型的不同指定不同的合并策略,通过这种方式将不同分组的结果集进行合并。
拓展
通过源码和代码debug发现,consumer持有的Invoker,有点像一个责任链。
如果是单分组——!group.contains(",") && !group.equals("*"),那么会是这样:
MockClusterInvoker->FailoverClusterInvoker
如果是多分组,则会变成:
MockClusterInvoker->MergeableClusterInvoker->FailoverClusterInvoker
参考:https://blog.csdn.net/zidongxiangxi/article/details/109312544