首页 > 编程语言 >ZeroMQ: Java 请求/响应和发布/订阅模式的简单实现

ZeroMQ: Java 请求/响应和发布/订阅模式的简单实现

时间:2023-07-11 16:48:47浏览次数:36  
标签:订阅 pubSock Java String private context ZeroMQ ZMQ public

转载于:https://blog.csdn.net/weixin_47951400/article/details/119142454

 

文章目录

 


POM

	<!-- jeromq -->
    <dependency>
    	<groupId>org.zeromq</groupId>
        <artifactId>jeromq</artifactId>
        <version>0.4.3</version>
   	</dependency>
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

一、请求响应模式(ZMQ_REQ + ZMQ_REP)

1.REP

package com.example.zmq.repreq;

import org.zeromq.ZMQ;


public abstract class ZmqRepThread implements Runnable {
    /**
     * ZMQ启动线程数
     */
    private int ZMQThreadCount = 1;

    /**
     * ZMQ数据端口
     */
    private  int ZMQRepPort;

    /**
     * ZMQ监听接收端ip
     */
    private String ZMQRepIP;

    private  ZMQ.Context context = null;
    private  ZMQ.Socket repSock = null;

    public ZmqRepThread(String ZMQRepIP, int ZMQRepPort){
        this.ZMQRepIP = ZMQRepIP;
        this.ZMQRepPort = ZMQRepPort;
        initZMQ();
    }
    /**
     * 初始化ZMQ对象
     */
    private  void initZMQ() {
        if (context == null) {
            context = ZMQ.context(ZMQThreadCount);
        }
        if (ZMQRepPort != 0) {
            repSock = context.socket(ZMQ.REP);
            String bindUri = "tcp://" +ZMQRepIP + ":" + ZMQRepPort;
            repSock.bind(bindUri);
        } else {
            throw new RuntimeException("Error!");
        }
    }
    @Override
    public void run() {
        while (true) {
            try {
                byte[] recvBuf = repSock.recv();
                if (recvBuf == null) {
                    continue;
                }
                if (new String(recvBuf).equals("END")){
                    break;
                }
                dealWith(recvBuf,repSock);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        //关闭
        repSock.close();
        context.term();
    }

    /**
     * 处理接收到数据的抽象方法
     */
    public abstract void dealWith(byte[] data,ZMQ.Socket socket);
}
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71

2.REQ

import org.zeromq.ZMQ;

public class ZmqReqClient {
    /**
     * ZMQ启动线程数
     */
    private  int ZMQThreadCount = 1;

    /**
     * ZMQ数据广播端口
     */
    private int ZMQReqPort;
    private String ZMQReqIP;

    private  ZMQ.Context context = null;
    private  ZMQ.Socket pubSock = null;


    public ZmqReqClient(String ZMQReqIP, int ZMQReqPort) {
        this.ZMQReqIP = ZMQReqIP;
        this.ZMQReqPort = ZMQReqPort;
        if (pubSock == null) {
            initZMQ();
        }
    }
    /**
     * 初始化ZMQ对象
     */
    private  void initZMQ() {
        if (context == null) {
            context = ZMQ.context(ZMQThreadCount);
        }
        if (context == null) {
            context = ZMQ.context(ZMQThreadCount);
        }
        if (ZMQReqPort != 0) {
            pubSock = context.socket(ZMQ.REQ);
            String connectUri = "tcp://" + ZMQReqIP + ":" + ZMQReqPort;
            pubSock.connect(connectUri);
        } else {
            throw new RuntimeException("Error!");
        }
    }
    public void sendEND() {
        //发送结束信息
        pubSock.send("END".getBytes());
        //关闭
        pubSock.close();
        context.term();
    }

    public byte[] sendData(byte[] msg) {
        pubSock.send(msg);
        byte[] recv =  pubSock.recv();
        return recv;
    }
    public void send(byte[] msg) {


        pubSock.send(msg);
        byte[] recv = pubSock.recv();
        System.out.println(recv);
        //发送结束信息
        pubSock.send("END".getBytes());
        //关闭
        pubSock.close();
        context.term();
    }

}
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71

3.测试

REP

public class Rep {
    public static void main(String[] args) {
        ZmqRepThread zmqRepThread = new ZmqRepThread("*", 8888) {
            @Override
            public void dealWith(byte[] data, ZMQ.Socket socket) {
                System.out.println("Ressponse recv:\t" + new String(data));
                String response = "I got it:\t" + new String(data);
                socket.send(response.getBytes());
            }
        };
        Thread thread = new Thread(zmqRepThread);
        thread.start();
    }
}
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

REQ

public class Req {
    public static void main(String[] args) throws InterruptedException {
        ZmqReqClient zmqReqClient = new ZmqReqClient("127.0.0.1",8888);
        for (int i = 0; i < 100; i++) {
            String request = "hello time is " + System.currentTimeMillis();
            byte[] recvData = zmqReqClient.sendData(request.getBytes());
            System.out.println(new String(recvData));
            Thread.sleep(1000);
        }
        zmqReqClient.sendEND();
    }
}
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

测试结果

在这里插入图片描述

在这里插入图片描述

二、发布/订阅模式(ZMQ_PUB + ZMQ_SUB)

1.PUB

import org.zeromq.ZMQ;

public class ZmqPubClient {
    /**
     * ZMQ启动线程数
     */
    private int ZMQThreadCount = 1 ;

    /**
     * ZMQ数据广播端口
     */
    private int ZMQSendPort;
    private String ZMQPubIP;

    private ZMQ.Context context;
    private static ZMQ.Socket pubSock;

    public ZmqPubClient(String ZMQPubIP, int ZMQPubPort) {
        this.ZMQPubIP = ZMQPubIP;
        this.ZMQSendPort = ZMQPubPort;
        if (pubSock == null) {
            initZMQ();
        }
    }
    /**
     * 初始化ZMQ对象
     */
    private void initZMQ() {
        if (ZMQPubIP == null || "".equals(ZMQPubIP)) {
            throw new RuntimeException("IP Error!");
        }
        if (context == null) {
            context = ZMQ.context(ZMQThreadCount);
        }
        if (ZMQSendPort != 0) {
            pubSock = context.socket(ZMQ.PUB);
            String bindUri = "tcp://" +ZMQPubIP + ":" + ZMQSendPort;
            pubSock.bind(bindUri);
            pubSock.send("");
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } else {
            throw new RuntimeException("Error!");
        }
    }


    public void sendData(byte[] msg) {
        pubSock.send(msg, ZMQ.NOBLOCK);
    }
}
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55

2. SUB

import org.zeromq.ZMQ;

/**
 * ZMQ接收线程
 */
public abstract class ZmqSubThread implements Runnable {

    /**
     * ZMQ启动线程数
     */
    private int ZMQThreadCount = Integer.parseInt("1");

    /**
     * ZMQ接收端口
     */
    private int ZMQRecvPort;

    /**
     * ZMQ监听接收ip
     */
    private String ZMQRecvIP;

    private ZMQ.Context context = null;
    private ZMQ.Socket subSock = null;

    public ZmqSubThread() {
        initZMQ();
    }

    public ZmqSubThread(String ZMQRecvIP, int ZMQRecvPort) {
        this.ZMQRecvIP = ZMQRecvIP;
        this.ZMQRecvPort = ZMQRecvPort;
        initZMQ();
    }

    /**
     * 初始化ZMQ对象
     */
    public void initZMQ() {
        if (ZMQRecvIP == null || "".equals(ZMQRecvIP)) {
            throw new RuntimeException("IP Error!");
        }
        if (ZMQRecvPort == 0) {
            throw new RuntimeException("Port Error!");
        }

        context = ZMQ.context(ZMQThreadCount);
        subSock = context.socket(ZMQ.SUB);
        String ConUri = "tcp://" + ZMQRecvIP + ":" + ZMQRecvPort;
        subSock.connect(ConUri);
        subSock.subscribe("".getBytes());
    }

    @Override
    public void run() {
        while (true) {
            try {
                byte[] recvBuf = subSock.recv(ZMQ.SUB);
                if (recvBuf == null) {
                    continue;
                }
                dealWith(recvBuf);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 处理接收到数据的抽象方法
     */
    public abstract void dealWith(byte[] data);
}
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74

3.测试

PUB

public class Pub {
    public static void main(String[] args) throws InterruptedException {
        ZmqPubClient zmqPubClient = new ZmqPubClient("127.0.0.1",7777);
        for (int i = 0; i < 1000;i++){
            String data = "data:\t" + System.currentTimeMillis() + "\t" + i;
            zmqPubClient.sendData(data.getBytes());
            System.out.println(data);
            Thread.sleep(1000);
        }
    }
}
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

SUB

public class Sub {
    public static void main(String[] args) {
        ZmqSubThread zmqSubThread = new ZmqSubThread("127.0.0.1",7777) {
            @Override
            public void dealWith(byte[] data) {
                System.out.println(new String(data));
            }
        };
        Thread thread = new Thread(zmqSubThread);
        thread.start();
    }
}
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

测试结果

在这里插入图片描述

在这里插入图片描述


 

标签:订阅,pubSock,Java,String,private,context,ZeroMQ,ZMQ,public
From: https://www.cnblogs.com/hewei-blogs/p/17545185.html

相关文章

  • java连接mqtt总是自动断开的问题排查及解决
    问题描述最近在做一个视频监控平台,要同步下级平台的摄像头信息数据,是通过其他同事写的c++服务往mqtt里推数据,我这边通过java连接mqtt监听主题获取摄像头信息。刚开始写完都还好,但是测试过一段时间,发现javaclient连接总是会自动断开,并且还会有丢失消息的情况。一开始怀疑是网络......
  • java Fegin x-www-form-urlencoded 类型请求
    spring发送content-type=application/x-www-form-urlencoded和普通请求不太一样。@FeignClient(name="ocr-api",url="${orc.idcard-url}",fallbackFactory=OcrClientFallbackFactory.class)publicinterfaceOcrClient{@PostMappi......
  • finshell 连接不到服务器,报Session.connect: java.net.SocketException: Connection r
    用finshell一段时间了,非常不错,但是有段时间突然连接不上服务器,各种重装,重启服务器都不行,在网上搜方法也没有好的办法。在我一次实在烦的不得了的时候,让我发现一个好的解决方案。先上图: 是不是出现这个问题,那么我的解决方案是啥呢?看我的手速,就是点击红色的闪电图标,一般连续点击......
  • Java中Queue的实现方式有哪些?
    一、队列的概念Queue用于模拟队列这种数据结构,队列通常是指“先进先出”(FIFO=firstinfirstout)的容器。新元素插入(offer)到队列的尾部,访问元素(poll)操作会返回队列头部的元素。通常,队列不允许随机访问队列中的元素。这种结构就相当于我们排队上车,先到的站在前面,先上车,后到的得......
  • JAVA static静态变量依赖spring实例化变量,可能导致初始化出错
    在Java中,静态变量是在类加载时初始化的,而实例变量是在对象实例化时初始化的。如果静态变量依赖于Spring实例化的变量,可能会导致初始化出错的问题。这是因为Spring的实例化过程是在运行时进行的,而类加载和静态变量初始化是在编译时进行的。当静态变量依赖于Spring实例化的变量时,如果......
  • java 阿里云直播配置及推拉流地址获取
    原文地址:https://blog.csdn.net/zhanglei5415/article/details/131551685?spm=1001.2014.3001.5501一、开通阿里云直播首先进入阿里云直播产品主页:https://www.aliyun.com/product/live。点击下方的“立即开通”。如果是还未注册的用户请按照页面提示进行完成注册并实名认证......
  • JAVA 和python 多网卡情况下获取正确的IP地址
    要获取内网地址,可以尝试连接到10.255.255.255:1。如果连接成功,获取本地套接字的地址信息就是当前的内网IP。python实现:importsocketdefextract_ip():st=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)try:st.connect(('10.255.255.255',1))......
  • java如何调用python.py文件并传参
    注意:java调用python.py文件并传参,在windows和linux中使用是不一样的我在windows操作系统中,java调用python文件并传参,是这样写的:完全没问题try{IntegertotalTestCaseCount=0;//传入python文件的参数:StringxmindFilePath,StringtestCaseKeyWo......
  • Java网络编程
    1.ip和端口ip地址InetAddress//因为没有构造方法,所以不能通过new来生成对象,但是可以通过类名来调用类的静态方法InetAddressinetAddress1=InetAddress.getByName("localhost");System.out.println(inetAddress1);InetAddressinetAddress2=I......
  • 520要通过这种方式告白 html+css+javascript canvas桃心代码 可修改 【附完整代码】
    ......