SinoDB 时序引擎的使用 黄铎彦
这个作业属于的课程 | 作业要求 |
作业目标 | 撰写技术总结博客 |
技术概述
SinoDB 时间序列引擎 TimeSeries 是星瑞格 基于 BizWrapper 业务封装技术构建的一套专用于时序数据处理的模块集。涉及与非时序类数据交互的业务场景时, 这一引擎避免了跨时序专用数据库和传统关系型数据库带来的一系列问题。为了参加海峡信息赛而学习这个知识。 除了对文档的理解外基本没难点。
技术详述
首先需要安装SinoDB。
创建数据库
以sinodbms
用户身份登录系统。使用onstat -
命令确认数据库处于OnLine
状态后, 使用dbaccess - -
命令登录数据库,执行CREATE DATABASE stock WITH LOG;
。 执行成功后,按 Ctrl+D 退出 dbaccess,回到 shell。
确认时间序列引擎版本
在 shell 中执行:
cd $SINODBMSDIR/extend
ls
找到TimeSeries
开头的文件夹名称,例如TimeSeries.6.00.FC7
。记下这个名称。
注册时间序列引擎到数据库
在 shell 中执行blademgr
命令,然后在 > 提示符后执行register TimeSeries.6.00.FC7 stock
。 根据实际情况将TimeSeries.6.00.FC7
替换为你上一步中记下的名称。输入 y 并按回车确认后,等待提示符再次出现时, 按 Ctrl+D 退出 blademgr,回到 shell。
表结构设计
首先,在 shell 中执行dbaccess stock -
连接到数据库。
以存储股票价格数据为例。传统 SQL 表设计如下(不用执行):
create table realtime_trad(
sym varchar(10), /* 股票代码 */
t datetime year to fraction(5), /* 时刻 */
p money, /* 价格 */
n integer /* 交易量 */
);
我们需要将sym
提取出来。对剩下的字段,先执行:
create row type realtime_t(
t datetime year to fraction(5), /* 这个精度是固定的 */
p money,
n int
);
后面我们再把sym
和realtime_t
类型的列组合起来,构成时间序列表。
为时间序列(类型)分配空间
选择一个充足的表空间,或者使用onspaces
命令新建一个表空间。这里我使用安装教程 里面创建的datadbs1
。执行:
execute procedure tsContainerCreate('realtime_cont', 'datadbs1', 'realtime_t', 0, 0);
组装时间序列表
create table realtime(sym varchar(10) primary key, data timeseries(realtime_t));
确定起始时刻和最小时间间隔
假设要存储秒级的交易数据。那么“时刻”字段的最小间隔应该为 1 秒。为此,我们必须确保calendarTable
表中存在这一时间间隔。并且假设存储2024 年 9 月 1 日之后的股票数据。
执行select * from calendarTable;
,观察输出中c_calendar
列中,pattern
后面括号里面的模式, 发现没有second
相关的。于是执行:
insert into calendarTable (c_name, c_calendar)
values('ts_1sec', 'startdate(2024-9-1 0:0:0.0), pattern({1 on}, second)');
pattern
中的{1 on}
表示按秒连续计时,中间不间断。此时我们创建了名为ts_1sec
的日历,后面我们会要求数据库按这个日历来组织数据。
准备股票名
在这个页面 依次展开The Dataset
,滚动到Ingest the dataset
,点击按钮下载real_time_stock_data.zip
。
解压后,对其中的tutorial_sample_company.csv
做修改得到 unl 文件(点击下载我整理好的), 注意到 unl 文件与时间序列表的字段对应,类似下面这样:
AAPL|origin(2024-9-1 0:0:0.0), calendar(ts_1sec), container(realtime_cont), threshold(0), irregular, []|
MSFT|origin(2024-9-1 0:0:0.0), calendar(ts_1sec), container(realtime_cont), threshold(0), irregular, []|
AMZN|origin(2024-9-1 0:0:0.0), calendar(ts_1sec), container(realtime_cont), threshold(0), irregular, []|
其中,ts_1sec
就是我们刚创建的日历,而realtime_cont
就是我们给时序数据分配的空间。 irregular
意味着并不是每一秒都有交易,也就是时间序列是不规则的,有些秒是没数据的。
细心的读者会发现,我们明明已经在日历里面指定了起始时刻,这里还要为每只股票在origin
后面的括号里指定起始时刻。 这是因此允许有的股票在更晚的时间,比如 9 月 2 日,才开始记录,从而灵活地节约空间。
将 unl 文件上传到服务器后,继续在 dbaccess 终端中执行:
load from 'realtime.unl' delimiter '|' insert into realtime;
视情况将'realtime.unl'
替换成实际的绝对路径或相对路径。
问题和解决换行符部分。
将时间序列表映射到标准 SQL 表
execute procedure tsCreateVirtualTab('realtime_v', 'realtime',
'origin(2024-9-1 0:0:0.0), calendar(ts_1sec), container(realtime_cont), threshold(0), irregular');
之后我们就可以在标准的 SQL 语句中引用realtime_v
这个映射后的表名了。比如:
select * from realtime_v where sym = 'AAPL';
载入数据
在准备股票名
那步解压后的文件中,将tutorial_sample_tick.csv
整理成像下面这样的格式, 这里命名为realtime_v.unl
。由于文件很大,只展示一部分。
2024-09-11 20:00:05|AMZN|184.52
2024-09-11 20:00:05|CMCSA|38.68
2024-09-11 20:00:05|AMZN|184.52
由于数据提供方Twelve Data没有提供交易量信息, 因此n
这一列暂时没用到。执行:
load from 'realtime_v.unl' insert into realtime_v (t, sym, p);
这里在之前坑的基础上又有坑,参见下面问题和解决长事务部分。
动态插入实时数据
首先参照官网,配置 ODBC 数据源。 假设配置完成后,名称为stock
。接着登录Twelve Data官网, 获取 API 密钥。编写下面的 Python 代码。
"""
依赖包:
twelvedata
websocket-client
pyodbc
"""
from datetime import datetime;
from twelvedata import TDClient;
import pyodbc;
from pyodbc import Connection, Cursor;
STOCKS = ['AAPL', 'MSFT', 'AMZN', 'GOOG'] # 按需要添加或删除自选股
conn : Connection = pyodbc.connect("DSN=stock", autocommit=True); # 注意,这个和 C 语言不一样,C 语言默认是自动提交的,参见微软 ODBC 文档。
def on_event(data:dict):
SQL = "insert into realtime_v values (?, ?, ?, null)"; # TODO TwelveData 没提供交易量数据,暂时填空
if (data["event"] == "price"):
cursor : Cursor = conn.cursor();
cursor.execute(SQL, (data["symbol"], datetime.now(), data["price"]));
cursor.close();
else:
print(data);
return;
td = TDClient("xxx"); # TODO 你的 API 密钥
ws = td.websocket(on_event=on_event);
ws.subscribe(STOCKS);
ws.connect();
while (True):
pass
为生成 K 线和移动平均线做准备
/* K 线类型 */
create row type ohlc_t(
t datetime year to fraction(5),
o money, h money, l money, c money,
n int
);
/* 移动平均线类型 */
create row type price_t(t datetime year to fraction(5), p money);
/* 分配空间 */
execute procedure tsContainerCreate('ohlc_cont', 'datadbs1', 'ohlc_t', 0, 0);
execute procedure tsContainerCreate('price_cont', 'datadbs1', 'price_t', 0, 0);
读取数据的 SQL 示例
实时历史数据查询
select t, p from realtime_v where sym = 'AAPL'
and t between '2024-9-24 0:0:0.0' and '2024-9-25 0:0:0.0';
生成分钟级 K 线。这个使用传统的 SQL 难以做到,因为不同的数据库厂商可能会对子查询做限制。 这里采用 SinoDB 的时序聚合 API,并将结果映射到临时的虚拟表中,便于应用程序读取。
/* 9 月 11 日 16 时至 17 时,苹果的分钟级 K 线。*/
begin;
create table k_1min(sym varchar(10), data timeseries(ohlc_t));
call tsCreateVirtualTab('k_1min_v', 'k_1min',
'origin(2024-09-01 00:00:00.00000), calendar(ts_1min), container(ohlc_cont), threshold(0)');
insert into k_1min
select sym, data
from (
select sym, aggregateBy('first($p), max($p), min($p), last($p), last($n)', 'ts_1min', data, 0, '2024-9-11 16:0:0.0', '2024-9-11 17:0:0.0')::timeseries(ohlc_t) data
from realtime
where sym = 'AAPL'
);
select * from k_1min_v;
-- 应用程序读取数据后
rollback;
生成分钟级移动平均线。这个使用传统的 SQL 也难以做到。
/* 9 月 11 日 16 时至 17 时,苹果的分钟级移动平均线。*/
begin;
create table ma5_1min(sym varchar(10), data timeseries(price_t));
call tsCreateVirtualTab('ma5_1min_v', 'ma5_1min',
'origin(2024-09-01 00:00:00.00000), calendar(ts_1min), container(price_cont), threshold(0)');
insert into ma5_1min
select 'AAPL', apply('tsRunningAvg($p, 5)', data)
from (
select aggregateBy('last($p)', 'ts_1min', data, 0, '2024-9-11 16:0:0.0', '2024-9-11 17:0:0.0')::timeseries(price_t) data
from realtime
where sym = 'AAPL'
);
select * from ma5_1min_v;
-- 应用程序读取数据后
rollback;
上面的两个聚合示例不能直接使用。因为在事务中给realtime
表加了读锁,可能导致 动态插入实时数据的操作获取写锁超时!相信后续能和星瑞格一道想出更合理的方案。 目前规避的措施是,读出原始数据,用pandas
这一 Python 库进行聚合(降采样)。
问题和解决
unl 文件的换行符问题
如果在 Windows 下处理 csv 文件,默认是 CRLF,上传到服务器后,会导致 dbaccess 解析出错。 需要在服务器上执行dos2unix
命令将行尾转为LF。
载入数据时的长事务
股票实时交易数据频率非常高,而 Twelve Data 一般提供一个月的实时数据,因此数据量非常大。如果按照 SinoDB 默认的 超时时间参数执行load
命令,一段时间后事务会被系统判断为超时,自动终止,等了半天,结果操作被撤销。
一种方法就是修改超时时间参数,不过我还没研究过。另一种最粗暴的的方法就是拆分 unl 文件,一次导入一小段。
总结
我初步探索了 SinoDB 时间序列引擎的使用,后续考虑进一步学习时间序列相关知识,用好 SinoDB 这一利器!
推荐关注时序人
公众号。
参考文献和博客
- 面向物联网的时间序列引擎(星瑞格论坛)
- TimescaleDB 股票数据库搭建教程 股票数据下载
- Python WebSocket 连接 TwelveData(TimescaleDB官网)
- SinoDB 时间序列用户手册