首页 > 其他分享 >TS MQTT封装

TS MQTT封装

时间:2023-12-29 09:11:58浏览次数:28  
标签:... 封装 string TS topic MQTT client && ._

TS MQTT封装

导入相关包

npm i mqtt
npm i lodash

  • guid 随机生成就行,具体可以参考百度或者随便生成一个随机数*

代码封装

import mqtt from 'mqtt'
import type { MqttClient, OnMessageCallback, IClientOptions, IClientPublishOptions, IPublishPacket } from 'mqtt'
import { getGuid } from '@/common/basic'
import { without, uniq } from 'lodash'

export type TPublishFormat = {
  topic: string
  payload: string | Buffer
  opts?: IClientPublishOptions
}
export type TMessageCallback<T> = (topic: string, payload: T) => void

export interface IMqClientOptions extends IClientOptions {
  connectCb?: () => void
  errorCb?: (e: Event) => void
  reconnectCb?: () => void
}

export default class MQTT {
  private _type: string
  private _url: string
  private _opt: IMqClientOptions // mqtt配置
  public client!: MqttClient
  public topicArr: Array<string> = []

  constructor(url: string, opt?: IMqClientOptions, type: string = 'Web') {
    this._type = type
    this._url = url
    this._opt = {
      clean: true,
      clientId: this._type + '_' + getGuid(), // 客户端分类唯一
      connectTimeout: 3000, // 超时时间
      reconnectPeriod: 1000, //重连超时
      ...(opt && opt),
    }
    this._init()
  }

  private _init() {
    this.destroy()
    this.client = mqtt.connect(this._url, this._opt)
    this.client.on('connect', () => {
      this._opt.connectCb && this._opt.connectCb()
      console.log(this._url + '连接成功...')
    })
    this.client.on('error', (error: any) => {
      this._opt.errorCb && this._opt.errorCb(error)
      console.log(this._url + '异常中断...')
    })
    this.client.on('reconnect', () => {
      this._opt.reconnectCb && this._opt.reconnectCb()
      console.log(this._url + '重新连接...')
    })
  }

  /**
   * 函数“unSubscribe”是一个 TypeScript 函数,用于取消订阅一个或多个主题,并返回一个 Promise,该 Promise 解析为一个布尔值,指示取消订阅是否成功。
   * @param {string | string[]} topic - topic 参数可以是字符串或字符串数组。它代表客户端想要取消订阅的主题。
   * @returns 正在返回 Promise。
   */
  public unSubscribe(topic: string | string[]) {
    return new Promise((resolve: (isOk: boolean) => void) => {
      this.client &&
        !this.client.disconnected &&
        this.client
          .unsubscribeAsync(topic)
          .then((result) => {
            if (typeof topic === 'string') {
              topic = [topic]
            }
            //去重
            this.topicArr = without(this.topicArr, ...topic)
            console.log(topic, this.topicArr, '取消订阅成功...')
            resolve(true)
          })
          .catch((err) => {
            console.log(topic, '取消订阅失败...')
            resolve(false)
          })
    })
  }

  /**
   * 函数“onSubscribe”是一个 TypeScript 函数,它订阅一个或多个主题并返回一个 Promise,该 Promise 解析为一个布尔值,指示订阅是否成功。
   * @param {string | string[]} topic - topic 参数可以是字符串或字符串数组。它代表您要订阅的主题。
   * @returns 一个 Promise,解析为布尔值,指示订阅是否成功。
   */
  public onSubscribe(topic: string | string[]) {
    if (typeof topic === 'string') {
      topic = [topic]
    }
    const topicOk: Array<string> = without(topic, ...this.topicArr)
    return new Promise((resolve: (isOk: boolean) => void) => {
      this.client &&
        !this.client.disconnected &&
        topicOk.length > 0 &&
        this.client
          .subscribeAsync(topic)
          .then((result) => {
            this.topicArr.push(...topicOk)
            this.topicArr = uniq(this.topicArr)
            console.log(topicOk, this.topicArr, '订阅成功...')
            resolve(true)
          })
          .catch((err) => {
            console.log(topicOk, '订阅失败...')
            resolve(false)
          })
    })
  }

  /**
   * 函数“onPublish”使用客户端向主题发布消息,并返回一个解析为布尔值的 Promise,指示发布是否成功。
   * @param {TPublishFormat} format - format参数的类型为TPublishFormat,它是一个包含两个属性的对象:topic和message。 topic
   * 属性表示消息将发布到的主题,message 属性表示将发布的实际消息。
   * @returns 正在返回 Promise。
   */
  public onPublish(format: TPublishFormat) {
    return new Promise((resolve: (isOk: boolean) => void) => {
      this.client &&
        !this.client.disconnected &&
        this.client
          .publishAsync(format.topic, format.payload, format.opts)
          .then((result) => {
            console.log('发布消息成功...')
            resolve(true)
          })
          .catch((err) => {
            console.log('发布消息失败...')
            resolve(false)
          })
    })
  }

  //收到的消息
  public onMessage<T = any>(callback: TMessageCallback<T>) {
    this.client &&
      !this.client.disconnected &&
      this.client.on('message', (topic: string, payload: Buffer) => {
        try {
          callback && callback(topic, JSON.parse(payload.toString()))
        } catch (err) {
          console.log('无法执行JSON.parse...')
          callback && callback(topic, payload.toString() as T)
        }
      })
  }

  //销毁
  public destroy() {
    console.log('销毁...')
    this.client && this.client.end()
    this.topicArr = []
  }
}

使用

//通过开源公共服务器测试,切换成自家服务器就行了
const mqtt = new MQTT('mqtt://broker.emqx.io:8083/mqtt', { username: 'emqx_test', password: 'emqx_test' })
mqtt.onSubscribe('/test/ss')
mqtt.onMessage((topic, message) => {
  console.log(topic, message)
})

setTimeout(() => {
  mqtt.onPublish({ topic: '/test/ss', payload: '测试1111' })
}, 3000);

标签:...,封装,string,TS,topic,MQTT,client,&&,._
From: https://www.cnblogs.com/ybxj/p/17933998.html

相关文章

  • 海康萤石C6C摄像头RTSP连接方式
    海康萤石C6C摄像头RTSP连接方式1.概述通过RTSP获取海康萤石C6C摄像头的码流。测试型号为:萤石C6C2K+星光增强版400万极清2.开启RTSP连接萤石摄像头默认是没有激活RTSP连接的,需要手动开启,开启步骤如下:1.打开萤石官方的App-"萤石云视频",在底部的选项卡中点击“我的......
  • vue3+ts打开echarts的正确方式
    实例项目使用vite5+vue3+ts,项目地址vite-vue3-charts,预览地址https://weizwz.com/vite-vue3-charts准备工作1.注册为百度地图开发者官网地址,然后在应用管理->我的应用里,创建应用,创建好后复制AK2.在根目录的index.html里引入百度地图<head><metacharse......
  • 包&封装&继承总结
    总结包package概念概念:简单的理解包就是一个文件夹。包作用①方便管理项目中的类等文件。②可以避免类名冲突的问题。使用包定义包包命名规范:一般是公司域名反写.项目名.模块名字.子模块名;要求:包名是全英文小写。packagecn.itsource.packagedemo;//声明包导入包......
  • flink中的setStreamTimeCharacteristic 指定为EventTime的source需要自己定义event ti
    flink中的setStreamTimeCharacteristicTimeCharacteristic   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 此处可以取以下三类值:EventTime事件时间,事件(Event)本身的时间,即数据流中事件实际发生的时间,通常使用事件发生时的时间戳来描述,这些......
  • PC9094超小体积封装可编程过流过压保护IC
    概述:PC9094过电压和过电流保护该器件具有低80mΩ(TYP)导通电阻集成MOSFET,主动保护低电压系统的电压供应故障高达+29V直流电。输入电压超过过电压阈值将导致内部MOSFET关闭,防止损坏下游的过大电压设备。过电压保护阈值默认为6V。2.3V/3.6V/11V/16V/23V还有其他版本OVP和无OVP。PC9094......
  • 通达OA 任意文件上传+文件包含 getshell
    漏洞影响版本通达OAV11版<=11.320200103通达OA2017版<=10.1920190522通达OA2016版<=9.1320170710通达OA2015版<=8.1520160722通达OA2013增强版<=7.2520141211通达OA2013版<=6.2020141017漏洞分析根据网上的文章可以知道任意文件上传的漏洞点在is......
  • PTS 3.0:可观测加持的下一代性能测试服务
    作者:肖长军(穹谷)大家好,我是来自阿里云云原生应用平台的肖长军,花名穹谷,我此次分享的主题是《可观测加持的下一代性能测试服务》。提到性能测试大家并不陌生,性能测试已成为评估系统能力、识别系统弱点、进行系统调优,验证系统稳定性等的重要手段。我们一般进行性能测试的大概流程就是构......
  • HighCharts 地图画航线,以及在城市点画圈
    需求:生成一个世界地图,在城市点处画一个响应式的圈,及在城市点间画一条指示性的航线分析:生成一幅世界地图需导入相关地图js文件与获取json文件,在城市点画一个响应式的圈和一条指示性的航线,需要生成序列,并指定类型,航线类型(flowmap),响应式的圆圈用render进行画圆圈,具体请看下图解决:源......
  • 使用hook封装表格常用功能(react)
    实现内容配置分页:usePagination生成过滤项:useFilter获取表格选择配置:useSelect生成批量删除按钮:useDelete生成模态框:useModal示例render部分:<React.Fragment><Formlayout="inline">{DeleteEle}{FilterEles}</Form><Table{...{......
  • TSINGSEE青犀智能分析网关V4在智慧小区场景中的应用
    一、方案背景随着物联网、AI、大数据、5G、边缘计算、移动互联网等新兴技术的不断成熟和应用,社区作为汇聚科技社会人、房、车三大物联网时代最核心的要素,其价值将不言而喻。建设智慧小区需要充分发挥信息技术在社区管理中的作用,提高居民生活的便利性和安全性,例如建设和利用视频监......