1 //引入pom 2 <!--mqtt--> 3 <dependency> 4 <groupId>org.springframework.boot</groupId> 5 <artifactId>spring-boot-starter-integration</artifactId> 6 </dependency> 7 <dependency> 8 <groupId>org.springframework.integration</groupId> 9 <artifactId>spring-integration-stream</artifactId> 10 </dependency> 11 <dependency> 12 <groupId>org.springframework.integration</groupId> 13 <artifactId>spring-integration-mqtt</artifactId> 14 </dependency> 15 //在工具类中建包mqtts 16 // 存放mqtt连接程序文件 17 //文件1:MqttConfig 18 package com.mushu.common.util.mqtts; 19 20 import com.mushu.common.core.utils.StringUtils; 21 import org.springframework.beans.factory.annotation.Autowired; 22 import org.springframework.boot.context.properties.ConfigurationProperties; 23 import org.springframework.context.annotation.Bean; 24 import org.springframework.context.annotation.Lazy; 25 import org.springframework.stereotype.Component; 26 27 @Component 28 @ConfigurationProperties("spring.mqtt") 29 public class MqttConfig { 30 @Autowired 31 @Lazy 32 private MqttPushClient mqttPushClient; 33 34 /** 35 * 用户名 36 */ 37 private String username; 38 /** 39 * 密码 40 */ 41 private String password; 42 /** 43 * 连接地址 44 */ 45 private String hostUrl; 46 /** 47 * 客户Id 48 */ 49 private String clientId; 50 /** 51 * 默认连接话题 52 */ 53 private String defaultTopic; 54 /** 55 * 超时时间 56 */ 57 private int timeout; 58 /** 59 * 保持连接数 60 */ 61 private int keepalive; 62 /** 63 * mqtt功能使能 64 */ 65 private boolean enabled; 66 67 public String getUsername() { 68 return username; 69 } 70 71 public void setUsername(String username) { 72 this.username = username; 73 } 74 75 public String getPassword() { 76 return password; 77 } 78 79 public void setPassword(String password) { 80 this.password = password; 81 } 82 83 public String getHostUrl() { 84 return hostUrl; 85 } 86 87 public void setHostUrl(String hostUrl) { 88 this.hostUrl = hostUrl; 89 } 90 91 public String getClientId() { 92 return clientId; 93 } 94 95 public void setClientId(String clientId) { 96 this.clientId = clientId; 97 } 98 99 public String getDefaultTopic() { 100 return defaultTopic; 101 } 102 103 public void setDefaultTopic(String defaultTopic) { 104 this.defaultTopic = defaultTopic; 105 } 106 107 public int getTimeout() { 108 return timeout; 109 } 110 111 public void setTimeout(int timeout) { 112 this.timeout = timeout; 113 } 114 115 public int getKeepalive() { 116 return keepalive; 117 } 118 119 public void setKeepalive(int keepalive) { 120 this.keepalive = keepalive; 121 } 122 123 public boolean isEnabled() { 124 return enabled; 125 } 126 127 public void setEnabled(boolean enabled) { 128 this.enabled = enabled; 129 } 130 131 @Bean 132 public MqttPushClient getMqttPushClient() { 133 if(enabled == true){ 134 String mqtt_topic[] = StringUtils.split(defaultTopic, ","); 135 mqttPushClient.connect(hostUrl, clientId, username, password, timeout, keepalive);//连接 136 for(int i=0; i<mqtt_topic.length; i++){ 137 mqttPushClient.subscribe(mqtt_topic[i], 0);//订阅主题 138 } 139 } 140 return mqttPushClient; 141 } 142 } 143 //文件2:MqttPushClient 144 package com.mushu.common.util.mqtts; 145 146 147 import com.mushu.common.core.web.domain.AjaxResult; 148 import org.eclipse.paho.client.mqttv3.*; 149 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; 150 import org.slf4j.Logger; 151 import org.slf4j.LoggerFactory; 152 import org.springframework.beans.factory.annotation.Autowired; 153 import org.springframework.context.annotation.Lazy; 154 import org.springframework.stereotype.Component; 155 156 import static com.mushu.common.core.web.domain.AjaxResult.error; 157 import static com.mushu.common.core.web.domain.AjaxResult.success; 158 159 160 @Component 161 public class MqttPushClient { 162 private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class); 163 164 @Autowired 165 @Lazy 166 private PushCallback pushCallback; 167 168 private static MqttClient client; 169 170 private static MqttClient getClient() { 171 return client; 172 } 173 174 private static void setClient(MqttClient client) { 175 MqttPushClient.client = client; 176 } 177 178 /** 179 * 客户端连接 180 * 181 * @param host ip+端口 182 * @param clientID 客户端Id 183 * @param username 用户名 184 * @param password 密码 185 * @param timeout 超时时间 186 * @param keepalive 保留数 187 */ 188 public void connect(String host, String clientID, String username, String password, int timeout, int keepalive) { 189 MqttClient client; 190 try { 191 client = new MqttClient(host, clientID, new MemoryPersistence()); 192 MqttConnectOptions options = new MqttConnectOptions(); 193 options.setCleanSession(true); 194 options.setUserName(username); 195 options.setPassword(password.toCharArray()); 196 options.setConnectionTimeout(timeout); 197 options.setKeepAliveInterval(keepalive); 198 MqttPushClient.setClient(client); 199 try { 200 client.setCallback(pushCallback); 201 client.connect(options); 202 } catch (Exception e) { 203 e.printStackTrace(); 204 } 205 } catch (Exception e) { 206 e.printStackTrace(); 207 } 208 } 209 210 /** 211 * 发布 212 * 213 * @param qos 连接方式 214 * @param retained 是否保留 215 * @param topic 主题 216 * @param pushMessage 消息体 217 */ 218 public AjaxResult publish(int qos, boolean retained, String topic, String pushMessage) { 219 MqttMessage message = new MqttMessage(); 220 message.setQos(qos); 221 message.setRetained(retained); 222 message.setPayload(pushMessage.getBytes()); 223 MqttTopic mTopic = MqttPushClient.getClient().getTopic(topic); 224 if (null == mTopic) { 225 logger.error("topic not exist"); 226 } 227 MqttDeliveryToken token; 228 try { 229 token = mTopic.publish(message); 230 token.waitForCompletion(); 231 return success(); 232 } catch (MqttPersistenceException e) { 233 e.printStackTrace(); 234 return error(); 235 } catch (MqttException e) { 236 e.printStackTrace(); 237 return error(); 238 } 239 } 240 241 /** 242 * 订阅某个主题 243 * 244 * @param topic 主题 245 * @param qos 连接方式 246 */ 247 public void subscribe(String topic, int qos) { 248 logger.info("开始订阅主题" + topic); 249 try { 250 MqttPushClient.getClient().subscribe(topic, qos); 251 } catch (MqttException e) { 252 e.printStackTrace(); 253 } 254 } 255 256 } 257 //文件3:PushCallback 258 259 package com.mushu.common.util.mqtts; 260 261 import com.alibaba.fastjson.JSONObject; 262 import com.mushu.ccm.service.MqttDatasService; 263 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; 264 import org.eclipse.paho.client.mqttv3.MqttCallback; 265 import org.eclipse.paho.client.mqttv3.MqttClient; 266 import org.eclipse.paho.client.mqttv3.MqttMessage; 267 import org.slf4j.Logger; 268 import org.slf4j.LoggerFactory; 269 import org.springframework.beans.factory.annotation.Autowired; 270 import org.springframework.context.annotation.Lazy; 271 import org.springframework.stereotype.Component; 272 273 @Component 274 public class PushCallback implements MqttCallback { 275 private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class); 276 277 @Autowired 278 @Lazy 279 private MqttConfig mqttConfig; 280 281 private static MqttClient client; 282 283 @Autowired 284 //private ICcmdbCarBoxStartService iCcmdbCarBoxStartService; 285 //private MqttDatasService mqttDatasService; 286 287 private static String _topic; 288 private static String _qos; 289 private static String _msg; 290 291 @Override 292 public void connectionLost(Throwable throwable) { 293 294 // 连接丢失后,一般在这里面进行重连 295 logger.info("连接断开,可以做重连"); 296 if (client == null || !client.isConnected()) { 297 mqttConfig.getMqttPushClient(); 298 } 299 } 300 301 @Override 302 public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { 303 // subscribe后得到的消息会执行到这里面 304 logger.info("接收消息主题 : " + topic); 305 logger.info("接收消息Qos : " + mqttMessage.getQos()); 306 logger.info("接收消息内容 : " + new String(mqttMessage.getPayload())); 307 System.out.println("mqttMessage = " + mqttMessage); 308 309 // 先调用Redis再调用数据库 310 //mqttDatasService.saveRedis(new String(mqttMessage.getPayload())); 311 //mqttDatasService.saveMysql(new String(mqttMessage.getPayload())); 312 _topic = topic; 313 _qos = mqttMessage.getQos()+""; 314 _msg = new String(mqttMessage.getPayload()); 315 } 316 317 @Override 318 public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { 319 logger.info("deliveryComplete---------" + iMqttDeliveryToken.isComplete()); 320 } 321 322 //别的Controller层会调用这个方法来 获取 接收到的硬件数据 323 public String receive() { 324 JSONObject jsonObject = new JSONObject(); 325 jsonObject.put("topic", _topic); 326 jsonObject.put("qos", _qos); 327 jsonObject.put("msg", _msg); 328 return jsonObject.toString(); 329 } 330 331 }
标签:return,String,void,private,mqtt,springframework,模块,public,SpringBoot From: https://www.cnblogs.com/188221creat/p/18150590