首页 > 编程语言 >JAVA实现MQTT通讯介绍

JAVA实现MQTT通讯介绍

时间:2023-01-20 12:33:05浏览次数:72  
标签:通讯 JAVA String MQTT client new import options

JAVA实现MQTT通讯介绍

      MQTT(消息队列遥测传输)是ISO 标准(ISO/IEC PRF 20922)下基于发布/订阅范式的消息协议。它工作在 TCP/IP协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型消息协议,为此,它需要一个消息中间件 。MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。

这篇文章的开发环境是:

构建工具。Maven
IDE: IntelliJ IDEA
Java:JDK 1.8.0

我们将使用Eclipse Paho Java Client作为客户端,它是Java语言中使用最广泛的MQTT客户端库。

在pom.xml文件中添加以下依赖项:

<dependencies>
    <dependency>
        <groupId>org.eclipse.paho</groupId>
        <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
        <version>1.2.5</version>
    </dependency>
</dependencies>

创建一个MQTT连接


MQTT代理Broker

本文将使用基于EMQX Cloud创建的公共MQTT代理。EMQ 2.0 (Erlang/Enterprise/Elastic MQTT Broker) 是基于 Erlang/OTP 语言平台开发,支持大规模连接和分布式集群,发布订阅模式的开源 MQTT 消息服务器。EMQ是国人出产的一个开源Broker,已经用于很多企业生产了,几乎是目前的全能Broker了,文档和资料也非常齐全,但它是用Erlang语言编写的,这是一个不常见的语言。有两个版本2.0和3.0,最大的区别是3.0的集群化更好,支持集群共享订阅功能,2.0只支持本地共享订阅功能。同时3.0支持mqtt5.0,其他的都是一些性能优化。其他的项目有,先列个表,这些已经算是比较优秀的Broker了,分析主要特性:

✔ - 支持

✘ - 不支持

? - 不了解

§ - 支持但做得不好(有限制)

image

image

  • EMQ 提供了一个性能测试工具 emqtt-benck,采用 Erlang 编写,适合来做 MQTT Broker 性能测试。

    • 连接:指定连接数、连接速率,测试 MQTT Broker 的连接性能(速率、响应时间、错误数)
    • 订阅:指定连接数、主题数、订阅速率,QoS、测试 MQTT Broker 的订阅性能(速率、响应时间、错误数)
    • 发布:指定连接数、消息发布速率、消息大小、QoS,测试 MQTT Broker 的消息吞吐性能(速率、响应时间、错误数)

服务器的访问信息如下:

代理商:broker.emqx.io
TCP端口: 1883
SSL/TLS端口: 8883
连接
设置MQTT的基本连接参数。用户名和密码是可选的。

String broker = "tcp://broker.emqx.io:1883";
// TLS/SSL
// String broker = "ssl://broker.emqx.io:8883";
String username = "emqx";
String password = "public";
String clientid = "publish_client";

然后创建一个MQTT客户端并连接到经纪人:

MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(username);
options.setPassword(password.toCharArray());
client.connect(options);

说明:

MqttClient:MqttClient提供了一组方法,一旦MQTT动作完成,这些方法就会阻塞并将控制权返回给应用程序。
MqttClientPersistence:代表一个持久的数据存储,用于在飞行中存储出站和入站的消息,使之能够交付给指定的QoS。
MqttConnectOptions:保存控制客户端如何连接到服务器的选项集。下面是一些常用的方法。
setUserName:设置用于连接的用户名。
setPassword: 设置连接时使用的密码。
setCleanSession:设置客户端和服务器是否应该在重新启动和重新连接时记住状态。
setKeepAliveInterval: 设置 "保持生存 "的时间间隔。
setConnectionTimeout:设置连接超时值。
setAutomaticReconnect(设置自动连接):设置如果连接丢失,客户端是否会自动尝试重新连接到服务器。

用TLS/SSL连接
如果你想在TLS/SSL连接中使用自签名证书,请在pom.xml文件中加入bcpkix-jdk15on:

<!-- https://mvnrepository.com/artifact/org.bouncycastle/bcpkix-jdk15on -->
<dependency>
    <groupId>org.bouncycastle</groupId>
    <artifactId>bcpkix-jdk15on</artifactId>
    <version>1.70</version>
</dependency>

然后用以下代码创建SSLUtils.java文件:

package io.emqx.mqtt;

import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.bouncycastle.openssl.PEMKeyPair;
import org.bouncycastle.openssl.PEMParser;
import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter;

import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManagerFactory;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.FileReader;
import java.security.KeyPair;
import java.security.KeyStore;
import java.security.Security;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;

public class SSLUtils {
    public static SSLSocketFactory getSocketFactory(final String caCrtFile,
                                                    final String crtFile, final String keyFile, final String password)
            throws Exception {
        Security.addProvider(new BouncyCastleProvider());

       // load CA certificate
        X509Certificate caCert = null;

       FileInputStream fis = new FileInputStream(caCrtFile);
        BufferedInputStream bis = new BufferedInputStream(fis);
        CertificateFactory cf = CertificateFactory.getInstance("X.509");

       while (bis.available() > 0) {
            caCert = (X509Certificate) cf.generateCertificate(bis);
       }

       // load client certificate
        bis = new BufferedInputStream(new FileInputStream(crtFile));
        X509Certificate cert = null;
        while (bis.available() > 0) {
            cert = (X509Certificate) cf.generateCertificate(bis);
       }

       // load client private key
        PEMParser pemParser = new PEMParser(new FileReader(keyFile));
        Object object = pemParser.readObject();
        JcaPEMKeyConverter converter = new JcaPEMKeyConverter().setProvider("BC");
        KeyPair key = converter.getKeyPair((PEMKeyPair) object);
        pemParser.close();

       // CA certificate is used to authenticate server
        KeyStore caKs = KeyStore.getInstance(KeyStore.getDefaultType());
        caKs.load(null, null);
        caKs.setCertificateEntry("ca-certificate", caCert);
        TrustManagerFactory tmf = TrustManagerFactory.getInstance("X509");
        tmf.init(caKs);

       // client key and certificates are sent to server so it can authenticate
        KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
        ks.load(null, null);
        ks.setCertificateEntry("certificate", cert);
        ks.setKeyEntry("private-key", key.getPrivate(), password.toCharArray(),
                new java.security.cert.Certificate[]{cert});
        KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory
               .getDefaultAlgorithm());
        kmf.init(ks, password.toCharArray());

       // finally, create SSL socket factory
        SSLContext context = SSLContext.getInstance("TLSv1.2");
        context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);

       return context.getSocketFactory();
   }
}

设置选项如下:

String broker = "ssl://broker.emqx.io:8883";
// Set socket factory
String caFilePath = "/cacert.pem";
String clientCrtFilePath = "/client.pem";
String clientKeyFilePath = "/client.key";
SSLSocketFactory socketFactory = getSocketFactory(caFilePath, clientCrtFilePath, clientKeyFilePath, "");
options.setSocketFactory(socketFactory);

发布MQTT消息


创建一个PublishSample类,它将发布一个Hello MQTT消息到主题mqtt/test:

package io.emqx.mqtt;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class PublishSample {

   public static void main(String[] args) {

       String broker = "tcp://broker.emqx.io:1883";
        String topic = "mqtt/test";
        String username = "emqx";
        String password = "public";
        String clientid = "publish_client";
        String content = "Hello MQTT";
        int qos = 0;

       try {
            MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName(username);
            options.setPassword(password.toCharArray());
            options.setConnectionTimeout(60);
       options.setKeepAliveInterval(60);
            // connect
            client.connect(options);
            // create message and setup QoS
            MqttMessage message = new MqttMessage(content.getBytes());
            message.setQos(qos);
            // publish message
            client.publish(topic, message);
            System.out.println("Message published");
            System.out.println("topic: " + topic);
            System.out.println("message content: " + content);
            // disconnect
            client.disconnect();
            // close client
            client.close();
       } catch (MqttException e) {
            throw new RuntimeException(e);
       }
   }
}

订阅


创建一个SubscribeSample类,它将订阅主题mqtt/test:

package io.emqx.mqtt;

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class SubscribeSample {
    public static void main(String[] args) {
        String broker = "tcp://broker.emqx.io:1883";
        String topic = "mqtt/test";
        String username = "emqx";
        String password = "public";
        String clientid = "subscribe_client";
        int qos = 0;

       try {
            MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence());
            // connect options
            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName(username);
            options.setPassword(password.toCharArray());
            options.setConnectionTimeout(60);
       options.setKeepAliveInterval(60);
            // setup callback
            client.setCallback(new MqttCallback() {

               public void connectionLost(Throwable cause) {
                    System.out.println("connectionLost: " + cause.getMessage());
               }

               public void messageArrived(String topic, MqttMessage message) {
                    System.out.println("topic: " + topic);
                    System.out.println("Qos: " + message.getQos());
                    System.out.println("message content: " + new String(message.getPayload()));

              }

               public void deliveryComplete(IMqttDeliveryToken token) {
                    System.out.println("deliveryComplete---------" + token.isComplete());
               }

          });
            client.connect(options);
            client.subscribe(topic, qos);
       } catch (Exception e) {
            e.printStackTrace();
       }
   }
}

MqttCallback:

connectionLost(Throwable cause)。当与服务器的连接丢失时,该方法被调用。
messageArrived(String topic, MqttMessage message)。当一个消息从服务器到达时,该方法被调用。
deliveryComplete(IMqttDeliveryToken token)。当一个消息的传递已经完成,并且所有的确认都已收到时调用。

测试


接下来,运行SubscribeSample来订阅mqtt/test主题。然后运行PublishSample来发布mqtt/test主题上的消息。我们将看到,发布者成功地发布了消息,订阅者也收到了消息:

image

也可以使用Spring Integration,Spring Integration MQTT Support 默认集成的就是 Eclipse Paho Java Client V3 版本。Spring Integration 的好处在于,我们只需要了解其消息通信的基本机制,屏蔽了 Eclipse Paho Java Client 的具体细节,便于编码。

handler-endpoint

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
     <groupId>org.springframework.integration</groupId>
     <artifactId>spring-integration-mqtt</artifactId>
</dependency>


总结


    现在我们已经完成了使用Paho Java客户端作为MQTT客户端来连接到公共MQTT服务器并实现消息发布和订阅。同时也引入Spring Integration介绍。



今天先到这儿,希望对云原生,技术领导力, 企业管理,系统架构设计与评估,团队管理, 项目管理, 产品管管,团队建设 有参考作用 , 您可能感兴趣的文章:
领导人怎样带领好团队
构建创业公司突击小团队
国际化环境下系统架构演化
微服务架构设计
视频直播平台的系统架构演化
微服务与Docker介绍
Docker与CI持续集成/CD
互联网电商购物车架构演变案例
互联网业务场景下消息队列架构
互联网高效研发团队管理演进之一
消息系统架构设计演进
互联网电商搜索架构演化之一
企业信息化与软件工程的迷思
企业项目化管理介绍
软件项目成功之要素
人际沟通风格介绍一
精益IT组织与分享式领导
学习型组织与企业
企业创新文化与等级观念
组织目标与个人目标
初创公司人才招聘与管理
人才公司环境与企业文化
企业文化、团队文化与知识共享
高效能的团队建设
项目管理沟通计划
构建高效的研发与自动化运维
某大型电商云平台实践
互联网数据库架构设计思路
IT基础架构规划方案一(网络系统规划)
餐饮行业解决方案之客户分析流程
餐饮行业解决方案之采购战略制定与实施流程
餐饮行业解决方案之业务设计流程
供应链需求调研CheckList
企业应用之性能实时度量系统演变

如有想了解更多软件设计与架构, 系统IT,企业信息化, 团队管理 资讯,请关注我的微信订阅号:

MegadotnetMicroMsg_thumb1_thumb1_thu[2]

作者:Petter Liu
出处:http://www.cnblogs.com/wintersun/
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。 该文章也同时发布在我的独立博客中-Petter Liu Blog。

标签:通讯,JAVA,String,MQTT,client,new,import,options
From: https://www.cnblogs.com/wintersun/p/17062656.html

相关文章

  • JavaScript 函数所能传递的最大参数
    取决于实现,取决于浏览器和操作系统标准没有规定(65535一般是有的)来源:https://stackoverflow.com/questions/22747068/is-there-a-max-number-of-arguments-javascript-fun......
  • 【java技术总结】Java 数组转 list(列表)的最全方法(含 java8、java9、java10)
    对象数组转列表Collections.addAll(推荐方式)如果jdk1.5版本以上,推荐如下方法,且返回的列表对象,可以进行数据的增删改查操作:String[]strings=newString[]{"a","b",......
  • Javascript数字精度丢失的问题,如何解决
    一、问题分析计算机存储以二进制的方式,而0.1在二进制中是无限循环的一个数字,所以会出现裁剪,精度丢失会出现,0.100000000000000002===0.1,0.200000000000000002===0.2......
  • javascript的防抖与节流
    一、节流一段时间内只能触发一次,如果这段时间内触发多次事件,只有第一次生效会触发回调函数,一段时间过后才能再次触发(一定时间内只执行第一次)应用场景1、鼠标连续不断......
  • Java 中九种 Map 的遍历方式,你一般用的是哪种呢?
    日常工作中Map绝对是我们Java程序员高频使用的一种数据结构,那Map都有哪些遍历方式呢?这篇文章阿粉就带大家看一下,看看你经常使用的是哪一种。通过entrySet来遍历......
  • Java Stream常见用法汇总,开发效率大幅提升
    Java8新增的Stream流大大减轻了我们代码的工作量,但是Stream流的用法较多,实际使用的时候容易遗忘,整理一下供大家参考。1.概述Stream使用一种类似用SQL语句从数据......
  • 2023.1.19 学习初识 JAVA
    C语言1972年诞生了C语言,1982年诞生了C++  1995年诞生了JAVA。C语言贴近硬件,运行速度快,效率极高  (指针和内存管理)操作系统编译器数据库网络系统等C++面向对象......
  • 【java技术总结】Stream流基础使用
    Stream流使用1.获取Stream流对于四种数据分别采取不同的获取方式获取方式方法名说明单列集合defaultStreamstream()Collection中的默认方法双列集合......
  • Mockito Java 测试框架
    Mockito是一个针对Java的mocking框架。它与EasyMock和jMock很相似,但是通过在执行后校验什么已经被调用,它消除了对期望行为(expectations)的需要。其它的mocking库......
  • java10-20K
    ......