首页 > 其他分享 >整合MQTT

整合MQTT

时间:2023-02-07 17:58:50浏览次数:40  
标签:String mqtt topic MQTT 整合 data public

1、步骤
(1)dependence

com.google.code.gson
gson


org.springframework.integration
spring-integration-stream


org.springframework.integration
spring-integration-mqtt

(2)application.properties

用户名(这里为空)

mqtt.username=iot

密码(这里为空)

mqtt.password=iot

推送信息的连接地址,如果有多个,用逗号隔开,如:tcp://127.0.0.1:1883,tcp://192.168.60.133:1883

mqtt.url=tcp://ccc.lmuiot.cn:1883

客户端ID(这里使用随机数)

mqtt.clientId=$

默认的推送主题,实际可在调用接口时指定

mqtt.sender.defaultTopic=test

默认的接收主题,实际可在调用接口时指定

mqtt.receiver.defaultTopic=testaa/#,testbb
(3)Configuration
@Configuration
public class MqttConfig {
/**
* 发布的bean名称
*/
public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";
public static final String CHANNEL_NAME_IN = "mqttInboundChannel";
// 客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息
private static final byte[] WILL_DATA;
static {
WILL_DATA = "offline".getBytes();
}

@Value("${mqtt.username}")
private String username;

@Value("${mqtt.password}")
private String password;

@Value("${mqtt.url}")
private String url;

@Value("${mqtt.clientId}")
private String clientId;

@Value("${mqtt.sender.defaultTopic}")
private String defaultTopic;

@Value("${mqtt.receiver.defaultTopic}")
private String defaultreceiverTopic;

/**
 * MQTT连接器选项
 */
@Bean
public MqttConnectOptions getSenderMqttConnectOptions(){
    MqttConnectOptions options=new MqttConnectOptions();
    // 设置连接的用户名
    System.out.println(username);
    if(!username.trim().equals("")){
        options.setUserName(username);
    }
    // 设置连接的密码
    options.setPassword(password.toCharArray());
    // 设置连接的地址
    options.setServerURIs(new String[]{url});
    // 设置超时时间 单位为秒
    options.setConnectionTimeout(100);
    // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线
    // 但这个方法并没有重连的机制
    options.setKeepAliveInterval(30);
    // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
    options.setWill("willTopic", WILL_DATA, 2, false);
    return options;
}

/**
 * MQTT客户端
 */
@Bean
public MqttPahoClientFactory senderMqttClientFactory() {
    DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
    factory.setConnectionOptions(getSenderMqttConnectOptions());
    return factory;
}

/**
 * MQTT信息通道(生产者)
 */
@Bean(name = CHANNEL_NAME_OUT)
public MessageChannel mqttOutboundChannel() {
    return new DirectChannel();
}

/**
 * MQTT消息处理器(生产者)
 */
@Bean
@ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
public MessageHandler mqttOutbound() {
    MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, senderMqttClientFactory());
    messageHandler.setAsync(true);
    messageHandler.setDefaultTopic(defaultTopic);
    return messageHandler;
}

/**
 * MQTT信息通道(消费者)
 */
@Bean(name = CHANNEL_NAME_IN)
public MessageChannel mqttInboundChannel() {
    return new DirectChannel();
}


/**
 * MQTT消息订阅绑定(消费者)
 */
@Bean
public MessageProducer inbound() {
    // 可以同时消费(订阅)多个Topic
    MqttPahoMessageDrivenChannelAdapter adapter =
            new MqttPahoMessageDrivenChannelAdapter(
                    clientId, senderMqttClientFactory(),
                    StringUtils.split(defaultreceiverTopic, ","));
    adapter.setCompletionTimeout(5000);
    adapter.setConverter(new DefaultPahoMessageConverter());
    adapter.setQos(1);
    // 设置订阅通道
    adapter.setOutputChannel(mqttInboundChannel());
    return adapter;
}

/**
 * MQTT消息处理器(消费者)
 */
@Bean
@ServiceActivator(inputChannel = CHANNEL_NAME_IN)
public MessageHandler handler() {
    return new MqttCaseServiceImpl();
}

}
(4)MqttCaseServiceImpl
public class MqttCaseServiceImpl implements MessageHandler {
@Autowired
Gson gson;
@Autowired
Sensor sensor;

@Override
public void handleMessage(Message<?> arg0) throws MessagingException {
	// TODO Auto-generated method stub
	System.out.println("ok 00");
    String topic = (String) arg0.getHeaders().get("mqtt_receivedTopic");
    String payload = (String) arg0.getPayload();
    System.out.println("headers:"+topic+"   "+payload);
    if(topic.equals("testaa/sensor"))
    {
    	 System.out.println("testaa/sensor"+"   "+payload);
    	 try {
    		 sensor=gson.fromJson(payload, Sensor.class);
		 System.out.println("sensor:"+sensor.toString());
			
		} catch (JsonSyntaxException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
    }
}

}
(5)IMqttSender
/**

  • MQTT生产者消息发送接口
    /
    @Component
    @MessagingGateway(defaultRequestChannel = MqttConfig.CHANNEL_NAME_OUT)
    public interface IMqttSender {
    /
    *

    • 发送信息到MQTT服务器
    • @param data 发送的文本
      /
      void sendToMqtt(String data);
      /
      *
    • 发送信息到MQTT服务器
    • @param topic 主题
    • @param payload 消息主体
      /
      void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
      String payload);
      /
      *
    • 发送信息到MQTT服务器
    • @param topic 主题
    • @param qos 对消息处理的几种机制。
    • 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。
    • 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。
    • 2 多了一次去重的动作,确保订阅者收到的消息有一次。
    • @param payload 消息主体
      /
      void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
      @Header(MqttHeaders.QOS) int qos,
      String payload);
      }
      (6)发送Controller
      @RestController
      public class MQTTController {
      /
      *
    • 注入发送MQTT的Bean
      */
      @Autowired
      private IMqttSender iMqttSender;

    // 发送自定义消息内容(使用默认主题)
    @RequestMapping("/test1/{data}")
    public void test1(@PathVariable("data") String data) {
    System.out.println(data);
    iMqttSender.sendToMqtt(data);
    }

    // 发送自定义消息内容,且指定主题
    @RequestMapping("/test2/{topic}/{data}")
    public void test2(@PathVariable("topic") String topic, @PathVariable("data") String data) {
    iMqttSender.sendToMqtt(topic, data);
    }
    }
    2、测试
    (1)发送消息

(2)接收消息

标签:String,mqtt,topic,MQTT,整合,data,public
From: https://www.cnblogs.com/lwq6/p/17099309.html

相关文章

  • MQTT协议详解
    MQTT协议详解 MQTT是基于Publish/Subscribe(发布订阅)模式的物联网通信协议特点:简单易实现支持Qos(服务质量)报文小MQTT协议构建于TCP/IP协议之上发布订阅模式:......
  • PLC利用函数块连接MQTT订阅消息(一)
    在亿佰特介绍了西门子PLC如何通过函数块连接MQTT服务器和发布消息,本文为大家介绍如何通过函数与函数块实现MQTT云消息的订阅,直接切入重点。一、飞燕物联网平台配置这里......
  • PLC利用函数块连接MQTT订阅消息(一)
    在亿佰特介绍了西门子PLC如何通过函数块连接MQTT服务器和发布消息,本文为大家介绍如何通过函数与函数块实现MQTT云消息的订阅,直接切入重点。一、飞燕物联网平台配置这里的配......
  • SpringBoot整合Activiti7工作流引擎
    在Idea中安装bpnm可视化插件引入activiti依赖<!--引入Activiti7--><dependency><groupId>org.activiti</groupId><artifactId>activiti-spring-boot-start......
  • Spring整合MyBatis及Junit4.11报错:No tests found matching [{ExactMatcher:fDisplayN
    发生缘由复习Spring整合MyBatis及Junit运行环境VSCode版本:1.72.0(usersetup)jdk版本:jdk-8电脑系统:win10spring-context:5.2.10.RELEASEjunit:4.11spring-test:5.......
  • 《区块链基础知识25讲》-第二十一讲-将所有知识点整合
    区块链中的技术概念及其作用和类比......
  • Spring整合Mybatis
    首先导入依赖1<properties>2<!--版本锁定-->3<spring.version>5.0.2.RELEASE</spring.version>4<log4j.version>1.2.17</log4j.version>......
  • 论文推荐:ACMix整合self-Attention和Convolution (ACMix)的优点的混合模型
    混合模型ACmix将自注意与卷积的整合,同时具有自注意和卷积的优点。这是清华大学、华为和北京人工智能研究院共同发布在2022年CVPR中的论文卷积分解与自注意力卷积分解......
  • SpringBoot和Vue整合ECharts
    一、Vue安装EChartsnpmiecharts-S二、Vue整合ECharts其实这个很简单首先在vue中引入ECharts 2.然后我们直接去ECharts官网使用一些图形的代码,放到Home.vue中,所......
  • SpringBoot整合JDBC详解
    SpringBoot整合JDBC@​​TOC​​前言对于数据访问层,无论是关系型数据库(SQL)还是NOSQL(非关系型数据库),SpringBoot的底层都是采用SpringData的方式来进行统一处理。SpringData其......