1、步骤
(1)dependence
(2)application.properties
用户名(这里为空)
mqtt.username=iot
密码(这里为空)
mqtt.password=iot
推送信息的连接地址,如果有多个,用逗号隔开,如:tcp://127.0.0.1:1883,tcp://192.168.60.133:1883
mqtt.url=tcp://ccc.lmuiot.cn:1883
客户端ID(这里使用随机数)
mqtt.clientId=$
默认的推送主题,实际可在调用接口时指定
mqtt.sender.defaultTopic=test
默认的接收主题,实际可在调用接口时指定
mqtt.receiver.defaultTopic=testaa/#,testbb
(3)Configuration
@Configuration
public class MqttConfig {
/**
* 发布的bean名称
*/
public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";
public static final String CHANNEL_NAME_IN = "mqttInboundChannel";
// 客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息
private static final byte[] WILL_DATA;
static {
WILL_DATA = "offline".getBytes();
}
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private String password;
@Value("${mqtt.url}")
private String url;
@Value("${mqtt.clientId}")
private String clientId;
@Value("${mqtt.sender.defaultTopic}")
private String defaultTopic;
@Value("${mqtt.receiver.defaultTopic}")
private String defaultreceiverTopic;
/**
* MQTT连接器选项
*/
@Bean
public MqttConnectOptions getSenderMqttConnectOptions(){
MqttConnectOptions options=new MqttConnectOptions();
// 设置连接的用户名
System.out.println(username);
if(!username.trim().equals("")){
options.setUserName(username);
}
// 设置连接的密码
options.setPassword(password.toCharArray());
// 设置连接的地址
options.setServerURIs(new String[]{url});
// 设置超时时间 单位为秒
options.setConnectionTimeout(100);
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线
// 但这个方法并没有重连的机制
options.setKeepAliveInterval(30);
// 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
options.setWill("willTopic", WILL_DATA, 2, false);
return options;
}
/**
* MQTT客户端
*/
@Bean
public MqttPahoClientFactory senderMqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getSenderMqttConnectOptions());
return factory;
}
/**
* MQTT信息通道(生产者)
*/
@Bean(name = CHANNEL_NAME_OUT)
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
/**
* MQTT消息处理器(生产者)
*/
@Bean
@ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, senderMqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(defaultTopic);
return messageHandler;
}
/**
* MQTT信息通道(消费者)
*/
@Bean(name = CHANNEL_NAME_IN)
public MessageChannel mqttInboundChannel() {
return new DirectChannel();
}
/**
* MQTT消息订阅绑定(消费者)
*/
@Bean
public MessageProducer inbound() {
// 可以同时消费(订阅)多个Topic
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(
clientId, senderMqttClientFactory(),
StringUtils.split(defaultreceiverTopic, ","));
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
// 设置订阅通道
adapter.setOutputChannel(mqttInboundChannel());
return adapter;
}
/**
* MQTT消息处理器(消费者)
*/
@Bean
@ServiceActivator(inputChannel = CHANNEL_NAME_IN)
public MessageHandler handler() {
return new MqttCaseServiceImpl();
}
}
(4)MqttCaseServiceImpl
public class MqttCaseServiceImpl implements MessageHandler {
@Autowired
Gson gson;
@Autowired
Sensor sensor;
@Override
public void handleMessage(Message<?> arg0) throws MessagingException {
// TODO Auto-generated method stub
System.out.println("ok 00");
String topic = (String) arg0.getHeaders().get("mqtt_receivedTopic");
String payload = (String) arg0.getPayload();
System.out.println("headers:"+topic+" "+payload);
if(topic.equals("testaa/sensor"))
{
System.out.println("testaa/sensor"+" "+payload);
try {
sensor=gson.fromJson(payload, Sensor.class);
System.out.println("sensor:"+sensor.toString());
} catch (JsonSyntaxException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
(5)IMqttSender
/**
-
MQTT生产者消息发送接口
/
@Component
@MessagingGateway(defaultRequestChannel = MqttConfig.CHANNEL_NAME_OUT)
public interface IMqttSender {
/*- 发送信息到MQTT服务器
- @param data 发送的文本
/
void sendToMqtt(String data);
/* - 发送信息到MQTT服务器
- @param topic 主题
- @param payload 消息主体
/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
String payload);
/* - 发送信息到MQTT服务器
- @param topic 主题
- @param qos 对消息处理的几种机制。
- 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。
- 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。
- 2 多了一次去重的动作,确保订阅者收到的消息有一次。
- @param payload 消息主体
/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
@Header(MqttHeaders.QOS) int qos,
String payload);
}
(6)发送Controller
@RestController
public class MQTTController {
/* - 注入发送MQTT的Bean
*/
@Autowired
private IMqttSender iMqttSender;
// 发送自定义消息内容(使用默认主题)
@RequestMapping("/test1/{data}")
public void test1(@PathVariable("data") String data) {
System.out.println(data);
iMqttSender.sendToMqtt(data);
}// 发送自定义消息内容,且指定主题
@RequestMapping("/test2/{topic}/{data}")
public void test2(@PathVariable("topic") String topic, @PathVariable("data") String data) {
iMqttSender.sendToMqtt(topic, data);
}
}
2、测试
(1)发送消息
(2)接收消息
标签:String,mqtt,topic,MQTT,整合,data,public From: https://www.cnblogs.com/lwq6/p/17099309.html