首页 > 其他分享 >springboot接入influxdb

springboot接入influxdb

时间:2023-06-03 11:32:55浏览次数:32  
标签:String 接入 fields param influxDB influxdb public springboot


1.添加maven依赖

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.influxdb</groupId>
            <artifactId>influxdb-java</artifactId>
            <version>2.14</version>
        </dependency>

2.spring boot 的application.yaml配置文件中添加influxdb的配置

influxdb:
  url: http://127.0.0.1:8086
  database: influx_db
  username: influx_db_user
  password: influx_db_pwd

  8086 为influxdb默认的端口

3.influxdb配置应用与service方法编写

import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

@Service
public class InfluxDBService {


    @Value("${influxdb.username}")
    public String influxdbUserName;

    @Value("${influxdb.password}")
    public String influxdbPassword;

    @Value("${influxdb.url}")
    public String influxdbUrl;

    //数据库
    @Value("${influxdb.database}")
    public String influxdbDatabase;

    @PostConstruct
    public void initInfluxDb() {
        this.retentionPolicy = retentionPolicy == null || "".equals(retentionPolicy) ? "autogen" : retentionPolicy;
        this.influxDB = influxDbBuild();
    }
    //保留策略
    private String retentionPolicy;

    private InfluxDB influxDB;

    /**
     * 设置数据保存策略 defalut 策略名 /database 数据库名/ 30d 数据保存时限30天/ 1 副本个数为1/ 结尾DEFAULT
     * 表示 设为默认的策略
     */
    public void createRetentionPolicy() {
        String command = String.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %s REPLICATION %s DEFAULT",
                "defalut", influxdbDatabase, "30d", 1);
        this.query(command);
    }

    /**
     * 连接时序数据库;获得InfluxDB
     **/
    private InfluxDB influxDbBuild() {
        if (influxDB == null) {
            influxDB = InfluxDBFactory.connect(influxdbUrl, influxdbUserName, influxdbPassword);
            influxDB.setDatabase(influxdbDatabase);
        }
        return influxDB;
    }

    /**
     * 插入
     * @param measurement 表
     * @param tags        标签
     * @param fields      字段
     */
    public void insert(String measurement, Map<String, String> tags, Map<String, Object> fields) {
        influxDbBuild();
        Point.Builder builder = Point.measurement(measurement);
        builder.time(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        builder.tag(tags);
        builder.fields(fields);
        influxDB.write(influxdbDatabase, "", builder.build());
    }

    /**
     * @desc 插入,带时间time
     * @date 2021/3/27
     *@param measurement
     *@param time
     *@param tags
     *@param fields
     * @return void
     */
    public void insert(String measurement, long time, Map<String, String> tags, Map<String, Object> fields) {
        influxDbBuild();
        Point.Builder builder = Point.measurement(measurement);
        builder.time(time, TimeUnit.MILLISECONDS);
        builder.tag(tags);
        builder.fields(fields);
        influxDB.write(influxdbDatabase, "", builder.build());
    }

    /**
     * @desc influxDB开启UDP功能,默认端口:8089,默认数据库:udp,没提供代码传数据库功能接口
     * @date 2021/3/13
     *@param measurement
     *@param time
     *@param tags
     *@param fields
     * @return void
     */
    public void insertUDP(String measurement, long time, Map<String, String> tags, Map<String, Object> fields) {
        influxDbBuild();
        Point.Builder builder = Point.measurement(measurement);
        builder.time(time, TimeUnit.MILLISECONDS);
        builder.tag(tags);
        builder.fields(fields);
        int udpPort = 8089;
        influxDB.write(udpPort,  builder.build());
    }

    /**
     * 查询
     * @param command 查询语句
     * @return
     */
    public QueryResult query(String command) {
        influxDbBuild();
        return influxDB.query(new Query(command, influxdbDatabase));
    }

    /**
     * @desc 查询结果处理
     * @date 2021/5/12
     *@param queryResult
     */
    public List<Map<String, Object>> queryResultProcess(QueryResult queryResult) {
        List<Map<String, Object>> mapList = new ArrayList<>();
        List<QueryResult.Result> resultList =  queryResult.getResults();
        //把查询出的结果集转换成对应的实体对象,聚合成list
        for(QueryResult.Result query : resultList){
            List<QueryResult.Series> seriesList = query.getSeries();
            if(seriesList != null && seriesList.size() != 0) {
                for(QueryResult.Series series : seriesList){
                    List<String> columns = series.getColumns();
                    String[] keys =  columns.toArray(new String[columns.size()]);
                    List<List<Object>> values = series.getValues();
                    if(values != null && values.size() != 0) {
                        for(List<Object> value : values){
                            Map<String, Object> map = new HashMap(keys.length);
                            for (int i = 0; i < keys.length; i++) {
                                map.put(keys[i], value.get(i));
                            }
                            mapList.add(map);
                        }
                    }
                }
            }
        }
        return mapList;
    }

    /**
     * @desc InfluxDB 查询 count总条数
     * @date 2021/4/8
     */
    public long countResultProcess(QueryResult queryResult) {
        long count = 0;
        List<Map<String, Object>> list = queryResultProcess(queryResult);
        if(list != null && list.size() != 0) {
            Map<String, Object> map = list.get(0);
            double num = (Double)map.get("count");
            count = new Double(num).longValue();
        }
        return count;
    }

    /**
     * 查询
     * @param dbName 创建数据库
     * @return
     */
    public void createDB(String dbName) {
        influxDbBuild();
        influxDB.createDatabase(dbName);
    }

    /**
     * 批量写入测点
     *
     * @param batchPoints
     */
    public void batchInsert(BatchPoints batchPoints) {
        influxDbBuild();
        influxDB.write(batchPoints);
    }

    /**
     * 批量写入数据
     *
     * @param database  数据库
     * @param retentionPolicy 保存策略
     * @param consistency   一致性
     * @param records 要保存的数据(调用BatchPoints.lineProtocol()可得到一条record)
     */
    public void batchInsert(final String database, final String retentionPolicy,
                            final InfluxDB.ConsistencyLevel consistency, final List<String> records) {
        influxDbBuild();
        influxDB.write(database, retentionPolicy, consistency, records);
    }

    /**
     * @desc 批量写入数据
     * @date 2021/3/19
     *@param consistency
     *@param records
     */
    public void batchInsert(final InfluxDB.ConsistencyLevel consistency, final List<String> records) {
        influxDbBuild();
        influxDB.write(influxdbDatabase, "", consistency, records);
    }

}

4.进行单元测试验证

@SpringBootTest
public class InfluxDbTest {

    @Autowired
    private InfluxDBService influxDBService;

    @Test
    void contextLoads() {
    }

    @Test
    public void testSave(){
        String measurement = "host_cpu_usage_total";

        Map<String,String> tags = new HashMap<>();
        tags.put("host_name","host2");
        tags.put("cpu_core","core0");

        Map<String, Object> fields = new HashMap<>();
        fields.put("cpu_usage",0.22);
        fields.put("cpu_idle",0.56);
        influxDBService.insert(measurement, tags, fields);
    }
}

  influxdb 在执行新增测试用例的时候,如果influxdb的数据库中对应的表不存在,会自动创建数据库表

5.查询示例

springboot接入influxdb_influxdb

 

 

 

 

 

  

标签:String,接入,fields,param,influxDB,influxdb,public,springboot
From: https://blog.51cto.com/u_15535797/6407637

相关文章

  • Springboot项目启动脚本
    #!/bin/bashSpringBoot=$2if["$1"=""];thenecho-e"\033[0;31m未输入操作名\033[0m\033[0;34m{start|stop|restart|status}\033[0m"exit1fiif["$SpringBoot"=""];thenecho-e&qu......
  • 基于JAVA的springboot篮球论坛系统,附源码+数据库+论文+PPT
    1、项目介绍考虑到实际生活中在篮球论坛方面的需要以及对该系统认真的分析,将系统权限按管理员和用户这两类涉及用户划分。(a)管理员;管理员使用本系统涉到的功能主要有:首页、个人中心、用户管理、篮球论坛、系统管理等功能。管理员用例图如图3-1所示。(b)用户;用户使用本系统......
  • 关于开发- springBoot 的中间件
    数据库中间件:主要用于存储和管理应用程序的数据。消息队列中间件:主要用于异步处理任务、削峰填谷、分布式解耦等场景。缓存中间件:主要用于提供快速的数据访问和响应能力,降低系统负载。搜索引擎中间件:主要用于实现全文搜索、分析数据、大规模数据聚合等场景。消息......
  • springboot - 项目启动初始化数据
    1、redis配置依赖<!--redis--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency>redisconfi......
  • SpringBoot大文件分片上传/多线程上传
    ​ 这里只写后端的代码,基本的思想就是,前端将文件分片,然后每次访问上传接口的时候,向后端传入参数:当前为第几块文件,和分片总数下面直接贴代码吧,一些难懂的我大部分都加上注释了:上传文件实体类:看得出来,实体类中已经有很多我们需要的功能了,还有实用的属性。如MD5秒传的信息。pub......
  • EasyCVR使用SDK接入,设备全部离线,但是SDK DEMO接入正常是什么原因?
    EasyCVR视频融合平台基于云边端一体化架构,具有强大的数据接入、处理及分发能力,平台支持多协议、多类型的设备接入,包括主流标准协议国标GB28181、RTSP/Onvif、RTMP等,以及厂家私有协议与SDK接入,包括海康Ehome、海大宇等设备的SDK等。有用户反馈,EasyCVR平台中,使用SDK接入的设备显示......
  • AI视频融合平台EasyCVR接入国标GB28181设备,视频无法播放是什么原因?
    EasyCVR可拓展性强、视频能力灵活、部署轻快,可支持的主流标准协议有国标GB28181、RTSP/Onvif、RTMP等,以及支持厂家私有协议与SDK接入,包括海康Ehome、海大宇等设备的SDK等,能对外分发RTSP、RTMP、FLV、HLS、WebRTC等格式的视频流。有用户反馈,将设备通过国标GB28181协议接入EasyCVR......
  • EasyCVR使用SDK接入,设备全部离线,但是SDK DEMO接入正常是什么原因?
    EasyCVR视频融合平台基于云边端一体化架构,具有强大的数据接入、处理及分发能力,平台支持多协议、多类型的设备接入,包括主流标准协议国标GB28181、RTSP/Onvif、RTMP等,以及厂家私有协议与SDK接入,包括海康Ehome、海大宇等设备的SDK等。有用户反馈,EasyCVR平台中,使用SDK接入的设备显示全......
  • springboot项目rabbitmq消费者消费json格式的String,出现无限循环抛出No method found
    转:springboot项目rabbitmq消费者消费json格式的String,出现无限循环抛出Nomethodfoundforclass[B     ......
  • linux sh脚本启动springboot
    1、restart.sh#!/bin/bashAPP_NAME=xxxxx.jar#定义JAVA程序名LOG_FILE="$APP_NAME.log"#定义日志文件名称#查询进程并终止PID=`ps-ef|grep$APP_NAME|grep-vgrep|awk'{print$2}'`kill-9$PIDecho"$APP_NAME的进程$PID已经终止"#启动jar包,指......