在上一篇中,我们介绍了在项目中如何整合 Nacos、OpenFeign。这一篇,我们来介绍一下项目中 Sentinel 的整合。
1. 关于 Sentinel
Sentinel 是阿里巴巴开源的分布式系统的流量防卫组件,Sentinel 把流量作为切入点,从流量控制,熔断降级,系统负载保护等多个维度保护服务的稳定性。
Sentinel 的使用可以分为两个部分:
- 核心库(Java 客户端):不依赖任何框架/库,能够运行于 Java 8 及以上的版本的运行时环境,同时对 Dubbo / Spring Cloud 等框架也有较好的支持(见 主流框架适配)。
- 控制台(Dashboard):Dashboard 主要负责管理推送规则、监控、管理机器信息等。
关于 Sentinel 的更详细介绍请参考官方文档(链接地址)。
2. 版本选择
由于我们 Spring Cloud Alibaba 选择的是 2022.0.0.0 版本,其中的 Sentinel 版本为 1.8.6,我们的 Sentinel 控制台(Dashboard) 也选择版本为 1.8.6。
3. 控制台搭建
第一步:下载控制台 jar, 本篇下载 jar 包为:sentinel-dashboard-1.8.6.jar 。
第二步:本篇将 8080 端口改为使用 8060,下列命令启动控制台。
java -Dserver.port=8060 -Dcsp.sentinel.dashboard.server=localhost:8060 -Dproject.name=sentinel-dashboard -jar sentinel-dashboard-1.8.6.jar
启动日志如下。
第三步:浏览器输入地址:http://localhost:8060/#,即可访问。
输入用户名:sentinel,密码:sentinel,即可登录。
4. Sentinel 整合
分别给账户模块、商品模块、订单模块的 adapter 工程添加 Sentinel 依赖。
<!-- spring-cloud-starter-alibaba-sentinel -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>
4.1. 控制台配置
在 nacos 的 dev 名称空间中创建名称为 sentinel.yaml 的配置文件,内容如下。
以商品模块为例,在 start 工程的 bootstrap.yml 文件追加 sentinel 配置如下。
4.2. 测试限流
以查询商品列表接口为例,未添加限流规则之前,访问频率不受限制。
我们对“/product/queryList”接口做如下流控。
连续点击请求,则出现下列流控效果了。
5. 自定义异常
在上面对“/product/queryList”接口限流的测试中,连续发送请求,达到限流条件直接响应“Blocked by Sentinel (flow limiting)”。这个提示信息看起来不太友好,我们可以通过 @SentinelResource 注解的 blockHandler 或 fallback 属性配置异常方法。blockHandler 与 fallback 的主要区别是,blockHandler 针对 BlockException 异常,fallback 针对所有异常。
5.1. 测试 fallback
给“/product/queryList”接口添加 @SentinelResource 注解及 fallback 对应的方法如下。
@Operation(summary = "查查满足条件的记录")
@PostMapping("/queryList")
@SentinelResource(value = "/product/queryList",fallback = "queryListFallback")
public ResponseResult<List<ProductRspDTO>> queryList(@RequestBody ProductQuery productQuery){
log.info("query product list, productQuery:{}",productQuery);
List<ProductRspDTO> dtoList = this.productExecutor.queryProductList(productQuery);
return ResponseResult.ok(dtoList);
}
public ResponseResult<ProductRspDTO> queryListFallback(ProductQuery productQuery,Throwable exception){
log.info("grade exception{}",exception.getClass().getCanonicalName());
return ResponseResult.fail("308","请降低请求频率!");
}
连续发送请求,达到限流条件,则返回下列结果。
5.2. 测试 blockHandler
对于“/product/queryList”接口,如果将 @SentinelResource 注解中的 fallback 改成 blockHandler,且方法本身未定义 throws BlockException,则会被 JVM 包装一层 UndeclaredThrowableException 抛出。例如下面代码。
@Operation(summary = "查查满足条件的记录")
@PostMapping("/queryList")
@SentinelResource(value = "/product/queryList",blockHandler = "queryListFallback")
public ResponseResult<List<ProductRspDTO>> queryList(@RequestBody ProductQuery productQuery){
log.info("query product list, productQuery:{}",productQuery);
List<ProductRspDTO> dtoList = this.productExecutor.queryProductList(productQuery);
return ResponseResult.ok(dtoList);
}
public ResponseResult<ProductRspDTO> queryListFallback(ProductQuery productQuery,Throwable exception){
log.info("grade exception{}",exception.getClass().getCanonicalName());
return ResponseResult.fail("308","请降低请求频率!");
}
测试结果如下。
异常打印日志如下。
给“/product/queryList”接口添加 BlockException 异常,并将 queryListFallback 方法捕获异常修改为 BlockException,代码如下。
@Operation(summary = "查查满足条件的记录")
@PostMapping("/queryList")
@SentinelResource(value = "/product/queryList",blockHandler = "queryListFallback")
public ResponseResult<List<ProductRspDTO>> queryList(@RequestBody ProductQuery productQuery) throws BlockException{
log.info("query product list, productQuery:{}",productQuery);
List<ProductRspDTO> dtoList = this.productExecutor.queryProductList(productQuery);
return ResponseResult.ok(dtoList);
}
public ResponseResult<ProductRspDTO> queryListFallback(ProductQuery productQuery,BlockException exception){
log.info("grade exception{}",exception.getClass().getCanonicalName());
return ResponseResult.fail("308","请降低请求频率!");
}
测试结果如下。
5.3. 测试熔断降级
我们以订单服务调用商品服务接口为例,假设订单服务中需要通过商品编号查询商品信息,如果商品服务提供的接口不稳定,则我们可以对它进行熔断降级处理。
5.3.1. 商品服务接口提供
商品模块 client 工程中,新增根据商品code查询商品信息的接口,代码如下。
@FeignClient(name = BaseConstant.Domain.PRODUCT,fallbackFactory = ProductFeignClientFallbackFactory.class)
public interface ProductFeignClient {
@Operation(summary = "根据id查询商品信息")
@GetMapping(BaseConstant.AdapterType.CLIENT+"/product/v1/queryById/{id}")
@Parameter(name = "id",description = "商品id",required = true)
ProductRspDTO queryById(@PathVariable(value = "id") Long id);
@Operation(summary = "根据商品编码查询商品信息")
@GetMapping(BaseConstant.AdapterType.CLIENT+"/product/v1/queryByCode/{productCode}")
@Parameter(name = "productCode",description = "商品编码",required = true)
ProductRspDTO queryByCode(@PathVariable(value = "productCode") String productCode) throws BizException;
}
@Slf4j
@Tag(name = "product-client端api")
@RestController
public class ProductClientController implements ProductFeignClient {
@Resource
private ProductExecutor productExecutor;
@Override
public ProductRspDTO queryById(Long id){
log.info("query product detail,id is :{}",id);
return this.productExecutor.queryProductById(id);
}
@Override
public ProductRspDTO queryByCode(String productCode) throws BizException {
return this.productExecutor.queryProductByCode(productCode);
}
}
@Component
public class ProductExecutor {
@Resource
private ProductService productService;
/**
* 省略其他CRUD方法
*/
/**
* 根据code查询商品
*/
public ProductRspDTO queryProductByCode(String productCode) throws BizException{
return this.productService.queryByCode(productCode);
}
}
public interface ProductService extends IService<ProductEntity> {
/**
* 省略其他CRUD方法
*/
/**
* 根据code查询商品
*/
ProductRspDTO queryByCode(String productCode) throws BizException;
}
@Service
public class ProductServiceImpl extends ServiceImpl<ProductMapper, ProductEntity> implements ProductService {
/**
* 省略其他CRUD方法
*/
/**
* 根据code查询商品
*/
@Override
public ProductRspDTO queryByCode(String productCode) throws BizException {
Random random = new Random();
if(random.nextInt() % 2 == 0){
throw ExceptionFactory.bizException("出现偶数,抛出异常!");
}
LambdaQueryWrapper<ProductEntity> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.eq(ProductEntity::getProductCode,productCode);
ProductEntity productEntity = this.getOne(lambdaQueryWrapper);
ProductRspDTO productRspDTO = new ProductRspDTO();
BeanUtils.copyProperties(productEntity,productRspDTO);
return productRspDTO;
}
}
其中 ProductFeignClientFallbackFactory 为处理熔断降级的工厂类,代码如下。
@Component
public class ProductFeignClientFallbackFactory implements FallbackFactory<ProductFeignClient> {
@Override
public ProductFeignClient create(Throwable cause) {
cause.printStackTrace();
return new ProductFeignClientFallback();
}
}
熔断降级具体处理方法在 ProductFeignClientFallback 类中,代码如下。
@Component
public class ProductFeignClientFallback implements ProductFeignClient {
@Override
public ProductRspDTO queryById(Long id) {
return null;
}
@Override
public ProductRspDTO queryByCode(String productCode) throws BizException {
ProductRspDTO productRspDTO = new ProductRspDTO();
productRspDTO.setProductName("熔断降级");
return productRspDTO;
}
}
在 ProductServiceImpl 类的 queryByCode 方法中,我们根据产生的随机数进行判断,如果为偶数,则抛出异常;如果为奇数,则正常返回。
5.3.2. 订单服务接口调用
订单模块的 OrderController 增加如下接口。
@Operation(summary = "根据商品code查询商品信息")
@GetMapping("/queryProductByCode/{productCode}")
@Parameter(name = "productCode",description = "商品编码",required = true)
public ResponseResult<ProductRspDTO> queryProductByCode(@PathVariable(value = "productCode") String productCode) throws BizException {
log.info("query product detail,productCode is :{}",productCode);
ProductRspDTO productRspDTO = this.orderExecutor.queryProductByCode(productCode);
return ResponseResult.ok(productRspDTO);
}
其中 this.orderExecutor.queryProductByCode(productCode) 方法代码如下。
public ProductRspDTO queryProductByCode(String productCode) throws BizException {
return this.productFeignClient.queryByCode(productCode);
}
5.3.3. 添加配置信息
给 Nacos 中的 sentinel.yaml 文件添加如下配置信息,开启 Feign 的 Sentinel 功能。
feign:
sentinel:
enabled: true
给 Sentinel 的 "/client/product/v1/queryByCode/{productCode}" 资源添加如下配置。
5.3.4. 接口测试
启动商品服务、订单服务,访问 http://localhost:7050/doc.html ,打开订单服务的 Knife4j 文档。选择 adapter-web 组。找到”根据商品code查询商品信息“ 接口,输入商品code,商品服务接口正常时,测试如下。
商品服务接口异常时,测试如下。
控制台打印异常信息如下。
不断点击“发送”,当10秒内发送的请求次数大于或等于5,且出现两次异常时,将触发熔断,熔断时长为5秒,在这5秒内,一直点击“发送”,订单服务将不再调用商品服务接口。触发熔断5秒结束后,又进入到下一个周期的统计。
6. 配置持久化
目前我们在 Sentinel 控制台上配置的限流或熔断规则都是存储在微服务内存中的,一旦重启微服务或者 Sentinel,则配置信息将丢失,我们可以将配置信息持久化到 Nacos 中。Sentinel 的 Nacos 数据源依赖如下。
<!-- sentinel 持久化数据源 -->
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-datasource-nacos</artifactId>
</dependency>
6.1. 流控配置
在 Nacos 中添加 dataId 为 mall-product-service-flow.json 的 json 文件,json 内容如下。
[
{
"resource": "/product/queryList",
"limitApp": "default",
"grade": 1,
"count": 1,
"strategy": 0,
"controlBehavior": 0,
"clusterMode": false
}
]
上面 json 内容中,各字段说明如下。
- resource:资源名,即限流规则的作用对象。
- limitApp:流控针对的调用来源,若为 default 则不区分调用来源。
- grade:限流阈值类型(QPS 或并发线程数),0表示根据并发数量来限流、1表示根据QPS来进行流量控制。
- count:限流阈值。
- strategy:调用关系限流策略,0表示直接、1表示关联、2表示链路。
- controlBehavior:流量控制效果(直接拒绝、Warm Up、匀速排队)。
- clusterMode:是否为集群模式。
上面的 json 内容与下面控制台界面中的配置效果相同。
Nacos 配置详情如下。
6.2. 熔断配置
在 Nacos 中添加 dataId 为 mall-product-service-degrade.json 的 json 文件,json 内容如下。
[
{
"resource": "/client/product/v1/queryByCode/{productCode}",
"grade": 2,
"count": 2,
"timeWindow": 5,
"minRequestAmount": 5,
"statIntervalMs": 10000
}
]
上面 json 内容中,各字段说明如下。
- resource:资源名,即降级规则的作用对象。
- grade:降级模式 0:RT、1:异常比例、2:异常数。
- count:慢调用比例模式下为慢调用临界 RT(超出该值计为慢调用);异常比例/异常数模式下为对应的阈值。
- timeWindow:熔断时长,单位为 s。
- minRequestAmount:熔断触发的最小请求数,请求数小于该值时即使异常比率超出阈值也不会熔断(1.7.0 引入)。
- statIntervalMs:统计时长(单位为 ms),如 60*1000 代表分钟级(1.8.0 引入)。
上面的 json 内容与下面控制台界面中的配置效果相同。
6.3. 数据源配置
在 Nacos 中创建商品模块数据源配置,名称为:product-sentinel-datasource.yaml,内容如下。
spring:
cloud:
sentinel:
web-context-unify: false # 默认将调用链路收敛,需要打开才可以进行链路流控
datasource:
flow-ds: #流控数据源
nacos:
server-addr: ${NACOS_ADDR}
namespace: ${NACOS_NAMESPACE}
dataId: ${spring.application.name}-flow.json
groupId: mall
data-type: json
rule-type: flow
degrade-ds: #熔断数据源
nacos:
server-addr: ${NACOS_ADDR}
namespace: ${NACOS_NAMESPACE}
dataId: ${spring.application.name}-degrade.json
groupId: mall
data-type: json
rule-type: degrade
商品模块 start 工程配置文件追加 product-sentinel-datasource.yaml 如下。
6.4. 数据源测试
重启商品服务,进入 Sentinel 控制台查看流控配置,刚启动,并未注册流控配置信息。
重新发起“/product/queryList”接口请求。
再打开 Sentinel 控制台流控规则列表,发现流控配置信息已经注册上来了。
详细配置如下。
再打开 Sentinel 控制台熔断规则列表,发现熔断配置信息也注册上来了。
详细配置如下。
流控测试效果如下。
熔断测试效果如下。
7. 配置持久化优化
上面的持久化操作,只是针对在 Nacos 上配置的持久化,如果从 Sentinel 控制台中配置的规则,则不会推送到 Nacos 中进行持久化。要解决这个问题,有两种方法可以处理。
方法一:从 Sentinel 控制台推送到应用,由应用持久化到 Nacos,需在应用上添加代码,实现 WritableDataSource 接口的 write(T value) 方法重写。
方法二:从 Sentinel 控制台直接持久化到 Nacos,需要改造 Sentinel 控制台源码,从新打包。
7.1. 方法实现
在这里,我们使用方法一比较简单。在商品模块 start 工程 org.example.product.sentinel 包路径下添加代码。
添加 NacosWritableDataSource 类,实现 WritableDataSource 接口的 write(T value) 方法重写,代码如下。
@Slf4j
public class NacosWritableDataSource<T> implements WritableDataSource<T> {
private final Converter<T, String> configEncoder;
private final NacosDataSourceProperties nacosDataSourceProperties;
private final Lock lock = new ReentrantLock(true);
private ConfigService configService;
public NacosWritableDataSource(NacosDataSourceProperties nacosDataSourceProperties, Converter<T, String> configEncoder) {
if (configEncoder == null) {
throw new IllegalArgumentException("Config encoder cannot be null");
}
if (nacosDataSourceProperties == null) {
throw new IllegalArgumentException("Nacos DataSource Properties cannot be null");
}
this.configEncoder = configEncoder;
this.nacosDataSourceProperties = nacosDataSourceProperties;
final Properties properties = buildProperties(nacosDataSourceProperties);
try {
this.configService = NacosFactory.createConfigService(properties);
} catch (NacosException e) {
log.error("create configService failed.", e);
}
}
/*
* @description 构建Nacos访问信息
*/
private Properties buildProperties(NacosDataSourceProperties nacosDataSourceProperties) {
Properties properties = new Properties();
Field[] fields = nacosDataSourceProperties.getClass().getDeclaredFields();
for(Field field : fields){
field.setAccessible(true);
String fieldName = field.getName();
String fieldValue = null;
try {
fieldValue = (String)field.get(nacosDataSourceProperties);
} catch (IllegalAccessException e) {
e.printStackTrace();
}
log.info("fieldName={},fieldValue={}",fieldName,fieldValue);
if(!StringUtils.isEmpty(fieldValue)){
properties.setProperty(fieldName, fieldValue);
}
}
return properties;
}
@Override
public void write(T value) throws Exception {
lock.lock();
// todo handle cluster concurrent problem
try {
String rule = configEncoder.convert(value);
if (configService == null) {
log.error("configServer is null, can not continue.");
return;
}
final boolean published = configService.publishConfig(nacosDataSourceProperties.getDataId(), nacosDataSourceProperties.getGroupId(), rule);
if (!published) {
log.error("sentinel {} publish to nacos failed.", nacosDataSourceProperties.getRuleType());
}
} finally {
lock.unlock();
}
}
@Override
public void close() throws Exception {
}
}
添加 NacosDataSourceHandler 类,将 NacosWritableDataSource 注入,代码如下。
public class NacosDataSourceHandler implements SmartInitializingSingleton {
private final SentinelProperties sentinelProperties;
public NacosDataSourceHandler(SentinelProperties sentinelProperties) {
this.sentinelProperties = sentinelProperties;
}
@Override
public void afterSingletonsInstantiated() {
sentinelProperties.getDatasource().values().forEach(this::registryWriter);
}
private void registryWriter(DataSourcePropertiesConfiguration dataSourcePropertiesConfiguration) {
final NacosDataSourceProperties nacosDataSourceProperties = dataSourcePropertiesConfiguration.getNacos();
if (nacosDataSourceProperties == null) {
return;
}
final RuleType ruleType = nacosDataSourceProperties.getRuleType();
switch (ruleType) {
case FLOW:
WritableDataSource<List<FlowRule>> flowRuleWriter = new NacosWritableDataSource<>(nacosDataSourceProperties, this::encodeJson);
WritableDataSourceRegistry.registerFlowDataSource(flowRuleWriter);
break;
case DEGRADE:
WritableDataSource<List<DegradeRule>> degradeRuleWriter = new NacosWritableDataSource<>(nacosDataSourceProperties, this::encodeJson);
WritableDataSourceRegistry.registerDegradeDataSource(degradeRuleWriter);
break;
case PARAM_FLOW:
WritableDataSource<List<ParamFlowRule>> paramFlowRuleWriter = new NacosWritableDataSource<>(nacosDataSourceProperties, this::encodeJson);
ModifyParamFlowRulesCommandHandler.setWritableDataSource(paramFlowRuleWriter);
break;
case SYSTEM:
WritableDataSource<List<SystemRule>> systemRuleWriter = new NacosWritableDataSource<>(nacosDataSourceProperties, this::encodeJson);
WritableDataSourceRegistry.registerSystemDataSource(systemRuleWriter);
break;
case AUTHORITY:
WritableDataSource<List<AuthorityRule>> authRuleWriter = new NacosWritableDataSource<>(nacosDataSourceProperties, this::encodeJson);
WritableDataSourceRegistry.registerAuthorityDataSource(authRuleWriter);
break;
default:
break;
}
}
private <T> String encodeJson(T t) {
return JSON.toJSONString(t);
}
}
添加 NacosDataSourceConfig 配置类,将 NacosDataSourceHandler 纳入 Spring 容器管理 ,代码如下。
@Configuration
public class NacosDataSourceConfig {
@Bean
public NacosDataSourceHandler nacosDataSourceHandler(SentinelProperties sentinelProperties) {
return new NacosDataSourceHandler(sentinelProperties);
}
}
7.2. 测试
重启商品服务,查看 Nacos 中 mall-product-service-flow.json 文件信息,目前只有一条“/product/queryList”资源的流控配置。
请求一下 “/web/v1/product/pageList”接口。
Sentinel 控制台也是只有一条“/product/queryList”资源的流控配置。
在 Sentinel 控制台上添加“/web/v1/product/pageList”资源的流控配置。
此时再到 Nacos 中查看 mall-product-service-flow.json 文件信息,发现已经有“/web/v1/product/pageList”资源的流控配置了。
再快速请求“/web/v1/product/pageList”接口,限流也起作用了。
8. 总结
本篇先介绍了 Sentinel 控制台的搭建,及 Sentinel 客户端的整合。然后,介绍了流控、熔断的自定义异常,并进行了相应测试。最后,介绍了如何将 Sentinel 的流控、熔断等规则信息持久化到 Nacos 中。