首页 > 其他分享 >从 InfluxDB 迁移数据到 DolphinDB

从 InfluxDB 迁移数据到 DolphinDB

时间:2023-01-05 10:35:23浏览次数:73  
标签:插件 timeStr name DolphinDB InfluxDB 迁移 数据

InfluxDB 是一款流行的时序数据处理平台,有着丰富的数据导入方式,完善的集数据处理、预警、可视化等功能为一体的操作界面。其主要是面向物联网场景开发的,同时从 2.0 版本开始,使用了基于 Flux 语言的操作模型来处理数据,对于用户来讲有一定的学习成本。

DolphinDB 是一款国产的高性能分布式时序数据库产品。支持 SQL 和使用类 Python 的语法来处理数据,相比 Flux 语言来讲学习成本较低。同时,DolphinDB 提供了 1400 多个函数,对于复杂的数据处理场景有很强的表达能力,极大的降低了用户开发成本。在数据存储压缩率方面,InfluxDB 和 DolphinDB 的相差不多。而数据查询性能方面,DolphinDB 要比 InfluxDB 高很多,甚至有多个数量级的差异。DolphinDB 也有强大的流式增量计算能力,满足用户实时数据的处理需求。

本文旨在为有从 InfluxDB 迁移至 DolphinDB 需求的用户提供一份简洁明了的参考。

1 实现方法

InfluxDB 支持丰富的数据源接入。在将数据导出时,用户可以通过 API 接口读取数据,并根据实际业务需求转换数据并导出,但这种方式会带来开发上的成本。

这种不同数据源之间的数据同步需求,催生了类似于 DataX 这样的同步工具/平台的诞生。这些工具主要通过插件实现各个数据源的读取和写入,用户只需提供一个配置文件,就可以实现不同数据源之间的同步需求。

DolphinDB 支持用户通过 DataX 插件实现数据的读写,但考虑到,InfluxDB1.0 和 2.0 版本均不支持 DataX 插件,我们基于另一个数据同步平台 Addax,开发了 DolphinDB 写入插件,通过配合 InfluxDB 的读写插件,实现将数据从 InfluxDB 迁移到 DolphinDB 的功能。

对于用户来讲,只需要配置 Addax 的任务文件,即配置 InfluxDB 的读插件参数以及 DolphinDB 的写插件参数,就可以完成从 InfluxDB 迁移数据到 DolphinDB 的功能。

2 迁移案例与操作步骤

在物联网监控场景中,数据有来源多、维度高、数据量大的特点。用户常见的需求是,在采集传感器数据时,实时写入、聚合数据,并做异常检测、预警等分析操作。这对数据库系统的吞吐量和实时分析能力提出了很高的要求。

在这方面,DolphinDB 有着丰富的实践案例,可以参考 ​​DolphinDB 物联网范例​​,​​DolphinDB 在工业物联网的应用​​,​​DolphinDB 流计算在物联网的应用:实时检测传感器状态变化​​ 等。

我们以物联网设备传感器的数据为例,来说明如何将 InfluxDB 的数据迁移到 DolphinDB。

2.1 生成测试数据

在实际迁移过程中,我们需要注意数据完成迁移后所使用的数据类型。具体 DolphinDB 支持的数据类型,可以参考 ​​数据类型​

这里以 InfluxDB 官方提供的示例数据里的“设备示例数据”(​​Sample data​​ ) 为例,说明如何把数据导入到 DolphinDB。

在导入之前,我们创建一个 InfluxDB 的 bucket, 并使用 Flux 脚本导入测试数据到测试的 bucket,代码文件 ​​genMockData.flux​​ 内容如下:

import "influxdata/influxdb/sample"

sample.data(set: "machineProduction")
|> to(bucket: "demo-bucket")

2.2 在 DolphinDB 创建表

针对上面的测试数据,我们需要在 DolphinDB 里创建对应的库表,用于存储迁移过来的数据。对于实际的数据,需要综合考虑被迁移数据的字段,类型,数据量,在 DolphinDB 是否需要分区,分区方案,使用 OLAP 还是 TSDB 引擎等情况,去设计建库建表方案。一些数据存储库表设计实践,可以参考 ​​分区数据库​

本例建表文件 ​​createTable.dos​​ 内容如下:

login("admin","123456")

dbName="dfs://demo"
tbName="pt"

colNames=`time`stateionID`grinding_time`oil_temp`pressure`pressure_target`rework_time`state
colTypes=[NANOTIMESTAMP,SYMBOL,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,SYMBOL]
schemaTb=table(1:0,colNames,colTypes)

db = database(dbName, VALUE,2021.08.01..2022.12.31)
pt = db.createPartitionedTable(schemaTb, "pt", `time)

2.3 部署 Addax

Addax 有2种安装方式,一种是下载已经编译好的二进制包,一种是下载源码,并从源码编译安装。可以参考其官方​​快速使用​​来部署。

2.4 部署 Addax-DolphinDBWriter 插件

下载已编译好的 Addax 的 DolphinDB ​​写插件​​,拷贝到 Addax 的插件目录 plugin/writer 目录下。

或者也可以自行编译插件。

2.5 定义迁移任务

在定义迁移任务,主要是按照 Addax 的​​任务配置​​,配置 InfluxDB 2.0 ​​读插件​​ 和 DolphinDB ​​写插件​​ 参数。对于时间类型的数据,也需要注意一下格式转换的问题。

2.6 时间格式的转换

InfluxDB 2.0 中存储时间类型数据,采用的是带有时区信息的 RFC3339 格式。而 DolphinDB 里存储的是本地时间,没有时区的概念。在使用 InfluxDB2.0 插件读取数据时,此时如果直接写入 DolphinDB 表,会造成转换错误。因此需要自定义函数,去处理和转化数据。这部分转换规则可以在 dolphindb 的写入插件中去定义,通过定义参数 saveFunctionName 和 saveFunctionDef 来指定转换函数(具体转换函数的格式参数,可以参考 ​​datax-writer​​)来实现时间列数据转换(可以参考下面的案例)。

2.7 编写配置文件

编写数据迁移任务的配置文件 ​​influx2ddb.json​​(具体的每个配置参数的含义,可以参考附录说明),文件内容如下:

{
"job": {
"content": {
"reader": {
"name": "influxdb2reader",
"parameter": {
"column": ["*"],
"connection": [
{
"endpoint": "http://183.136.170.168:8086",
"bucket": "demo-bucket",
"table": [
"machinery"
],
"org": "zhiyu"
}
],
"token": "GLiPjQFQIxzVO0-atASJHH4b075sTlyEZGrqW20XURkelUT5pOlfhi_Yuo2fjcSKVZvyuO00kdXunWPrpJd_kg==",
"range": [
"2007-08-09"
]
}
},
"writer": {
"name": "dolphindbwriter",
"parameter": {
"userId": "admin",
"pwd": "123456",
"host": "115.239.209.122",
"port": 3134,
"dbPath": "dfs://demo",
"tableName": "pt",
"batchSize": 1000000,
"saveFunctionName": "transData",
"saveFunctionDef": "def parseRFC3339(timeStr) {if(strlen(timeStr) == 20) {return localtime(temporalParse(timeStr,'yyyy-MM-ddTHH:mm:ssZ'));} else if (strlen(timeStr) == 24) {return localtime(temporalParse(timeStr,'yyyy-MM-ddTHH:mm:ss.SSSZ'));} else {return timeStr;}};def transData(dbName, tbName, mutable data) {timeCol = exec time from data; timeCol=each(parseRFC3339, timeCol); writeLog(timeCol);replaceColumn!(data,'time', timeCol); loadTable(dbName,tbName).append!(data); }",
"table": [
{
"type": "DT_STRING",
"name": "time"
},
{
"type": "DT_SYMBOL",
"name": "stationID"
},
{
"type": "DT_DOUBLE",
"name": "grinding_time"
},
{
"type": "DT_DOUBLE",
"name": "oil_temp"
},
{
"type": "DT_DOUBLE",
"name": "pressure"
},
{
"type": "DT_DOUBLE",
"name": "pressure_target"
},
{
"type": "DT_DOUBLE",
"name": "rework_time"
},
{
"type": "DT_SYMBOL",
"name": "state"
}
]
}
}
},
"setting": {
"speed": {
"bytes": -1,
"channel": 1
}
}
}
}

其中 saveFunctionDef 的定义为:

def parseRFC3339(timeStr) {
if(strlen(timeStr) == 20) {
return localtime(temporalParse(timeStr,'yyyy-MM-ddTHH:mm:ssZ'));
} else if (strlen(timeStr) == 24) {
return localtime(temporalParse(timeStr,'yyyy-MM-ddTHH:mm:ss.SSSZ'));
} else {
return timeStr;
}
}

def transData(dbName, tbName, mutable data) {
timeCol = exec time from data; writeLog(timeCol);
timeCol=each(parseRFC3339, timeCol);
replaceColumn!(data, 'time', timeCol);
loadTable(dbName,tbName).append!(data);
}

这里主要定义了2个函数,转换函数入口是 transData。在 writer 的定义里,需要把参数 saveFunctionName 设置为转换入口函数"transData"。上面的函数实现里,主要是先对这个字符串类型的时间列做解析,生成需要的时间列,最后调用 ​​append!​​ 去写入分布式表。

在转换时间列时,通过函数 ​​temporalParse​​ 去分析时间字符串,并通过函数 ​​localtime​​ 去将 UTC 时间转换成本地时间并进行存储。如果业务上有需要,也可以不做时区转换,存储 temporalParse 的结果。关于时间信息处理相关的实践例子,可以参考 ​​DolphinDB 中有关时间信息的最佳实践指南​​ 。

2.8 执行 addax 任务

./bin/addax.sh ~/addax/job/influx2ddb.json

(上面路径根据实际 addax 的位置,以及配置文件路径来修改)

运行结果如下:

2022-12-13 21:17:17.870 [job-0] INFO  JobContainer         -
任务启动时刻 : 2022-12-13 21:17:14
任务结束时刻 : 2022-12-13 21:17:17
任务总计耗时 : 3s
任务平均流量 : 345.07KB/s
记录写入速度 : 6568rec/s
读出记录总数 : 19705
读写失败总数 : 0

若需要实现增量同步,可以在 influx2ddb.json 中的 reader 里,修改 range 字段,指定开始时间和结束时间来​​过滤数据​​。

以上就是一个 InfluxDB 2.0 数据迁移到 DolphinDB 的过程。对于 InfluxDB 1.0 的数据迁移任务,只需要修改上面的任务配置文件(influx2ddb.json),将里面的读插件替换为 InfluxDB 1.0 的读插件,然后做对应的配置即可。

除了定义迁移任务配置文件,更重要的是需要考虑 DolphinDB 数据存储方案,包括是否分区,分区方案,使用引擎等。

3 配置文件参数说明

3.1 InfluxDB 2.0 读插件配置项

配置项

是否必须

数据类型

默认值

描述

endpoint


string


token


string


访问数据库的 token

table


list


所选取的需要同步的表名(即指标)

org


string


指定 InfluxDB 的 org 名称

bucket


string


指定 InfluxDB 的 bucket 名称

column


list


所配置的表中需要同步的列名集合,具体参考后面 column 配置详解

range


list


读取数据的时间范围

limit


int


限制获取记录数

3.2 column 配置详解

column 是配置的表中需要同步的列名集合,使用 JSON 的数组描述字段信息。用户使用 ​​*​​ 代表默认使用所有列配置,例如 ["*"]。

支持列裁剪,即列可以挑选部分列进行导出。

支持列换序,即列可以不按照表 schema 信息进行导出。

支持常量配置,用户需要按照 JSON 格式:

["id", "`table`", "1", "'bazhen.csy'", "null", "to_char(a + 1)", "2.3" , "true"]
  • ​id​​ 为普通列名
  • ​`table`​​ 为包含保留在的列名
  • ​1​​ 为整形数字常量
  • ​'bazhen.csy'​​ 为字符串常量
  • ​null​​ 为空指针,注意,这里的 ​​null​​ 必须以字符串形式出现,即用双引号引用
  • ​to_char(a + 1)​​ 为表达式
  • ​2.3​​ 为浮点数
  • ​true​​ 为布尔值,同样的,这里的布尔值也必须用双引号引用

3.3 DolphinDB 写插件配置项

配置项

是否必须

数据类型

默认值

描述

host


string


Server Host

port


int


Server Port

userId


string


DolphinDB 用户名导入分布式库时,必须要有权限的用户才能操作,否则会返回

pwd


string


DolphinDB 用户密码

dbPath


string


需要写入的目标分布式库名称,比如 "dfs://MYDB"。

tableName


string


目标数据表名称

batchSize


int

10000000

datax 每次写入 dolphindb 的批次记录数

table


写入表的字段集合,具体参考后续 table 项配置详解

saveFunctionName


string


自定义数据处理函数。若未指定此配置,插件在接收到 reader 的数据后,会将数据提交到 DolphinDB 并通过 tableInsert 函数写入指定库表;如果定义此参数,则会用指定函数替换 tableInsert 函数。

saveFunctionDef


string


数据入库自定义函数。此函数指 用 dolphindb 脚本来实现的数据入库过程。 此函数必须接受三个参数:dfsPath(分布式库路径),tbName(数据表名), data(从 datax 导入的数据,table 格式)

3.4 table 配置详解

table 用于配置写入表的字段集合。内部结构为

{"name": "columnName", "type": "DT_STRING", "isKeyField":true}

请注意此处列定义的顺序,需要与原表提取的列顺序完全一致。

  • name:字段名称
  • isKeyField:是否唯一键值,可以允许组合唯一键。本属性用于数据更新场景,用于确认更新数据的主键,若无更新数据的场景,无需设置
  • type:枚举值以及对应 DolphinDB 数据类型如下

DolphinDB 类型

配置值

DOUBLE

DT_DOUBLE

FLOAT

DT_FLOAT

BOOL

DT_BOOL

DATE

DT_DATE

MONTH

DT_MONTH

DATETIME

DT_DATETIME

TIME

DT_TIME

SECOND

DT_SECOND

TIMESTAMP

DT_TIMESTAMP

NANOTIME

DT_NANOTIME

NANOTIMETAMP

DT_NANOTIMETAMP

INT

DT_INT

LONG

DT_LONG

UUID

DT_UUID

SHORT

DT_SHORT

STRING

DT_STRING

SYMBOL

DT_SYMBOL

标签:插件,timeStr,name,DolphinDB,InfluxDB,迁移,数据
From: https://blog.51cto.com/u_15022783/5989487

相关文章

  • informatica 861 迁移domain和资料库(oracle迁oracle)
    1:备份资料库  登录informatica控制台,选择资料库PC64-->Actions-->backupcontents,填写用户、密码、文件名称、备份说明2:备份服务器文件  安全起见整体备份......
  • elasticdump迁移es数据
    离线安装elasticdump在一台有网络的机器上先安装elasticdump下载nodehttps://nodejs.org/dist/v14.17.5/node-v14.17.5-linux-x64.tar.xz解压安装配置nodetarxvf......
  • 迁移学习(DANN)《Domain-Adversarial Training of Neural Networks》
    论文信息论文标题:Domain-AdversarialTrainingofNeuralNetworks论文作者:YaroslavGanin,EvgeniyaUstinova,HanaAjakan,PascalGermain,HugoLarochelle....论文......
  • 数据泵使用dblink的方式迁移数据
    文档课题:数据泵使用dblink的方式迁移数据.说明:本文内容摘自“OGG配置—oracle11.2.0.4到oracle19.16ddl复制”源端:192.168.133.103数据库oracle11.2.0.464位,实例......
  • Sentinel-dashboard数据持久化到InfluxDB(四)
    之前的有做过sentinel整合的工作,已经完成sentinel客户端集成到普通微服务、集成到gateway,以及sentinel控制台的规则数据持久化到nacos,当时的工作基本做的差不多了,但是还差......
  • InfluxDB介绍
    1、时序数据库介绍时序数据库全称为时间序列数据库。时间序列数据库指主要用于处理带时间标签(按照时间的顺序变化,即时间序列化)的数据,带时间标签的数据也称为时间序列数据......
  • 【开源项目】历史数据迁移
    历史数据迁移项目地址:https://gitee.com/xl-echo/dataMigration历史迁移解决方案。微服务的架构为基础,使用多种设计模式,如:单利、桥接、工厂、模板、策略等。其中涉及的......
  • 将一台服务器上的数据库迁移到另一台服务器上
    1、首先需要在目标服务器的mysql服务中创建一个和源数据库完全相同的数据库,使用工具Navicat  注:需要填写的内容从源数据库中查看。 2、使用Navicat自带的传输工具......
  • KVM虚拟机迁移
    说明一下背景:本次迁移主要采取的是冷机迁移,热机迁移太复杂,这个可以根据自己实际业务选择相应的方案,我们可以短暂的宕机,所以以简单和不出错成为本此迁移的主要知道思想。1......
  • clickhouse常用SQL语句,查询、建表、数据复制迁移、删除等
    clickhouse常用SQL语句,查询、建表、数据复制迁移、删除等坚持是一种态度于2022-02-1817:58:35发布1646收藏6分类专栏:数据库及存储技术大数据开发文章标签:sql......