首页 > 其他分享 >Sentinel-dashboard数据持久化到InfluxDB(四)

Sentinel-dashboard数据持久化到InfluxDB(四)

时间:2023-01-03 18:13:58浏览次数:79  
标签:return String InfluxDB value 化到 private dashboard import public

之前的有做过sentinel整合的工作,已经完成sentinel客户端集成到普通微服务、集成到gateway,以及sentinel控制台的规则数据持久化到nacos,当时的工作基本做的差不多了,但是还差一个监控数据的持久化没有做,这次是监控数据持久化到influxDB这一块补齐,涉及到源码的改造,经过测试已经基本实现了功能。

1、前置准备

  1. 需要安装InfluxDB,安装不多说,详情见Linux安装InfluxDB ,Windows下安装自行百度。
  2. 下载sentinel-dashboard的源码,前面的文章已经说了,这里就不再赘述,详细见 Sentinel-dashboard源码改造

2、依赖引入

这里自行选择合适的版本

<dependency>
    <groupId>com.influxdb</groupId>
    <artifactId>influxdb-client-java</artifactId>
    <version>6.3.0</version>
</dependency>

3、新建一个文件夹存放Influx相关代码

3.1、参考MetricEntity创建SentinelMetric

其实就是指定time为时间戳,然后app和resource为Tag,其他的字段都是Field,Field需要转换成数值类型存储,例如long、int、double类型,@Measurement用来指定表的名称。

注:网上其他的文章,将所有的字段都标记为Tag其实是不对的,因为tag就是表示数据的归属,这里就是app和resource,这块不太理解的可以参考 时序数据库的数据结构

import com.influxdb.annotations.Column;
import com.influxdb.annotations.Measurement;

import java.time.Instant;

/**
 * 参考com.alibaba.csp.sentinel.dashboard.datasource.entity.MetricEntity
 */
@Measurement(name = "sentinel_metric")
public class SentinelMetric {

    /**
     * 时间戳
     */
    @Column(name = "time", timestamp = true)
    private Instant time;

    @Column(name = "gmtCreate")
    private long gmtCreate;

    @Column(name = "gmtModified")
    private long gmtModified;

    @Column(name = "app", tag = true)
    private String app;

    /**
     * 监控信息的时间戳
     */
    @Column(name = "resource", tag = true)
    private String resource;

    @Column(name = "passQps")
    private long passQps;

    @Column(name = "successQps")
    private long successQps;

    @Column(name = "blockQps")
    private long blockQps;

    @Column(name = "exceptionQps")
    private long exceptionQps;

    /**
     * summary rt of all success exit qps.
     */
    @Column(name = "rt")
    private double rt;

    /**
     * 本次聚合的总条数
     */
    @Column(name = "count")
    private int count;

    @Column(name = "resourceCode")
    private int resourceCode;

    public Instant getTime() {
        return time;
    }

    public void setTime(Instant time) {
        this.time = time;
    }

    public long getGmtCreate() {
        return gmtCreate;
    }

    public void setGmtCreate(long gmtCreate) {
        this.gmtCreate = gmtCreate;
    }

    public long getGmtModified() {
        return gmtModified;
    }

    public void setGmtModified(long gmtModified) {
        this.gmtModified = gmtModified;
    }

    public String getApp() {
        return app;
    }

    public void setApp(String app) {
        this.app = app;
    }

    public String getResource() {
        return resource;
    }

    public void setResource(String resource) {
        this.resource = resource;
    }

    public long getPassQps() {
        return passQps;
    }

    public void setPassQps(long passQps) {
        this.passQps = passQps;
    }

    public long getSuccessQps() {
        return successQps;
    }

    public void setSuccessQps(long successQps) {
        this.successQps = successQps;
    }

    public long getBlockQps() {
        return blockQps;
    }

    public void setBlockQps(long blockQps) {
        this.blockQps = blockQps;
    }

    public long getExceptionQps() {
        return exceptionQps;
    }

    public void setExceptionQps(long exceptionQps) {
        this.exceptionQps = exceptionQps;
    }

    public double getRt() {
        return rt;
    }

    public void setRt(double rt) {
        this.rt = rt;
    }

    public int getCount() {
        return count;
    }

    public void setCount(int count) {
        this.count = count;
    }

    public int getResourceCode() {
        return resourceCode;
    }

    public void setResourceCode(int resourceCode) {
        this.resourceCode = resourceCode;
    }
}

3.2、Influx相关配置(Bean的注入)

为了防止每次调用Influx的API接口都去new 一个客户端,我们可以将其注入到Spring容器,初始化这些客户端时,需要用到一些参数,包含数据库的url、访问API接口时用到的Token(Token的申请见InfluxDB基本使用的 1.4 API Tokens章节),还需要用到账号注册时填写的组织,这些参数都可以放到配置文件中,代码如下:

import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import com.influxdb.client.QueryApi;
import com.influxdb.client.WriteApiBlocking;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class InfluxDBConfig {

    @Value("${influx.token}")
    private String token;

    @Value("${influx.url}")
    private String url;

    @Value("${influx.org:InfluxData}")
    private String org;

    @Bean
    public InfluxDBClient influxDBClient() {
        return InfluxDBClientFactory.create(url, token.toCharArray(), org);
    }

    @Bean
    public QueryApi queryApi() {
        return influxDBClient().getQueryApi();
    }

    @Bean
    public WriteApiBlocking writeApiBlocking() {
        return influxDBClient().getWriteApiBlocking();
    }
}

3.3、配置文件

这里是在application.properties中增加如下参数,根据实际情况填写。

influx.url=http://localhost:8086
influx.token=sMiW3c4w0wBZO8WwZK_MrseZUYht9Fmm7HOUQDZF_kj6-Icwg0_6tEt_iyBSdTFAvRywHlOsuDMScypi93ejPQ==
influx.bucket=sentinel_db
influx.org=InfluxData

4、实现MetricsRepository<MetricEntity>接口

这里可以参考sentinel原有的实现,其原有的实现是将数据的存取放到内存,具体实现见InMemoryMetricsRepository,这里我们命名为InfluxDBMetricsRepository,存放位置如下图:

主要就是实现其保存监控数据到InfluxDB和从InfluxDB查询数据的逻辑,代码如下:

import com.alibaba.csp.sentinel.dashboard.datasource.entity.MetricEntity;
import com.alibaba.csp.sentinel.dashboard.influxdb.SentinelMetric;
import com.alibaba.csp.sentinel.util.StringUtil;
import com.influxdb.annotations.Column;
import com.influxdb.client.QueryApi;
import com.influxdb.client.WriteApiBlocking;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.exceptions.InfluxException;
import com.influxdb.query.FluxRecord;
import com.influxdb.query.FluxTable;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.lang.reflect.Field;
import java.math.BigDecimal;
import java.time.Instant;
import java.util.*;
import java.util.stream.Collectors;

@Component
public class InfluxDBMetricsRepository implements MetricsRepository<MetricEntity> {

    @Autowired
    private QueryApi queryApi;

    @Autowired
    private WriteApiBlocking writeApiBlocking;

    /**
     * InfluxDB界面上新建一个bucket
     */
    @Value("${influx.bucket:sentinel_db}")
    private String bucket;

    /**
     * 首次登录Influx界面时填写的组织
     */
    @Value("${influx.org:InfluxData}")
    private String org;

    /**
     * SentinelMetric上的@Measurement(name = "sentinel_metric")注解的name
     */
    @Value("${influx.measurement:sentinel_metric}")
    private String measurement;

    private static final String PASS_QPS = "passQps";

    private static final String BLOCK_QPS = "blockQps";

    private static final String RESOURCE = "resource";

    @Override
    public void save(MetricEntity metric) {
        writeApiBlocking.writeMeasurement(bucket, org, WritePrecision.NS, transferToSentinelMetric(metric));
    }

    @Override
    public void saveAll(Iterable<MetricEntity> metrics) {
        Iterator<MetricEntity> iterator = metrics.iterator();
        List<SentinelMetric> list = new ArrayList<>();
        while (iterator.hasNext()) {
            list.add(transferToSentinelMetric(iterator.next()));
        }
        if (!CollectionUtils.isEmpty(list)) {
            writeApiBlocking.writeMeasurements(bucket, org, WritePrecision.NS, list);
        }
    }

    @Override
    public List<MetricEntity> queryByAppAndResourceBetween(String app, String resource, long startTime, long endTime) {
        List<MetricEntity> results = new ArrayList<>();
        if (StringUtil.isBlank(app)) {
            return results;
        }

        if (StringUtil.isBlank(resource)) {
            return results;
        }

        long nowTime = System.currentTimeMillis();
        String start = (startTime - nowTime) / 60000 + "m";
        String stop = (endTime - nowTime) / 60000 + "m";

        String query = String.format("from(bucket: \"%s\")\n" +
                        "  |> range(start: %s , stop: %s)\n" +
                        "  |> filter(fn: (r) => (r[\"_measurement\"] == \"%s\"" +
                        " and r[\"app\"] ==\"%s\")" + "and r[\"resource\"] == \"%s\")",
                bucket, start, stop, measurement, app, resource);
        final List<FluxTable> tableList = queryApi.query(query, org);

        //获取总的数据条数
        List<FluxRecord> records = tableList.get(0).getRecords();
        int size = records.size();

        List<SentinelMetric> sentinelMetricList = new ArrayList<>();
        Field[] fields = SentinelMetric.class.getDeclaredFields();
        Map<String, Field> fieldMap = new HashMap<>(fields.length);
        for (Field field : fields) {
            Column anno = field.getAnnotation(Column.class);
            if (anno != null && !anno.name().isEmpty()) {
                fieldMap.put(anno.name(), field);
            }
        }

        for (int i = 0; i < size; i++) {
            SentinelMetric sentinelMetric = new SentinelMetric();
            sentinelMetric.setApp(app);
            sentinelMetric.setResource(resource);
            for (FluxTable table : tableList) {
                FluxRecord record = table.getRecords().get(i);
                sentinelMetric.setTime((Instant) record.getValueByKey("_time"));
                String fieldName = record.getField();
                if (fieldMap.containsKey(fieldName)) {
                    setFieldValue(sentinelMetric, fieldMap.get(fieldName), record.getValue());
                }
            }
            sentinelMetricList.add(sentinelMetric);
        }

        for (SentinelMetric metric : sentinelMetricList) {
            results.add(transferToMetricEntity(metric));
        }

        return results;
    }

    @Override
    public List<String> listResourcesOfApp(String app) {
        List<String> results = new ArrayList<>();
        if (StringUtil.isBlank(app)) {
            return results;
        }

        //只查询passQps和blockQps数据,因为下面的排序逻辑只用到了这两个参数
        String query = String.format("from(bucket: \"%s\")\n" +
                        "  |> range(start: %s)\n" +
                        "  |> filter(fn: (r) => r[\"_measurement\"] == \"%s\"" + " and r[\"app\"] ==\"%s\")\n" +
                        "  |> filter(fn: (r) => r[\"_field\"] == \"passQps\" or r[\"_field\"] == \"blockQps\")",
                bucket, "-1m", measurement, app);
        List<FluxTable> tableList = queryApi.query(query, org);

        if (CollectionUtils.isEmpty(tableList)) {
            return results;
        }

        //返回resource名称列表,按block_qps降序排列,block_qps相同时,按pass_qps降序排列
        Map<String, MetricEntity> resourceCount = new HashMap<>(32);
        for (FluxTable table : tableList) {
            final List<FluxRecord> records = table.getRecords();
            for (FluxRecord record : records) {
                String fieldName = record.getField();
                if (BLOCK_QPS.equals(fieldName) || PASS_QPS.equals(record.getField())) {
                    String resource = (String) record.getValueByKey(RESOURCE);
                    final Object o = record.getValue();
                    if (o == null) {
                        continue;
                    }
                    Long value = (Long) o;
                    if (resourceCount.containsKey(resource)) {
                        final MetricEntity metricEntity = resourceCount.get(resource);
                        if (BLOCK_QPS.equals(fieldName)) {
                            metricEntity.addBlockQps(value);
                        } else {
                            metricEntity.addPassQps(value);
                        }
                    } else {
                        final MetricEntity metricEntity = new MetricEntity();
                        metricEntity.setPassQps(0L);
                        metricEntity.setBlockQps(0L);
                        if (BLOCK_QPS.equals(fieldName)) {
                            metricEntity.setBlockQps(value);
                        } else {
                            metricEntity.setPassQps(value);
                        }
                        resourceCount.put(resource, metricEntity);
                    }
                }
            }
        }

        // 排序返回
        return resourceCount.entrySet()
                .stream()
                .sorted((o1, o2) -> {
                    MetricEntity e1 = o1.getValue();
                    MetricEntity e2 = o2.getValue();
                    int t = e2.getBlockQps().compareTo(e1.getBlockQps());
                    if (t != 0) {
                        return t;
                    }
                    return e2.getPassQps().compareTo(e1.getPassQps());
                })
                .map(Map.Entry::getKey)
                .collect(Collectors.toList());
    }

    private SentinelMetric transferToSentinelMetric(MetricEntity metric) {
        SentinelMetric sentinelMetric = new SentinelMetric();
        BeanUtils.copyProperties(metric, sentinelMetric);
        sentinelMetric.setTime(metric.getTimestamp().toInstant());
        return sentinelMetric;
    }

    private MetricEntity transferToMetricEntity(SentinelMetric sentinelMetric) {
        MetricEntity metric = new MetricEntity();
        BeanUtils.copyProperties(sentinelMetric, metric);
        metric.setTimestamp(Date.from(sentinelMetric.getTime()));
        return metric;
    }

    private void setFieldValue(@Nonnull final Object object,
                               @Nullable final Field field,
                               @Nullable final Object value) {

        if (field == null || value == null) {
            return;
        }
        String msg =
                "Class '%s' field '%s' was defined with a different field type and caused a ClassCastException. "
                        + "The correct type is '%s' (current field value: '%s').";

        try {
            if (!field.isAccessible()) {
                field.setAccessible(true);
            }
            Class<?> fieldType = field.getType();

            //the same type
            if (fieldType.equals(value.getClass())) {
                field.set(object, value);
                return;
            }

            //convert primitives
            if (double.class.isAssignableFrom(fieldType)) {
                field.setDouble(object, toDoubleValue(value));
                return;
            }
            if (long.class.isAssignableFrom(fieldType)) {
                field.setLong(object, toLongValue(value));
                return;
            }
            if (int.class.isAssignableFrom(fieldType)) {
                field.setInt(object, toIntValue(value));
                return;
            }
            if (boolean.class.isAssignableFrom(fieldType)) {
                field.setBoolean(object, Boolean.valueOf(String.valueOf(value)));
                return;
            }
            if (BigDecimal.class.isAssignableFrom(fieldType)) {
                field.set(object, toBigDecimalValue(value));
                return;
            }

            //enum
            if (fieldType.isEnum()) {
                //noinspection unchecked, rawtypes
                field.set(object, Enum.valueOf((Class<Enum>) fieldType, String.valueOf(value)));
                return;
            }

            field.set(object, value);

        } catch (ClassCastException | IllegalAccessException e) {

            throw new InfluxException(String.format(msg, object.getClass().getName(), field.getName(),
                    value.getClass().getName(), value));
        }
    }

    private double toDoubleValue(final Object value) {

        if (double.class.isAssignableFrom(value.getClass()) || Double.class.isAssignableFrom(value.getClass())) {
            return (double) value;
        }

        return (Double) value;
    }

    private long toLongValue(final Object value) {

        if (long.class.isAssignableFrom(value.getClass()) || Long.class.isAssignableFrom(value.getClass())) {
            return (long) value;
        }

        return ((Double) value).longValue();
    }

    private int toIntValue(final Object value) {

        if (int.class.isAssignableFrom(value.getClass()) || Integer.class.isAssignableFrom(value.getClass())) {
            return (int) value;
        }

        return ((Double) value).intValue();
    }

    private BigDecimal toBigDecimalValue(final Object value) {
        if (String.class.isAssignableFrom(value.getClass())) {
            return new BigDecimal((String) value);
        }

        if (double.class.isAssignableFrom(value.getClass()) || Double.class.isAssignableFrom(value.getClass())) {
            return BigDecimal.valueOf((double) value);
        }

        if (int.class.isAssignableFrom(value.getClass()) || Integer.class.isAssignableFrom(value.getClass())) {
            return BigDecimal.valueOf((int) value);
        }

        if (long.class.isAssignableFrom(value.getClass()) || Long.class.isAssignableFrom(value.getClass())) {
            return BigDecimal.valueOf((long) value);
        }

        String message = String.format("Cannot cast %s [%s] to %s.",
                value.getClass().getName(), value, BigDecimal.class);

        throw new ClassCastException(message);
    }
}

 5、修改引用了MetricsRepository的地方

涉及到引用MetricsRepository的类有两个,分别是MetricFetcher和MetricController,将其采用@Qualifier("influxDBMetricsRepository")的方式,指定注入的Bean

6、存在的问题

  1. 存储在InfluxDB的数据时区存在问题,和实际的差了8个小时,这里可以在存数据时处理,也可以将数据查询出来后处理,代码里面没有处理,需要自行修改。
  2. SentinelMetric中有一个int参数count,从InfluxDB取出的值始终会被识别成Long型,这里是在代码里面做了特殊处理,在InfluxDBMetricsRepository的toIntValue方法中,增加了如下的判断。
if (Long.class.isAssignableFrom(value.getClass())) {
	return ((Long) value).intValue();
}

标签:return,String,InfluxDB,value,化到,private,dashboard,import,public
From: https://www.cnblogs.com/zhaodalei/p/17022551.html

相关文章

  • InfluxDB介绍
    1、时序数据库介绍时序数据库全称为时间序列数据库。时间序列数据库指主要用于处理带时间标签(按照时间的顺序变化,即时间序列化)的数据,带时间标签的数据也称为时间序列数据......
  • 部署 K8S可视化工具dashboard
    vimdashboard.yamlapiVersion:v1kind:Namespacemetadata:name:kubernetes-dashboard---apiVersion:v1kind:ServiceAccountmetadata:labels:k8s......
  • 初探 InfluxDB 篇(六)InfluxDB 修改数据存放路径
    初探InfluxDB篇(六)InfluxDB修改数据存放路径 1、创建数据存放目录mkdir-p/home/data/influxdb说明:目录可以根据实际情况进行修改 2、设置目录访问权限sud......
  • InfluxDB基本使用
    最近有项目使用到influxDB,因此记录一下InfluxDB的基本使用,版本:InfluxDB2.61、LoadData1.1Source1.1.1数据上传这里主要包含了数据的导入,目前支持如下几种,如图:1.1......
  • Linux安装InfluxDB
    1、InfluxDB官方资料InfluxDB的官网地址:https://www.influxdata.com/InfluxDB的官方文档地址:https://docs.influxdata.com/influxdb/v2.6/install/?t=Linux2、选择安装......
  • golang写入influxdb2,共3种方式,小心有坑!
    ==事件背景==项目中需要接收设备上报的时序数据,写入到Influxdb中,在压力测试过程中发现服务器CPU占用的非常高,使用pprof解析了CPU占用的详细信息之后,发现Influxdb的AddFie......
  • centos 安装 influxdb2
       http://www.manongjc.com/detail/27-asqkkdxungikkls.html https://github.com/influxdata/influxdb/releases下载influxdb2-2.6.0.x86_64.rpm安装yumloc......
  • 在k8s集群中Kubernetes仪表板dashboard使用RABC机制限制指定用户针对指定名称空间中的
    公众号关注「WeiyiGeek」本章目录:Dashboard-利用rbac机制限制指定用户针对指定名称空间中的资源进行UI管理(2)Dashboard-利用rbac机制限制指定用户针对指定名称空间中的资......
  • 如何配置Kubernetes仪表板dashboard支持http方式并使用ingress-nginx代理访问实践
    公众号关注「WeiyiGeek」本章目录:配置Kubernetes-dashboard以支持http方式访问1.配置Kubernetes-dashboard以支持http方式访问描述:当前默认安装配置的Kubernetes......
  • 仪表板工具Stimulsoft Dashboards仪表板中的数据筛选元素介绍
    StimulsoftUltimate是用于创建报告和仪表盘的通用工具集,包含九种产品,可以为任何受支持的平台创建报告和仪表板,适用于WinForms,ASP.NET,.NETCore,JavaScript,WPF,PHP,Java等环境......