首页 > 其他分享 >SpringBoot模块集成mqtt代码实现

SpringBoot模块集成mqtt代码实现

时间:2024-04-22 14:48:50浏览次数:27  
标签:return String void private mqtt springframework 模块 public SpringBoot

  1 //引入pom
  2 <!--mqtt-->
  3         <dependency>
  4             <groupId>org.springframework.boot</groupId>
  5             <artifactId>spring-boot-starter-integration</artifactId>
  6         </dependency>
  7         <dependency>
  8             <groupId>org.springframework.integration</groupId>
  9             <artifactId>spring-integration-stream</artifactId>
 10         </dependency>
 11         <dependency>
 12             <groupId>org.springframework.integration</groupId>
 13             <artifactId>spring-integration-mqtt</artifactId>
 14         </dependency>
 15 //在工具类中建包mqtts
 16 // 存放mqtt连接程序文件
 17 //文件1:MqttConfig
 18 package com.mushu.common.util.mqtts;
 19 
 20 import com.mushu.common.core.utils.StringUtils;
 21 import org.springframework.beans.factory.annotation.Autowired;
 22 import org.springframework.boot.context.properties.ConfigurationProperties;
 23 import org.springframework.context.annotation.Bean;
 24 import org.springframework.context.annotation.Lazy;
 25 import org.springframework.stereotype.Component;
 26 
 27 @Component
 28 @ConfigurationProperties("spring.mqtt")
 29 public class MqttConfig {
 30     @Autowired
 31     @Lazy
 32     private MqttPushClient mqttPushClient;
 33 
 34     /**
 35      * 用户名
 36      */
 37     private String username;
 38     /**
 39      * 密码
 40      */
 41     private String password;
 42     /**
 43      * 连接地址
 44      */
 45     private String hostUrl;
 46     /**
 47      * 客户Id
 48      */
 49     private String clientId;
 50     /**
 51      * 默认连接话题
 52      */
 53     private String defaultTopic;
 54     /**
 55      * 超时时间
 56      */
 57     private int timeout;
 58     /**
 59      * 保持连接数
 60      */
 61     private int keepalive;
 62     /**
 63      * mqtt功能使能
 64      */
 65     private boolean enabled;
 66 
 67     public String getUsername() {
 68         return username;
 69     }
 70 
 71     public void setUsername(String username) {
 72         this.username = username;
 73     }
 74 
 75     public String getPassword() {
 76         return password;
 77     }
 78 
 79     public void setPassword(String password) {
 80         this.password = password;
 81     }
 82 
 83     public String getHostUrl() {
 84         return hostUrl;
 85     }
 86 
 87     public void setHostUrl(String hostUrl) {
 88         this.hostUrl = hostUrl;
 89     }
 90 
 91     public String getClientId() {
 92         return clientId;
 93     }
 94 
 95     public void setClientId(String clientId) {
 96         this.clientId = clientId;
 97     }
 98 
 99     public String getDefaultTopic() {
100         return defaultTopic;
101     }
102 
103     public void setDefaultTopic(String defaultTopic) {
104         this.defaultTopic = defaultTopic;
105     }
106 
107     public int getTimeout() {
108         return timeout;
109     }
110 
111     public void setTimeout(int timeout) {
112         this.timeout = timeout;
113     }
114 
115     public int getKeepalive() {
116         return keepalive;
117     }
118 
119     public void setKeepalive(int keepalive) {
120         this.keepalive = keepalive;
121     }
122 
123     public boolean isEnabled() {
124         return enabled;
125     }
126 
127     public void setEnabled(boolean enabled) {
128         this.enabled = enabled;
129     }
130 
131     @Bean
132     public MqttPushClient getMqttPushClient() {
133         if(enabled == true){
134             String mqtt_topic[] = StringUtils.split(defaultTopic, ",");
135             mqttPushClient.connect(hostUrl, clientId, username, password, timeout, keepalive);//连接
136             for(int i=0; i<mqtt_topic.length; i++){
137                 mqttPushClient.subscribe(mqtt_topic[i], 0);//订阅主题
138             }
139         }
140         return mqttPushClient;
141     }
142 }
143 //文件2:MqttPushClient
144 package com.mushu.common.util.mqtts;
145 
146 
147 import com.mushu.common.core.web.domain.AjaxResult;
148 import org.eclipse.paho.client.mqttv3.*;
149 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
150 import org.slf4j.Logger;
151 import org.slf4j.LoggerFactory;
152 import org.springframework.beans.factory.annotation.Autowired;
153 import org.springframework.context.annotation.Lazy;
154 import org.springframework.stereotype.Component;
155 
156 import static com.mushu.common.core.web.domain.AjaxResult.error;
157 import static com.mushu.common.core.web.domain.AjaxResult.success;
158 
159 
160 @Component
161 public class MqttPushClient {
162     private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);
163 
164     @Autowired
165     @Lazy
166     private PushCallback pushCallback;
167 
168     private static MqttClient client;
169 
170     private static MqttClient getClient() {
171         return client;
172     }
173 
174     private static void setClient(MqttClient client) {
175         MqttPushClient.client = client;
176     }
177 
178     /**
179      * 客户端连接
180      *
181      * @param host      ip+端口
182      * @param clientID  客户端Id
183      * @param username  用户名
184      * @param password  密码
185      * @param timeout   超时时间
186      * @param keepalive 保留数
187      */
188     public void connect(String host, String clientID, String username, String password, int timeout, int keepalive) {
189         MqttClient client;
190         try {
191             client = new MqttClient(host, clientID, new MemoryPersistence());
192             MqttConnectOptions options = new MqttConnectOptions();
193             options.setCleanSession(true);
194             options.setUserName(username);
195             options.setPassword(password.toCharArray());
196             options.setConnectionTimeout(timeout);
197             options.setKeepAliveInterval(keepalive);
198             MqttPushClient.setClient(client);
199             try {
200                 client.setCallback(pushCallback);
201                 client.connect(options);
202             } catch (Exception e) {
203                 e.printStackTrace();
204             }
205         } catch (Exception e) {
206             e.printStackTrace();
207         }
208     }
209 
210     /**
211      * 发布
212      *
213      * @param qos         连接方式
214      * @param retained    是否保留
215      * @param topic       主题
216      * @param pushMessage 消息体
217      */
218     public AjaxResult publish(int qos, boolean retained, String topic, String pushMessage) {
219         MqttMessage message = new MqttMessage();
220         message.setQos(qos);
221         message.setRetained(retained);
222         message.setPayload(pushMessage.getBytes());
223         MqttTopic mTopic = MqttPushClient.getClient().getTopic(topic);
224         if (null == mTopic) {
225             logger.error("topic not exist");
226         }
227         MqttDeliveryToken token;
228         try {
229             token = mTopic.publish(message);
230             token.waitForCompletion();
231             return success();
232         } catch (MqttPersistenceException e) {
233             e.printStackTrace();
234             return error();
235         } catch (MqttException e) {
236             e.printStackTrace();
237             return error();
238         }
239     }
240 
241     /**
242      * 订阅某个主题
243      *
244      * @param topic 主题
245      * @param qos   连接方式
246      */
247     public void subscribe(String topic, int qos) {
248         logger.info("开始订阅主题" + topic);
249         try {
250             MqttPushClient.getClient().subscribe(topic, qos);
251         } catch (MqttException e) {
252             e.printStackTrace();
253         }
254     }
255 
256 }
257 //文件3:PushCallback
258 
259 package com.mushu.common.util.mqtts;
260 
261 import com.alibaba.fastjson.JSONObject;
262 import com.mushu.ccm.service.MqttDatasService;
263 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
264 import org.eclipse.paho.client.mqttv3.MqttCallback;
265 import org.eclipse.paho.client.mqttv3.MqttClient;
266 import org.eclipse.paho.client.mqttv3.MqttMessage;
267 import org.slf4j.Logger;
268 import org.slf4j.LoggerFactory;
269 import org.springframework.beans.factory.annotation.Autowired;
270 import org.springframework.context.annotation.Lazy;
271 import org.springframework.stereotype.Component;
272 
273 @Component
274 public class PushCallback implements MqttCallback {
275     private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);
276 
277     @Autowired
278     @Lazy
279     private MqttConfig mqttConfig;
280 
281     private static MqttClient client;
282 
283     @Autowired
284     //private ICcmdbCarBoxStartService iCcmdbCarBoxStartService;
285     //private MqttDatasService mqttDatasService;
286 
287     private static String _topic;
288     private static String _qos;
289     private static String _msg;
290 
291     @Override
292     public void connectionLost(Throwable throwable) {
293 
294         // 连接丢失后,一般在这里面进行重连
295         logger.info("连接断开,可以做重连");
296         if (client == null || !client.isConnected()) {
297             mqttConfig.getMqttPushClient();
298         }
299     }
300 
301     @Override
302     public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
303         // subscribe后得到的消息会执行到这里面
304         logger.info("接收消息主题 : " + topic);
305         logger.info("接收消息Qos : " + mqttMessage.getQos());
306         logger.info("接收消息内容 : " + new String(mqttMessage.getPayload()));
307         System.out.println("mqttMessage = " + mqttMessage);
308 
309         // 先调用Redis再调用数据库
310         //mqttDatasService.saveRedis(new String(mqttMessage.getPayload()));
311         //mqttDatasService.saveMysql(new String(mqttMessage.getPayload()));
312         _topic = topic;
313         _qos = mqttMessage.getQos()+"";
314         _msg = new String(mqttMessage.getPayload());
315     }
316 
317     @Override
318     public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
319         logger.info("deliveryComplete---------" + iMqttDeliveryToken.isComplete());
320     }
321 
322     //别的Controller层会调用这个方法来  获取  接收到的硬件数据
323     public String receive() {
324         JSONObject jsonObject = new JSONObject();
325         jsonObject.put("topic", _topic);
326         jsonObject.put("qos", _qos);
327         jsonObject.put("msg", _msg);
328         return jsonObject.toString();
329     }
330 
331 }

 

标签:return,String,void,private,mqtt,springframework,模块,public,SpringBoot
From: https://www.cnblogs.com/188221creat/p/18150590

相关文章

  • springboot 嵌入式的web容器的的选择
    springboot默认内置tomcat可以替换undertow、jetty、nettytomcattomcat默认200最大线程完整实现了JEE容器和serlet规范tomcat6以后支持Jdk1.4的NIO用于完整支持了javaee因此比较笨重和重量级很多高并发会替换成undertowundertow这个是红帽2012开源出来的一个......
  • 【PLM踩坑记】新建SpringBoot项目,无法使用Java8
    概述今天开始学SpringBoot,需要使用IDEA新建SpringBoot项目。公司使用的Java版本为jdk1.8,这里我选择了这个版本的jdk之后,下面的Java选项不提供Java8。解决方法如下:首先将jdk版本选择为较新的jdk22,然后下面的Java版本随便选择一个。在正式进入项目之后,修改IDEA中的项目设置。点......
  • 常用模块
    20240419模块os模块操作本地路径、操作文件夹创建、删除执行本地cmd命令拼接路径文件路径相关操作--file--表示当前文件所在的文件夹路径importosfile_name=os.path.dirname(__file__)获取当前文件所在的文件路径file_path=os.path.abspath(__f......
  • hashlib模块
    摘要算法:只能加密不能解密加密算法:用方法加密加密后的字符串可以解密【一】什么是摘要算法Python的hashlib提供了常见的摘要算法如MD5SHA1等等。摘要算法又称哈希算法、散列算法。它通过一个函数,把任意长度的数据转换为一个长度固定的数据串(通常用16进制的字符串表......
  • logging模块
    logging模块记录log记录日志的模块importloggingimportlogging.configimportosimportsystry:#想要给日志上色就安装这个模块#pipinstallcoloredlogs:::>>>给日志上个色importcoloredlogsexceptExceptionase:ifstr(e)=="Nomodulenamed'coloredlo......
  • json模块
    【一】什么是序列化将Python中的字典、列表、元组...转换成字符串类型如果使用str强制转换数据类型,造成的后果就是转换后的字符串无法转回Python对象【二】什么是反序列化将字符串类型的数据转换成Python对象(列表、字典、元组...)能将python对象转为字符串-->字符串......
  • C#的基于.net framework的Dll模块编程(一) - 编程手把手系列文章
    从此博文开始分几篇介绍C#的开发。这次讲讲C#的.netframework的Dll文件类库模块的编程方法。对于Windows来说,要运行应用程序要基于Dll类库和Exe执行文件。对于笔者来说,模块化的编程方式,就是将一些函数等封装到Dll类库文件中,将这些类库集中和分模块进行编写和管理。就是......
  • springboot接口接收xml
    对xml文件的操作也可以借助hutool的XmlUtil.1.xml格式```xml<root> <deviceStatInfoonlineCount="64"offlineCount="2"errorCount="0"/> <data> <recordid="0"name="5号楼10层流量计"status="预警2/正常1......
  • 【SpringBoot】【一】初识数据源连接池
    1 前言上节我们看了看,SpringBoot启动后都有哪些线程,看到有一部分是关于数据源连接池的,那么这节我们就看看数据源连接池都是如何工作的。我们本节就从这两个问题看起:(1)连接池是如何创建的,也就是什么时候创建的呢?(2)连接是什么时候放进连接池的?是创建完就初始化了一批新的连接,还......
  • dist资源包放入springboot运行遇到的问题
    1、在vue、react使用npmrunbuild打包将dist包放入resources下2、通过浏览器访问本地在访问路径会出现404,打开dist包下的index.html发现打包后的指向样式不对,更改指向后,发现还是404把拦截器修改为排除所有路径后,页面不再404,说明资源没有显示。修改资源静态映射指向dis......