首页 > 其他分享 >ZMQ消息队列 PUSH/PULL PUB/SUB REQ/REP

ZMQ消息队列 PUSH/PULL PUB/SUB REQ/REP

时间:2024-10-14 17:33:11浏览次数:1  
标签:PULL SUB REP context org import zeromq ZMQ String

1.REQ/REP 客户端(Client)/ 服务器(Server)

import org.zeromq.ZContext;
import org.zeromq.ZMQ;

/**
 * @Description: (服务器)
 * @CreateTime: 2024/3/20 18:22
 */
public class Server {
    public static void main(String[] args) throws InterruptedException {

        ZMQ.Context context = ZMQ.context(1);
        ZMQ.Socket responder = context.socket(ZMQ.REP);

        //使服务器端通过tcp协议通信,监听5555端口
        responder.bind("tcp://*:5555");
        while (!Thread.currentThread().isInterrupted()) {
            byte[] request = responder.recv(0);
            System.out.println("Received Hello");
            Thread.sleep(1000);
            String reply = "World";
            responder.send(reply.getBytes(), 0);
        }

        //关闭服务器端的上下文及套接字
        responder.close();
        context.close();
    }
}
服务端代码
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;

/**
 * @Description: (客户端)
 * @CreateTime: 2024/3/20 18:23
 */
public class Client {
    public static void main(String[] args) {
        //创立客户端的上下文捷套接字
        Context context = ZMQ.context(1);
        System.out.println("Connecting to hello world server…");
        Socket requester = context.socket(ZMQ.REQ);

        //讲客户端绑定在5555端口
        requester.connect("tcp://localhost:5555");
        for (int requestNbr = 0; requestNbr != 100; requestNbr++) {
            String request = "Hello";
            System.out.println("Sending Hello " + requestNbr);
            requester.send(request.getBytes(), 0);
            byte[] reply = requester.recv(0);
            System.out.println("Received " + new String(reply) + " " + requestNbr);
        }
        //关闭客户端的上下文套接字
        requester.close();
        context.term();
    }
}
客户端代码

2.PUSH/PULL 模式: 生产者(Producer)/消费者(Consumer)

import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;

import java.util.concurrent.atomic.AtomicInteger;

import static org.zeromq.ZMQ.context;

/**
 * @Description: 拉取消息 (服务器)
 * @CreateTime: 2024/3/20 18:29
 */
public class Pull {
    public static void main(String args[]) {
        final AtomicInteger number = new AtomicInteger(0);
        for (int i = 0; i < 5; i++) {
            new Thread(new Runnable(){
                private int here = 0;
                public void run() {
                    // TODO Auto-generated method stub
                    Context context = context(1);
                    Socket pull = context.socket(ZMQ.PULL);
                    pull.connect("ipc://fjs");
                    //pull.connect("ipc://fjs");
                    while (true) {
                        String message = new String(pull.recv());
                        int now = number.incrementAndGet();
                        here++;
                        if (now % 1000000 == 0) {
                            System.out.println(now + "  here is : " + here);
                        }
                    }
                }
            }).start();
        }
    }
}
消费者
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;

/**
 * @Description: PUSH 推送消息 (客户端)
 * @CreateTime: 2024/3/20 18:29
 */
public class Push {
    public static void main(String[] args) {
        Context context = ZMQ.context(1);
        Socket push = context.socket(ZMQ.PUSH);
        push.bind("ipc://fjs");
        for (int i = 0; i < 10000000; i++) {
            push.send("hello".getBytes(), i);
        }
        push.close();
        context.term();
    }
}
生产者

3.PUB/SUB 模式: 发布者(Publisher)/订阅者(Subscriber)

import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;

/**
 * @Description: 发布者(Publisher) (服务器)
 * @CreateTime: 2024/3/20 18:25
 */
public class ZMQ_PUB {
    public static void main(String[] args) throws InterruptedException {
        // 创建一个 ZeroMQ 上下文
        Context context = ZMQ.context(1);
        // 创建一个发布者套接字
        Socket publisher = context.socket(zmq.ZMQ.ZMQ_PUB);
        publisher.bind("tcp://*:5555");
        Thread.sleep(3000);

//        normalSend(publisher);
        topicSend(publisher);

        // 关闭资源
        context.close();
        publisher.close();
    }

    //普通发送
    public static void normalSend(Socket publisher) throws InterruptedException {
        // 发送消息
        for (int i = 0; i < 100; i++) {
            publisher.send(("admin " + i).getBytes(), ZMQ.NOBLOCK);
            System.out.println("pub msg " + i);
            Thread.sleep(1000);
        }
    }

    //带主题的发送
    public static void topicSend(Socket publisher) throws InterruptedException {
        // 发送消息
        for (int i = 0; i < 10; i++) {
            String topic = "NEWS";
            if (i % 2 == 0) {
                topic = "WEATHER";
            }
            String message = topic + " Message " + i;

            //此处发送了主题字段和消息内容,注意都的 topic 主题设置的开头的
            publisher.send(topic.getBytes(), 0);
            publisher.send(message.getBytes(), 0);

            System.out.println("Sent: [" + message + "]");
            Thread.sleep(1000); // 每秒发送一条消息
        }
    }
}
发布者
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;

/**
 * @Description: 订阅者(Subscriber) (客户端)
 * @CreateTime: 2024/3/20 18:26
 */

public class ZMQ_SUB {
    public static void main(String[] args) {
        // 创建一个 ZeroMQ 上下文
        Context context = ZMQ.context(1);
        // 创建一个订阅者套接字
        Socket subscriber = context.socket(zmq.ZMQ.ZMQ_SUB);
        subscriber.connect("tcp://localhost:5555");
//        normalRecv(context, subscriber);
        topicRecv(subscriber);
    }

    //接受普通信息,设置主日为 ""
    public static void normalRecv(Context context, Socket subscriber) {
        // 订阅所有主题(空字符串表示订阅所有消息)
        subscriber.subscribe("".getBytes());

        // 接收并打印消息
        for (int i = 0; i < 100; i++) {
            //Receive a message.
            String string = new String(subscriber.recv(0));
            System.out.println("recv 1" + string);
        }
        //关闭套接字和上下文
        subscriber.close();
        context.term();
    }

    public static void topicRecv(Socket subscriber) {
        // 订阅特定主题("NEWS")
        // 所谓主题,就是字符串的开始字段。 只接受 NEWS 开始的字段
        subscriber.subscribe("NEWS".getBytes());

        // 接收并打印消息
        while (true) {
            byte[] topic = subscriber.recv(0);
            byte[] message = subscriber.recv(0);
            System.out.println("Received: [" + new String(topic) + "][" + new String(message) + "]");

        }
    }

}
订阅者
 
        <!-- https://mvnrepository.com/artifact/org.zeromq/jeromq -->
        <dependency>
            <groupId>org.zeromq</groupId>
            <artifactId>jeromq</artifactId>
            <version>0.5.3</version>
        </dependency>

 

总结
PUSH/PULL 模式: 生产者(Producer)/消费者(Consumer)
•用途:生产者-消费者模型。
•特点:单向通信、无序消息、可靠性。
•应用场景:任务分发、异步通信。
PUB/SUB 模式: 发布者(Publisher)/订阅者(Subscriber)
•用途:发布者-订阅者模型。
•特点:多播通信、主题过滤、广播模式、无序消息、可靠性。
•应用场景:实时消息更新、数据分发。
REQ/REP 模式: 客户端(Client)/ 服务器(Server)
•用途:请求-响应模型。
•特点:双向通信、有序消息、可靠通信、阻塞模式。
•应用场景:远程过程调用(RPC)、命令-响应。
通过这三种模式,ZeroMQ 提供了灵活的消息传递方式,可以根据实际需求选择合适的模式来构建应用程序。

标签:PULL,SUB,REP,context,org,import,zeromq,ZMQ,String
From: https://www.cnblogs.com/ruber/p/18464658

相关文章

  • UE-GAS CreateDefaultSubobject<UAbilitySystemComponent>返回为nullptr
    前因原先在项目中用了默认的ASC(UAbilitySystemComponent),后面因为新的需求导致需要改为自己的ASC。结果改了之后,发现在Pre-Init(构造函数)时,CreateDefaultSubobject的返回值是nullptr。过程于是去论坛求助:论坛的人的回答是,你修改了ASC的类,导致前一个ASC反序列化到当前ASC上,从而......
  • P9021 [USACO23JAN] Subtree Activation P
    P9021[USACO23JAN]SubtreeActivationP这种看上去就很不常规的东西不用想着怎么构造最佳方案,这条路一定是行不通的,考虑转化题意。考虑变化的实质只有两种:全\(0\)状态和\(x\)子树全满的状态转化;\(x\)子树全满和\(y\)子树全满的状态转化,其中\(x,y\)有边。这样的状态转......
  • Some bytes have been replaced with the Unicode substitution character while load
    需要修改一较旧的网页代码,当打开时,却出现异常提示: SomebyteshavebeenreplacedwiththeUnicodesubstitutioncharacterwhileloadingfile【文档路径】withUnicode(UTF-8)encoding.Savingthefilewillnotpreservetheoriginalfilecontents.点“OK”,文档是......
  • ReplitLM: 开源代码生成模型的新突破
    ReplitLMReplitLM模型简介ReplitLM是由在线编程平台Replit公司开发的一系列开源大型语言模型(LLM),专门用于代码生成和自然语言处理任务。这些模型在大规模代码数据集上进行训练,能够理解和生成多种编程语言的代码,为开发人员提供强大的AI辅助编程工具。目前,ReplitLM模型系列......
  • 【解决方案】Sublime Text 4 按下 Esc 键后无法输入任何内容
    在最后编辑博客内容时,我的Sublime版本为4180。我基本用SublimeText4替代了系统自带的Notepad,我用它编辑任何东西(除了代码,手动狗头)。开始我怀疑是PackageControl安装了过多依赖导致的兼容性问题,但由于Sublime多次更新,我的PackageControl再次从命令面板消失,而它......
  • Non-terminating decimal expansion; no exact representable decimal result.
    使用Bigdecimal做除法publicstaticvoidmain(String[]args){//1、不设置保留位数System.out.println(BigDecimal.valueOf(2).divide(BigDecimal.valueOf(3)));//2、divide以后再设置保留位数System.out.println(BigDecimal.valueOf(......
  • stiReport动态更新数据源
    总结为了确保动态数据源的更新过程顺利进行,并避免之前的默认数据源导致的冲突或冗余,以下是推荐的步骤:清除数据源:使用report.Dictionary.DataSources.Clear()清除所有旧数据源。添加新数据源:通过report.RegData()方法添加新的数据源。同步字典:使用report.Dictionary.Sy......
  • RepVGGBlock+GSConv+Bifpn+c2fca(模块融合)改进yolo5
    参考论文:ExploringtheClose-RangeDetectionofUAV-BasedImagesonPineWiltDiseasebyanImprovedDeepLearningMethod 首先我们来介绍一下该文章使用的改进模块接着再来实现它 我们查看论文提出的repvggblock1和repvggblock2有什么区别: 可以看到只是一个bn层......
  • WSL(Windows Subsystem for Linux)——简单的双系统开发
    文章目录WSLWSL的作用WSL的使用WSL的安装挂载磁盘的作用安装linux发行版wsl下载mysql,mongodb,redisWSL前言:本人由于在开发中需要linux环境,同时还想要直接在Windows下开发,来提升开发效率,随即简单学习WSL。WSL(WindowsSubsystemforLinux)是微软开发的一项技术,允许用......
  • Sublime
    激活快捷键Alt+F3查找内容全部选择,可以统一替换或手动修改......