分片
弹性调度是ElasticJob
最重要的功能,也是这款产品名称的由来。它是一款能够让任务通过分片进行水
平扩展的任务处理系统。
ElasticJob
中任务分片项的概念,使得任务可以在分布式的环境下运行,每台任务服务器只运行分配给该服务器的分片。随着服务器的增加或宕机,ElasticJob
会近乎实时的感知服务器数量的变更,从而重新为分布式的任务服务器分配更加合理的任务分片项,使得任务可以随着资源的增加而提升效率。
任务的分布式执行,需要将一个任务拆分为多个独立的任务项,然后由分布式的服务器分别执行某一个或几个分片项。
举例说明,如果作业分为4
片,用两台服务器执行,则每个服务器分到2
片,分别负责作业的50%
的负载,如下图所示。
ElasticJob
并不直接提供数据处理的功能,而是将分片项分配至各个运行中的作业服务器,开发者需要自行处理分片项与业务的对应关系。分片项为数字,始于0
而终于分片总数减1
。以上是ElasticJob
的官方文档对分片的描述,而文档对作业分片策略的介绍非常简单,只给了作业分片策略的SPI
名称,如下图所示:
作业分片策略
博主目前使用的是3.0.1
版本的ElasticJob‐Lite
(目前最新版本)。
<dependency>
<groupId>org.apache.shardingsphere.elasticjob</groupId>
<artifactId>elasticjob-lite-core</artifactId>
<version>3.0.1</version>
</dependency>
作业分片策略的SPI
名称是JobShardingStrategy
,是作业分片策略的顶层设计。
package org.apache.shardingsphere.elasticjob.infra.handler.sharding;
import org.apache.shardingsphere.elasticjob.infra.spi.TypedSPI;
import java.util.List;
import java.util.Map;
/**
* 作业分片策略
*/
public interface JobShardingStrategy extends TypedSPI {
/**
* 作业分片
* jobInstances – 参与分片的所有作业实例(作业服务器)
* jobName -作业名称
* shardingTotalCount – 分片总数
*/
Map<JobInstance, List<Integer>> sharding(List<JobInstance> jobInstances, String jobName, int shardingTotalCount);
}
JobShardingStrategy
接口的sharding
方法就是用来定义作业分片的逻辑,供子类实现,目前有三个实现类:AverageAllocationJobShardingStrategy
、OdevitySortByNameJobShardingStrategy
以及RoundRobinByNameJobShardingStrategy
。
AverageAllocationJobShardingStrategy
源码如下:
package org.apache.shardingsphere.elasticjob.infra.handler.sharding.impl;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobShardingStrategy;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
public final class AverageAllocationJobShardingStrategy implements JobShardingStrategy {
@Override
public Map<JobInstance, List<Integer>> sharding(final List<JobInstance> jobInstances, final String jobName, final int shardingTotalCount) {
if (jobInstances.isEmpty()) {
return Collections.emptyMap();
}
Map<JobInstance, List<Integer>> result = shardingAliquot(jobInstances, shardingTotalCount);
addAliquant(jobInstances, shardingTotalCount, result);
return result;
}
private Map<JobInstance, List<Integer>> shardingAliquot(final List<JobInstance> shardingUnits, final int shardingTotalCount) {
Map<JobInstance, List<Integer>> result = new LinkedHashMap<>(shardingUnits.size(), 1);
int itemCountPerSharding = shardingTotalCount / shardingUnits.size();
int count = 0;
for (JobInstance each : shardingUnits) {
List<Integer> shardingItems = new ArrayList<>(itemCountPerSharding + 1);
for (int i = count * itemCountPerSharding; i < (count + 1) * itemCountPerSharding; i++) {
shardingItems.add(i);
}
result.put(each, shardingItems);
count++;
}
return result;
}
private void addAliquant(final List<JobInstance> shardingUnits, final int shardingTotalCount, final Map<JobInstance, List<Integer>> shardingResults) {
int aliquant = shardingTotalCount % shardingUnits.size();
int count = 0;
for (Map.Entry<JobInstance, List<Integer>> entry : shardingResults.entrySet()) {
if (count < aliquant) {
entry.getValue().add(shardingTotalCount / shardingUnits.size() * shardingUnits.size() + count);
}
count++;
}
}
@Override
public String getType() {
return "AVG_ALLOCATION";
}
}
这是一种尽量平均分配的分片策略,如果作业的分片项无法平均分配给所有的作业服务器,即作业的分片项数%
作业服务器数不为零,则将无法平均分配的冗余分片项依次添加到序号较小的服务器中。 例如:
- 如果有
3
个作业服务器,总分片数为9
,每个作业服务器的分片项为:1=[0,1,2]
,2=[3,4,5]
,3=[6,7,8]
。 - 如果有
3
个作业服务器,总分片数为8
,每个作业服务器的分片项为:1=[0,1,6]
,2=[2,3,7]
,3=[4,5]
。 - 如果有
3
个作业服务器,总分片数为10
,每个作业服务器的分片项为:1=[0,1,2,9]
,2=[3,4,5]
,3=[6,7,8]
。
先给每个作业服务器分配相同数量的作业分片项(数量为:作业的分片项数/
作业服务器数)。
private Map<JobInstance, List<Integer>> shardingAliquot(final List<JobInstance> shardingUnits, final int shardingTotalCount) {
Map<JobInstance, List<Integer>> result = new LinkedHashMap<>(shardingUnits.size(), 1);
// 每个作业服务器最少应该分配的作业分片项数
int itemCountPerSharding = shardingTotalCount / shardingUnits.size();
int count = 0;
for (JobInstance each : shardingUnits) {
// 每个作业服务器申请的作业分片项列表(容量为itemCountPerSharding + 1)
// itemCountPerSharding + 1为每个作业服务器最多应该分配的作业分片项数
List<Integer> shardingItems = new ArrayList<>(itemCountPerSharding + 1);
for (int i = count * itemCountPerSharding; i < (count + 1) * itemCountPerSharding; i++) {
// 给作业分片项列表添加第i个作业分片项
shardingItems.add(i);
}
// 将作业服务器与它执行的作业分片项列表进行关联
result.put(each, shardingItems);
count++;
}
return result;
}
如果作业的分片项无法平均分配给所有的作业服务器,则将无法平均分配的冗余分片项依次添加到序号较小的服务器中。
private void addAliquant(final List<JobInstance> shardingUnits, final int shardingTotalCount, final Map<JobInstance, List<Integer>> shardingResults) {
// 无法平均分配的分片项数
int aliquant = shardingTotalCount % shardingUnits.size();
// 已分配的无法平均分配的分片项数
int count = 0;
for (Map.Entry<JobInstance, List<Integer>> entry : shardingResults.entrySet()) {
// 是否还有无法平均分配的分片项
if (count < aliquant) {
// 分配给序号较小的作业服务器
entry.getValue().add(shardingTotalCount / shardingUnits.size() * shardingUnits.size() + count);
}
// 已分配数更新
count++;
}
}
OdevitySortByNameJobShardingStrategy
源码如下:
package org.apache.shardingsphere.elasticjob.infra.handler.sharding.impl;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobShardingStrategy;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public final class OdevitySortByNameJobShardingStrategy implements JobShardingStrategy {
private final AverageAllocationJobShardingStrategy averageAllocationJobShardingStrategy = new AverageAllocationJobShardingStrategy();
@Override
public Map<JobInstance, List<Integer>> sharding(final List<JobInstance> jobInstances, final String jobName, final int shardingTotalCount) {
long jobNameHash = jobName.hashCode();
if (0 == jobNameHash % 2) {
Collections.reverse(jobInstances);
}
return averageAllocationJobShardingStrategy.sharding(jobInstances, jobName, shardingTotalCount);
}
@Override
public String getType() {
return "ODEVITY";
}
}
其实还是使用AverageAllocationJobShardingStrategy
作业分片策略进行分配,只是会先根据作业名称的哈希码的奇偶性来决定是否对作业服务器列表进行reverse
操作。例如:
- 如果有
3
个作业服务器,总分片数为2
,作业名称的哈希码为奇数(对作业服务器列表不进行reverse
操作),每个作业服务器的分片项为:1=[0]
,2=[1]
,3=[]
。 - 如果有
3
个作业服务器,总分片数为2
,作业名的哈希码是偶数(对作业服务器列表进行reverse
操作),每个作业服务器的分片项为:3=[0]
,2=[1]
,1=[]
。
RoundRobinByNameJobShardingStrategy
源码如下:
package org.apache.shardingsphere.elasticjob.infra.handler.sharding.impl;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobShardingStrategy;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public final class RoundRobinByNameJobShardingStrategy implements JobShardingStrategy {
private final AverageAllocationJobShardingStrategy averageAllocationJobShardingStrategy = new AverageAllocationJobShardingStrategy();
@Override
public Map<JobInstance, List<Integer>> sharding(final List<JobInstance> jobInstances, final String jobName, final int shardingTotalCount) {
return averageAllocationJobShardingStrategy.sharding(rotateServerList(jobInstances, jobName), jobName, shardingTotalCount);
}
private List<JobInstance> rotateServerList(final List<JobInstance> shardingUnits, final String jobName) {
int shardingUnitsSize = shardingUnits.size();
int offset = Math.abs(jobName.hashCode()) % shardingUnitsSize;
if (0 == offset) {
return shardingUnits;
}
List<JobInstance> result = new ArrayList<>(shardingUnitsSize);
for (int i = 0; i < shardingUnitsSize; i++) {
int index = (i + offset) % shardingUnitsSize;
result.add(shardingUnits.get(index));
}
return result;
}
@Override
public String getType() {
return "ROUND_ROBIN";
}
}
其实跟OdevitySortByNameJobShardingStrategy
作业分片策略类似,都是使用AverageAllocationJobShardingStrategy
作业分片策略进行分配,并且在分配前都会根据作业名称的哈希码将作业服务器列表中的作业服务器项改变顺序,只是变序规则不一样而已,OdevitySortByNameJobShardingStrategy
作业分片策略根据作业名称的哈希码的奇偶性来决定是否对作业服务器列表进行reverse
操作,而RoundRobinByNameJobShardingStrategy
作业分片策略根据作业名称的哈希码的绝对值%
作业服务器数的值对作业服务器列表进行rotate
操作。例如:
- 如果有
3
个作业服务器,总分片数为2
,作业名称的哈希码的绝对值%
作业服务器数的值为0
,每个作业服务器的分片项为:1=[0]
,2=[1]
,3=[]
。 - 如果有
3
个作业服务器,总分片数为2
,作业名称的哈希码的绝对值%
作业服务器数的值为1
,每个作业服务器的分片项为:2=[0]
,3=[1]
,1=[]
。 - 如果有
3
个作业服务器,总分片数为2
,作业名称的哈希码的绝对值%
作业服务器数的值为2
,每个作业服务器的分片项为:3=[0]
,1=[1]
,2=[]
。
JobShardingStrategyFactory
作业的分片策略通过JobShardingStrategyFactory
类(作业分片策略工厂类)的getStrategy
方法获取,源码如下:
package org.apache.shardingsphere.elasticjob.infra.handler.sharding;
import com.google.common.base.Strings;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.elasticjob.infra.exception.JobConfigurationException;
import org.apache.shardingsphere.elasticjob.infra.spi.ElasticJobServiceLoader;
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class JobShardingStrategyFactory {
private static final String DEFAULT_STRATEGY = "AVG_ALLOCATION";
static {
ElasticJobServiceLoader.registerTypedService(JobShardingStrategy.class);
}
public static JobShardingStrategy getStrategy(final String type) {
if (Strings.isNullOrEmpty(type)) {
return ElasticJobServiceLoader.getCachedTypedServiceInstance(JobShardingStrategy.class, DEFAULT_STRATEGY).get();
}
return ElasticJobServiceLoader.getCachedTypedServiceInstance(JobShardingStrategy.class, type)
.orElseThrow(() -> new JobConfigurationException("Cannot find sharding strategy using type '%s'.", type));
}
}
在JobShardingStrategyFactory
类的静态块中使用ElasticJobServiceLoader
类的registerTypedService
方法加载所有作业分片策略。
static {
ElasticJobServiceLoader.registerTypedService(JobShardingStrategy.class);
}
ElasticJobServiceLoader
类的相关代码如下所示,通过Java
提供的SPI
机制(ServiceLoader
类)加载所有作业分片策略。
private static final ConcurrentMap<Class<? extends TypedSPI>, ConcurrentMap<String, TypedSPI>> TYPED_SERVICES = new ConcurrentHashMap<>();
private static final ConcurrentMap<Class<? extends TypedSPI>, ConcurrentMap<String, Class<? extends TypedSPI>>> TYPED_SERVICE_CLASSES = new ConcurrentHashMap<>();
public static <T extends TypedSPI> void registerTypedService(final Class<T> typedService) {
if (TYPED_SERVICES.containsKey(typedService)) {
return;
}
ServiceLoader.load(typedService).forEach(each -> registerTypedServiceClass(typedService, each));
}
private static <T extends TypedSPI> void registerTypedServiceClass(final Class<T> typedService, final TypedSPI instance) {
TYPED_SERVICES.computeIfAbsent(typedService, unused -> new ConcurrentHashMap<>()).putIfAbsent(instance.getType(), instance);
TYPED_SERVICE_CLASSES.computeIfAbsent(typedService, unused -> new ConcurrentHashMap<>()).putIfAbsent(instance.getType(), instance.getClass());
}
默认为AverageAllocationJobShardingStrategy
作业分片策略,和官方文档给的示意图是对应的。
private static final String DEFAULT_STRATEGY = "AVG_ALLOCATION";
AverageAllocationJobShardingStrategy
类的getType
方法(ElasticJobServiceLoader
类加载所有作业分片策略会将getType
方法的返回值作为存储每个作业分片策略实例的第二个key
值)。
@Override
public String getType() {
return "AVG_ALLOCATION";
}
到这里就结束了,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。