消息系统
MQ 全称Message Queue(消息队列)
消息队列有RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMQ等,而部分数据库如Redis、MySQL以及phxsql也可实现消息队列的功能
系统管理者MessageManager
包括Apache的 ActiveMQ,Apache的Kafka,RabbitMQ、memcacheQ
消息类型
点对点的消息 发布订阅的的消息
异步通信
AMQP : 即Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,
专门为消息中间件设计。基于此协议的客户端与消息中间件可传递消息
STOMP,Streaming Text Orientated Message Protocol,是流文本定向消息协议,是一种为MOM(Message Oriented Middleware,面向消息的中间件)设计的简单文本协议
CKafka、RocketMQ、RabbitMQ、Pulsar、CMQ 五大产品
CMQ(Cloud Message Queue)是基于腾讯自研消息引擎的分布式消息队列系统
消息队列 CKafka(Cloud Kafka)
阿里 RocketMQ
企业应用系统通信的
RabbitMQ
收消息和发消息而已
AMQP中增加了Exchange和Binging的角色。生产者把消息发布到 Exchange 上,消息最终到达队列并被消费者接收
,而Binding决定交换器的消息应该发送到哪个队列
Producer Connection Channel
Broker Queue Exchange
Vhost
Consumer
路由类型 路由关系 Queue类型 Message Queue
Messaage
消息的 headers 和 body
延时消息:消息在发送至服务端后,实际业务并不希望消费端马上收到这条消息,而是推迟一段时间后再被消费,这类消息统称为延时消息
组件说明
生产者: RoutingKey(路由键)
vhost 标识一批交换机、消息队列和相关对象 RabbitMQ默认的vhost是 /。
交换机 Exchange 3种类型的 Exchange :Direct、Fanout、Topic
Direct:消息中的 Routing Key
Fanout:叫广播
消息的路由是由Exchange类型 和 Binding 来决定的。
Binding 表示建立 Queue 和 Exchange 之间的绑定关系,每一个绑定关系会存在一个 BindingKey。
RabbitMQ支持三种路由键匹配规则:直接匹配、通配符匹配和正则表达式匹配。
看集群、节点、Vhost 和 Queue 四个维度
###Vhost
Exchange 数量:展示当前 Vhost 下的 Exchange 数量。
Queue 数量:展示当前 Vhost 下的 Queue 数量。
Channel 数量:展示当前 Vhost 下的 Channel 数量
User 数量:展示当前 Vhost 的用户数量
###Exchange
生产者将消息发送到 Exchange 中,Exchange 根据 消息的属性或内容 将消息路由到一个或多个 Queue 中
路由类型:选择路由类型,包括:Direct、Fanout、Topic 和 headers
Queue ****
多个 Consumer 可以订阅同一个 Queue,这时 Queue 中的消息会被平均分摊给多个 Consumer 进行处理,而不是每个 Consumer 都收到所有的消息并处理
Binding—告诉Exchange消息应该存储在哪个Queue的规则
frame:协议头frame、方法frame、消息头frame、消息体(body) frame、心跳frame。
java代码
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.tencent.tdmq.demo.cloud.Constant;
public class MessageProducer {
private static final String EXCHANGE_NAME = "exchange_name";
public static void main(String[] args) throws Exception {
// 连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置服务地址 (完整复制控制台接入点地址)
factory.setUri("amqp://***");
// 设置Virtual Hosts (开源 RabbitMQ 控制台复制完整Vhost名称)
factory.setVirtualHost(VHOST_NAME);
// 设置用户名 (开源 RabbitMQ 控制台中Vhost的配置权限中的user名称)
factory.setUsername(USERNAME);
// 设置密码 (对应user的密钥)
factory.setPassword("****");
// 获取连接、建立通道
try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
// 绑定消息交换机 (EXCHANGE_NAME必须在消息队列RabbitMQ版控制台上已存在,并且Exchange的类型与控制台上的类型一致)
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
for (int i = 0; i < 10; i++) {
String message = "this is rabbitmq message " + i;
// 发布消息到交换机,交换机自动将消息投递到相应队列
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [producer] Sent '" + message + "'");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
Java消费者
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.tencent.tdmq.demo.cloud.Constant;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class MessageConsumer1 {
public static final String QUEUE_NAME = "queue_name";
private static final String EXCHANGE_NAME = "exchange_name";
public static void main(String[] args) throws Exception {
// 连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置服务地址 (完整复制控制台接入点地址)
factory.setUri("amqp://***");
// 设置Virtual Hosts (开源 RabbitMQ 控制台中复制完整Vhost名称)
factory.setVirtualHost(VHOST_NAME);
// 设置用户名 (开源 RabbitMQ 控制台中Vhost的配置权限中的user名称)
factory.setUsername(USERNAME);
// 设置密码 (对应user的密钥)
factory.setPassword("****");
// 获取连接
Connection connection = factory.newConnection();
// 建立通道
Channel channel = connection.createChannel();
// 绑定消息交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 声明队列信息
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 绑定消息交换机 (EXCHANGE_NAME必须在消息队列RabbitMQ版控制台上已存在,并且Exchange的类型与控制台上的类型一致)
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
System.out.println(" [Consumer1] Waiting for messages.");
// 订阅消息
channel.basicConsume(QUEUE_NAME, false, "ConsumerTag", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
//接收到的消息,进行业务逻辑处理。
System.out.println("Received: " + new String(body, StandardCharsets.UTF_8) + ", deliveryTag: " + envelope.getDeliveryTag() + ", messageId: " + properties.getMessageId());
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
RabbitMQ 原生控制台
添加路由策略
配置路由接入规则
rabbitpy
旨在提供一个简单易用的api来与 RabbitMQ,最大限度地减少了其他库中经常出现的编程开销
rabbitpy: RabbitMQ Simplified
There are two basic ways to interact with rabbitpy, using the simple wrapper methods:
1. Simple API Methods
rabbitpy.publish()
rabbitpy.consume()
2. by using the core objects:
AMQP Adapter : amqp = rabbitpy.AMQP(channel)
Transactions
kafka-python
参考
https://github.com/gmr/rabbitpy
标签:Java,NAME,Exchange,队列,RabbitMQ,Queue,消息,import
From: https://www.cnblogs.com/ytwang/p/17806245.html