首页 > 数据库 >从 SQL Server 迁移数据到 DolphinDB

从 SQL Server 迁移数据到 DolphinDB

时间:2022-12-14 10:00:48浏览次数:67  
标签:插件 DolphinDB ODBC Server DataX SQL

作为传统的事务型数据库,SQL Server 有着出色的读写性能,但当面对高吞吐量数据写入以及海量的数据分析等场景时,却无法满足需求。即使数据量较小,能满足数据写入的要求,也不能同时响应实时计算的请求。

DolphinDB 是一款国产的高性能分布式时序数据库产品。既有非常高的吞吐量,又有较低的延时;既能够实时处理流数据,又能够处理海量的历史数据;既能满足简单的点查询的要求,又能满足批量数据复杂分析的要求。在金融和物联网领域,DolphinDB 以天然的分布式构架,强大的流式增量计算能力,丰富的函数库,以及易用性等优势吸引了大量海内外用户。

本文旨在为有从 SQL Server 迁移至 DolphinDB 需求的用户提供一份简洁明了的参考。

1. 实现方法

从 SQL Server 迁移数据到 DolphinDB 可以选择以下两种软件

  • ODBC 插件

ODBC(Open Database Connectivity) 插件是 DolphinDB 提供的通过 ODBC 接口访问 SQL Server 的开源产品。插件配合 DolphinDB 脚本使用,与服务器在同一个进程空间内运行,能高效地完成 SQL Server 数据到 DolphinDB 的数据写入。

ODBC 提供如下函数,函数的具体使用请参考 ​​DolphinDB ODBC plugin​

  1. ​odbc::connect(connStr, [dataBaseType])​
  2. ​odbc::close(conn)​
  3. ​odbc::query(connHandle or connStr, querySql, [t], [batchSize], [tranform])​
  4. ​odbc::execute(connHandle or connStr, SQLstatements)​
  5. ​odbc::append(connHandle, tableData, tablename, [createTableIfNotExist], [insertIgnore])​
  • DataX 驱动

DataX 是可扩展的数据同步框架,将不同数据源的同步抽象为从源头数据源读取数据的 Reader 插件,以及向目标端写入数据的 Writer 插件,理论上 DataX 框架可以支持任意数据源类型的数据同步工作。

DolphinDB 提供基于 DataXReader 和 DataXWriter 的开源驱动。DolphinDBWriter 插件实现了向 DolphinDB 写入数据,使用 DataX 的现有 reader 插件结合 DolphinDBWriter 插件,即可实现从不同数据源向 DolphinDB 同步数据。用户可以在 Java 项目中包含 DataX 的驱动包,开发从 SQL Server 数据源到 DolphinDB 的数据迁移软件。

DataX 驱动的开发基于 Java SDK,支持高可用。

实现途径

数据写入效率

高可用

ODBC 插件


不支持

DataX 驱动


支持

2. 应用需求

很多之前使用 SQL Server 的用户不可避免地需要将数据迁移同步至 DolphinDB。DolphinDB 提供了多种灵活的数据同步方法,来帮助用户方便地把海量数据从多个数据源进行全量同步或增量同步。本文中的实践案例基于该需求,提供了将深交所逐笔委托数据从 SQL Server 迁移至 DolphinDB 的高性能解决方案。

现有最近 10 年的逐笔委托数据,存储于 SQL Server。查看​​部分数据示例​​。

为方便后期与其他数据源的逐笔委托数据整合,数据迁移过程中要对表结构进行部分调整,字段对应表如下所示:

SQL Server 字段含义

SQL Server 字段

SQL Server 数据类型

DolphinDB 字段含义

DolphinDB 字段

DolphinDB 数据类型

交易日期

tradedate

date

数据生成时间

OrigTime

time(7)

发送时间

SendTime

time(7)

接收时间

Recvtime

time(7)

入库时间

LocalTime

time(7)

入库时间

LocalTime

TIME

频道代码

ChannelNo

int

频道代码

ChannelNo

INT

行情类别

MDStreamID

int

行情类别

MDStreamID

INT

委托订单号

ApplSeqNum

bigint

委托订单号

ApplSeqNum

LONG

证券代码

SecurityID

varchar(255)

证券代码

SecurityID

SYMBOL

证券代码源

SecurityIDSource

int

证券代码源

SecurityIDSource

INT

委托价格

Price

float

委托价格

Price

DOUBLE

委托数量

OrderQty

int

委托数量

OrderQty

INT

委托时间

TransactTime

datetime

委托时间

TransactTime

TIMESTAMP

买卖方向

Side

varchar(255)

买卖方向

Side

SYMBOL

委托类别

OrderType

varchar(255)

委托类别

OrderType

SYMBOL

定价行情约定号

ConfirmID

varchar(255)

联系人

Contactor

varchar(255)

联系方式

ContactInfo

varchar(255)

期限

ExpirationDays

int

期限类型

ExpirationType

varchar(255)

业务序列号

BizIndex

LONG

数据状态

DataStatus

INT

SeqNo

SeqNo

LONG

交易市场

Market

SYMBOL

3. 迁移步骤

3.1 通过 ODBC 插件迁移

3.1.1 安装 ODBC 驱动

本例中部署 DolphinDB 的服务器操作系统为 ubuntu22.04。

  1. 终端中运行以下命令安装 freeTDS 程序库、unixODBC 库和 SQL Server ODBC 驱动。
# 安装 freeTDS
apt install -y freetds

# 安装 unixODBC 库
apt-get install unixodbc unixodbc-dev

# 安装 SQL Server ODBC 驱动
apt-get install tdsodbc

2. 在 / etc/freetds/freetds.conf 中添加 SQL Server 的 ip 和 port

[sqlserver]
host = 127.0.0.1
port = 1433

3. 在 / etc/odbcinst.ini 中完成以下配置:

[SQLServer]
Description = ODBC for SQLServer
Driver = /usr/lib/x86_64-linux-gnu/odbc/libtdsodbc.so
Setup = /usr/lib/x86_64-linux-gnu/odbc/libtdsodbc.so
FileUsage = 1

若是 CentOS 操作系统,则通过以下步骤安装:

  1. 终端中运行以下命令安装 freeTDS 程序库和 unixODBC 库。
# 安装 freeTDS
yum install -y freetds

# 安装 unixODBC 库
yum install unixODBC-devel

2. 在 / etc/freetds.conf 中添加 SQL Server 的 ip 和 port

3. 在 / etc/odbcinst.ini 中完成以下配置

[SQLServer]
Description = ODBC for SQLServer
Driver = /usr/lib64/libtdsodbc.so.0.0.0
Setup = /usr/lib64/libtdsodbc.so.0.0.0
FileUsage = 1

3.1.2 同步数据

  1. 运行以下命令加载 ODBC 插件
loadPlugin("/home/Linux64_V2.00.8/server/plugins/odbc/PluginODBC.txt")

2. 运行以下命令建立与 SQL Server 的连接

conn =odbc::connect("Driver={SQLServer};Servername=sqlserver;Uid=sa;Pwd=DolphinDB;database=historyData;;");

3. 运行以下脚本执行同步

def transform(mutable msg){
msg.replaceColumn!(`LocalTime,time(temporalParse(msg.LocalTime,"HH:mm:ss.nnnnnn")))
msg.replaceColumn!(`Price,double(msg.Price))
msg[`SeqNo]=int(NULL)
msg[`DataStatus]=int(NULL)
msg[`BizIndex]=long(NULL)
msg[`Market]="SZ"
msg.reorderColumns!(`ChannelNo`ApplSeqNum`MDStreamID`SecurityID`SecurityIDSource`Price`OrderQty`Side`TransactTime`OrderType`LocalTime`SeqNo`Market`DataStatus`BizIndex)
return msg
}

def synsData(conn,dbName,tbName){
odbc::query(conn,"select ChannelNo,ApplSeqNum,MDStreamID,SecurityID,SecurityIDSource,Price,OrderQty,Side,TransactTime,OrderType,LocalTime from data",loadTable(dbName,tbName),100000,transform)
}

submitJob("synsData","synsData",synsData,conn,dbName,tbName)


startTime                      endTime
2022.11.28 11:51:18.092 2022.11.28 11:53:20.198

耗时约 122 秒。

借助 DolphinDB 提供的 scheduleJob 函数,可以实现增量同步。

示例如下,每天 00:05 同步前一天的数据。

def synchronize(){
login(`admin,`123456)
conn =odbc::connect("Driver={SQLServer};Servername=sqlserver;Uid=sa;Pwd=DolphinDB;database=historyData;;")
sqlStatement = "select ChannelNo,ApplSeqNum,MDStreamID,SecurityID,SecurityIDSource,Price,OrderQty,Side,TransactTime,OrderType,LocalTime from data where TradeDate ='" + string(date(now())-1) + "';"
odbc::query(conn,sqlStatement,loadTable("dfs://TSDB_Entrust",`entrust),100000,transform)
}
scheduleJob(jobId=`test, jobDesc="test",jobFunc=synchronize,scheduleTime=00:05m,startDate=2022.11.11, endDate=2023.01.01, frequency='D')

注意:为防止节点重启时定时任务解析失败,预先在配置文件里添加 ​​preloadModules=plugins::odbc​

3.2 通过 DataX 驱动迁移

3.2.1 部署 DataX

从 ​​DataX 下载地址​​ 下载 DataX 压缩包后,解压至自定义目录。

3.2.2 部署 DataX-DolphinDBWriter 插件

将 ​​Github 链接​​ 中源码的 ./dist/dolphindbwriter 目录下所有内容拷贝到 DataX/plugin/writer 目录下,即可使用。

3.2.3 执行 DataX 任务

配置文件 synchronization.json 置于 data/job 目录下:

{
"core": {
"transport": {
"channel": {
"speed": {
"byte": 5242880
}
}
}
},
"job": {
"setting": {
"speed": {
"byte":10485760
}
},
"content": [
{
"reader": {
"name": "sqlserverreader",
"parameter": {
"username": "sa",
"password": "DolphinDB123",
"column": [
"ChannelNo","ApplSeqNum","MDStreamID","SecurityID","SecurityIDSource","Price","OrderQty","Side","TransactTime","OrderType","LocalTime"
],
"connection": [
{
"table": [
"data"
],
"jdbcUrl": [
"jdbc:sqlserver://127.0.0.1:1433;databasename=historyData"
]
}
]
}
},
"writer": {
"name": "dolphindbwriter",
"parameter": {
"userId": "admin",
"pwd": "123456",
"host": "127.0.0.1",
"port": 8888,
"dbPath": "dfs://TSDB_Entrust",
"tableName": "entrust",
"batchSize": 100000,
"saveFunctionDef": "def customTableInsert(dbName, tbName, mutable data) {data.replaceColumn!(`LocalTime,time(temporalParse(data.LocalTime,\"HH:mm:ss.nnnnnn\")));data.replaceColumn!(`Price,double(data.Price));data[`SeqNo]=int(NULL);data[`DataStatus]=int(NULL);data[`BizIndex]=long(NULL);data[`Market]=`SZ;data.reorderColumns!(`ChannelNo`ApplSeqNum`MDStreamID`SecurityID`SecurityIDSource`Price`OrderQty`Side`TransactTime`OrderType`LocalTime`SeqNo`Market`DataStatus`BizIndex);pt = loadTable(dbName,tbName);pt.append!(data)}",
"saveFunctionName" : "customTableInsert",
"table": [
{
"type": "DT_INT",
"name": "ChannelNo"
},
{
"type": "DT_LONG",
"name": "ApplSeqNum"
},
{
"type": "DT_INT",
"name": "MDStreamID"
},
{
"type": "DT_SYMBOL",
"name": "SecurityID"
},
{
"type": "DT_INT",
"name": "SecurityIDSource"
},
{
"type": "DT_DOUBLE",
"name": "Price"
},
{
"type": "DT_INT",
"name": "OrderQty"
},
{
"type": "DT_SYMBOL",
"name": "Side"
},
{
"type": "DT_TIMESTAMP",
"name": "TransactTime"
},
{
"type": "DT_SYMBOL",
"name": "OrderType"
},
{
"type": "DT_STRING",
"name": "LocalTime"
}
]

}
}
}
]
}
}

2. Linux 终端中执行以下命令执行 DataX 任务

cd ./DataX/bin/
python DataX.py ../job/synchronization.json


任务启动时刻                    : 2022-11-28 17:58:52
任务结束时刻 : 2022-11-28 18:02:24
任务总计耗时 : 212s
任务平均流量 : 3.62MB/s
记录写入速度 : 78779rec/s
读出记录总数 : 16622527
读写失败总数 : 0

同样的,若需要增量同步,可在 synchronization.json 中的 "reader" 增加 "where" 条件,增加对交易日期的过滤,每次执行同步任务时只同步 where 条件中过滤的数据,例如这些数据可以是前一天的数据。

以下例子仅供参考:

"reader": {
"name": "sqlserverreader",
"parameter": {
"username": "sa",
"password": "DolphinDB123",
"column": [
"ChannelNo","ApplSeqNum","MDStreamID","SecurityID","SecurityIDSource","Price","OrderQty","Side","TransactTime","OrderType","LocalTime"
],
"connection": [
{
"table": [
"data"
],
"jdbcUrl": [
"jdbc:sqlserver://127.0.0.1:1433;databasename=historyData"
]
}
]
"where":"TradeDate=(select CONVERT ( varchar ( 12) , dateadd(d,-1,getdate()), 102))"
}
}

4. 性能比较

本案例中使用的软件版本为:

  • DolphinDB: DolphinDB_Linux64_V2.00.8.6
  • SQL Server: Microsoft SQL Server 2017 (RTM-CU31) (KB5016884) - 14.0.3456.2 (X64)

分别使用 ODBC 插件和 DataX 驱动进行数据迁移的耗时对比如下表所示:

ODBC 插件

DataX

122s

212s

由此可见,ODBC 插件的性能优于 DataX。当数据量增大,尤其时表的数量增多时,ODBC 插件的速度优势更为明显。而且即使不需要对同步数据进行处理,DataX 也需要对每一个表配置一个 json,而 ODBC 则只需要更改表名即可,实现难度明显低于 DataX。

使用场景

实现难度

ODBC 插件

全量同步,增量同步

可完全在 DolphinDB 端实现

DataX

全量同步,增量同步

需要部署 DataX 并编写 json

标签:插件,DolphinDB,ODBC,Server,DataX,SQL
From: https://blog.51cto.com/u_15022783/5935519

相关文章

  • sqlserver添加修改表备注信息
    添加表说明:EXECsys.sp_addextendedproperty@name=N'MS_Description',@value=N'表说明',@level0type=N'SCHEMA',@level0name=N'dbo',@level1type=N'TABLE',@level1na......
  • mysql 将字段复制到另一个字段(数据运维)
    1.将同一个表中的一个字段的值复制给另一个字段UPDATEt_userSETsigned_time=create_time  create_time  是源数据,signed_time是目标数据,之前是空的2.将同一个......
  • 搭建Eureka Server服务注册中心
    了解了Eureka的作用之后,我们搭建一个EurekaServer注册中心。我们使用IDEA创建一个名为eureka-server的SpringBoot项目,如图9-1所示。然后,在“dependencies”界面中勾选......
  • SQL_8_TCL语句
    TCL指的是事务控制语句。  1、事务概论:指一组要么同时执行成功,要么同时执行失败的SQL语句。是数据库操作的一个执行单元。特性:原子性:它是数据库中最小执行......
  • 查看sqlserver备份历史
    SELECTT1.name ,T3_full.full_backup_start_date ,T3_full.full_backup_finish_date ,T3_full.full_Duration ,t3_full.full_backup_size ,t3_full.full_physical_d......
  • SQL_7_DML语句
    DML指的是,操作行数据的行为(增删改),它依据一个集合构成一个事务逻辑单元。简单的DML语句后需要加comit语句进行事务提交。 1、增(insert)使用insert语句,在指定的表中增加......
  • 2:数据库的基本操作-MySQL
    (目录)2.1数据库的显示讲解information_schema:信息图式,存储服务器管理数据库的信息mysql:存放系统信息,用户名密码等performance_schema:性能图式sys:系统文件showdat......
  • MySQL
    MySQLALTER命令当我们需要修改数据表名或者修改数据表字段时,就需要使用到MySQLALTER命令。开始本章教程前让我们先创建一张表,表名为:testalter_tbl。root@host#mysql-u......
  • sql server 查询所有表名,字段名,字段类型
    SELECT表名=casewhena.colorder=1thend.nameelse''end,表说明=casewhena.colorder=1thenisnull(f.value,'')else''end,字段序......
  • MySQL事务必知必会
    事务必知必会事务由一组数据操纵语句(DML)组成,这组语句要么全部成功,要么全部失败事务操作开启事务starttransaction;设置保存点savepoint保存点名;回退到某个保存......