首页 > 其他分享 >EMQX+HStreamDB 实现物联网流数据高效持久化

EMQX+HStreamDB 实现物联网流数据高效持久化

时间:2023-03-17 18:32:39浏览次数:52  
标签:持久 stream -- HStreamDB EMQX 数据 hstream

EMQX+HStreamDB 实现物联网流数据高效持久化_规则引擎

在 IoT 场景中,通常面临设备数量庞大、数据产生速率高、累积数据量巨大等挑战。因此,如何接入、存储和处理这些海量设备数据就成为了一个关键的问题。

EMQX 作为一款强大的​​物联网 MQTT 消息服务器​​,单个集群可处理上亿设备连接,同时提供了丰富的数据集成功能。​​HStreamDB​​ 作为一款分布式流数据库,不仅可以高效存储来自 EMQX 的海量设备数据,而且提供实时处理分析能力。EMQX 与 HStreamDB 都具备高可扩展性和可靠性,两者结合不仅能够满足大规模 IoT 应用的性能和稳定性需求,同时能够提升应用的实时性。

EMQX+HStreamDB 实现物联网流数据高效持久化_IoT_02

近期 ​​EMQX Enterprise 4.4.15​​ 发布,更新了对 HStreamDB 最新版本的支持,本文将具体介绍如何通过 EMQX 规则引擎将数据持久化到 HStreamDB,实现 MQTT 数据流的存储与实时处理。

:本文介绍的集成步骤基于 EMQX 4.4.15 和 HStreamDB 0.14.0 以上版本。

连接到 HStreamDB 集群

在下面的教程中,我们假设有一个正在运行的 EMQX Enterprise 集群和正在运行的 HStreamDB 集群。如需部署 EMQX Enterprise 集群,请参考 ​​EMQX Enterprise docs​​。如需部署 HStreamDB 集群,请参考 ​​HStreamDB docs​​,其中包含关于如何用 Docker 快速部署的说明。

我们可以通过 Docker 来部署 HStreamDB 客户端并连接到 HStreamDB 集群:

# 获取帮助信息
docker run -it --rm --name some-hstream-cli --network host hstreamdb/hstream:v0.14.0 hstream --help

我们在此使用 ​​hstream stream​​ 命令创建一个 stream,供接下来的示例使用:

# 使用 hstream stream 命令创建 streams
docker run -it --rm --name some-hstream-cli --network host hstreamdb/hstream:v0.14.0 hstream stream create basic_condition_info_0 -r 3 -b $(( 7 * 24 * 60 * 60 ))

接下来,连接到 HStreamDB 集群,启动交互式 HStream SQL shell:

docker run -it --rm --name some-hstream-cli --network host hstreamdb/hstream:v0.14.0 hstream sql --service-url "<<YOUR-SERVICE-URL>>"
# 如果要使用安全连接,还需要填写 --tls-ca, --tls-key, --tls-cert 参数

如果连接成功,将会出现

__  _________________  _________    __  ___
/ / / / ___/_ __/ __ \/ ____/ | / |/ /
/ /_/ /\__ \ / / / /_/ / __/ / /| | / /|_/ /
/ __ /___/ // / / _, _/ /___/ ___ |/ / / /
/_/ /_//____//_/ /_/ |_/_____/_/ |_/_/ /_/

Command
:h To show these help info
:q To exit command line interface
:help [sql_operation] To show full usage of sql statement

SQL STATEMENTS:
To create a simplest stream:
CREATE STREAM stream_name;
To create a query select all fields from a stream:
SELECT * FROM stream_name EMIT CHANGES;
To insert values to a stream:
INSERT INTO stream_name (field1, field2) VALUES (1, 2);

可以使用 ​​show streams;​​ 来查看已经创建的 streams 的信息:

> show streams;
+-------------------------------------------+---------+----------------+-------------+
| Stream Name | Replica | Retention Time | Shard Count |
+-------------------------------------------+---------+----------------+-------------+
| basic_condition_info_0 | 3 | 604800 seconds | 1 |
+-------------------------------------------+---------+----------------+-------------+

创建 HStreamDB 资源

在利用 EMQX 规则引擎将数据持久化到 HStreamDB 之前,需要创建一个 HStreamDB 资源。

为此,请访问 EMQX Dashboard,单击 ​​规则引擎​​ -> ​​资源​​ → ​​创建​​ ,选择 ​​HStreamDB 资源​​,输入 HStreamDB 地址并填写必要的选项。可用选项如下表:

EMQX+HStreamDB 实现物联网流数据高效持久化_SQL_03

在选择开启 SSL 时,会出现额外的 SSL 配置界面,可以粘贴所需配置内容或上传文件。

EMQX+HStreamDB 实现物联网流数据高效持久化_MQTT_04

EMQX+HStreamDB 实现物联网流数据高效持久化_IoT_05

创建数据持久化到 HStreamDB 的规则

点击 ​​规则引擎​​ -> ​​规则​​ -> ​​创建​​。

EMQX+HStreamDB 实现物联网流数据高效持久化_MQTT_06

编辑 SQL 规则并添加操作,您可以在字符串模板中使用 SQL 变量。

请注意,本文档中介绍的 SQL 规则仅供演示,实际的 SQL 应根据业务设计进行编写。

单击 ​​添加操作​​,选择「数据持久化」以将数据保存到 HStreamDB 中。选择上一步创建的资源并输入参数。可用参数如下表:

EMQX+HStreamDB 实现物联网流数据高效持久化_MQTT_07

EMQX+HStreamDB 实现物联网流数据高效持久化_MQTT_08

点击 ​​确定​​ 来确认添加行为。

EMQX+HStreamDB 实现物联网流数据高效持久化_IoT_09

在 HStream SQL Shell 中获取实时的数据更新

从 EMQX 规则引擎持久化到 HStreamDB 的数据可以使用 HStream SQL Shell 实时读出新写入 stream 的内容。现在,数据已经被写入 HStreamDB,可以使用任何消费方式来消费消息。文档使用了一个简单的消费方法:使用 HStream SQL shell 进行查询。此外,读者可以自由选择使用自己​​喜欢的编程语言 SDK​​ 编写消费端。

# docker run -it --rm --name some-hstream-cli --network host hstreamdb/hstream:v0.14.0 hstream sql
> select * from basic_condition_info_0 emit changes;

当前的 ​​select​​ 查询没有结果可供打印出,这是因为还没有数据通过 EMQX 的规则引擎向 HStreamDB 写入。一旦有数据写入,便可以在 HStream SQL shell 观察到数据的即时更新。目前在 HStreamDB 使用 SQL 对 streams 做查询,只会打印出创建查询后的结果。如果在 EMQX 停止向 HStreamDB 写入后创建查询,可能观察不到产生的结果。

向 EMQX 写入消息测试规则引擎

可以使用跨平台的桌面客户端 ​​MQTT X​​ 来连接到 EMQX 并发送消息:

EMQX+HStreamDB 实现物联网流数据高效持久化_物联网_10

从 EMQX Dashboard 获取规则引擎的运行数据指标

访问对应的规则引擎界面:

EMQX+HStreamDB 实现物联网流数据高效持久化_SQL_11

如果规则引擎运行数据指标正常,则代表 EMQX 会将数据持久化到 HStreamDB。一旦写入成功,便可以在前面步骤启动的 HStream SQL Shell 中看到实时的数据更新。

# docker run -it --rm --name some-hstream-cli --network host hstreamdb/hstream:v0.14.0 hstream sql
> select * from basic_condition_info_0 emit changes;
{"current-number-of-people":247.0,"device-health":true,"number-of-people-in-line":14.0,"submitter":"admin-07","temperature":27.0}
{"current-number-of-people":220.0,"device-health":true,"number-of-people-in-line":13.0,"submitter":"admin-07","temperature":27.2}
{"current-number-of-people":135.0,"device-health":true,"number-of-people-in-line":2.0,"submitter":"admin-01","temperature":26.9}
{"current-number-of-people":137.0,"device-health":true,"number-of-people-in-line":0.0,"submitter":"admin-01","temperature":26.9}

结语

至此,我们就完成了通过 EMQX 规则引擎将数据持久化到 HStreamDB 的主要流程。

将 EMQX 采集到的数据存储到 HStreamDB 后,可以对这些数据进行实时处理与分析,为上层 AI、大数据等应用提供支撑,进一步发掘和利用数据价值。作为首个专为流数据设计的云原生流数据库,HStreamDB 与 EMQX 结合可以实现一站式存储和实时处理海量物联网数据,精简物联网应用数据栈,加速企业的物联网应用开发。

版权声明: 本文为 EMQ 原创,转载请注明出处。

原文链接:​​https://www.emqx.com/zh/blog/integration-practice-of-emqx-and-hstreamdb​

标签:持久,stream,--,HStreamDB,EMQX,数据,hstream
From: https://blog.51cto.com/u_15204296/6127173

相关文章

  • RabbitMQ - 生产者消息确认、消息持久化、消费者消息确认、消费失败重试
    1.生产者消息确认RabbitMQ提供了publisherconfirm机制来避免消息发送到MQ过程中丢失。这种机制必须给每个消息指定一个唯一ID。消息发送到MQ以后,会返回一个结果给发送者,......
  • K8s之MySQL实现数据持久化
    转载自:https://blog.51cto.com/u_14154700/2450932=============== 这个是一个只写配置及验证的博文。。。。。。博文大纲:1、搭建nfs存储2、创建PV3、创建PVC4、......
  • redis持久化以及高可用
    1.发布订阅模式例子:比如说你有一个收音机,你收听了xxxx频道,那么只要你你打开这个频道,你就能听到这个频道的所有内容你的收音机----接收方(订阅方)频道的内容发送方---内......
  • IdentityServer4: 配置项持久化
    目录IdentityServer配置项持久化创建IdentityServer项目添加依赖包添加QuickstartUI数据库迁移ConfigurationDbContextPersistedGrantDbContext生成初始化数据严重BU......
  • 可持久化数据结构
    现阶段常用的可持久化数据结构大概有以下三类:可持久化线段树、并查集、Trie树。因此本文将围绕这三个大类来讲。1.可持久化线段树/可持久化数组可持久化线段树本来有一......
  • 持久化技术Mybatis知识精讲【形成知识体系之路】
    环境要求JDK1.8及以上版本MySQL数据库ApacheMaven3.6.1构建工具IDEA/VSCode/Eclipse开发工具任选其一思维导图:XmindZEN技术要求熟悉Java语言熟悉数......
  • 7、Redis持久化存储的两种方式
    1.Redis持久化存储的两种方式RDB方式RDB存储是Redis实现的一种存储机制(默认开启)AOF方式AOF存储方式,直接把操作的命令记录下来,保存到一个文件里,类似mysql的......
  • 04-Redis系列之-持久化(RDB,AOF)
    持久化的作用什么是持久化redis的所有数据保存在内存中,对数据的更新将异步的保存到硬盘上持久化的实现方式快照:某时某刻数据的一个完整备份(mysql的Dump,redis的RDB)......
  • Sentinel安装&配置使用&规则持久化
    目录Sentinel是什么安装部署1.下载2.运行3.访问项目连接Sentinel新建一个项目POMYML启动类Controller启动项目Sentinel规则配置流控规则a.直接-快速失败b.关联-快速失败c.......
  • TiDB Operator恢复持久卷上的备份文件
    上篇文章介绍了通过BR对tidb数据库备份到NFS共享存储上,本文将结束将NFS共享存储上的备份数据恢复到K8S集群环境上。本文介绍的恢复方法基于TiDBOperator的CustomResourc......