之前的有做过sentinel整合的工作,已经完成sentinel客户端集成到普通微服务、集成到gateway,以及sentinel控制台的规则数据持久化到nacos,当时的工作基本做的差不多了,但是还差一个监控数据的持久化没有做,这次是监控数据持久化到influxDB这一块补齐,涉及到源码的改造,经过测试已经基本实现了功能。
1、前置准备
- 需要安装InfluxDB,安装不多说,详情见Linux安装InfluxDB ,Windows下安装自行百度。
- 下载sentinel-dashboard的源码,前面的文章已经说了,这里就不再赘述,详细见 Sentinel-dashboard源码改造
2、依赖引入
这里自行选择合适的版本
<dependency>
<groupId>com.influxdb</groupId>
<artifactId>influxdb-client-java</artifactId>
<version>6.3.0</version>
</dependency>
3、新建一个文件夹存放Influx相关代码
3.1、参考MetricEntity创建SentinelMetric
其实就是指定time为时间戳,然后app和resource为Tag,其他的字段都是Field,Field需要转换成数值类型存储,例如long、int、double类型,@Measurement用来指定表的名称。
注:网上其他的文章,将所有的字段都标记为Tag其实是不对的,因为tag就是表示数据的归属,这里就是app和resource,这块不太理解的可以参考 时序数据库的数据结构
import com.influxdb.annotations.Column;
import com.influxdb.annotations.Measurement;
import java.time.Instant;
/**
* 参考com.alibaba.csp.sentinel.dashboard.datasource.entity.MetricEntity
*/
@Measurement(name = "sentinel_metric")
public class SentinelMetric {
/**
* 时间戳
*/
@Column(name = "time", timestamp = true)
private Instant time;
@Column(name = "gmtCreate")
private long gmtCreate;
@Column(name = "gmtModified")
private long gmtModified;
@Column(name = "app", tag = true)
private String app;
/**
* 监控信息的时间戳
*/
@Column(name = "resource", tag = true)
private String resource;
@Column(name = "passQps")
private long passQps;
@Column(name = "successQps")
private long successQps;
@Column(name = "blockQps")
private long blockQps;
@Column(name = "exceptionQps")
private long exceptionQps;
/**
* summary rt of all success exit qps.
*/
@Column(name = "rt")
private double rt;
/**
* 本次聚合的总条数
*/
@Column(name = "count")
private int count;
@Column(name = "resourceCode")
private int resourceCode;
public Instant getTime() {
return time;
}
public void setTime(Instant time) {
this.time = time;
}
public long getGmtCreate() {
return gmtCreate;
}
public void setGmtCreate(long gmtCreate) {
this.gmtCreate = gmtCreate;
}
public long getGmtModified() {
return gmtModified;
}
public void setGmtModified(long gmtModified) {
this.gmtModified = gmtModified;
}
public String getApp() {
return app;
}
public void setApp(String app) {
this.app = app;
}
public String getResource() {
return resource;
}
public void setResource(String resource) {
this.resource = resource;
}
public long getPassQps() {
return passQps;
}
public void setPassQps(long passQps) {
this.passQps = passQps;
}
public long getSuccessQps() {
return successQps;
}
public void setSuccessQps(long successQps) {
this.successQps = successQps;
}
public long getBlockQps() {
return blockQps;
}
public void setBlockQps(long blockQps) {
this.blockQps = blockQps;
}
public long getExceptionQps() {
return exceptionQps;
}
public void setExceptionQps(long exceptionQps) {
this.exceptionQps = exceptionQps;
}
public double getRt() {
return rt;
}
public void setRt(double rt) {
this.rt = rt;
}
public int getCount() {
return count;
}
public void setCount(int count) {
this.count = count;
}
public int getResourceCode() {
return resourceCode;
}
public void setResourceCode(int resourceCode) {
this.resourceCode = resourceCode;
}
}
3.2、Influx相关配置(Bean的注入)
为了防止每次调用Influx的API接口都去new 一个客户端,我们可以将其注入到Spring容器,初始化这些客户端时,需要用到一些参数,包含数据库的url、访问API接口时用到的Token(Token的申请见InfluxDB基本使用的 1.4 API Tokens章节),还需要用到账号注册时填写的组织,这些参数都可以放到配置文件中,代码如下:
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import com.influxdb.client.QueryApi;
import com.influxdb.client.WriteApiBlocking;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class InfluxDBConfig {
@Value("${influx.token}")
private String token;
@Value("${influx.url}")
private String url;
@Value("${influx.org:InfluxData}")
private String org;
@Bean
public InfluxDBClient influxDBClient() {
return InfluxDBClientFactory.create(url, token.toCharArray(), org);
}
@Bean
public QueryApi queryApi() {
return influxDBClient().getQueryApi();
}
@Bean
public WriteApiBlocking writeApiBlocking() {
return influxDBClient().getWriteApiBlocking();
}
}
3.3、配置文件
这里是在application.properties中增加如下参数,根据实际情况填写。
influx.url=http://localhost:8086
influx.token=sMiW3c4w0wBZO8WwZK_MrseZUYht9Fmm7HOUQDZF_kj6-Icwg0_6tEt_iyBSdTFAvRywHlOsuDMScypi93ejPQ==
influx.bucket=sentinel_db
influx.org=InfluxData
4、实现MetricsRepository<MetricEntity>接口
这里可以参考sentinel原有的实现,其原有的实现是将数据的存取放到内存,具体实现见InMemoryMetricsRepository,这里我们命名为InfluxDBMetricsRepository,存放位置如下图:
主要就是实现其保存监控数据到InfluxDB和从InfluxDB查询数据的逻辑,代码如下:
import com.alibaba.csp.sentinel.dashboard.datasource.entity.MetricEntity;
import com.alibaba.csp.sentinel.dashboard.influxdb.SentinelMetric;
import com.alibaba.csp.sentinel.util.StringUtil;
import com.influxdb.annotations.Column;
import com.influxdb.client.QueryApi;
import com.influxdb.client.WriteApiBlocking;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.exceptions.InfluxException;
import com.influxdb.query.FluxRecord;
import com.influxdb.query.FluxTable;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.lang.reflect.Field;
import java.math.BigDecimal;
import java.time.Instant;
import java.util.*;
import java.util.stream.Collectors;
@Component
public class InfluxDBMetricsRepository implements MetricsRepository<MetricEntity> {
@Autowired
private QueryApi queryApi;
@Autowired
private WriteApiBlocking writeApiBlocking;
/**
* InfluxDB界面上新建一个bucket
*/
@Value("${influx.bucket:sentinel_db}")
private String bucket;
/**
* 首次登录Influx界面时填写的组织
*/
@Value("${influx.org:InfluxData}")
private String org;
/**
* SentinelMetric上的@Measurement(name = "sentinel_metric")注解的name
*/
@Value("${influx.measurement:sentinel_metric}")
private String measurement;
private static final String PASS_QPS = "passQps";
private static final String BLOCK_QPS = "blockQps";
private static final String RESOURCE = "resource";
@Override
public void save(MetricEntity metric) {
writeApiBlocking.writeMeasurement(bucket, org, WritePrecision.NS, transferToSentinelMetric(metric));
}
@Override
public void saveAll(Iterable<MetricEntity> metrics) {
Iterator<MetricEntity> iterator = metrics.iterator();
List<SentinelMetric> list = new ArrayList<>();
while (iterator.hasNext()) {
list.add(transferToSentinelMetric(iterator.next()));
}
if (!CollectionUtils.isEmpty(list)) {
writeApiBlocking.writeMeasurements(bucket, org, WritePrecision.NS, list);
}
}
@Override
public List<MetricEntity> queryByAppAndResourceBetween(String app, String resource, long startTime, long endTime) {
List<MetricEntity> results = new ArrayList<>();
if (StringUtil.isBlank(app)) {
return results;
}
if (StringUtil.isBlank(resource)) {
return results;
}
long nowTime = System.currentTimeMillis();
String start = (startTime - nowTime) / 60000 + "m";
String stop = (endTime - nowTime) / 60000 + "m";
String query = String.format("from(bucket: \"%s\")\n" +
" |> range(start: %s , stop: %s)\n" +
" |> filter(fn: (r) => (r[\"_measurement\"] == \"%s\"" +
" and r[\"app\"] ==\"%s\")" + "and r[\"resource\"] == \"%s\")",
bucket, start, stop, measurement, app, resource);
final List<FluxTable> tableList = queryApi.query(query, org);
//获取总的数据条数
List<FluxRecord> records = tableList.get(0).getRecords();
int size = records.size();
List<SentinelMetric> sentinelMetricList = new ArrayList<>();
Field[] fields = SentinelMetric.class.getDeclaredFields();
Map<String, Field> fieldMap = new HashMap<>(fields.length);
for (Field field : fields) {
Column anno = field.getAnnotation(Column.class);
if (anno != null && !anno.name().isEmpty()) {
fieldMap.put(anno.name(), field);
}
}
for (int i = 0; i < size; i++) {
SentinelMetric sentinelMetric = new SentinelMetric();
sentinelMetric.setApp(app);
sentinelMetric.setResource(resource);
for (FluxTable table : tableList) {
FluxRecord record = table.getRecords().get(i);
sentinelMetric.setTime((Instant) record.getValueByKey("_time"));
String fieldName = record.getField();
if (fieldMap.containsKey(fieldName)) {
setFieldValue(sentinelMetric, fieldMap.get(fieldName), record.getValue());
}
}
sentinelMetricList.add(sentinelMetric);
}
for (SentinelMetric metric : sentinelMetricList) {
results.add(transferToMetricEntity(metric));
}
return results;
}
@Override
public List<String> listResourcesOfApp(String app) {
List<String> results = new ArrayList<>();
if (StringUtil.isBlank(app)) {
return results;
}
//只查询passQps和blockQps数据,因为下面的排序逻辑只用到了这两个参数
String query = String.format("from(bucket: \"%s\")\n" +
" |> range(start: %s)\n" +
" |> filter(fn: (r) => r[\"_measurement\"] == \"%s\"" + " and r[\"app\"] ==\"%s\")\n" +
" |> filter(fn: (r) => r[\"_field\"] == \"passQps\" or r[\"_field\"] == \"blockQps\")",
bucket, "-1m", measurement, app);
List<FluxTable> tableList = queryApi.query(query, org);
if (CollectionUtils.isEmpty(tableList)) {
return results;
}
//返回resource名称列表,按block_qps降序排列,block_qps相同时,按pass_qps降序排列
Map<String, MetricEntity> resourceCount = new HashMap<>(32);
for (FluxTable table : tableList) {
final List<FluxRecord> records = table.getRecords();
for (FluxRecord record : records) {
String fieldName = record.getField();
if (BLOCK_QPS.equals(fieldName) || PASS_QPS.equals(record.getField())) {
String resource = (String) record.getValueByKey(RESOURCE);
final Object o = record.getValue();
if (o == null) {
continue;
}
Long value = (Long) o;
if (resourceCount.containsKey(resource)) {
final MetricEntity metricEntity = resourceCount.get(resource);
if (BLOCK_QPS.equals(fieldName)) {
metricEntity.addBlockQps(value);
} else {
metricEntity.addPassQps(value);
}
} else {
final MetricEntity metricEntity = new MetricEntity();
metricEntity.setPassQps(0L);
metricEntity.setBlockQps(0L);
if (BLOCK_QPS.equals(fieldName)) {
metricEntity.setBlockQps(value);
} else {
metricEntity.setPassQps(value);
}
resourceCount.put(resource, metricEntity);
}
}
}
}
// 排序返回
return resourceCount.entrySet()
.stream()
.sorted((o1, o2) -> {
MetricEntity e1 = o1.getValue();
MetricEntity e2 = o2.getValue();
int t = e2.getBlockQps().compareTo(e1.getBlockQps());
if (t != 0) {
return t;
}
return e2.getPassQps().compareTo(e1.getPassQps());
})
.map(Map.Entry::getKey)
.collect(Collectors.toList());
}
private SentinelMetric transferToSentinelMetric(MetricEntity metric) {
SentinelMetric sentinelMetric = new SentinelMetric();
BeanUtils.copyProperties(metric, sentinelMetric);
sentinelMetric.setTime(metric.getTimestamp().toInstant());
return sentinelMetric;
}
private MetricEntity transferToMetricEntity(SentinelMetric sentinelMetric) {
MetricEntity metric = new MetricEntity();
BeanUtils.copyProperties(sentinelMetric, metric);
metric.setTimestamp(Date.from(sentinelMetric.getTime()));
return metric;
}
private void setFieldValue(@Nonnull final Object object,
@Nullable final Field field,
@Nullable final Object value) {
if (field == null || value == null) {
return;
}
String msg =
"Class '%s' field '%s' was defined with a different field type and caused a ClassCastException. "
+ "The correct type is '%s' (current field value: '%s').";
try {
if (!field.isAccessible()) {
field.setAccessible(true);
}
Class<?> fieldType = field.getType();
//the same type
if (fieldType.equals(value.getClass())) {
field.set(object, value);
return;
}
//convert primitives
if (double.class.isAssignableFrom(fieldType)) {
field.setDouble(object, toDoubleValue(value));
return;
}
if (long.class.isAssignableFrom(fieldType)) {
field.setLong(object, toLongValue(value));
return;
}
if (int.class.isAssignableFrom(fieldType)) {
field.setInt(object, toIntValue(value));
return;
}
if (boolean.class.isAssignableFrom(fieldType)) {
field.setBoolean(object, Boolean.valueOf(String.valueOf(value)));
return;
}
if (BigDecimal.class.isAssignableFrom(fieldType)) {
field.set(object, toBigDecimalValue(value));
return;
}
//enum
if (fieldType.isEnum()) {
//noinspection unchecked, rawtypes
field.set(object, Enum.valueOf((Class<Enum>) fieldType, String.valueOf(value)));
return;
}
field.set(object, value);
} catch (ClassCastException | IllegalAccessException e) {
throw new InfluxException(String.format(msg, object.getClass().getName(), field.getName(),
value.getClass().getName(), value));
}
}
private double toDoubleValue(final Object value) {
if (double.class.isAssignableFrom(value.getClass()) || Double.class.isAssignableFrom(value.getClass())) {
return (double) value;
}
return (Double) value;
}
private long toLongValue(final Object value) {
if (long.class.isAssignableFrom(value.getClass()) || Long.class.isAssignableFrom(value.getClass())) {
return (long) value;
}
return ((Double) value).longValue();
}
private int toIntValue(final Object value) {
if (int.class.isAssignableFrom(value.getClass()) || Integer.class.isAssignableFrom(value.getClass())) {
return (int) value;
}
return ((Double) value).intValue();
}
private BigDecimal toBigDecimalValue(final Object value) {
if (String.class.isAssignableFrom(value.getClass())) {
return new BigDecimal((String) value);
}
if (double.class.isAssignableFrom(value.getClass()) || Double.class.isAssignableFrom(value.getClass())) {
return BigDecimal.valueOf((double) value);
}
if (int.class.isAssignableFrom(value.getClass()) || Integer.class.isAssignableFrom(value.getClass())) {
return BigDecimal.valueOf((int) value);
}
if (long.class.isAssignableFrom(value.getClass()) || Long.class.isAssignableFrom(value.getClass())) {
return BigDecimal.valueOf((long) value);
}
String message = String.format("Cannot cast %s [%s] to %s.",
value.getClass().getName(), value, BigDecimal.class);
throw new ClassCastException(message);
}
}
5、修改引用了MetricsRepository的地方
涉及到引用MetricsRepository的类有两个,分别是MetricFetcher和MetricController,将其采用@Qualifier("influxDBMetricsRepository")的方式,指定注入的Bean
6、存在的问题
- 存储在InfluxDB的数据时区存在问题,和实际的差了8个小时,这里可以在存数据时处理,也可以将数据查询出来后处理,代码里面没有处理,需要自行修改。
- SentinelMetric中有一个int参数count,从InfluxDB取出的值始终会被识别成Long型,这里是在代码里面做了特殊处理,在InfluxDBMetricsRepository的toIntValue方法中,增加了如下的判断。
if (Long.class.isAssignableFrom(value.getClass())) {
return ((Long) value).intValue();
}
标签:return,String,InfluxDB,value,化到,private,dashboard,import,public
From: https://www.cnblogs.com/zhaodalei/p/17022551.html