首页 > 编程语言 >java把数据批量插入iotdb

java把数据批量插入iotdb

时间:2023-11-23 14:55:06浏览次数:27  
标签:java 批量 get kks records iotdb org import

package com.xlkh.kafka;

import cn.hutool.core.collection.CollectionUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.pool.SessionPool;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.stream.Collectors;


@Slf4j
@Component("com.xlkh.kafka.DataLoaderKafkaConsumer")
public class DataLoaderKafkaConsumer {


    @Autowired
    private SessionPool sessionPool;

    /**
     * 保存已经创建的时间序列
     */
    private final static Set<String> STATIC_PATHS = Sets.newConcurrentHashSet();

    /**
     * fluent_data,批量消费
     */
    @KafkaListener(topics = "fluent_data", groupId = "fluent_data_demo", containerFactory = "batchFactory")
    @SneakyThrows
    public void listenBatchByFluent(List<ConsumerRecord<String, String>> records) {
        log.error("从kafka消费fluent数据" + records.size() + "条,当前偏移量:" + records.get(0).offset());

        //创建时间序列,如果序列已经存在,不再重新创建
        createTimeseriesIfNotExist(records);

        log.info("开始把数据放到iotdb-----------------------");

        insertIotdbByKafka(records);


    }

    private void insertIotdbByKafka(List<ConsumerRecord<String, String>> records) throws ParseException, IoTDBConnectionException, StatementExecutionException {

        //key为kks的路径,value是时间戳集合
        Map<String, List<Long>> timeStampMap = new HashMap<>();

        //key为kks路径,value是具体的数据
        Map<String, List<Float>> values = new HashMap<>();

        //保存kks编码
        Set<String> kksSet = new HashSet<>();

        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        for (ConsumerRecord<String, String> record : records) {
            String valueData = record.value();
            JSONArray jsonArray = JSON.parseArray(valueData);
            for (int i = 0; i < jsonArray.size(); i++) {
                Map map = JSON.parseObject(String.valueOf(jsonArray.get(i)), Map.class);
                JSONArray jsonArray1 = JSON.parseArray(String.valueOf(map.get("msg")));
                for (int j = 0; j < jsonArray1.size(); j++) {
                    Map map1 = JSON.parseObject(String.valueOf(jsonArray1.get(j)), Map.class);
                    if (map1.containsKey("big_v")){
                        String kks = map1.get("kks").toString();
                        Date date = dateFormat.parse(map1.get("time").toString());
                        Float val = Float.parseFloat(String.valueOf(map1.get("big_v")));
                        if(!kksSet.contains(kks)){
                            timeStampMap.put(kks, new ArrayList<>());
                            values.put(kks, new ArrayList<>());
                            kksSet.add(kks);
                        }
                        timeStampMap.get(kks).add(date.getTime());
                        values.get(kks).add(val);
                    }
                }
            }
        }

        //遍历批量插入每个设备的数据
        for (String kks : kksSet) {
            List<Long> longs = timeStampMap.get(kks);

            //声明Tablet对象设备属性
            List<MeasurementSchema> schemas = new ArrayList<>();
            schemas.add(new MeasurementSchema(kks,TSDataType.FLOAT));
            Tablet tablet = new Tablet("root.param.demo", schemas, longs.size());
            for (int row = 0; row < longs.size(); row++) {
                int rowIndex = tablet.rowSize++;
                //设备时间戳值
                tablet.addTimestamp(rowIndex, longs.get(row));
                //设置对应的值
                tablet.addValue(schemas.get(0).getMeasurementId(), rowIndex, values.get(kks).get(row));
            }
            //批量插入数据
            sessionPool.insertTablet(tablet);
        }
        log.info("数据成功插入到iotdb-----------------------"+"插入的数据量大小为:"+records.size());

    }


    /**
     * 创建时间序列,如果序列已经存在,不再重新创建
     *
     * @param records 批量数据
     */
    private void createTimeseriesIfNotExist(List<ConsumerRecord<String, String>> records) {
        try {
            List<String> data = records.stream().map(ConsumerRecord::value).collect(Collectors.toList());
            HashSet<String> paths = Sets.newHashSetWithExpectedSize(250);
            for (String msg : data) {

                JSONArray jsonArray = JSON.parseArray(msg);

                for (int i = 0; i <jsonArray.size() ; i++) {
                    Map map = JSON.parseObject(String.valueOf(jsonArray.get(i)), Map.class);
                    JSONArray jsonArray1 = JSON.parseArray(String.valueOf(map.get("msg")));
                    for (int j = 0; j < jsonArray1.size(); j++){
                        Map map1 = JSON.parseObject(String.valueOf(jsonArray1.get(j)), Map.class);
                        String kks = map1.get("kks").toString();
                        String path = "root.param.demo." + kks;
                        paths.add(path);
                    }
                }
            }

            List<String> notExistPaths = Lists.newArrayList();
            List<TSDataType> tsDataTypes = Lists.newArrayList();
            List<TSEncoding> tsEncodings = Lists.newArrayList();
            List<CompressionType> compressionTypes = Lists.newArrayList();
//            List<Map<String, String>> propsList = Lists.newArrayList();
            for (String path : paths) {
                if (!STATIC_PATHS.contains(path)) {
                    if (sessionPool.checkTimeseriesExists(path)) {
                        STATIC_PATHS.add(path);
                    } else {
                        notExistPaths.add(path);
                        tsDataTypes.add(TSDataType.FLOAT);
                        tsEncodings.add(TSEncoding.RLE);
                        compressionTypes.add(CompressionType.SNAPPY);
                    }
                }
            }
            if (CollectionUtil.isNotEmpty(notExistPaths)) {
                //批量创建时间序列
                sessionPool.createMultiTimeseries(notExistPaths, tsDataTypes, tsEncodings, compressionTypes, null, null, null, null);
                //缓存时间序列
                STATIC_PATHS.addAll(notExistPaths);
            }
        } catch (IoTDBConnectionException | StatementExecutionException e) {
            log.error(e.getMessage(), e);
        }
    }


}

 切记:对于iotdb来说,节点的第一层一直到倒数第二层,都属于设备id,只有最后一层才是你的属性

标签:java,批量,get,kks,records,iotdb,org,import
From: https://www.cnblogs.com/dabu/p/17851543.html

相关文章

  • Java8函数式接口, 方法引用, 构造器引用, 数组引用
    函数式(Functional)接口只包含一个抽象方法的接口,称为函数式接口。你可以通过Lambda表达式来创建该接口的对象。(若Lambda表达式抛出一个受检异常(即:非运行时异常),那么该异常需要在目标接口的抽象方法上进行声明我们可以在一个接口上使用@Functionallnterface注解,这样做可以检查......
  • 秦疆的Java课程笔记:35 流程控制 顺序结构
    Java的基本结构就是顺序结构,除非特别指明,否则就按照顺序一句一句执行。顺序结构是最简单的算法结构。publicclassShunXuDemo{publicstaticvoidmain(String[]args){System.out.println("hello1");System.out.println("hello2");......
  • 基于java+springboot的酒店预定网站、酒店客房管理系统
    该系统是基于Java的酒店客房预订系统设计与实现。是给师弟开发的毕业设计。现将源代码开放出来,感兴趣的同学可以下载。演示地址前台地址:http://hotel.gitapp.cn后台地址:http://hotel.gitapp.cn/admin后台管理帐号:用户名:admin123密码:admin123功能介绍平台采用B/S结构,后端采用主......
  • Java商城网站系统设计与实现(带源码)
    基于Java的商城网站系统设计与实现功能介绍平台采用B/S结构,后端采用主流的Springboot框架进行开发,前端采用主流的Vue.js进行开发。整个平台包括前台和后台两个部分。前台功能包括:首页、商品详情页、订单、用户中心模块。后台功能包括:总览、订单管理、商品管理、分类管理、标签管理......
  • JAVA循环结构 | JAVA
    Java中有三种主要的循环结构:while 循环do…while 循环for 循环(还有一种增强的for循环) ......
  • 小练习简单的JAVAEE框架
    简单的JAVAEE框架注意:本次框架练习是为了了解tomcat的框架底层代码一、解析web.xml文件packagecn.servlet;abstractclassLoadConfig{//缺省不允许外包访问,抽象不允许实例化不能被继承privatestaticMap<String,String>config;privateLoadConfig(){......
  • java中LocalDate、Calendar、Date类型进行加减
    java三种类型的加减,LocalDate、Calendar、Date@目录1.LocalDate类型加减:2.Calendar加减:3.Date类型加减1.LocalDate类型加减:以下是LocalDate类进行日期加减:importjava.time.LocalDate;importjava.time.temporal.ChronoUnit;publicclassDateDemo{publicstatic......
  • 需要注意的运算符 | JAVA
    算术运算符需要注意的是a++和++a的区别。intd=25;//查看d++与++d的不同System.out.println("d++="+(d++));System.out.println("++d="+(++d));其中d++=25,++d=27;相当于遇到print的时候,d++有一个顺序,从左到右,也就......
  • 前端学习-JavaScript学习-JavaScript高级程序设计-第3章笔记
    第3章基础概念数据类型Number<!DOCTYPEhtml><htmllang="en"><head><metacharset="UTF-8"><metahttp-equiv="X-UA-Compatible"content="IE=edge"><metaname="viewport"......
  • Java时间截和日期格式相互转换的方法。
    1.将时间戳转换为日期格式: 2.将日期格式转换为时间戳: ......