用到的工具:
EMQX , mqttx , idea
工具使用都很简单,自己看看就能会。
订阅端config代码:
package com.example.demo.config; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; import javax.annotation.PostConstruct; /** * @Author: xct * @Date: 2021/7/30 17:06 * @Description: */ @Configuration public class MqttConsumerConfig { @Value("${spring.mqtt.username}") private String username; @Value("${spring.mqtt.password}") private String password; @Value("${spring.mqtt.url}") private String hostUrl; @Value("${spring.mqtt.client.id}") private String clientId; @Value("${spring.mqtt.default.topic}") private String defaultTopic; /** * 客户端对象 */ private MqttClient client; /** * 在bean初始化后连接到服务器 * @author xct * @param * @return void * @date 2021/7/30 16:48 */ @PostConstruct public void init(){ connect(); } /** * 客户端连接服务端 * @author xct * @param * @return void * @date 2021/7/30 16:01 */ public void connect(){ try { //创建MQTT客户端对象 client = new MqttClient(hostUrl,clientId,new MemoryPersistence()); //连接设置 MqttConnectOptions options = new MqttConnectOptions(); //是否清空session,设置为false表示服务器会保留客户端的连接记录,客户端重连之后能获取到服务器在客户端断开连接期间推送的消息 //设置为true表示每次连接到服务端都是以新的身份 options.setCleanSession(true); //设置连接用户名 options.setUserName(username); //设置连接密码 options.setPassword(password.toCharArray()); //设置超时时间,单位为秒 options.setConnectionTimeout(100); //设置心跳时间 单位为秒,表示服务器每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线 options.setKeepAliveInterval(20); //设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息 options.setWill("willTopic",(clientId + "与服务器断开连接").getBytes(),0,false); //设置回调 client.setCallback(new MqttConsumerCallBack()); client.connect(options); //订阅主题 //消息等级,和主题数组一一对应,服务端将按照指定等级给订阅了主题的客户端推送消息 int[] qos = {1,1}; //主题 String[] topics = {"topic1","topic2"}; //订阅主题 client.subscribe(topics,qos); } catch (MqttException e) { e.printStackTrace(); } } /** * 断开连接 * @author xct * @param * @return void * @date 2021/8/2 09:30 */ public void disConnect(){ try { client.disconnect(); } catch (MqttException e) { e.printStackTrace(); } } /** * 订阅主题 * @author xct * @param topic * @param qos * @return void * @date 2021/7/30 17:12 */ public void subscribe(String topic,int qos){ try { client.subscribe(topic,qos); } catch (MqttException e) { e.printStackTrace(); } } }
订阅端回调代码:
package com.example.demo.config; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; /** * @Author: xct * @Date: 2021/7/30 17:06 * @Description: */ public class MqttConsumerCallBack implements MqttCallback { /** * 客户端断开连接的回调 * @author xct * @param throwable * @return void * @date 2021/7/30 17:14 */ @Override public void connectionLost(Throwable throwable) { System.out.println("与服务器断开连接,可重连"); } /** * 消息到达的回调 * @author xct * @param topic * @param message * @return void * @date 2021/7/30 17:14 */ @Override public void messageArrived(String topic, MqttMessage message) throws Exception { System.out.println(String.format("接收消息主题 : %s",topic)); System.out.println(String.format("接收消息Qos : %d",message.getQos())); System.out.println(String.format("接收消息内容 : %s",new String(message.getPayload()))); System.out.println(String.format("接收消息retained : %b",message.isRetained())); //TODO 可以将消息持久化到数据库中,然后在进行其他操作。 } /** * 消息发布成功的回调 * @author xct * @param iMqttDeliveryToken * @return void * @date 2021/7/30 17:14 */ @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { } }
测试控制器:
package com.example.demo.controller; import com.example.demo.config.MqttConsumerConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; /** * @Author: xct * @Date: 2021/7/30 17:20 * @Description: */ @Controller public class TestController { @Autowired private MqttConsumerConfig client; @Value("${spring.mqtt.client.id}") private String clientId; @RequestMapping("connect") @ResponseBody public String connect(){ client.connect(); return clientId + "连接到服务器"; } @RequestMapping("disConnect") @ResponseBody public String disConnect(){ client.disConnect(); return clientId + "与服务器断开连接"; } }
配置文件:
spring: application: name: consumer #MQTT配置信息 mqtt: #MQTT服务端地址,端口默认为1883,如果有多个,用逗号隔开,如tcp://127.0.0.1:1883,tcp://192.168.60.133:1883 url: tcp://0.0.0.0:1883 #用户名 username: admin #密码 password: public #客户端id(不能重复) client: id: consumer-id #MQTT默认的消息推送主题,实际可在调用接口时指定 default: topic: topic server: port: 8082
启动订阅端代码,将订阅端和mqttx都连接到EMQX
确认主题是否正确 发送即可。
标签:springboot,void,mqtt,client,整合,import,客户端,public,String From: https://www.cnblogs.com/mfy123/p/17589296.html