1.Hello_wordl!
代码实现
consumer
package cn.pickle.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author Pickle
* @version V1.0
* @date 2022/10/3 16:33
*/
public class Consumer_Hello {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPassword("pickle");
factory.setUsername("pickle");
factory.setPort(5672);
factory.setVirtualHost("/");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
Consumer comsurmer = new DefaultConsumer(channel){
/*
回调方法,当收到消息后自动执行该方法。
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag: " + consumerTag);
System.out.println("Exchange: " + envelope.getExchange());
System.out.println("RoutingKey: " + envelope.getRoutingKey());
System.out.println("Properties: " + properties);
System.out.println("body: " + new String(body));
}
};
channel.basicConsume("hello_world",true,comsurmer);
}
}
producer
package cn.pickle.producer;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @describe 发送消息
* @author Pickle
* @version V1.0
* @date 2022/10/3 15:08
*/
public class Producer_Hello {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置参数,
factory.setHost("localhost"); //IP地址
factory.setPort(5672); //端口,默认值5672
factory.setVirtualHost("/"); //设置虚拟机,默认值"/"
factory.setUsername("pickle");
factory.setPassword("pickle");
//创建连接
final Connection connection = factory.newConnection();
//创建channel
final Channel channel = connection.createChannel();
//创建消息队列
/*
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException {
*/
channel.queueDeclare("hello_world", true, false, false, null);
//发送消息
String body = "hello rabbitmq~~~~";
channel.basicPublish("","hello_world",null,body.getBytes());
//释放资源
channel.close();
connection.close();
}
}
2、工作队列
producer
package cn.pickle.producer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author Pickle
* @version V1.0
* @date 2022/10/3 17:36
*/
public class Producer_WorkQueues {
public static void main(String[] args) throws IOException, TimeoutException {
final ConnectionFactory factory = new ConnectionFactory();
factory.setPassword("pickle");
factory.setUsername("pickle");
factory.setPort(5672);
factory.setHost("localhost");
factory.setVirtualHost("/");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
//创建消息队列
channel.queueDeclare("work_queues",true,false,false,null);
for (int i = 1; i <= 10; i++) {
String body = i + "hello rabbitmq~~~";
channel.basicPublish("","work_queues",null,body.getBytes());
}
}
}
consumer1
package cn.pickle.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author Pickle
* @version V1.0
* @date 2022/10/3 17:47
*/
public class Consumer_WorkQueues1 {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPassword("pickle");
factory.setUsername("pickle");
factory.setPort(5672);
factory.setVirtualHost("/");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
Consumer comsurmer = new DefaultConsumer(channel){
/*
回调方法,当收到消息后自动执行该方法。
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// System.out.println("consumerTag: " + consumerTag);
// System.out.println("Exchange: " + envelope.getExchange());
// System.out.println("RoutingKey: " + envelope.getRoutingKey());
// System.out.println("Properties: " + properties);
System.out.println("body: " + new String(body));
}
};
channel.basicConsume("work_queues",true,comsurmer);
}
}
consumer2
package cn.pickle.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author Pickle
* @version V1.0
* @date 2022/10/3 17:47
*/
public class Consumer_WorkQueues1 {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPassword("pickle");
factory.setUsername("pickle");
factory.setPort(5672);
factory.setVirtualHost("/");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
Consumer comsurmer = new DefaultConsumer(channel){
/*
回调方法,当收到消息后自动执行该方法。
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// System.out.println("consumerTag: " + consumerTag);
// System.out.println("Exchange: " + envelope.getExchange());
// System.out.println("RoutingKey: " + envelope.getRoutingKey());
// System.out.println("Properties: " + properties);
System.out.println("body: " + new String(body));
}
};
channel.basicConsume("work_queues",true,comsurmer);
}
}
先启动两个消费者,再启动生产者.效果如下
总结
- 消费者对于同一个消息是竞争关系
- 消费者轮流的从消息队列中读取消息
3、Pub/Sub工作模式
引入了交换机
Producer_PubSub
package cn.pickle.producer;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author Pickle
* @version V1.0
* @date 2022/10/4 16:05
*/
public class Producer_PubSub {
public static void main(String[] args) throws IOException, TimeoutException {
final ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("pickle");
factory.setPassword("pickle");
factory.setVirtualHost("/");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
//创建交换机
// String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments
// exchange:交换机名称
// type:
String exchangeName = "test_fanout";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
//创建队列
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
//绑定队列和交换机
channel.queueBind(queue1Name,exchangeName,"");
channel.queueBind(queue2Name,exchangeName,"");
//发送消息
String body = "日志信息:张三调用了fanout方法.... 日志级别:info...";
channel.basicPublish(exchangeName,"",null,body.getBytes());
channel.close();
connection.close();
}
}
Consumer_PubSub1
package cn.pickle.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author Pickle
* @version V1.0
* @date 2022/10/4 16:25
*/
public class Consumer_PubSub1 {
public static void main(String[] args) throws IOException, TimeoutException {
final ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setPort(5672);
factory.setUsername("pickle");
factory.setPassword("pickle");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
final Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body: " + new String(body));
System.out.println("将日志信息打印到控制台....");
}
};
channel.basicConsume(queue1Name,true,consumer);
}
}
Consumer_PubSub2
package cn.pickle.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author Pickle
* @version V1.0
* @date 2022/10/4 16:25
*/
public class Consumer_PubSub2 {
public static void main(String[] args) throws IOException, TimeoutException {
final ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setPort(5672);
factory.setUsername("pickle");
factory.setPassword("pickle");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
final Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body: " + new String(body));
System.out.println("将日志信息保存数据库....");
}
};
channel.basicConsume(queue2Name,true,consumer);
}
}
结果展示
4、路由工作模式
Producer_Routing
package cn.pickle.producer;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author Pickle
* @version V1.0
* @date 2022/10/4 16:45
*/
public class Producer_Routing {
public static void main(String[] args) throws IOException, TimeoutException {
final ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("pickle");
factory.setPassword("pickle");
factory.setVirtualHost("/");
factory.setPort(5672);
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
String exchangeName="test_direct";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
channel.queueBind(queue1Name,exchangeName,"error");
channel.queueBind(queue2Name,exchangeName,"error");
channel.queueBind(queue2Name,exchangeName,"info");
channel.queueBind(queue2Name,exchangeName,"warning");
String body = "日志信息:张三调用了routing方法... 日志等级:info...";
channel.basicPublish(exchangeName,"warning",null,body.getBytes());
channel.close();
connection.close();
}
}
Consumer_Routing1
package cn.pickle.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author Pickle
* @version V1.0
* @date 2022/10/4 17:26
*/
public class Consumer_Routing1 {
public static void main(String[] args) throws IOException, TimeoutException {
final ConnectionFactory factory = new ConnectionFactory();
factory.setPort(5672);
factory.setPassword("pickle");
factory.setUsername("pickle");
factory.setVirtualHost("/");
factory.setHost("localhost");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body: " + new String(body));
System.out.println("将日志信息打印到控制台....");
}
};
channel.basicConsume(queue2Name,true,consumer);
}
}
Consumer_Routing2
package cn.pickle.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author Pickle
* @version V1.0
* @date 2022/10/4 17:26
*/
public class Consumer_Routing2 {
public static void main(String[] args) throws IOException, TimeoutException {
final ConnectionFactory factory = new ConnectionFactory();
factory.setPort(5672);
factory.setPassword("pickle");
factory.setUsername("pickle");
factory.setVirtualHost("/");
factory.setHost("localhost");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body: " + new String(body));
System.out.println("将日志信息保存到数据库....");
}
};
channel.basicConsume(queue1Name,true,consumer);
}
}
5、Topic模式
Producer_Topic
package cn.pickle.producer;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author Pickle
* @version V1.0
* @date 2022/10/4 19:09
*/
public class Producer_Topic {
public static void main(String[] args) throws IOException, TimeoutException {
final ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("pickle");
factory.setPassword("pickle");
factory.setVirtualHost("/");
factory.setPort(5672);
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
String exchangeName="test_topic";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);
String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
channel.queueBind(queue1Name,exchangeName,"#.error");
channel.queueBind(queue1Name,exchangeName,"order.*");
channel.queueBind(queue2Name,exchangeName,"*.*");
String body = "我TM是一条消息...";
channel.basicPublish(exchangeName,"goods.error",null,body.getBytes());
}
}
Consumer_Topic1
package cn.pickle.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author Pickle
* @version V1.0
* @date 2022/10/4 19:26
*/
public class Consumer_Topic1 {
public static void main(String[] args) throws IOException, TimeoutException {
final ConnectionFactory factory = new ConnectionFactory();
factory.setPort(5672);
factory.setPassword("pickle");
factory.setUsername("pickle");
factory.setVirtualHost("/");
factory.setHost("localhost");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body: " + new String(body));
System.out.println("将日志信息保存到数据库....");
}
};
channel.basicConsume(queue1Name,true,consumer);
}
}
Consumer_Topic2
package cn.pickle.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author Pickle
* @version V1.0
* @date 2022/10/4 19:26
*/
public class Consumer_Topic2 {
public static void main(String[] args) throws IOException, TimeoutException {
final ConnectionFactory factory = new ConnectionFactory();
factory.setPort(5672);
factory.setPassword("pickle");
factory.setUsername("pickle");
factory.setVirtualHost("/");
factory.setHost("localhost");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body: " + new String(body));
System.out.println("将日志信息打印到控制台....");
}
};
channel.basicConsume(queue2Name,true,consumer);
}
}
标签:String,factory,模式,工作,RabbitMQ,import,pickle,final,channel
From: https://www.cnblogs.com/poteitoutou/p/16750846.html