首页 > 编程语言 >使用java 实现mqtt两种方式

使用java 实现mqtt两种方式

时间:2022-12-14 18:31:21浏览次数:49  
标签:两种 java String System param topic mqtt 消息 public


前言

在开发MQTT时有两种方式一种是使用Paho Java 原生库来完成,一种是使用spring boot 来完成。

Paho Java 库实现

Eclipse Paho Java Client (opens new window)是用 Java 编写的 MQTT 客户端库(MQTT Java Client),可用于 JVM 或其他 Java 兼容平台(例如Android)。
Eclipse Paho Java Client 提供了MqttAsyncClient 和 MqttClient 异步和同步 API

  • 通过 Maven 安装 Paho Java
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
  • Paho Java 使用示例
    Java 体系中 Paho Java 是比较稳定、广泛应用的 MQTT 客户端库,本示例包含 Java 语言的 Paho Java 连接 EMQX Broker,并进行消息收发完整代码:
package io.emqx;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;


public class App {
public static void main(String[] args) {
String subTopic = "testtopic/#";
String pubTopic = "testtopic/1";
String content = "Hello World";
int qos = 2;
String broker = "tcp://broker.emqx.io:1883";
String clientId = "emqx_test";
MemoryPersistence persistence = new MemoryPersistence();

try {
MqttClient client = new MqttClient(broker, clientId, persistence);

// MQTT 连接选项
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName("emqx_test");
connOpts.setPassword("emqx_test_password".toCharArray());
// 保留会话
connOpts.setCleanSession(true);

// 设置回调
client.setCallback(new PushCallback());

// 建立连接
System.out.println("Connecting to broker: " + broker);
client.connect(connOpts);

System.out.println("Connected");
System.out.println("Publishing message: " + content);

// 订阅
client.subscribe(subTopic);

// 消息发布所需参数
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
client.publish(pubTopic, message);
System.out.println("Message published");

client.disconnect();
System.out.println("Disconnected");
client.close();
System.exit(0);
} catch (MqttException me) {
System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("loc " + me.getLocalizedMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace();
}
}
}

回调消息处理类 OnMessageCallback.java

package io.emqx;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public class OnMessageCallback implements MqttCallback {
public void connectionLost(Throwable cause) {
// 连接丢失后,一般在这里面进行重连
System.out.println("连接断开,可以做重连");
}

public void messageArrived(String topic, MqttMessage message) throws Exception {
// subscribe后得到的消息会执行到这里面
System.out.println("接收消息主题:" + topic);
System.out.println("接收消息Qos:" + message.getQos());
System.out.println("接收消息内容:" + new String(message.getPayload()));
}

public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
}

好的上述就实现了简单的 MQTT的连接和消息收发。

spring boot集成mqtt

  • spring boot 环境
spring-boot 版本 2.2.2
spring-integration的版本为:5.4.3
Spring Integration提供了入站适配器和出站适配器以支持MQTT协议。

Maven 依赖:

<!-- https://mvnrepository.com/artifact/org.springframework.integration/spring-integration-mqtt -->

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.4.3</version>
</dependency>

配置文件 application.yml:

spring:
mqtt:
username:
password:
url: tcp://ip:port
clientId: clientId
topic: default
completionTimeout: 2000

核心代码

  • 配置类
@Data
@Configuration
@ConfigurationProperties(prefix = "spring.mqtt")
public class MqttConfiguration {

private String username;
private String password;
private String url;
private String clientId;
private String topic = "TOPIC_DEFAULT";
private Integer completionTimeout = 2000;

/**
* 注册MQTT客户端工厂
* @return
*/
@Bean
public MqttPahoClientFactory mqttClientFactory(){
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
//如果设置为 false,客户端和服务器将在客户端、服务器和连接重新启动时保持状态。随着状态的保持:
// 即使客户端、服务器或连接重新启动,消息传递也将可靠地满足指定的 QOS。服务器将订阅视为持久的。
// 如果设置为 true,客户端和服务器将不会在客户端、服务器或连接重新启动时保持状态。
options.setCleanSession(true);
//该值以秒为单位,必须>0,定义了客户端等待与 MQTT 服务器建立网络连接的最大时间间隔。
// 默认超时为 30 秒。值 0 禁用超时处理,这意味着客户端将等待直到网络连接成功或失败。
options.setConnectionTimeout(0);
//此值以秒为单位,定义发送或接收消息之间的最大时间间隔,必须>0
options.setKeepAliveInterval(90);
//自动重新连接
options.setAutomaticReconnect(true);
options.setUserName(this.getUsername());
options.setPassword(this.getPassword().toCharArray());
options.setServerURIs(new String[]{this.getUrl()});

factory.setConnectionOptions(options);
return factory;
}
}
@Slf4j
@AllArgsConstructor
@Configuration
@IntegrationComponentScan
public class MqttInboundConfiguration {

private MqttConfiguration mqttConfig;
private MqttPahoClientFactory factory;
private MqttMessageReceiver mqttMessageReceiver;

/**
* 此处可以使用其他消息通道
* Spring Integration默认的消息通道,它允许将消息发送给一个订阅者,然后阻碍发送直到消息被接收。
*
* @return
*/
@Bean
public MessageChannel mqttInBoundChannel() {
return new DirectChannel();
}

/**
* 适配器, 两个topic共用一个adapter
* 客户端作为消费者,订阅主题,消费消息
*
* @param
* @param
* @return
*/
@Bean
public MessageProducerSupport mqttInbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId()+"-"+System.currentTimeMillis(), factory, mqttConfig.getTopic());

adapter.setCompletionTimeout(60000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setRecoveryInterval(10000);
adapter.setQos(0);
adapter.setOutputChannel(mqttInBoundChannel());
return adapter;
}

/**
* mqtt入站消息处理工具,对于指定消息入站通道接收到生产者生产的消息后处理消息的工具。
*
* @return
*/
@Bean
@ServiceActivator(inputChannel = "mqttInBoundChannel")
public MessageHandler mqttMessageHandler() {
return this.mqttMessageReceiver;
}
}
  • 数据接收
@Slf4j
@AllArgsConstructor
@Component
public class MqttMessageReceiver implements MessageHandler {

@Override
public void handleMessage(Message<?> message) throws MessagingException {
try {

MessageHeaders headers = message.getHeaders();
//获取消息Topic
String receivedTopic = (String) headers.get(MqttHeaders.RECEIVED_TOPIC);
log.info("[获取到的消息的topic :]{} ", receivedTopic);
//获取消息体
String payload = (String) message.getPayload();
log.info("[获取到的消息的payload :]{} ", payload);
//todo ....
} catch (Exception e) {
e.printStackTrace();
}
}
}
@Slf4j
@AllArgsConstructor
@Configuration
public class MqttOutboundConfiguration {

private MqttConfiguration mqttConfig;
private MqttPahoClientFactory factory;



@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}

@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
mqttConfig.getClientId()+"-"+System.currentTimeMillis() + System.currentTimeMillis(), factory);

messageHandler.setDefaultQos(0);
//开启异步
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(mqttConfig.getTopic());
return messageHandler;
}

}
  • 发送者
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {

/**
* 发送mqtt消息
* @param topic 主题
* @param payload 内容
* @return void
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);

/**
* 发送包含qos的消息
* @param topic 主题
* @param qos 对消息处理的几种机制。
* * 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。<br>
* * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。<br>
* * 2 多了一次去重的动作,确保订阅者收到的消息有一次。
* @param payload 消息体
* @return void
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);

/**
* 发送包含qos的消息
* @param topic 主题
* @param qos 对消息处理的几种机制。
* * 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。<br>
* * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。<br>
* * 2 多了一次去重的动作,确保订阅者收到的消息有一次。
* @param payload 消息体
* @return void
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload);
}
@Component
@AllArgsConstructor
public class MqttMessageSender {

private MqttGateway mqttGateway;

/**
* 发送mqtt消息
* @param topic 主题
* @param message 内容
* @return void
*/
public void send(String topic, String message) {
mqttGateway.sendToMqtt(topic, message);
}

/**
* 发送包含qos的消息
* @param topic 主题
* @param qos 质量
* @param messageBody 消息体
* @return void
*/
public void send(String topic, int qos, JSONObject messageBody){
mqttGateway.sendToMqtt(topic, qos, messageBody.toString());
}

/**
* 发送包含qos的消息
* @param topic 主题
* @param qos 质量
* @param message 消息体
* @return void
*/
public void send(String topic, int qos, byte[] message){
mqttGateway.sendToMqtt(topic, qos, message);
}
}

总结

综上所述上面就是我们经常用的到两种方式,希望对你有所帮助


标签:两种,java,String,System,param,topic,mqtt,消息,public
From: https://blog.51cto.com/u_15461374/5938182

相关文章

  • 16 Java内存模型与线程_Java内存模型
    目录1Java内存模型背景2主内存与工作内存3内存间交互及约束4volatile变量特性5原子性、可见性、有序性5.1原子性5.2可见性5.3有序性6先行发生原则7总结特别说明......
  • java Date和Timestamp类型的相互转换
    重要的概念:日期类和时间戳类都是用一个时间数值——日期相对于基准日期(1970年1月1日GMT时间(格林尼治时间)0时0分0秒)的时间间隔(以毫秒为单位)long类型来构造的通过Date对象......
  • 【Java】Spring Cache 缓存
    SpringCache一、Spring缓存抽象Spring从3.1开始定义了org.springframework.cache.Cache和org.springframework.cache.CacheManager接口来统一不同的缓存技术;并支......
  • Java: 在Excel中插入和提取图片
    在编辑Excel文档时,为了丰富文档内容或者更好地说明文档内容,有时我们会在单元格中插入图片。此外,整理文档内容时,也可以通过编程的方式将图片从Excel中提取出来。接下来我就将......
  • Java继承构造方法的注意点
    父类:注意(x,y是private,不能继承到子类中)classPar{privateintx;privateinty;inta;publicPar(){}publicPar(intx,inty){t......
  • JAVA中子类继承时构造方法注意事项
    JAVA中子类继承时构造方法注意事项类的继承不容易理解的一种情况:父类只存在有参构造,子类的构造方法必需要做相应的处理,比如说也创建有参构造,为什么?1、任何类,如果无显......
  • JAVA多态(超详细讲解)
    JAVA多态(超详细讲解)坠水于 2021-10-2914:29:56 发布31330收藏144版权声明:本文为博主原创文章,遵循CC4.0BY-SA版权协议,转载请附上原文出处链接和本声明。......
  • Java 继承解决了编程中的什么问题
    Java继承解决了编程中的什么问题?Java继承产生的背景?继承产生的原因:在一个项目中多个类具有很多相同的代码段,代码多了不仅仅容易出错,在需要系统升级的时候各个相同的代......
  • 狂神说 javaweb
    javaweb1、基本概念web开发:web静态web动态web淘宝,不断变化技术栈:Servlet/JSPASPPHP1.2、web应用程序web应用程序:可以提供浏览器访问的程序能访问的任何页面都存在于世......
  • Java: 在Excel中插入和提取图片
    在编辑Excel文档时,为了丰富文档内容或者更好地说明文档内容,有时我们会在单元格中插入图片。此外,整理文档内容时,也可以通过编程的方式将图片从Excel中提取出来。接下来我就......