首页 > 其他分享 >mqtt入门(四):客户端sdk

mqtt入门(四):客户端sdk

时间:2022-10-01 13:06:49浏览次数:73  
标签:String void param mqtt org import sdk public 客户端

前言

起步

  • 导入依赖
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
  • 配置yml
mqtt:  
broker-url: tcp://124.xxx.xxx.xxx:1883
client-id: emq-client
username: user
password: 123456
  • 获取配置
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConfigurationProperties(prefix = "mqtt")
public class MqttProperties {

private String brokerUrl;

private String clientId;

private String username;

private String password;

public String getBrokerUrl() {
return brokerUrl;
}

public void setBrokerUrl(String brokerUrl) {
this.brokerUrl = brokerUrl;
}

public String getClientId() {
return clientId;
}

public void setClientId(String clientId) {
this.clientId = clientId;
}

public String getUsername() {
return username;
}

public void setUsername(String username) {
this.username = username;
}

public String getPassword() {
return password;
}

public void setPassword(String password) {
this.password = password;
}

@Override
public String toString() {
return "MqttProperties{" +
"brokerUrl='" + brokerUrl + '\'' +
", clientId='" + clientId + '\'' +
", username='" + username + '\'' +
", password='" + password + '\'' +
'}';
}
}
  • 编写客户端
import com.itheima.mqtt.enums.QosEnum;
import com.itheima.mqtt.properties.MqttProperties;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

@Component
public class EmqClient {

private static final Logger log = LoggerFactory.getLogger(EmqClient.class);

private IMqttClient mqttClient;

@Autowired
private MqttProperties mqttProperties;

@Autowired
private MqttCallback mqttCallback;

@PostConstruct
public void init(){
MqttClientPersistence mempersitence = new MemoryPersistence();
try {
mqttClient = new MqttClient(mqttProperties.getBrokerUrl(),mqttProperties.getClientId(),mempersitence);
} catch (MqttException e) {
log.error("初始化客户端mqttClient对象失败,errormsg={},brokerUrl={},clientId={}",e.getMessage(),mqttProperties.getBrokerUrl(),mqttProperties.getClientId());
}
}

/**
* 连接broker
* @param username
* @param password
*/
public void connect(String username,String password){
MqttConnectOptions options = new MqttConnectOptions();
options.setAutomaticReconnect(true);
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setCleanSession(true);

mqttClient.setCallback(mqttCallback);

try {
mqttClient.connect(options);
} catch (MqttException e) {
log.error("mqtt客户端连接服务端失败,失败原因{}",e.getMessage());
}
}

/**
* 断开连接
*/
@PreDestroy
public void disConnect(){
try {
mqttClient.disconnect();
} catch (MqttException e) {
log.error("断开连接产生异常,异常信息{}",e.getMessage());
}
}

/**
* 重连
*/
public void reConnect(){
try {
mqttClient.reconnect();
} catch (MqttException e) {
log.error("重连失败,失败原因{}",e.getMessage());
}
}

/**
* 发布消息
* @param topic
* @param msg
* @param qos
* @param retain
*/
public void publish(String topic, String msg, QosEnum qos,boolean retain){

MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(msg.getBytes());
mqttMessage.setQos(qos.value());
mqttMessage.setRetained(retain);
try {
mqttClient.publish(topic,mqttMessage);
} catch (MqttException e) {
log.error("发布消息失败,errormsg={},topic={},msg={},qos={},retain={}",e.getMessage(),topic,msg,qos.value(),retain);
}

}

/**
* 订阅
* @param topicFilter
* @param qos
*/
public void subscribe(String topicFilter,QosEnum qos){
try {
mqttClient.subscribe(topicFilter,qos.value());
} catch (MqttException e) {
log.error("订阅主题失败,errormsg={},topicFilter={},qos={}",e.getMessage(),topicFilter,qos.value());
}
}

/**
* 取消订阅
* @param topicFilter
*/
public void unSubscribe(String topicFilter){
try {
mqttClient.unsubscribe(topicFilter);
} catch (MqttException e) {
log.error("取消订阅失败,errormsg={},topicfiler={}",e.getMessage(),topicFilter);
}
}

}
  • 编写枚举类
public enum QosEnum {

QoS0(0),QoS1(1),QoS2(2);

private final int value;

QosEnum(int value) {
this.value = value;
}

public int value(){
return this.value;
}

}
  • 编写回调类
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
public class MessageCallback implements MqttCallback {

private static final Logger log = LoggerFactory.getLogger(MessageCallback.class);


/**
* 丢失了对服务端的连接后触发的回调
* @param cause
*/
@Override
public void connectionLost(Throwable cause) {
// 资源的清理 重连
log.info("丢失了对服务端的连接");
}

/**
* 应用收到消息后触发的回调
* @param topic
* @param message
* @throws Exception
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
log.info("订阅者订阅到了消息,topic={},messageid={},qos={},payload={}",
topic,
message.getId(),
message.getQos(),
new String(message.getPayload()));
}

/**
* 消息发布者消息发布完成产生的回调
* @param token
*/
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
int messageId = token.getMessageId();
String[] topics = token.getTopics();
log.info("消息发布完成,messageid={},topics={}",messageId,topics);
}

}
  • 编写测试类
import com.itheima.mqtt.client.EmqClient;
import com.itheima.mqtt.enums.QosEnum;
import com.itheima.mqtt.properties.MqttProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

@RestController
public class TestController {

@Autowired
private EmqClient emqClient;

@Autowired
private MqttProperties properties;

@RequestMapping("/test1")
@ResponseBody
public String test1(){
//连接服务端
emqClient.connect(properties.getUsername(),properties.getPassword());
// 发布消息
emqClient.publish("testtopic/123"," publish msg :"+ LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME),
QosEnum.QoS2,false);
//订阅一个主题
//emqClient.subscribe("testtopic/#", QosEnum.QoS2);
return "success";
}

}
  • 启动项目测试
  • 向服务端发送一条消息
  • mqtt入门(四):客户端sdk_服务端

  • MQTTX订阅消息
  • mqtt入门(四):客户端sdk_MQTT_02

  • ​vue + mqtt​​​​参考​



标签:String,void,param,mqtt,org,import,sdk,public,客户端
From: https://blog.51cto.com/chniny/5728229

相关文章