先看一下项目结构:
首先创建 rabbitmq-publisher:
pom.xml
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>com.niezhiliang</groupId>
<artifactId>rabbitmq-common</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
yml文件:
spring:
rabbitmq:
addresses: 192.168.8.144:5672
username: chaowei
password: 123456
#虚拟主机地址
virtual-host: /
#连接超时时间
connection-timeout: 15000
publisher-confirms: true
publisher-returns: true
template:
mandatory: true
#重新投递时间(分钟)
overtime: 1
package com.niezhiliang.springboot.rabbitmq.publisher.producer;
import com.niezhiliang.springboot.rabbitmq.common.domain.BrokerMessageLog;
import com.niezhiliang.springboot.rabbitmq.common.domain.BrokerMessageLogExample;
import com.niezhiliang.springboot.rabbitmq.common.domain.Order;
import com.niezhiliang.springboot.rabbitmq.common.mapper.BrokerMessageLogMapper;
import com.niezhiliang.springboot.rabbitmq.publisher.constants.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* @Author NieZhiLiang
* @Email [email protected]
* @Date 2019/1/24 上午9:56
*/
@Component
public class OrderSender {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private BrokerMessageLogMapper brokerMessageLogMapper;
final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
/**
*
* @param correlationData 唯一标识,有了这个唯一标识,我们就知道可以确认(失败)哪一条消息了
* @param ack 是否投递成功
* @param cause 失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String messageId = correlationData.getId();
BrokerMessageLogExample brokerMessageLogExample =
new BrokerMessageLogExample();
brokerMessageLogExample.createCriteria()
.andMessage_idEqualTo(messageId);
BrokerMessageLog brokerMessageLog = null;
try {
brokerMessageLog =
brokerMessageLogMapper.selectByExample(brokerMessageLogExample).get(0);
} catch (IndexOutOfBoundsException e) {
logger.error("不存在messageId:{}的日志记录",messageId);
}
//返回成功,表示消息被正常投递
if (ack) {
brokerMessageLog.setStatus(Constants.ORDER_SEND_SUCCESS);
brokerMessageLog.setUpdate_time(new Date());
brokerMessageLogMapper.updateByPrimaryKeySelective(brokerMessageLog);
logger.info("信息投递成功,messageId:{}",brokerMessageLog.getMessage_id());
} else {
logger.error("消费信息失败,messageId:{} 原因:{}",brokerMessageLog.getMessage_id(),cause);
}
}
};
/**
* 信息投递的方法
* @param order
* @throws Exception
*/
public void send(Order order) throws Exception{
//设置投递回调
rabbitTemplate.setConfirmCallback(confirmCallback);
CorrelationData correlationData = new CorrelationData();
correlationData.setId(order.getMessage_id());
rabbitTemplate.convertAndSend("order-exchange",
"order.abcd",
order,
correlationData);
}
}
接下来,创建rabbitmq-consumer项目:
pom.xml里的jar依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
然后是 application.yml:
spring:
rabbitmq:
#基本配置
addresses: 192.168.8.144:5672
username: chaowei
password: 123456
virtual-host: /
connection-timeout: 15000
#消费端配置
listener:
simple:
#消费端
concurrency: 5
#最大消费端数
max-concurrency: 10
#自动签收auto 手动 manual
acknowledge-mode: manual
#限流(海量数据,同时只能过来一条)
prefetch: 1
server:
port: 8000
package com.niezhiliang.springboot.rabbitmq.customer.consumer;
import com.niezhiliang.springboot.rabbitmq.common.domain.Order;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* @Author NieZhiLiang
* @Email [email protected]
* @Date 2019/1/24 上午10:29
*/
@Component
public class OrderReceiver {
@RabbitListener(
bindings = @QueueBinding( //数据是否持久化
value = @Queue(value = "order-queue",durable = "true"),
exchange = @Exchange(name = "order-exchange",
durable = "true",type = "topic"),
key="order.*"
)
)
@RabbitHandler
public void onOrderMessage(@Payload Order order, @Headers Map<String,Object> headers, Channel channel) throws Exception {
System.out.println("----收到消息,开始消费-----");
System.out.println("d订单id:"+order.getId());
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
/**
* 取值为 false 时,表示通知 RabbitMQ 当前消息被确认
* 如果为 true,则额外将比第一个参数指定的 delivery tag 小的消息一并确认
*/
channel.basicAck(deliveryTag,false);
System.out.println("--------消费完成--------");
}
}