首页 > 其他分享 >Apache IoTDB开发系统整合之MapReduce TsFile

Apache IoTDB开发系统整合之MapReduce TsFile

时间:2023-09-04 20:31:33浏览次数:45  
标签:get Text sum MapReduce value IoTDB Apache new class

TsFile-Hadoop-Connector User Guide

关于 TsFile-Hadoop-Connector

TsFile-Hadoop-Connector 实现了 Hadoop 对 Tsfile 类型的外部数据源的支持。这使用户能够通过Hadoop读取,写入和查询Tsfile。

使用此连接器,咱们就可以:

  • 将单个 TsFile 从本地文件系统或 hdfs 加载到 Hadoop 中
  • 将特定目录中的所有文件(从本地文件系统或HDFS加载到Hadoop中)
  • 将数据从 Hadoop 写入 TsFile

系统要求

Hadoop Version

Java Version

TsFile Version

2.7.3

1.8

0.10.0

数据类型对应

TsFile data type

Hadoop writable

BOOLEAN

BooleanWritable

INT32

IntWritable

INT64

LongWritable

FLOAT

FloatWritable

DOUBLE

DoubleWritable

TEXT

Text

TSFInput格式说明

TSFInputFormat 从 tsfile 中提取数据,并将其格式化为 .MapWritable

假设我们要提取名为该设备的数据,该设备具有三个名为 、、 的传感器。d1s1s2s3

s1的类型为 ,的类型为 ,的类型为 。BOOLEANs2DOUBLEs3TEXT

结构将如下所示:MapWritable

  1. {
  2. "time_stamp": 10000000,
  3. "device_id": d1,
  4. "s1": true,
  5. "s2": 3.14,
  6. "s3": "middle"
  7. }

在 Hadoop 的 Map 作业中,你可以按键获取任何你想要的值,如下所示:

mapwritable.get(new Text("s1"))

注意:中的所有键的类型均为 。MapWritableText

阅读示例:计算总和

首先,我们应该告诉 InputFormat 我们想要从 tsfile 获得什么样的数据。

  1. // configure reading time enable
  2. TSFInputFormat.setReadTime(job, true);
  3. // configure reading deviceId enable
  4. TSFInputFormat.setReadDeviceId(job, true);
  5. // configure reading which deltaObjectIds
  6. String[] deviceIds = {"device_1"};
  7. TSFInputFormat.setReadDeviceIds(job, deltaObjectIds);
  8. // configure reading which measurementIds
  9. String[] measurementIds = {"sensor_1", "sensor_2", "sensor_3"};
  10. TSFInputFormat.setReadMeasurementIds(job, measurementIds);

然后,应指定映射器和化简器的输出键和值

  1. // set inputformat and outputformat
  2. job.setInputFormatClass(TSFInputFormat.class);
  3. // set mapper output key and value
  4. job.setMapOutputKeyClass(Text.class);
  5. job.setMapOutputValueClass(DoubleWritable.class);
  6. // set reducer output key and value
  7. job.setOutputKeyClass(Text.class);
  8. job.setOutputValueClass(DoubleWritable.class);

然后,和类是你如何处理类产生的。mapperreducerMapWritableTSFInputFormat

  1. public static class TSMapper extends Mapper<NullWritable, MapWritable, Text, DoubleWritable> {

  2. @Override
  3. protected void map(NullWritable key, MapWritable value,
  4. Mapper<NullWritable, MapWritable, Text, DoubleWritable>.Context context)
  5. throws IOException, InterruptedException {

  6. Text deltaObjectId = (Text) value.get(new Text("device_id"));
  7. context.write(deltaObjectId, (DoubleWritable) value.get(new Text("sensor_3")));
  8. }
  9. }

  10. public static class TSReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {

  11. @Override
  12. protected void reduce(Text key, Iterable<DoubleWritable> values,
  13. Reducer<Text, DoubleWritable, Text, DoubleWritable>.Context context)
  14. throws IOException, InterruptedException {

  15. double sum = 0;
  16. for (DoubleWritable value : values) {
  17. sum = sum + value.get();
  18. }
  19. context.write(key, new DoubleWritable(sum));
  20. }
  21. }

注意:有关完整的代码,请参阅以下链接:https://github.com/apache/incubator-iotdb/blob/master/example/hadoop/src/main/java/org/apache/iotdb//hadoop/tsfile/TSFMRReadExample.java

写入示例:将平均值写入 Tsfile

除了 ,Hadoop-map-reduce 作业的其余配置代码几乎与上面相同。OutputFormatClass

  1. job.setOutputFormatClass(TSFOutputFormat.class);
  2. // set reducer output key and value
  3. job.setOutputKeyClass(NullWritable.class);
  4. job.setOutputValueClass(HDFSTSRecord.class);

然后,和类是你如何处理类产生的。mapperreducerMapWritableTSFInputFormat

  1. public static class TSMapper extends Mapper<NullWritable, MapWritable, Text, MapWritable> {
  2. @Override
  3. protected void map(NullWritable key, MapWritable value,
  4. Mapper<NullWritable, MapWritable, Text, MapWritable>.Context context)
  5. throws IOException, InterruptedException {

  6. Text deltaObjectId = (Text) value.get(new Text("device_id"));
  7. long timestamp = ((LongWritable)value.get(new Text("timestamp"))).get();
  8. if (timestamp % 100000 == 0) {
  9. context.write(deltaObjectId, new MapWritable(value));
  10. }
  11. }
  12. }

  13. /**
  14. * This reducer calculate the average value.
  15. */
  16. public static class TSReducer extends Reducer<Text, MapWritable, NullWritable, HDFSTSRecord> {

  17. @Override
  18. protected void reduce(Text key, Iterable<MapWritable> values,
  19. Reducer<Text, MapWritable, NullWritable, HDFSTSRecord>.Context context) throws IOException, InterruptedException {
  20. long sensor1_value_sum = 0;
  21. long sensor2_value_sum = 0;
  22. double sensor3_value_sum = 0;
  23. long num = 0;
  24. for (MapWritable value : values) {
  25. num++;
  26. sensor1_value_sum += ((LongWritable)value.get(new Text("sensor_1"))).get();
  27. sensor2_value_sum += ((LongWritable)value.get(new Text("sensor_2"))).get();
  28. sensor3_value_sum += ((DoubleWritable)value.get(new Text("sensor_3"))).get();
  29. }
  30. HDFSTSRecord tsRecord = new HDFSTSRecord(1L, key.toString());
  31. DataPoint dPoint1 = new LongDataPoint("sensor_1", sensor1_value_sum / num);
  32. DataPoint dPoint2 = new LongDataPoint("sensor_2", sensor2_value_sum / num);
  33. DataPoint dPoint3 = new DoubleDataPoint("sensor_3", sensor3_value_sum / num);
  34. tsRecord.addTuple(dPoint1);
  35. tsRecord.addTuple(dPoint2);
  36. tsRecord.addTuple(dPoint3);
  37. context.write(NullWritable.get(), tsRecord);
  38. }
  39. }

注意:有关完整的代码,请参阅以下链接:https://github.com/apache/incubator-iotdb/blob/master/example/hadoop/src/main/java/org/apache/iotdb//hadoop/tsfile/TSMRWriteExample.java



标签:get,Text,sum,MapReduce,value,IoTDB,Apache,new,class
From: https://blog.51cto.com/u_15123639/7355147

相关文章

  • Apache IoTDB开发之Load External TsFile工具
    LoadExternalTsFile工具简介加载外部tsfile工具允许用户从正在运行的ApacheIoTDB实例加载tsfiles、删除tsfile或将tsfile移动到目标目录。用法用户通过Cli工具或JDBC向ApacheIoTDB系统发送指定的命令以使用该工具。加载TS文件加载tsfiles的命令是。load"<p......
  • CDC一键入湖:当 Apache Hudi DeltaStreamer 遇见 Serverless Spark
    ApacheHudi的DeltaStreamer是一种以近实时方式摄取数据并写入Hudi表的工具类,它简化了流式数据入湖并存储为Hudi表的操作,自0.10.0版开始,Hudi又在DeltaStreamer的基础上增加了基于Debezium的CDC数据处理能力,这使得其可以直接将Debezium采集的CDC数据落地成Hudi表,这一功能极大地简......
  • Apache IoTDB开发之日志可视化工具
    工具简介与其他软件系统一样,IoTDB在运行时生成各种日志。调试和跟踪日志可以帮助开发人员跟踪IoTDB的状态,并挖掘出潜在或不清楚的错误信息日志可以告诉系统的健康程度,并指出数据库参数优化的方向。警告和错误日志指示系统处于危险状态或发生意外情况,并帮助数据库管理员在系统崩溃......
  • idea 创建maven项目出现 Cannot resolve plugin org.apache.maven.plugins:maven-jar-
    idea创建maven项目出现Cannotresolvepluginorg.apache.maven.plugins:maven-jar-plugin:3.3.0如下图所示:遇到这个问题很多次了,也看了很多别人的解决方案,大致分为两种问题:1、maven路径问题:在idea中找到File-Settings通过搜索框找到maven,查看maven的安装路径和本地仓库......
  • Apache Drill 教程
    ApacheDrillhttps://github.com/apache/drill前言这里更偏向于应用,而不是将官方文档翻译给你。ApacheDrill是阿帕奇的顶级项目,但是它的中文文档很少,这篇文档将帮你了解如何使用ApacheDrill。并且会在文末整理一些专业的文档供你参考学习。当然如果你是官网爱好者,那么你可......
  • Vue项目打包,部署到apache服务器
    初学veu,实战项目上线服务器,查遍全网和问遍身边大佬,终于经过我不断地探索,上线成功啦,现在我就为大家梳理一下思路。首先,我们先看一下官网链接:VueCLI部署.,参数配置:publicPath.,VueRouter:HTML5History模式1、步骤1、项目配置2、打包项目,命令:npmrunbuild3、将生成的dist文件......
  • Apache IoTDB开发之Watermark工具
    Watermark工具该工具有两个功能:IoTDB查询结果的Watermark嵌入可疑数据的Watermark检测1.Watermark嵌入1.1配置默认情况下,IoTDB中的Watermark处于禁用状态。要启用Watermark嵌入,首先要修改配置文件中的以下字段:iotdb-engine.properties名字例解释watermark_module_opened假true启......
  • MapReduce 中的两表 join 几种方案简介
    1.概述在传统数据库(如:MYSQL)中,JOIN操作是非常常见且非常耗时的。而在HADOOP中进行JOIN操作,同样常见且耗时,由于Hadoop的独特设计思想,当进行JOIN操作时,有一些特殊的技巧。本文首先介绍了Hadoop上通常的JOIN实现方法,然后给出了几种针对不同输入数据集的优化方法。2.常见的join方法介......
  • 制作Ubuntu64位的apache+asp+mssqlserver运行环境压缩包
    上一篇我在Ubuntu15.04下成功搭建了apache+asp+mssqlserver运行环境,但今天我又在Ubuntu20.04下重复了一次,一切顺利。但启动apache后,运行asp时页面停止了响应,查看日志发现apache的进程崩溃了。我这两个系统都是64位的,比较了apache2/bin下的问题,发现两个apache文件大小并不一样,Ubun......
  • 使用Apache IoTDB进行IoT相关开发的架构设计与功能实现(12)
    现在到了使用ApacheIoTDB进行IoT相关开发的架构设计与功能实现的最后一个环境,在本文中我将向大家介绍IoTDB的查询语言。IoTDB为咱们广大开发者提供了类似SQL的查询语言,用于与IoTDB进行交互,查询语言可以分为4个主要部分:架构语句:本节中列出了有关架构管理的语句。数据管理语句:本节中......