简单案例:消息生产与消费
pom.xml 配置
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<!-- 3.6.5 是稳定版本 -->
<version>3.6.5</version>
</dependency>
生产者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* <h1>简单案例:消息生产与消费</h1>
* 消息生产者
* Created by DHA on 2019/11/18.
*/
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1 创建一个 Connectionfactory,并进行设置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 通过连接工厂创建连接
Connection connection = connectionFactory.newConnection();
//3 通过 connecion 创建一个 Channel
Channel channel = connection.createChannel();
//4 通过 chanel 发送数据
for(int i=0;i<10;i++){
String data="Hello!";
channel.basicPublish("","test001",null,data.getBytes());
}
//5 关闭相关连接
channel.close();
connection.close();
}
}
消费者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* <h1>简单案例:消息生产与消费</h1>
* 消息生产者
* Created by DHA on 2019/11/18.
*/
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1 创建一个 Connectionfactory,并进行设置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 通过连接工厂创建连接
Connection connection = connectionFactory.newConnection();
//3 通过 connecion 创建一个 Channel
Channel channel = connection.createChannel();
//4 通过 chanel 发送数据
for(int i=0;i<10;i++){
String data="Hello!";
channel.basicPublish("","test001",null,data.getBytes());
}
//5 关闭相关连接
channel.close();
connection.close();
}
}
Direct Exchange
生产者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* <h1> Direct Exchange</h1>
* 所有发送到 Direct Exchange 的消息被转发到 routing key 中指定的 Queue。
* 消息生产者
* Created by DHA on 2019/11/19.
*/
public class Producer4DirectExchange {
public static void main(String[] args) throws IOException, TimeoutException {
//1 创建一个 Connectionfactory,并进行设置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 通过连接工厂创建连接
Connection connection = connectionFactory.newConnection();
//3 通过 connecion 创建一个 Channel
Channel channel = connection.createChannel();
//4 声明
//声明 exchange 名称
String exchangeName="test_direct_exchange";
String routingKey = "test.direct";
//5 通过 chanel 发送数据
String msg = "Hello World RabbitMQ 4 Direct Exchange Message ... ";
channel.basicPublish(exchangeName, routingKey , null , msg.getBytes());
//6 关闭相关连接
channel.close();
connection.close();
}
}
消费者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* <h1> Direct Exchange</h1>
* 所有发送到 Direct Exchange 的消息被转发到 routing key 中指定的 Queue。
* 消息消费者
* Created by DHA on 2019/11/19.
*/
public class Consumer4DirectExchange {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//1 创建一个 Connectionfactory,并进行设置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 通过连接工厂创建连接
Connection connection = connectionFactory.newConnection();
//3 通过 connecion 创建一个 Channel
Channel channel = connection.createChannel();
//4 声明
String exchangeName="test_direct_exchange";
String exchangeType="direct";
String queueName="test_direct_queue";
String routingKey="test.direct";
// 声明一个交换机
channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null);
// 声明一个队列
channel.queueDeclare(queueName,false,false,false,null);
// 绑定:将一个队列绑定到一个交换机上
channel.queueBind(queueName,exchangeName,routingKey);
//5 创建消费者
QueueingConsumer queueingConsumer=new QueueingConsumer(channel);
//6 设置 channel
channel.basicConsume(queueName,true,queueingConsumer);
//7 获取数据
while(true){
QueueingConsumer.Delivery delivery=queueingConsumer.nextDelivery();
String msg=new String(delivery.getBody());
System.out.println("消费端:"+msg);
}
}
}
Topic Exchange
生产者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* <h1> Topic Exchange</h1>
* Topic Exchange 将 routing key 与某 Topic 进行模糊匹配,此时队列需要绑定一个 Topic。
* 消息生产者
* Created by DHA on 2019/11/19.
*/
public class Producer4TopicExchange {
public static void main(String[] args) throws IOException, TimeoutException {
//1 创建一个 Connectionfactory,并进行设置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 通过连接工厂创建连接
Connection connection = connectionFactory.newConnection();
//3 通过 connecion 创建一个 Channel
Channel channel = connection.createChannel();
//4 声明
//声明 exchange 名称
String exchangeName="test_topic_exchange";
String routingKey1 = "user.save";
String routingKey2 = "user.update";
String routingKey3 = "user.delete.abc";
//5 通过 chanel 发送数据
String msg = "Hello World RabbitMQ 4 Topic Exchange Message ... ";
channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes());
channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes());
channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes());
//6 关闭相关连接
channel.close();
connection.close();
}
}
消费者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* <h1> Topic Exchange</h1>
* Topic Exchange 将 routing key 与某 Topic 进行模糊匹配,此时队列需要绑定一个 Topic。
* 消息消费者
* Created by DHA on 2019/11/19.
*/
public class Consumer4TopicExchange {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//1 创建一个 Connectionfactory,并进行设置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 通过连接工厂创建连接
Connection connection = connectionFactory.newConnection();
//3 通过 connecion 创建一个 Channel
Channel channel = connection.createChannel();
//4 声明
String exchangeName="test_topic_exchange";
String exchangeType="topic";
String queueName="test_topic_queue";
String routingKey="user.*";
// 声明一个交换机
channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null);
// 声明一个队列
channel.queueDeclare(queueName,false,false,false,null);
// 绑定:将一个队列绑定到一个交换机上
channel.queueBind(queueName,exchangeName,routingKey);
//5 创建消费者
QueueingConsumer queueingConsumer=new QueueingConsumer(channel);
//6 设置 channel
channel.basicConsume(queueName,true,queueingConsumer);
//7 获取数据
while(true){
QueueingConsumer.Delivery delivery=queueingConsumer.nextDelivery();
String msg=new String(delivery.getBody());
System.out.println("消费端:"+msg);
}
}
}
Fanout Exchange
生产者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* <h1> Fanout Exchange</h1>
* Fanout Exchange 不处理 routing key,只需要简单的将队列绑定到交换机上,发送到交换机的消息都会被转发到交换机绑定的所有队列上。
* 消息生产者
* Created by DHA on 2019/11/19.
*/
public class Producer4FanoutExchange {
public static void main(String[] args) throws IOException, TimeoutException {
//1 创建一个 Connectionfactory,并进行设置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 通过连接工厂创建连接
Connection connection = connectionFactory.newConnection();
//3 通过 connecion 创建一个 Channel
Channel channel = connection.createChannel();
//4 声明
//声明 exchange 名称
String exchangeName="test_fanout_exchange";
//5 通过 chanel 发送数据
for(int i = 0; i < 10; i ++) {
String msg = "Hello World RabbitMQ 4 Fanout Exchange Message ...";
channel.basicPublish(exchangeName, "", null , msg.getBytes());
}
//6 关闭相关连接
channel.close();
connection.close();
}
}
消费者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* <h1> Fanout Exchange</h1>
* Fanout Exchange 不处理 routing key,只需要简单的将队列绑定到交换机上,发送到交换机的消息都会被转发到交换机绑定的所有队列上。
* 消息消费者
* Created by DHA on 2019/11/19.
*/
public class Consumer4FanoutExchange {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//1 创建一个 Connectionfactory,并进行设置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 通过连接工厂创建连接
Connection connection = connectionFactory.newConnection();
//3 通过 connecion 创建一个 Channel
Channel channel = connection.createChannel();
//4 声明
String exchangeName="test_fanout_exchange";
String exchangeType="fanout";
String queueName="test_fanout_queue";
String routingKey="";
// 声明一个交换机
channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null);
// 声明一个队列
channel.queueDeclare(queueName,false,false,false,null);
// 绑定:将一个队列绑定到一个交换机上
channel.queueBind(queueName,exchangeName,routingKey);
//5 创建消费者
QueueingConsumer queueingConsumer=new QueueingConsumer(channel);
//6 设置 channel
channel.basicConsume(queueName,true,queueingConsumer);
//7 获取数据
while(true){
QueueingConsumer.Delivery delivery=queueingConsumer.nextDelivery();
String msg=new String(delivery.getBody());
System.out.println("消费端:"+msg);
}
}
}
设置消息属性
生产者
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
/**
* <h1>消息属性设置</h1>
* 消息生产者
* Created by DHA on 2019/11/18.
*/
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1 创建一个 Connectionfactory,并进行设置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 通过连接工厂创建连接
Connection connection = connectionFactory.newConnection();
//3 通过 connecion 创建一个 Channel
Channel channel = connection.createChannel();
// 设置自定义属性
Map<String, Object> headers = new HashMap<>();
headers.put("attr1", "111");
headers.put("attr2", "222");
//4 设置消息属性
AMQP.BasicProperties properties=new AMQP.BasicProperties.Builder()
.deliveryMode(2) // 2 表示持久化的投递
.contentEncoding("UTF-8") // 设置内容编码
.expiration("10000") // 设置过期时间为 10 秒
.headers(headers) // 自定义属性
.build();
//5 通过 chanel 发送数据
for(int i=0;i<5;i++){
String data="Hello!";
channel.basicPublish("","test001",properties,data.getBytes());
}
//6 关闭相关连接
channel.close();
connection.close();
}
}
消费者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeoutException;
/**
* <h1>消息属性设置</h1>
* 消息消费者
* Created by DHA on 2019/11/18.
*/
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//1 创建一个 Connectionfactory,并进行设置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 通过连接工厂创建连接
Connection connection = connectionFactory.newConnection();
//3 通过 connecion 创建一个 Channel
Channel channel = connection.createChannel();
//4 声明一个队列
String queueName="test001";
channel.queueDeclare(queueName,true,false,false,null);
//5 创建消费者
QueueingConsumer queueingConsumer=new QueueingConsumer(channel);
//6 设置 channel
channel.basicConsume(queueName,true,queueingConsumer);
//7 获取数据
while(true){
Delivery delivery=queueingConsumer.nextDelivery();
String msg=new String(delivery.getBody());
System.out.println("消费端:"+msg);
// 获取自定义属性数据
Map<String,Object> headers=delivery.getProperties().getHeaders();
System.err.println("headers get attribute attr1 value: " + headers.get("attr1"));
}
}
}
参考:
标签:connectionFactory,入门,rabbitmq,MQ,Rabbit,import,com,channel,String From: https://www.cnblogs.com/i9code/p/17998177