首页 > 其他分享 >es通过时间聚合查询一周中每天的数据平均值

es通过时间聚合查询一周中每天的数据平均值

时间:2022-10-21 16:55:28浏览次数:55  
标签:10 00 聚合 平均值 value aggValues 2022 createTime es

场景回顾:设备上传的数据保存在es中,大屏模块要统计本周的数据折线图(一个设备三分总上传一次,所以拟定每天聚合求个平均值)

kibana查询请求

GET xxxx_2022-10/_search
{
  "size": 0,
  "query": {
    "bool":{
      "must": [
        {"term": {
          "deviceId": {
            "value": "xxx"
          }
        }},
        {
          "term":{
          "property":{
            "value":"temperature"
          }
        }
        }
      ],
      "filter": {
        "range": {
          "createTime": {
            "gte": 1664726400000,	// 2022-10-03 00:00:00 ————本周开始时间
            "lt": 1665331200000		// 2022-10-10 00:00:00 ————下周开始时间	
          }
        }
      }
    }
  },  
    "aggs": {
    "create_date":{
      "date_histogram": {
        "field": "createTime",
        "calendar_interval": "day",
        "time_zone": "Asia/Shanghai", 
        "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis", 
         // 强制返回指定范围内的每一个桶,包括min和max,这时就会将max作为一个key
         // 所以我这里只设置的最小时间,没有设计结束时间
        "extended_bounds": {
          "min": "1664726400000"
        }
      },
        "aggs": {
          "dayNumValAvg": {
            "avg": {
              "field": "numberValue"
            }
          }
        }
    }
  }
}

存在的问题

返回的桶不在extended_bounds设置的范围内:

原本只需要 [10.03日0点,10.10日零点)中每一天的聚合,使用extended_bounds设定min为1664726400000(2022-10-03 00:00:00),但是返回的第一个却为"2022-10-02 00:00:00"

投机取巧的解法,offset设置为“+24h”,后来把数据挨个加了遍,这样求出来的数据没有问题……

使用脚本实现该需求

没有指定时区,可能存在偏差

点击展开代码
POST xxx_2022-10/_search?size=0
{
  "size": 0,
  "query": {
    "bool":{
      "must": [
        {"term": {
          "deviceId": {
            "value": "xxx"
          }
        }},
        {
          "term":{
          "property":{
            "value":"temperature"
          }
        }
        }
      ],
      "filter": {
        "range": {
          "createTime": {
            "gte": 1664726400000,
            "lt": 1665331200000		
          }
        }
      }
    }
  },  
  "aggs": {
    "dayOfWeek": {
        "terms": {
            "script": {
                "lang": "painless",
                "source": "doc['createTime'].value.dayOfWeekEnum.value"
            }
        },
        "aggs": {
          "dayNumValAvg": {
            "avg": {
              "field": "numberValue"
            }
          }
        }
    }
  }
}

对应Java实现

Long[] timeScope = {1664726400000L, 1665331200000L};
String deviceId = "xxxx";
String property = "temperature";
String index = "xxxx";

DateHistogramAggregationBuilder aggCreateTime = AggregationBuilders.dateHistogram("createTimesGroup")
    .field("createTime").timeZone(ZoneId.of("Asia/Shanghai")).calendarInterval(DateHistogramInterval.DAY)
    .offset("+24h").extendedBounds(new ExtendedBounds(timeScope[0], null));
aggCreateTime.subAggregation(AggregationBuilders.avg("aggNumVal").field("numberValue"));
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.must(QueryBuilders.matchQuery("deviceId", deviceId))
    .must(QueryBuilders.matchQuery("property", property));
boolQueryBuilder.filter(
    QueryBuilders.rangeQuery("createTime").gte(timeScope[0]).lt(timeScope[1])
);
SearchSourceBuilder thisWeekSearch = new SearchSourceBuilder().size(0).query(boolQueryBuilder).aggregation(aggCreateTime);
SearchRequest searchRequest = new SearchRequest(index);
searchRequest.source(thisWeekSearch);
SearchResponse response = null;
try {
    response = highLevelClient.search(searchRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
    e.printStackTrace();
}

// 这里是根据响应一步步拆出来的 如果有更好的方法还望不吝指教
int capacity = 7;
Double[] aggValues = new Double[capacity];
Aggregations aggregations = response.getAggregations();
ParsedDateHistogram dateHistogram = (ParsedDateHistogram) aggregations.asList().get(0);
List<? extends Histogram.Bucket> buckets = dateHistogram.getBuckets();
for (int i = 0; i < capacity; i++) {
    if (i < buckets.size()) {
        Histogram.Bucket bucket = buckets.get(i);
        ParsedSingleValueNumericMetricsAggregation aggValue = bucket.getAggregations().get("aggNumVal");
        aggValues[i] = aggValue.value();
        if(Double.isNaN(aggValues[i]) || Double.isInfinite(aggValues[i])){
            aggValues[i] = 0.0D;
        }
    } else {
        aggValues[i] = 0.0D;
    }
}
System.out.println(Arrays.toString(aggValues));

标签:10,00,聚合,平均值,value,aggValues,2022,createTime,es
From: https://www.cnblogs.com/daydreamer-fs/p/16810836.html

相关文章

  • WordPress 优化 MySQL 数据库慢查询
    搭建WordPress网站会占用大量的内存,还需要一些好些的服务器配置之外,我们如果想更好的给用户有更好的访问体验,对网站优化也是必不可少的。很多时候用WordPress越久,数据......
  • Educational Codeforces Round 138 (Rated for Div. 2) ABC(二分)
    只能说这场的出题人不适合我食用,ABC都读了假题,离谱啊~A.CowardlyRooks题目大意:给定一个棋盘n*n的大小,左下角的顶点在(1,1);给定了棋盘格上的m个棋子坐标。这m个棋子......
  • 抓包:WireShark
    参考https://zhuanlan.zhihu.com/p/431525275(Wireshark的抓包和分析,看这篇就够了!) 下载:https://www.wireshark.org/#download......
  • [转载] MATLAB | RGB image representation
    转载自https://www.geeksforgeeks.org/matlab-rgb-image-representation/MATLAB|RGBimagerepresentationAnRGBimagecanbeviewedasthreedifferentimages(ar......
  • es建mapping转移数据碰到的小乌龙
    项目要上线,本地es存储数据,需要搬到服务器上,于是要创建mapping,reindex迁移数据。一套操作下来后发现后台的查询,排序等功能全部都报400,错误提示中有设置fileds为true的建议......
  • 牛客SQL-employees表(一):195-220
    195.查找employees里最晚入职员工的所有信息SELECT*FROMemployeesWHEREhire_date=(SELECTMAX(hire_date)FROMemployees)/*#如果员工入职的日期都不是同一......
  • ES6迭代器自定义遍历数据
    //声明一个对象constbanji={name:"一班",stus:["xiaobai","xiaohei",......
  • k8s 安装ingress
    https://kubernetes.github.io/ingress-nginx/deploy/    wgethttps://raw.githubusercontent.com/kubernetes/ingress-nginx/controller-v1.3.0/deploy/static......
  • 「题解」Codeforces 1730F Almost Sorted
    给定一个长度为\(n\)的置换\(p\),以及一个正整数\(k\).对于一个置换\(q\),要求对于所有满足\(1\leqi<j\leqn\)的\(i,j\),有以下不等式成立:\[p_{q_i}\leqp_{q_j}+......
  • CH582 CH573 BLE设备地址 DEVICE ADDRESS
    BLE设备,可以使用两种类型的地址(一个BLE设备可同时具备两种地址):PublicDeviceAddress和RandomDeviceAddress。而RandomDeviceAddress又分为StaticDeviceAddress和Pr......