首页 > 其他分享 >kettle从入门到精通 第五十三课 ETL之kettle MQTT/RabbitMQ producer 实战

kettle从入门到精通 第五十三课 ETL之kettle MQTT/RabbitMQ producer 实战

时间:2024-04-21 16:22:38浏览次数:23  
标签:producer 步骤 kettle 指定 RabbitMQ MQTT 服务器 客户端

1、MQTT介绍

MQTT (Message Queuing Telemetry Transport) 是一种轻量级的消息传输协议,设计用于连接低带宽、高延迟或不可靠网络的设备。

MQTT 是基于发布/订阅模式(Publish/Subscribe)的协议,其中设备可以发布消息到一个主题(Topic),其他设备可以订阅这个主题以接收相关消息。这种模式使得设备之间可以进行灵活的通信,而不需要直接连接到彼此。

常见的支持支持MQTT的中间件有RabbitMQ和ActiveMQ。

2、今天我们基于RabbitMQ来进行学习下MQTT 生产者,如下图所示:

3、通过步骤【生成记录】生成10条记录,记录中有一个message字段,类型为String字符串,message的值为我是java小金刚,如下图所示:

 

4、通过步骤【MQTT producer】将message值推送到RabbitMQ中供其他应用消费。

Step name:自定义步骤名称

Connection:指定此步骤将连接的 MQTT 服务器的地址,如127.0.0.1:1883(注意这里的端口是1883,不是5672)

Client ID:指定 MQTT 客户端的唯一 ID。MQTT 服务器使用此客户端 ID 来识别每个不同的客户端及其当前状态。

Specify topic:选择“指定topic”以输入特定的主题名称,静态指定。

Get data from field:动态指定topic。

Topic name:在主题名称字段中,输入您希望发布流数据(消息)的 MQTT 主题的名称。每个 MQTT 生产者步骤将启动一个单独的线程进行发布。

Quality of Service:是消息传递的保证级别。选择以下选项之一。
至多一次(0),这是默认值
至少一次(1)
恰好一次(2)

Message field:设置消息字段,来源于前置步骤,下拉选择需要的字段。

 

Username:MQTT服务器的用户名,如admin

Password:MQTT服务器的秘密。

Use secure protocol:选择此选项以定义连接的 SSL 属性,本次不做介绍。

Keep Alive Interval:指定在 PDI 客户端完成传输一个控制数据包和开始传输下一个数据包之间允许经过的最大间隔秒数。

Max Inflight:指定在任何给定时间点处理中的最大消息数量。

Connection Timeout:指定在未收到消息时断开连接的时间,以秒为单位。

Clean Session:指定broker是否会存储或清除会话的消息。请选择以下之一。

True
当设置为 True 时,经纪人不会存储客户端的任何信息。所有来自先前持久会话的信息都将被清除。
False
当设置为 False 时,经纪人将存储客户端的所有订阅。当 QoS(服务质量)参数设置为 1 或 2 时,所有未接收的消息将被存储。

Storage Level:消息是存储在内存中还是在磁盘上的。

默认(留空)是内存。
对于磁盘,请输入有效路径。

Server URIs:指定 MQTT 服务器的统一资源标识符(URI),如mqtt://example.com:1883

MQTT Version:请指定此步骤连接到的 MQTT 协议版本,如MQTT 3.1.1

Automatic Reconnect:客户端在与服务器断开连接时尝试自动重新连接。请选择 True 或 False:

True
是,尝试重新连接到服务器。
False
否,不尝试重新连接到服务器。

 5、RabbitMQ MQTT协议配置,

1)需要先安装然RabbitMQ。

2)启用MQTT插件,通过命令rabbitmq-plugins enable rabbitmq_mqtt进行启动。

3)开启成功后,查看管理控制台,我们可以发现MQTT服务运行在1883端口上了。

 6、spoon中运行转换文件,此时发现RabbitMQ中已经成功写入数据,如下图所示:

 

 

标签:producer,步骤,kettle,指定,RabbitMQ,MQTT,服务器,客户端
From: https://www.cnblogs.com/zjBoy/p/18141503

相关文章

  • 用海豚调度器定时调度从Kafka到HDFS的kettle任务脚本
    在实际项目中,从Kafka到HDFS的数据是每天自动生成一个文件,按日期区分。而且Kafka在不断生产数据,因此看看kettle是不是需要时刻运行?能不能按照每日自动生成数据文件?为了测试实际项目中的海豚定时调度从Kafka到HDFS的Kettle任务情况,特地提前跑一下海豚定时调度这个任务,看看到底什么......
  • RabbitMQ应急运维文档
    RabbitMQ集群工厂方向架构负载均衡Nginxversion:nginx/1.22.0管控界面:https://rabbitmqlb1.mfg.tp-link.com开放端口:8443-8447集群1:UI端口8443,数据流端口5672,后端节点rabbitmq[1-3]-mfg集群2:UI端口8444,数据流端口5673,后端节点rabbitmq[4-6]-mfg集群3:UI端口8445,数据流端......
  • rabbitmq相关
    工作队列模型workQueue,多个消费者绑定到一个队列,当队列堆积消息时,可使用work模型。多个消费者绑定一个队列,同一条消息只会被一个消费者处理通过设置prefetch来控制消费者预取的消息数量spring:rabbitmq:listener:simple:prefetch:1#每次只能获取......
  • [转]Kafka与RabbitMQ区别
    Kafka和RabbitMQ都是流行的消息传递系统,但它们在设计和用途上有一些重要的区别。以下是它们之间的一些主要区别:1.消息传递模型:Kafka:Kafka是一个分布式流处理平台,主要用于处理实时数据流。它采用发布-订阅模型,消息被持久化保存在日志中,允许多个消费者以不同的速率消费消息。......
  • RabbitMQ重试机制
    RabbitMQ重试机制+死信队列RabbitMQ重试机制RabbitMQ的消息重试机制,就是消息消费失败后进行重试,重试机制的触发条件是消费者显式的抛出异常,这个很类似@Transactional,如果没有显式地抛出异常或者trycatch起来没有手动回滚,事务是不会回滚的。if("ACK重试机制".equals(messageBod......
  • 消息中间件RabbitMQ_RabbitMQ集群搭建8
    一、集群搭建概述摘要:实际生产应用中都会采用消息队列的集群方案,如果选择RabbitMQ那么有必要了解下它的集群方案原理一般来说,如果只是为了学习RabbitMQ或者验证业务工程的正确性那么在本地环境或者测试环境上使用其单实例部署就可以了,但是出于MQ中间件本身的可靠性、并发性......
  • 消息中间件RabbitMQ_RabbitMQ应用问题7
    一、RabbitMQ应用问题1、消息可靠性保障消息补偿机制2、消息幂等性保障乐观锁解决方案 二、消息可靠性保障需求: 100%确保消息发送成功 消息补偿:三、消息幂等性保障幂等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果。也就是说,其......
  • 消息中间件RabbitMQ_RabbitMQ高级特性6
    一、RabbitMQ高级特性消息可靠性投递ConsumerACK消费端限流TTL死信队列延迟队列日志与监控消息可靠性分析与追踪管理二、消息的可靠投递1、模式在使用RabbitMQ的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ为我们提供了两种方式......
  • 用海豚调度器定时调度从Kafka到HDFS的kettle任务脚本
    在实际项目中,从Kafka到HDFS的数据是每天自动生成一个文件,按日期区分。而且Kafka在不断生产数据,因此看看kettle是不是需要时刻运行?能不能按照每日自动生成数据文件?为了测试实际项目中的海豚定时调度从Kafka到HDFS的Kettle任务情况,特地提前跑一下海豚定时调度这个任务,看看到底什么......
  • docker安装rabbitmq
    //查找镜像dockersearchrabbitmq//默认拉取官方最新版本dockerpullrabbitmq//创建容器,也可直接执行该命令,没有镜像会去先拉取镜像dockerrun-d--namemyrabbitmq-p5672:5672-p15672:15672rabbitmq//进入镜像dockerexec-it容器id/bin/bash//安装UI插件rabb......