import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** 消息生产者 **/ public class Producer { // 定义队列名称 private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { //创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("47.243.109.199"); factory.setUsername("admin"); factory.setPassword("123456"); //channel 实现了自动 close 接口 自动关闭 不需要显示关闭 //创建连接对象 Connection connection = factory.newConnection(); //根据连接对象,获取信道 Channel channel = connection.createChannel(); /**设置消息队列的属性! * queue :队列名称 * durable :是否持久化 如果持久化,mq重启后队列数据还在! (队列是在虚拟路径上的...) * exclusive :队列是否独占此连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建 * autoDelete :队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除) * arguments :队列参数 null,可以设置一个队列的扩展参数,需要时候使用!比如:可设置存活时间 * */ channel.queueDeclare(QUEUE_NAME, false, false, false, null); /**发送消息,参数: * exchange :指定的交换机,不指定就会有默认的.... * routingKey :路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机routingKey设置为队列的名称 * props :消息包含的属性: 后面介绍,可以是一个一个对象... 消息持久化配置... * body :发送的消息,AMQP以字节方式传输... * */ channel.basicPublish("", QUEUE_NAME, null, "Hello Word你好世界".getBytes()); System.out.println("消息发送完毕"); } }
import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** 消息消费者 **/ public class Consumer { // 定义队列名称 private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("47.243.109.199"); factory.setUsername("admin"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); System.out.println("等待接收消息........."); //收到消息后用来处理消息的回调对象 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody()); System.out.println(message); }; //取消消费的一个回调接口 如在消费的时候队列被删除掉了 CancelCallback cancelCallback = (consumerTag) -> { System.out.println("消息消费被中断"); }; /** * 消费者消费消息 - 接受消息 * queue 消费哪个队列 * autoAck 消费成功之后是否要自动应答 true 代表自动应答 false 手动应答,要通过编程实现回复验证,这就是Unacked 为返回ack的数据 * deliverCallback 消费方法,当消费者接收到消息要执行的方法, 参数是一个函数式接口可以使用 lambda表达式~ * cancelCallback 消息被取消时的回调 */ channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); } }
标签:String,队列,factory,rabbitmq,消息,import From: https://www.cnblogs.com/ZhangZiXue/p/16976963.html