首页 > 其他分享 >kettle从入门到精通 第五十三课 ETL之kettle MQTT/RabbitMQ consumer实战

kettle从入门到精通 第五十三课 ETL之kettle MQTT/RabbitMQ consumer实战

时间:2024-04-21 16:22:58浏览次数:14  
标签:选项 转换 步骤 kettle 指定 RabbitMQ MQTT consumer

1、上一节课我们学习了MQTT producer 生产者步骤,MQTT consumer消费者步骤。该步骤可以从支持MRQTT协议的中间件获取数据,该步骤和kafka consumer 一样可以处理实时数据交互,如下图所示:

 2、双击步骤打开MQTT consumer 配置窗口,如下图所示:

Step name:自定义步骤名称。

Transformation:设置子转换,该子转换的作用是从中间件读取流数据,然后将字段返回给MQTT consumer步骤进行使用。

Connection:指定此步骤将连接的 MQTT 服务器的地址,如127.0.0.1:1883(注意这里的端口是1883,不是5672)

Client ID:指定 MQTT 客户端的唯一 ID。MQTT 服务器使用此客户端 ID 来识别每个不同的客户端及其当前状态。

Topics name:在主题名称字段中,输入您希望订阅流数据(消息)的 MQTT 主题的名称。其实这里的topic是RabbitMQ中的routing key(另外这里的routing key 一定不要绑定队列,否则MQTT consumer步骤无法接收数据)。

Quality of Service:是消息传递的保证级别。选择以下选项之一。
至多一次(0),这是默认值
至少一次(1)
恰好一次(2)

 3、安全验证设置,如下图所示:

 

Username:MQTT服务器的用户名,如admin

Password:MQTT服务器的密码。

Use secure protocol:选择此选项以定义连接的 SSL 属性,本次不做介绍。

 3、批次设置,使用此选项卡来指定在处理之前要拉取多少消息。您可以指定消息数量和/或特定的时间量。

Duration(ms):
请指定一个以毫秒为单位的时间。此值表示在执行转换之前该步骤将花费多少时间来收集记录。
如果将此选项设置为0,则根据参数Number of records记录数触发消费。要运行转换,持续时间或记录数选项都必须包含一个大于0的值。

Number of records
指定一个数字。在每收集到‘X’条记录之后,指定的转换将被执行,并且这些‘X’条记录将被传递给转换过程。
如果将此选项设置为0,则将参数Duration按持续时间触发消费。为了运行转换,持续时间或记录数选项都必须包含一个大于0的值。

Maximum concurrent batches

指定用于同时收集记录的最大批次数。默认值为1,表示使用单个批次来收集记录。仅当您的消费者步骤无法跟上数据流的速度时,才应使用此选项。
您的计算环境必须具备足够的 CPU 和内存来进行此实现。如果您的环境无法处理指定的最大并发批次数,将会出现错误。


Message prefetch limit
请指定此步骤将排队等待处理的传入消息的限制,即从 kfakfa broker接收到的消息。
设置此值会强制kafka broker处理超过指定限制的消息的背压。默认排队消息的数量是100000条。

 4、字段设置,这里采用默认值就行了,不用调整。

 

5、子转换结果字段设置,选择子转换返回数据的步骤。 

 

 6、同上一节课,本次不再介绍。

 

 7、子转换配置,将Get records from stream步骤拉到画布,然后填写Message、Topic两个字段,类型都是设置为String即可。

 

标签:选项,转换,步骤,kettle,指定,RabbitMQ,MQTT,consumer
From: https://www.cnblogs.com/zjBoy/p/18144200

相关文章

  • kettle从入门到精通 第五十三课 ETL之kettle MQTT/RabbitMQ producer 实战
    1、MQTT介绍MQTT(MessageQueuingTelemetryTransport)是一种轻量级的消息传输协议,设计用于连接低带宽、高延迟或不可靠网络的设备。MQTT是基于发布/订阅模式(Publish/Subscribe)的协议,其中设备可以发布消息到一个主题(Topic),其他设备可以订阅这个主题以接收相关消息。这种模式......
  • MQTT协议
    一、MQTT协议简介MQTT(MessageQueuingTelemetryTransport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。MQTT协议是为工作在低带宽、不可靠的网络的远程传感器和控制设备通讯而设计的协议。二......
  • STM32、ESP8266与MQTT连接阿里云物联网的串口通信异常解析
    STM32、ESP8266与MQTT协议连接阿里云物联网平台时常见的串口通信异常介绍在构建物联网应用时,STM32、ESP8266与MQTT协议的结合是实现设备与网络间稳定通信的关键。然而,在连接阿里云物联网平台的过程中,串口通信异常成为了一个常见的挑战。本文将探讨这些异常现象及其可能的原因,并给......
  • 用海豚调度器定时调度从Kafka到HDFS的kettle任务脚本
    在实际项目中,从Kafka到HDFS的数据是每天自动生成一个文件,按日期区分。而且Kafka在不断生产数据,因此看看kettle是不是需要时刻运行?能不能按照每日自动生成数据文件?为了测试实际项目中的海豚定时调度从Kafka到HDFS的Kettle任务情况,特地提前跑一下海豚定时调度这个任务,看看到底什么......
  • RabbitMQ应急运维文档
    RabbitMQ集群工厂方向架构负载均衡Nginxversion:nginx/1.22.0管控界面:https://rabbitmqlb1.mfg.tp-link.com开放端口:8443-8447集群1:UI端口8443,数据流端口5672,后端节点rabbitmq[1-3]-mfg集群2:UI端口8444,数据流端口5673,后端节点rabbitmq[4-6]-mfg集群3:UI端口8445,数据流端......
  • rabbitmq相关
    工作队列模型workQueue,多个消费者绑定到一个队列,当队列堆积消息时,可使用work模型。多个消费者绑定一个队列,同一条消息只会被一个消费者处理通过设置prefetch来控制消费者预取的消息数量spring:rabbitmq:listener:simple:prefetch:1#每次只能获取......
  • [转]Kafka与RabbitMQ区别
    Kafka和RabbitMQ都是流行的消息传递系统,但它们在设计和用途上有一些重要的区别。以下是它们之间的一些主要区别:1.消息传递模型:Kafka:Kafka是一个分布式流处理平台,主要用于处理实时数据流。它采用发布-订阅模型,消息被持久化保存在日志中,允许多个消费者以不同的速率消费消息。......
  • RabbitMQ重试机制
    RabbitMQ重试机制+死信队列RabbitMQ重试机制RabbitMQ的消息重试机制,就是消息消费失败后进行重试,重试机制的触发条件是消费者显式的抛出异常,这个很类似@Transactional,如果没有显式地抛出异常或者trycatch起来没有手动回滚,事务是不会回滚的。if("ACK重试机制".equals(messageBod......
  • 消息中间件RabbitMQ_RabbitMQ集群搭建8
    一、集群搭建概述摘要:实际生产应用中都会采用消息队列的集群方案,如果选择RabbitMQ那么有必要了解下它的集群方案原理一般来说,如果只是为了学习RabbitMQ或者验证业务工程的正确性那么在本地环境或者测试环境上使用其单实例部署就可以了,但是出于MQ中间件本身的可靠性、并发性......
  • 消息中间件RabbitMQ_RabbitMQ应用问题7
    一、RabbitMQ应用问题1、消息可靠性保障消息补偿机制2、消息幂等性保障乐观锁解决方案 二、消息可靠性保障需求: 100%确保消息发送成功 消息补偿:三、消息幂等性保障幂等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果。也就是说,其......