生产者:
1 package com.learn.rabbitmq.simple; 2 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 import com.rabbitmq.client.ConnectionFactory; 6 7 public class Producer { 8 public static void main(String[] args) { 9 // 1: 创建连接工厂 10 ConnectionFactory connectionFactory = new ConnectionFactory(); 11 //2: 设置连接属性 12 connectionFactory.setHost("127.0.0.1"); 13 connectionFactory.setPort(5672); 14 connectionFactory.setVirtualHost("/"); 15 connectionFactory.setUsername("guest"); 16 connectionFactory.setPassword("guest"); 17 Connection connection = null; 18 Channel channel = null; 19 try { 20 // 3: 从连接工厂中获取连接 21 connection = connectionFactory.newConnection("生产者"); 22 // 4: 从连接中获取通道channel 23 channel = connection.createChannel(); 24 // 5: 申明队列queue存储消息 25 /* 26 * 如果队列不存在,则会创建 27 * Rabbitmq不允许创建两个相同的队列名称,否则会报错。 28 * 29 * @params1: queue 队列的名称 30 * @params2: durable 队列是否持久化 false 非持久化,也会存盘,但是随着服务器重启,数据会丢失 31 * @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭 32 * @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。 33 * @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。 34 * */ 35 channel.queueDeclare("queueTest",false,false,false,null); 36 // 6: 准备发送消息的内容 37 String msg = "hello world!"; 38 // 7: 发送消息给中间件rabbitmq-server 39 // @params1: 交换机exchange 40 // @params2: 队列名称/routing 41 // @params3: 属性配置 42 // @params4: 发送消息的内容 43 channel.basicPublish("","queueTest",null,msg.getBytes()); 44 System.out.println("消息发送成功!"); 45 } catch (Exception e) { 46 e.printStackTrace(); 47 System.out.println("发送消息出现异常!"); 48 }finally { 49 if (channel != null && channel.isOpen()){ 50 try { 51 channel.close(); 52 } catch (Exception e) { 53 e.printStackTrace(); 54 } 55 } 56 if (connection != null){ 57 try { 58 connection.close(); 59 }catch (Exception e){ 60 e.printStackTrace(); 61 } 62 } 63 } 64 } 65 }
消费者
1 package com.learn.rabbitmq.simple; 2 3 import com.rabbitmq.client.*; 4 5 import java.io.IOException; 6 7 public class Customer { 8 public static void main(String[] args) { 9 // 1: 创建连接工厂 10 ConnectionFactory connectionFactory = new ConnectionFactory(); 11 //2: 设置连接属性 12 connectionFactory.setHost("127.0.0.1"); 13 connectionFactory.setPort(5672); 14 connectionFactory.setVirtualHost("/"); 15 connectionFactory.setUsername("guest"); 16 connectionFactory.setPassword("guest"); 17 Connection connection = null; 18 Channel channel = null; 19 try { 20 // 3: 从连接工厂中获取连接 21 connection = connectionFactory.newConnection("消费者"); 22 // 4: 从连接中获取通道channel 23 channel = connection.createChannel(); 24 // 5: 创建交换机,声明队列,绑定关系,路由key,发视信息,接收信息 25 channel.basicConsume("queueTest", true, new DeliverCallback() { 26 public void handle(String consumerTag, Delivery delivery) throws IOException { 27 System.out.println("收到的信息是:" + new String(delivery.getBody(), "UTF-8")); 28 } 29 }, new CancelCallback() { 30 public void handle(String s) throws IOException { 31 System.out.println("接收失败了!"); 32 } 33 }); 34 System.out.println("开始接收信息!"); 35 //阻断作用,不往下继续执行 36 System.in.read(); 37 } catch (Exception e) { 38 e.printStackTrace(); 39 System.out.println("发送消息出现异常!"); 40 }finally { 41 if (channel != null && channel.isOpen()){ 42 try { 43 channel.close(); 44 } catch (Exception e) { 45 e.printStackTrace(); 46 } 47 } 48 if (connection != null){ 49 try { 50 connection.close(); 51 }catch (Exception e){ 52 e.printStackTrace(); 53 } 54 } 55 } 56 } 57 }
面试题
为什么rabbitmq是基于通道处理而不是连接呢?
因为连接需要结果三次握手,四次挥手,费时间,关闭,连接性能消耗大。所以通过长连接---------有很多个信道 channel 处理 所以性能高
可靠消费机制
手动应答,代码和NACK确定,可靠消费,ACK机制 AutoAck 为true 正常应答,服务端就会将消息队列的信息移除,整个过程正常,为false,不应答,就会不停重试,直到消费为止,如果出现异常,就会可能无限重试,服务器资源消耗殆尽。消息队列被挂起,就会屏蔽生产者的消息接收。
标签:connectionFactory,队列,中间件,connection,null,连接,channel From: https://www.cnblogs.com/hs-note/p/16606975.html