首页 > 其他分享 >TDengine中订阅的用途和用法

TDengine中订阅的用途和用法

时间:2023-10-12 10:36:49浏览次数:117  
标签:订阅 TDengine 查询 subscribe 用法 数据 taos

TDengine中订阅的用途和用法 - TDengine | 涛思数据 (taosdata.com)

本文将介绍TDengine Database订阅功能的使用场景、使用方法和一些限制,并与InfluxDB的订阅功能进行简单的对比。本文的预期读者是基于TDengine开发各种应用的软件开发人员。

什么是订阅?

订阅,是一种数据查询方式,其特点为:客户端执行一个查询语句后,可以增量形式,不断收到新到达服务端的、符合查询条件的数据。订阅的实现模型有两种,一种是“推”,即服务器主动将数据发到客户端;另一种是“拉”,即客户端主动向服务器请求数据。两种方式各有优缺点,这里不做详细的对比,只是说明一下,TDengine Database使用的是“拉”模型。

什么时候需要使用订阅?

为了便于用户程序消费TDengine Database中的数据,TDengine实现了基于SQL的数据查询语法,并提供了丰富的聚合函数,这种方式的优势已在多个实际案例中得到了体现。但由于时序数据的特点,单纯的直接数据查询并不能满足用户程序的需求,比如:我们管理着一批温度测量设备,希望当某个设备检测到的温度超过限制(比如80°C)后能得到通知并进行一些处理时,肯定会先为所有的设备建立一张超级表:

  1. create database test;
  2. use test;
  3. create table devices (ts timestamp, temperature float) tags(id int);

并为每个设备创建一张子表:

  1. create table device1 using devices tags(1);
  2. create table device2 using devices tags(2);
  3. ...

这种设计满足了设备管理的需求,但如何满足温度监测的需求呢?如果仅使用普通的查询,有两种方法:一是分别对每张子表进行查询,每次查询后记录最后一条数据的时间戳,后续只查询这个时间戳之后的数据:

  1. select * from device1 where ts > last_timestamp1 and temperature > 80;
  2. select * from device2 where ts > last_timestamp2 and temperature > 80;
  3. ...

这确实可行,但随着设备数量的增加,查询数量也会增加,客户端和服务端的性能都会受到影响,当设备数增长到一定的程度,系统就无法承受了。

另一种方法是对超级表进行查询。这样,无论有多少设备,都只需一次查询:

  1. select * from devices where ts > last_timestamp and temperature > 80;

但是,如何选择 last_timestamp 就成了一个新的问题。因为,一方面数据的产生时间(也就是数据时间戳)和数据入库的时间一般并不相同,有时偏差还很大;另一方面,不同设备的数据到达TDengine的时间也会有差异。所以,如果我们在查询中使用最慢的那台设备的数据的时间戳作为 last_timestamp ,就可能重复读入其它设备的数据;如果使用最快的设备的时间戳,其它设备的数据就可能被漏掉。

TDengine的订阅功能为上面这个问题提供了一个彻底的解决方案。

如何使用TDengine中的订阅功能?

TDengine的API中,与订阅相关的主要有以下三个:

  • taos_subscribe
  • taos_consume
  • taos_unsubscribe

这三个API的具体说明请见《C/C++数据订阅接口》,下面结合一个示例,介绍下其使用方法,完整的示例代码可以在这里找到。

首先是创建订阅:

  1. TAOS_SUB* tsub = NULL;
  2. if (async) {
  3.   // create an asynchronized subscription, the callback function will be called every 1s
  4.   tsub = taos_subscribe(taos, restart, topic, sql, subscribe_callback, &blockFetch, 1000);
  5. } else {
  6.   // create an synchronized subscription, need to call 'taos_consume' manually
  7.   tsub = taos_subscribe(taos, restart, topic, sql, NULL, NULL, 0);
  8. }

TDengine中的订阅既可以是同步的,也可以是异步的,上面的代码会根据从命令行获取的参数async的值来决定使用哪种方式。这里,同步的意思是用户程序要直接调用 taos_consume来拉取数据,而异步则由API在内部的另一个线程中调用taos_consume,然后把拉取到的数据交给回调函数 subscribe_callback去处理。

参数taos是一个已经建立好的数据库连接,在同步模式下无特殊要求。但在异步模式下,需要注意它不会被其它线程使用,否则可能导致不可预计的错误,因为回调函数在API的内部线程中被调用,而TDengine的部分API不是线程安全的。

参数sql是查询语句,可以在其中使用where子句指定过滤条件。回到开头的例子,如果我们只想订阅设备温度超过 80°C 时的数据,可以这样写:

  1. select * from devices where temperature > 80;

注意,这里没有指定起始时间,所以会读到所有时间的数据。如果只想从一天前的数据开始订阅,而不需要更早的历史数据,可以再加上一个时间条件:

  1. select * from devices where ts > now - 1d and temperature > 80;

订阅的topic实际上是它的名字,因为订阅功能是在客户端API中实现的,所以没必要保证它全局唯一,但需要它在一台客户端机器上唯一。

如果名topic的订阅不存在,参数restart没有意义;但如果用户程序创建这个订阅后退出,当它再次启动并重新使用这个topic时,restart就会被用于决定是从头开始读取数据,还是接续上次的位置进行读取。本例中,如果restarttrue(非零值),用户程序肯定会读到所有数据。但如果这个订阅之前就存在了,并且已经读取了一部分数据,且restartfalse(0),用户程序就不会读到之前已经读取的数据了。

taos_subscribe的最后一个参数是以毫秒为单位的轮询周期。在同步模式下,如过前后两次调用taos_consume的时间间隔小于此时间,taos_consume会阻塞,直到间隔超过此时间。异步模式下,这个时间是两次调用回调函数的最小时间间隔。

taos_subscribe的倒数第二个参数用于用户程序向回调函数传递附加参数,订阅API不对其做任何处理,只原样传递给回调函数。此参数在同步模式下无意义。

订阅创建以后,就可以消费其数据了,同步模式下,示例代码是下面的 else 部分:

  1. if (async) {
  2.   getchar();
  3. } else while(1) {
  4.   TAOS_RES* res = taos_consume(tsub);
  5.   if (res == NULL) {
  6.     printf("failed to consume data.");
  7.     break;
  8.   } else {
  9.     print_result(res, blockFetch);
  10.     getchar();
  11.   }
  12. }

这里是一个while循环,用户每按一次回车键就调用一次taos_consume,而taos_consume的返回值是查询到的结果集,与taos_use_result完全相同,例子中使用这个结果集的代码是函数print_result

  1. void print_result(TAOS_RES* res, int blockFetch) {
  2.   TAOS_ROW row = NULL;
  3.   int num_fields = taos_num_fields(res);
  4.   TAOS_FIELD* fields = taos_fetch_fields(res);
  5.   int nRows = 0;
  6.   if (blockFetch) {
  7.     nRows = taos_fetch_block(res, &row);
  8.     for (int i = 0; i < nRows; i++) {
  9.       char temp[256];
  10.       taos_print_row(temp, row + i, fields, num_fields);
  11.       puts(temp);
  12.     }
  13.   } else {
  14.     while ((row = taos_fetch_row(res))) {
  15.       char temp[256];
  16.       taos_print_row(temp, row, fields, num_fields);puts(temp);
  17.       nRows++;
  18.     }
  19.   }
  20.   printf("%d rows consumed.\n", nRows);
  21. }

其中的 taos_print_row 用于处理订阅到数据,在我们的例子中,它会打印出所有符合条件的记录。而异步模式下,消费订阅到的数据则显得更为简单:

  1. void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) {
  2.   print_result(res, *(int*)param);
  3. }

当要结束一次数据订阅时,需要调用taos_unsubscribe:

  1. taos_unsubscribe(tsub, keep);

其第二个参数,用于决定是否在客户端保留订阅的进度信息,如果大家还记得前面说过“订阅功能是在客户端API中实现的”的话,应该可以猜到,如果这个参数是false(0),那无论下次调用taos_subscribe的时的restart参数是什么,订阅都只能重新开始了。另外,进度信息的保存位置是{DataDir}/subscribe/,这个目录下,每个订阅有一个与其topic同名的文件,删掉某个文件,同样会导致下次创建其对应的订阅时只能重新开始。

代码介绍完毕,我们来看一下实际的运行效果。假设:

  • 示例代码已经下载到本地
  • TDengine 也已经在同一台机器上安装好
  • 已经按照本文开头的脚本创建数据库、超级表和一些子表

则可以在示例代码所在目录执行以下命令来编译并启动示例程序:

  1. $ make
  2. $ ./subscribe -sql='select * from devices where temperature > 80;'

示例程序启动后,打开另一个终端窗口,启动 TDengine 的 shell 向 device1 插入一条温度为 90 °C 的数据:

  1. $ taos
  2. > use test;
  3. > insert into device1 values(0, 90);

这时,因为温度超过了 80 °C ,您应该可以看到示例程序将它输出到了屏幕上。您可以继续插入一些数据观察示例程序的输出。

用作消息队列

本文开头的例子,是用订阅实现了一个报警监控的功能,但其实订阅也可以用在其它场景中,比如:消息队列。

应用程序可以订阅数据库某些表的内容,同一个表也可以被多个应用订阅,一旦表有新的记录,应用将立即得到通知。这样,再把数据插入看做Publish操作,用户完全可以把TDengine作为一个消息队列中间件来使用。

所以,当下次面对需要使用Kafka的场景时,不妨先考虑下TDengine,因为TDengine除了安装包超小、运维超简单的优点外,还有一个Kafka不具备的功能——数据过滤:可以在查询语句中指定过滤条件,保证读到的数据都是有用的,不用再在代码中手写过滤逻辑了。

与InfluxDB的对比

概念上说,InfluxDB的订阅和TDengine的订阅区别很大,我们可以认为订阅在InfluxDB中更像一种数据同步机制,而TDengine中的订阅则是一种数据查询机制:

  • InfluxDB将收到的数据实时推送给其它节点,TDengine通过轮询的方式拉取数据,InfluxDB具有更好的实时性。
  • InfluxDB中只能订阅全部数据,TDengine中可以指定数据过滤条件。
  • InfluxDB中只能订阅当前时间之后的数据,TDengine中可以在订阅中读到历史数据。

所以,两相对比,InfluxDB的优势是实时性,而TDengine则以稍微牺牲实时性为代价提供了更强大的功能。

限制条件

下面是一些TDengine订阅功能的局限,大家需要在使用中注意。

  • 订阅的查询语句只能是 select 语句,只能查询原始数据(不支持聚合函数),只能按时间正序查询数据。
  • 在满足应用需求的情况下,请尽量将轮询周期设置的大一些,否则会对系统性能造成影响。
  • 暂不支持乱序数据,用户程序可能读不到使用import方式插入的数据。
  • 如果用户程序异常退出或没有正确调用taos_unsubscribe,进度信息可能会有错误,这时,后续的同名订阅可能读到之前已经读过的数据。

标签:订阅,TDengine,查询,subscribe,用法,数据,taos
From: https://www.cnblogs.com/81/p/17758884.html

相关文章

  • django model 条件过滤 queryset.filter详细用法
    条件选取querySet的时候,filter表示=,exclude表示!=。querySet.distinct()去重复__exact精确等于like'aaa'__iexact精确等于忽略大小写ilike'aaa'__contains包含like'%aaa%'__icontains包含忽略大小写ilike'%aaa%',但是对于sqlite来说,contains的作用效果等同......
  • 软件测试|深入理解SQL RIGHT JOIN:语法、用法及示例解析
    引言在SQL中,JOIN是一种重要的操作,用于将两个或多个表中的数据关联在一起。SQL提供了多种JOIN类型,其中之一是RIGHTJOIN。RIGHTJOIN用于从右表中选择所有记录,并将其与左表中匹配的记录组合在一起。本文将深入探讨SQLRIGHTJOIN的语法、用法以及通过实例解析来说明其作用。RIGH......
  • 软件测试|深入理解SQL FULL JOIN:语法、用法及示例解析
    简介在SQL中,JOIN是一个强大的操作,它允许将两个或多个表中的数据进行关联。SQL提供了多种JOIN类型,其中之一是FULLJOIN。FULLJOIN允许从左表和右表中选择所有记录,并将它们组合在一起。本文将深入探讨SQLFULLJOIN的语法、用法,并通过实例解析来说明其作用。FULLJOIN基本语法......
  • TDengine - Windows
    1、下载安装包,官网下载地址:TDengine发布历史及下载链接|TDengine文档|涛思数据(taosdata.com)2、在创建文件夹创建TDengine文件夹,在TDengine文件夹下创建log文件夹和data文件夹3、双击TDengine-server-3.0.7.1-Windows-x64.exe一直回车即可4、配置数据库变量打开C:\T......
  • 小程序订阅消息(服务通知)实现 wx.requestSubscribeMessage
     第一步:根据官方文来,先在微信公众平台登录小程序后台配置模板,获取模板id:,这块的模版可以在公共模版库里选。也可以新申请,但是需要3-7天才能出审核结果。  第二步,获取下发权限在获取下发权限之前,需要先获取小程序code和订阅消息的模板id给服务端,以便后台人员进行服务端配......
  • logger.add() 方法的所有参数及其用法说明:
    Loguru是一个强大而易于使用的日志记录库,logger.add()方法用于向Logurulogger添加处理程序。下面是logger.add()方法的所有参数及其用法说明:logger.add(sink,*,level=None,format=None,filter=None,colorize=None,backtrace=None,diagnose=None,serialize=False,......
  • 无涯教程-Meteor - 发布和订阅
    正如"Collections集合"一章中已经讨论的那样,无涯教程所有的数据都可以在客户端获得,这是一个安全问题,可以使用发布和订阅方法来处理。删除自动发布在此示例中,无涯教程将使用PlayersCollection集合以及以下数据,在能够专心于本章本身之前,无涯教程已经准备好了此系列。如果不确......
  • Google Guava 库用法整理
    参考:(2,3,4)http://blog.publicobject.com更多用法参考http://ajoo.iteye.com/category/119082以前这么用:Java代码Map<String,Map<Long,List<String>>>map=newHashMap<String,Map<Long,List<String>>>();现在这么用(JDK7将实现该功能......
  • uniCloud-云对象基本用法
    在项目的uniCloud---cloudfunctions文件夹上右键,新建云函数/云对象单选框选择"云对象",命名,创建云对象代码constdb=uniCloud.database()module.exports={ _before:function(){}, //定义云对象异步函数 asyncget(num){ returnawaitdb.collection("user......
  • 【转】SVN branch和tag相关用法
    svn分支(branch)和标签(tag)管理https://blog.csdn.net/caohongxing/article/details/129311157版本控制的一大功能是可以隔离变化在某个开发线上,这个开发线就是分支(branch)。分支通常用于开发新功能,而不会影响主干的开发。也就是说分支上的代码的编译错误、bug不会对主干(trunk......