首页 > 编程语言 >Java_消息队列

Java_消息队列

时间:2023-11-02 20:34:07浏览次数:35  
标签:Java NAME Exchange 队列 RabbitMQ Queue 消息 import

消息系统

MQ 全称Message Queue(消息队列)
消息队列有RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMQ等,而部分数据库如Redis、MySQL以及phxsql也可实现消息队列的功能
  系统管理者MessageManager
    包括Apache的 ActiveMQ,Apache的Kafka,RabbitMQ、memcacheQ
  消息类型
      点对点的消息  发布订阅的的消息
异步通信
AMQP : 即Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,
             专门为消息中间件设计。基于此协议的客户端与消息中间件可传递消息

STOMP,Streaming Text Orientated Message Protocol,是流文本定向消息协议,是一种为MOM(Message Oriented Middleware,面向消息的中间件)设计的简单文本协议
 
CKafka、RocketMQ、RabbitMQ、Pulsar、CMQ 五大产品
    CMQ(Cloud Message Queue)是基于腾讯自研消息引擎的分布式消息队列系统
   消息队列 CKafka(Cloud Kafka)
   阿里 RocketMQ
企业应用系统通信的

RabbitMQ

 收消息和发消息而已
 AMQP中增加了Exchange和Binging的角色。生产者把消息发布到 Exchange 上,消息最终到达队列并被消费者接收
        ,而Binding决定交换器的消息应该发送到哪个队列
    Producer   Connection  Channel 
       Broker Queue Exchange
    Vhost  
 Consumer
 路由类型  路由关系  Queue类型 Message Queue
Messaage   
   消息的 headers 和 body
   延时消息:消息在发送至服务端后,实际业务并不希望消费端马上收到这条消息,而是推迟一段时间后再被消费,这类消息统称为延时消息
组件说明
  生产者: RoutingKey(路由键)
    vhost 标识一批交换机、消息队列和相关对象 RabbitMQ默认的vhost是 /。
	 交换机 Exchange 3种类型的 Exchange :Direct、Fanout、Topic
	     Direct:消息中的 Routing Key 
		 Fanout:叫广播
	消息的路由是由Exchange类型 和 Binding 来决定的。
	    Binding 表示建立 Queue 和 Exchange 之间的绑定关系,每一个绑定关系会存在一个 BindingKey。
 RabbitMQ支持三种路由键匹配规则:直接匹配、通配符匹配和正则表达式匹配。 

看集群、节点、Vhost 和 Queue 四个维度

###Vhost 
    Exchange 数量:展示当前 Vhost 下的 Exchange 数量。
    Queue 数量:展示当前 Vhost 下的 Queue 数量。
    Channel 数量:展示当前 Vhost 下的 Channel 数量
    User 数量:展示当前 Vhost 的用户数量
###Exchange 
   生产者将消息发送到 Exchange 中,Exchange 根据 消息的属性或内容 将消息路由到一个或多个 Queue 中
    路由类型:选择路由类型,包括:Direct、Fanout、Topic 和 headers
Queue ****
    多个 Consumer 可以订阅同一个 Queue,这时 Queue 中的消息会被平均分摊给多个 Consumer 进行处理,而不是每个 Consumer 都收到所有的消息并处理	
 Binding—告诉Exchange消息应该存储在哪个Queue的规则
 
 frame:协议头frame、方法frame、消息头frame、消息体(body) frame、心跳frame。

java代码

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.tencent.tdmq.demo.cloud.Constant;

   public class MessageProducer {

  private static final String EXCHANGE_NAME = "exchange_name";
  public static void main(String[] args) throws Exception {
       // 连接工厂
       ConnectionFactory factory = new ConnectionFactory();
       // 设置服务地址 (完整复制控制台接入点地址)
       factory.setUri("amqp://***");
       // 设置Virtual Hosts (开源 RabbitMQ 控制台复制完整Vhost名称)
       factory.setVirtualHost(VHOST_NAME);
       // 设置用户名 (开源 RabbitMQ 控制台中Vhost的配置权限中的user名称)
       factory.setUsername(USERNAME);
       // 设置密码 (对应user的密钥)
       factory.setPassword("****");
       // 获取连接、建立通道
       try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
           // 绑定消息交换机 (EXCHANGE_NAME必须在消息队列RabbitMQ版控制台上已存在,并且Exchange的类型与控制台上的类型一致)
           channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
           for (int i = 0; i < 10; i++) {
               String message = "this is rabbitmq message " + i;
               // 发布消息到交换机,交换机自动将消息投递到相应队列
               channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
               System.out.println(" [producer] Sent '" + message + "'");
           }
       } catch (Exception e) {
           e.printStackTrace();
       }
   }
   }

Java消费者

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.tencent.tdmq.demo.cloud.Constant;
import java.io.IOException;
import java.nio.charset.StandardCharsets;

   public class MessageConsumer1 {
      public static final String QUEUE_NAME = "queue_name";
      private static final String EXCHANGE_NAME = "exchange_name";
   public static void main(String[] args) throws Exception {
       // 连接工厂
       ConnectionFactory factory = new ConnectionFactory();
       // 设置服务地址 (完整复制控制台接入点地址)
       factory.setUri("amqp://***");
       // 设置Virtual Hosts (开源 RabbitMQ 控制台中复制完整Vhost名称)
       factory.setVirtualHost(VHOST_NAME);
       // 设置用户名 (开源 RabbitMQ 控制台中Vhost的配置权限中的user名称)
       factory.setUsername(USERNAME);
       // 设置密码 (对应user的密钥)
       factory.setPassword("****");
       // 获取连接
       Connection connection = factory.newConnection();
       // 建立通道
       Channel channel = connection.createChannel();
       // 绑定消息交换机
       channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
       // 声明队列信息
       channel.queueDeclare(QUEUE_NAME, true, false, false, null);
       // 绑定消息交换机 (EXCHANGE_NAME必须在消息队列RabbitMQ版控制台上已存在,并且Exchange的类型与控制台上的类型一致)
       channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
       System.out.println(" [Consumer1] Waiting for messages.");
       // 订阅消息
       channel.basicConsume(QUEUE_NAME, false, "ConsumerTag", new DefaultConsumer(channel) {
           @Override
           public void handleDelivery(String consumerTag, Envelope envelope,
                                      AMQP.BasicProperties properties, byte[] body)
                   throws IOException {
               //接收到的消息,进行业务逻辑处理。
               System.out.println("Received: " + new String(body, StandardCharsets.UTF_8) + ", deliveryTag: " + envelope.getDeliveryTag() + ", messageId: " + properties.getMessageId());
               channel.basicAck(envelope.getDeliveryTag(), false);
           }
       });
   }
   }

RabbitMQ 原生控制台

  添加路由策略
     配置路由接入规则

rabbitpy

旨在提供一个简单易用的api来与 RabbitMQ,最大限度地减少了其他库中经常出现的编程开销
 rabbitpy: RabbitMQ Simplified


There are two basic ways to interact with rabbitpy, using the simple wrapper methods:

1. Simple API Methods
     rabbitpy.publish()
     rabbitpy.consume()		 
2. by using the core objects:
   AMQP Adapter :  amqp = rabbitpy.AMQP(channel)
   Transactions

kafka-python

参考

https://github.com/gmr/rabbitpy

标签:Java,NAME,Exchange,队列,RabbitMQ,Queue,消息,import
From: https://www.cnblogs.com/ytwang/p/17806245.html

相关文章

  • 前端基础之JavaScript
    前端基础之JavaScriptJavaScript概述ECMAScript和JavaScript的关系1996年11月,JavaScript的创造者--Netscape公司,决定将JavaScript提交给国际标准化组织ECMA,希望这门语言能够成为国际标准。次年,ECMA发布262号标准文件(ECMA-262)的第一版,规定了浏览器脚本语言的标准,并将这种语言称......
  • JavaScript基础
    引入方式JavaScript程序不能独立运行,它需要被嵌入HTML中,然后浏览器才能执行JavaScript代码。内部引入写在body结束标签的上方。<!DOCTYPEhtml><htmllang="en"><head><metacharset="UTF-8"><metahttp-equiv="X-UA-Compatible"content=&......
  • 迭代加深,双向搜索,IDA*,A*,双端队列BFS
    迭代加深://迭代加深搜索//给搜索设定一个范围,如果在这个范围内没有答案那么再加大搜索范围//这么做是为了防止搜索过深,导致利用大量时间搜索无用信息//如果当前搜索是第10位,搜索的是个二叉树,那么前9个就是2^0+2^1+2^2+..+2^9=2^10-1,所以时间复杂度并没增大太多//htt......
  • JavaScript知识点
    new操作符1、创建一个空对象,并且this变量引用该对象,同时还继承了该函数的原型2、属性和方法被加入到this引用的对象中3、新创建的对象由this所引用,并且最后隐式的返回thisAjax原理1、Ajax的原理简单来说是在用户和服务器之间加了一个中间层(AJAX引擎),通过XmiHttpRequest对象来向服......
  • Java面试题:链表-反转链表
    问题描述给定一个单链表的头结点pHead(该头节点是有值的,比如在下图,它的val是1),长度为n,反转该链表后,返回新链表的表头。如当输入链表{1,2,3}时,经反转后,原链表变为{3,2,1},所以对应的输出为{3,2,1}。示例输入:{1,2,3}返回值:{3,2,1}原题地址:https://www.nowcoder.com/practice/7......
  • JavaScript 将大数组拆分成多个小数组 循环调用接口
    项目需求:数据列表批量选择提交购物车,一次性提交数据量过大接口会报错,传递的参数是选中数据id的数组。项目运行很久了不做大改动,将提交数据总数限制在2000条以内,每500条走一次接口。思路:1.写一个将大数组拆分多个小数组的方法,arr为大数组,len为要拆分的小数组长度arrGroup(arr,......
  • java开发环境
    java开发环境编译运行过程(1)编译期将.java源文件交给编译器编译成.class字节码文件的过程(2)运行期把编译后的.class字节码文件经过JVM加载并运行.class字节码文件配置环境变量windows10系统下配置JDK环境变量:(1).安装JDK,安装过程中可以自定义安装目录等信息(2)安装完成后,右击......
  • JAVA多线程之线程间的通信方式
    一,介绍本总结我对于JAVA多线程中线程之间的通信方式的理解,主要以代码结合文字的方式来讨论线程间的通信,故摘抄了书中的一些示例代码。 二,线程间的通信方式①同步这里讲的同步是指多个线程通过synchronized关键字这种方式来实现线程间的通信。参考示例:publicclassMyObj......
  • Java面试题2
    Java面试题(第二天)1.重载和重写的区别重载:发生在同一个类中,方法名必须相同,参数类型不同,个数不同,顺序不同,方法返回值和访问修饰符可以不同,发生在编译时期重写:发生在父子类中,方法名、参数列表必须相同,返回值范围小于等于父类,抛出的异常范围小于等于父类,访问修饰符范围大于等于父......
  • Java面试题3
    Java面试题(第三天)1.HashMap和HashTable的区别?a.区别多线程环境下,HashTable比HashMap更安全,因为HashTable都加了一个synchronized修饰HashMap允许key和value为null,而HashTable不允许b.HashMap底层实现数组+链表jdk8开始链表高度到8,数组长度超过64,链表转变为红黑树,元素以......