InfluxDB
-
influxdb 时序性数据库
时序数据是描述一个实体在不同时间所处的不同状态。一般用于指标监控场景。 -
influxdb连接
@Component @ConfigurationProperties(prefix = "influxdb") public class InfluxDBInfoStatic { private static String url; private static String port; private static String database; private static String measurement; private static String user; private static String password; private static String retentionPolicy="hour"; //okhttpclient连接池最大空闲数量 private static Integer idleConnectionCount=100; //okhttpclient连接池最大空闲保持时间 private static Integer keepAliveDuration=10; } @Slf4j public class InfluxDBConnectionUtil { private static final InfluxDB INFLUXDB; private final static ConnectionPool connectionPool=new ConnectionPool(InfluxDBInfoStatic.getIdleConnectionCount(),InfluxDBInfoStatic.getKeepAliveDuration(), TimeUnit.SECONDS); static{ OkHttpClient.Builder okHttpClientBuilder = new OkHttpClient().newBuilder() .connectTimeout(30, TimeUnit.SECONDS) .writeTimeout(30, TimeUnit.SECONDS) .retryOnConnectionFailure(true) .readTimeout(30, TimeUnit.SECONDS); okHttpClientBuilder.connectionPool(connectionPool); INFLUXDB = InfluxDBFactory.connect( "http://" + InfluxDBInfoStatic.getUrl() + ":" + InfluxDBInfoStatic.getPort() , InfluxDBInfoStatic.getUser(), InfluxDBInfoStatic.getPassword(),okHttpClientBuilder); } /** * 获取对象句柄 */ public static InfluxDBConnectionUtil getInstance() { return SingletonInstance.INSTANCE; } /** * 创建自定义保留策略 policyName-策略名;days-保存天数;replication-保存副本数量;isDefault-是否设为默认保留策略 */ public void createRetentionPolicy(String dataBaseName, String policyName, int days, int replication, Boolean isDefault) { String sql = String.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %sd REPLICATION %s ", policyName, dataBaseName, days, replication); if (isDefault) { sql = sql + " DEFAULT"; } query(sql); } /** * 创建默认的保留策略 策略名:hour,保存天数:30天,保存副本数量:1,设为默认保留策略 * */ public void createDefaultRetentionPolicy() { String command = String.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %s REPLICATION %s DEFAULT", "hour", InfluxDBInfoStatic.getDatabase(), "30d", 1); this.query(command); } /** * 查询 */ public QueryResult query(String command) { return INFLUXDB.query(new Query(command, InfluxDBInfoStatic.getDatabase())); } /** * 插入 measurement-表;tags-标签;fields-字段 */ public void insert(String measurement, Map<String, String> tags, Map<String, Object> fields, long time, TimeUnit timeUnit) { Builder builder = Point.measurement(measurement); builder.tag(tags); builder.fields(fields); if (0 != time) { builder.time(time, timeUnit); } INFLUXDB.write(InfluxDBInfoStatic.getDatabase(), InfluxDBInfoStatic.getRetentionPolicy() == null || InfluxDBInfoStatic.getRetentionPolicy().equals("") ? "autogen" : InfluxDBInfoStatic.getRetentionPolicy(), builder.build()); } public static void main(String[] args) { QueryResult results = InfluxDBConnectionUtil.getInstance() .query("select time, application, avg, count, countError, max, min, \"pct90.0\", \"pct95.0\", \"pct99.0\", rb, sb, transaction from jmeter where application = '16'"); QueryResult.Result oneResult = results.getResults().get(0); if (oneResult.getSeries() != null) { List<List<Object>> valueList = oneResult.getSeries().stream().map(QueryResult.Series::getValues).collect(Collectors.toList()).get(0); if (valueList != null && valueList.size() > 0) { System.out.println("valueList ===> " + JSONUtil.toJsonStr(valueList)); for (List<Object> value : valueList) { System.out.println("value ===> " + JSONUtil.toJsonStr(value)); } } } } }