二 利用RabbitMQ基本用法,开发项目
2.1 RabbitMQ消息交换的关键是什么?
1.AMQP协议架构
AMQP协议直接定义了RabbitMQ的内部结构和外部行为
我们使用RabbitMQ本质上是在使用AMQP协议
AMQP协议被多种消息中间件使用,可以举一反三
2.消息流转流程
发送者不能直接将消息发送给最终队列,必须发送给交换机
消息根据路由规则,消息由交换机转发给队列
消费者从队列将消息取走
3.合理的交换机和队列设置
交换机数量不能过多,一般来说同一个业务,或者同一类业务使用同一个交换机
合理设置队列数量,一般来说一个微服务监听一个队列,或者一个微服务的一个业务监听一个队列
合理配置交换机类型,使用Topic模式时仔细设置绑定键
4.尽量使用自动化配置
将创建交换机/队列的操作固化在应用代码中,免去复杂的运维操作,高效且不易出错
一般来说,交换机由双方同时声明,队列由接收方声明并配置绑定关系
交换机/队列的参数一定要由双方开发团队确认,否则重复声明时,若参数不一致,会导致声明失败
2.1.1 问题:
为什么AMQP要设计Exchange消息流转机制?
1、模拟交换机处理的机制,通信输入根据绑定规则找到相应的队列发送给消费者,信息传递可靠性高
2、流转机制:同一个交换机可以绑定多个队列,分布式的处理,使性能提高
3、灵活多变的绑定机制可以适用于多种情况的应用
2.2 需求分析与架构设计
2.2.1 需求分析
一个外卖后端系统,用户可以在线下单
外卖用户下单后,可以实时查询订单进度
系统可以承受短时间的大量并发请求
2.2.2 架构设计
使用微服务系统,组件之间充分解耦
使用消息中间件,解耦业务逻辑
使用数据库,持久化业务数据
2.2.3 什么是微服务架构
将应用程序构建为松耦合、可独立部署的一组服务
服务:一个单一的、可独立部署的软件组件,实现了一些有用的功能
松耦合:封装服务的实现细节,通过API调用
2.2.4 如何拆分微服务
根据系统操作进行微服务拆分
根据业务能力进行微服务拆分(推荐使用)
根据子域进行微服务拆分
2.2.5 根据业务能力进行微服务拆分
2.2.6 接口需求
新建订单
接口查询
订单接口接口采用REST风格
2.3 数据库设计与项目搭建
2.3.1 微服务的数据库设计原则
每个微服务使用自己的数据库
不要使用共享数据库的方式进行通信
不要使用外键,对于数据量非常少的表慎用索引
2.3.2 数据库设计
2.3.3 SpringBoot项目搭建
输入项目名称、包名
选择Lombok, SpringWeb, MyBatis, MySQL Driver,Spring for RabbitMQ 插件
2.3.4 总结
微服务:将应用程序构建为松耦合、可独立部署的一组服务
微服务拆分:推荐使用业务能力拆分方法
微服务对外接口:推荐使用Rest风格
数据库设计原则:使用单独的数据库,服务之间不共享数据库
2.4 利用Direct开发餐厅和骑手服务
2.4.1 订单微服务和餐厅微服务通信
两个模块order-service restaurant-service
1. 首先配置order-service 微服务
properties
server.port=8080
spring.datasource.url=jdbc:mysql://127.0.0.1:3307/food?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8
spring.datasource.username=root
spring.datasource.password=abc123456
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
pojo
@Getter
@Setter
@ToString
public class OrderDetailPO {
private Integer id;
private OrderStatus status;
private String address;
private Integer accountId;
private Integer productId;
private Integer deliverymanId;
private Integer settlementId;
private Integer rewardId;
private BigDecimal price;
private Date date;
}
dto
@Getter
@Setter
@ToString
public class OrderMessageDTO {
private Integer orderId;
private OrderStatus orderStatus;
private BigDecimal price;
private Integer deliverymanId;
private Integer productId;
private Integer accountId;
private Integer settlementId;
private Integer rewardId;
private BigDecimal rewardAmount;
private Boolean confirmed;
}
vo
@Getter
@Setter
@ToString
public class OrderCreateVO {
private Integer accountId;
private String address;
private Integer productId;
}
枚举状态类
public enum OrderStatus {
ORDER_CREATING,
RESTAURANT_CONFIRMED,
DELIVERYMAN_CONFIRMED,
SETTLEMENT_CONFIRMED,
ORDER_CREATED,
FAILED;
}
dao
@Mapper
@Repository
public interface OrderDetailDao {
@Insert("INSERT INTO order_detail (status, address, account_id, product_id, deliveryman_id, settlement_id, " +
"reward_id, price, date) VALUES(#{status}, #{address},#{accountId},#{productId},#{deliverymanId}," +
"#{settlementId}, #{rewardId},#{price}, #{date})")
@Options(useGeneratedKeys = true, keyProperty = "id")
void insert(OrderDetailPO orderDetailPO);
@Update("update order_detail set status =#{status}, address =#{address}, account_id =#{accountId}, " +
"product_id =#{productId}, deliveryman_id =#{deliverymanId}, settlement_id =#{settlementId}, " +
"reward_id =#{rewardId}, price =#{price}, date =#{date} where id=#{id}")
void update(OrderDetailPO orderDetailPO);
@Select("SELECT id,status,address,account_id accountId, product_id productId,deliveryman_id deliverymanId," +
"settlement_id settlementId,reward_id rewardId,price, date FROM order_detail WHERE id = #{id}")
OrderDetailPO selectOrder(Integer id);
}
service
OrderService这个是前端接受消息,保存到数据库,然后发送给商家微服务
@Service
@Slf4j
public class OrderService {
ObjectMapper objectMapper = new ObjectMapper();
@Autowired
private OrderDetailDao orderDetailDao;
public void insertOrder(OrderCreateVO orderCreateVO) throws Exception {
log.info("orderCreateVO:{}", orderCreateVO);
//获取数据保存到数据库
OrderDetailPO orderPO = new OrderDetailPO();
orderPO.setAddress(orderCreateVO.getAddress());
orderPO.setAccountId(orderCreateVO.getAccountId());
orderPO.setProductId(orderCreateVO.getProductId());
orderPO.setStatus(OrderStatus.ORDER_CREATING);
orderPO.setDate(new Date());
orderDetailDao.insert(orderPO);
//发送消息
OrderMessageDTO orderMessageDTO = new OrderMessageDTO();
orderMessageDTO.setOrderId(orderPO.getId());
orderMessageDTO.setProductId(orderPO.getProductId());
orderMessageDTO.setAccountId(orderCreateVO.getAccountId());
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
channel.basicPublish("exchange.order.restaurant", "key.restaurant", null, messageToSend.getBytes());
}
}
}
OrderMessageService
这个是接受各种微服务的消息,现在接受的商家微服务发来的,然后收到,在发给骑手微服务
@Service
@Slf4j
public class OrderMessageService {
ObjectMapper objectMapper = new ObjectMapper();
@Autowired
private OrderDetailDao orderDetailDao;
@Async
public void handleMessage() throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setHost("localhost");
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
/*---------------------restaurant---------------------*/
channel.exchangeDeclare(
"exchange.order.restaurant",
BuiltinExchangeType.DIRECT,
true,
false,
null);
channel.queueDeclare(
"queue.order",
true,
false,
false,
null);
channel.queueBind(
"queue.order",
"exchange.order.restaurant",
"key.order");
/*---------------------deliveryman---------------------*/
channel.exchangeDeclare(
"exchange.order.deliveryman",
BuiltinExchangeType.DIRECT,
true,
false,
null);
channel.queueBind(
"queue.order",
"exchange.order.deliveryman",
"key.order");
channel.basicConsume("queue.order", true, deliverCallback, consumerTag -> {
});
while (true) {
Thread.sleep(100000);
}
}
}
DeliverCallback deliverCallback = (consumerTag, message) -> {
String messageBody = new String(message.getBody());
log.info("deliverCallback:messageBody:{}", messageBody);
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
try {
OrderMessageDTO orderMessageDTO = objectMapper.readValue(messageBody,
OrderMessageDTO.class);
OrderDetailPO orderPO = orderDetailDao.selectOrder(orderMessageDTO.getOrderId());
switch (orderPO.getStatus()) {
case ORDER_CREATING:
if (orderMessageDTO.getConfirmed() && null != orderMessageDTO.getPrice()) {
orderPO.setStatus(OrderStatus.RESTAURANT_CONFIRMED);
orderPO.setPrice(orderMessageDTO.getPrice());
orderDetailDao.update(orderPO);
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
channel.basicPublish("exchange.order.deliveryman", "key.deliveryman", null,
messageToSend.getBytes());
}
} else {
orderPO.setStatus(OrderStatus.FAILED);
orderDetailDao.update(orderPO);
}
break;
case RESTAURANT_CONFIRMED:
break;
case DELIVERYMAN_CONFIRMED:
break;
case SETTLEMENT_CONFIRMED:
break;
}
} catch (JsonProcessingException | TimeoutException e) {
e.printStackTrace();
}
};
}
controller
@RestController
@Slf4j
public class OrderController {
@Autowired
private OrderService orderService;
@PostMapping("/orders")
public void insertOrder(@RequestBody OrderCreateVO orderCreateVO) throws Exception {
orderService.insertOrder(orderCreateVO);
log.info("orderCreateVO:{}",orderCreateVO);
}
}
config
配置线程池
@Configuration
@EnableAsync
public class AsyncTaskConfig implements AsyncConfigurer {
// ThredPoolTaskExcutor的处理流程
// 当池子大小小于corePoolSize,就新建线程,并处理请求
// 当池子大小等于corePoolSize,把请求放入workQueue中,池子里的空闲线程就去workQueue中取任务并处理
// 当workQueue放不下任务时,就新建线程入池,并处理请求,如果池子大小撑到了maximumPoolSize,就用RejectedExecutionHandler来做拒绝处理
// 当池子的线程数大于corePoolSize时,多余的线程会等待keepAliveTime长时间,如果无请求可处理就自行销毁
@Override
@Bean
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
//设置核心线程数
threadPool.setCorePoolSize(10);
//设置最大线程数
threadPool.setMaxPoolSize(100);
//线程池所使用的缓冲队列
threadPool.setQueueCapacity(10);
//等待任务在关机时完成--表明等待所有线程执行完
threadPool.setWaitForTasksToCompleteOnShutdown(true);
// 等待时间 (默认为0,此时立即停止),并没等待xx秒后强制停止
threadPool.setAwaitTerminationSeconds(60);
// 线程名称前缀
threadPool.setThreadNamePrefix("Rabbit-Async-");
// 初始化线程
threadPool.initialize();
return threadPool;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return null;
}
}
RabbitConfig
自启动线程池
@Slf4j
@Configuration
public class RabbitConfig {
@Autowired
OrderMessageService orderMessageService;
@Autowired
public void startListenMessage() throws Exception {
orderMessageService.handleMessage();
}
}
2. restaurant-service微服务
pojo
@Getter
@Setter
@ToString
public class ProductPO {
private Integer id;
private String name;
private BigDecimal price;
private Integer restaurantId;
private ProductStatus status;
private Date date;
}
@Getter
@Setter
@ToString
public class RestaurantPO {
private Integer id;
private String name;
private String address;
private RestaurantStatus status;
private Date date;
}
dto
@Getter
@Setter
@ToString
public class OrderMessageDTO {
private Integer orderId;
private OrderStatus orderStatus;
private BigDecimal price;
private Integer deliverymanId;
private Integer productId;
private Integer accountId;
private Integer settlementId;
private Integer rewardId;
private BigDecimal rewardAmount;
private Boolean confirmed;
}
枚举状态类
public enum OrderStatus {
ORDER_CREATING,
RESTAURANT_CONFIRMED,
DELIVERYMAN_CONFIRMED,
SETTLEMENT_CONFIRMED,
ORDER_CREATED,
FAILED;
}
public enum ProductStatus {
AVALIABIE,
NOT_AVALIABLE;
}
public enum RestaurantStatus {
OPEN,
CLOSE;
}
dao
@Mapper
@Repository
public interface ProductDao {
@Select("SELECT id,name,price,restaurant_id restaurantId,status,date FROM product WHERE id = #{id}")
ProductPO selsctProduct(Integer id);
}
@Mapper
@Repository
public interface RestaurantDao {
@Select("SELECT id,name,address,status,settlement_id settlementId,date FROM restaurant WHERE id = #{id}")
RestaurantPO selsctRestaurant(Integer id);
}
service
@Service
@Slf4j
public class OrderMessageService {
ObjectMapper objectMapper = new ObjectMapper();
@Autowired
private RestaurantDao restaurantDao;
@Autowired
private ProductDao productDao;
@Async
public void handleMessage() throws Exception {
log.info("start linstening message");
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setHost("localhost");
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(
"exchange.order.restaurant",
BuiltinExchangeType.DIRECT,
true,
false,
null);
channel.queueDeclare(
"queue.restaurant",
true,
false,
false,
null);
channel.queueBind(
"queue.restaurant",
"exchange.order.restaurant",
"key.restaurant");
channel.basicConsume("queue.restaurant", true, deliverCallback, consumerTag -> {
});
while (true) {
Thread.sleep(100000);
}
}
}
DeliverCallback deliverCallback = (consumerTag, message) -> {
String messageBody = new String(message.getBody());
log.info("deliverCallback:messageBody:{}", messageBody);
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
try {
OrderMessageDTO orderMessageDTO = objectMapper.readValue(messageBody,
OrderMessageDTO.class);
ProductPO productPO = productDao.selsctProduct(orderMessageDTO.getProductId());
log.info("onMessage:productPO:{}", productPO);
RestaurantPO restaurantPO = restaurantDao.selsctRestaurant(productPO.getRestaurantId());
log.info("onMessage:restaurantPO:{}", restaurantPO);
if (ProductStatus.AVALIABIE == productPO.getStatus() && RestaurantStatus.OPEN == restaurantPO.getStatus()) {
orderMessageDTO.setConfirmed(true);
orderMessageDTO.setPrice(productPO.getPrice());
} else {
orderMessageDTO.setConfirmed(false);
}
log.info("sendMessage:restaurantOrderMessageDTO:{}", orderMessageDTO);
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
channel.basicPublish("exchange.order.restaurant", "key.order", null, messageToSend.getBytes());
}
} catch (Exception e) {
e.printStackTrace();
}
};
}
config
线程池自启动和订单微服务一样
3. deliveryman-service微服务
pojo
@Getter
@Setter
@ToString
public class DeliverymanPO {
private Integer id;
private String name;
private String district;
private DeliverymanStatus status;
private Date date;
}
dto
略.....
枚举类状态
public enum DeliverymanStatus {
AVALIABIE,
NOT_AVALIABLE;
}
public enum OrderStatus {
ORDER_CREATING,
RESTAURANT_CONFIRMED,
DELIVERYMAN_CONFIRMED,
SETTLEMENT_CONFIRMED,
ORDER_CREATED,
FAILED;
}
service
@Service
@Slf4j
public class OrderMessageService {
ObjectMapper objectMapper = new ObjectMapper();
@Autowired
private DeliverymanDao deliverymanDao;
@Async
public void handleMessage() throws Exception {
log.info("start linstening message");
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setHost("localhost");
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(
"exchange.order.deliveryman",
BuiltinExchangeType.DIRECT,
true,
false,
null);
channel.queueDeclare(
"queue.deliveryman",
true,
false,
false,
null);
channel.queueBind(
"queue.deliveryman",
"exchange.order.deliveryman",
"key.deliveryman");
channel.basicConsume("queue.deliveryman", true, deliverCallback, consumerTag -> {
});
while (true) {
Thread.sleep(100000);
}
}
}
DeliverCallback deliverCallback = (consumerTag, message) -> {
String messageBody = new String(message.getBody());
log.info("deliverCallback:messageBody:{}", messageBody);
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
try {
OrderMessageDTO orderMessageDTO = objectMapper.readValue(messageBody,
OrderMessageDTO.class);
List<DeliverymanPO> deliverymanPOS = deliverymanDao.selectAvaliableDeliveryman(DeliverymanStatus.AVALIABIE);
DeliverymanPO deliverymanPO = deliverymanPOS.get(0);
orderMessageDTO.setDeliverymanId(deliverymanPO.getId());
log.info("sendMessage:restaurantOrderMessageDTO:{}", orderMessageDTO);
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
channel.basicPublish("exchange.order.restaurant", "key.order", null, messageToSend.getBytes());
}
} catch (Exception e) {
e.printStackTrace();
}
};
}
config
略.....
2.5 利用Fanout完善结算微服务
使用Fanout用两个交换机
2.5.1 补充order-service
/*---------------------settlement---------------------*/
channel.exchangeDeclare(
"exchange.settlement.order",
BuiltinExchangeType.FANOUT,
true,
false,
null);
channel.queueBind(
"queue.order",
"exchange.settlement.order",
"key.order");
..........
case RESTAURANT_CONFIRMED:
if (null!=orderMessageDTO.getDeliverymanId()){
orderPO.setDeliverymanId(orderMessageDTO.getDeliverymanId());
orderPO.setStatus(OrderStatus.DELIVERYMAN_CONFIRMED);
orderDetailDao.update(orderPO);
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
channel.basicPublish("exchange.order.settlement", "key.settlement", null,
messageToSend.getBytes());
}
} else {
orderPO.setStatus(OrderStatus.FAILED);
orderDetailDao.update(orderPO);
}
break;
........
2.5.2 order-settlement微服务
pojo
@Getter
@Setter
@ToString
public class SettlementPO {
private Integer id;
private Integer orderId;
private Integer transactionId;
private SettlementStatus status;
private BigDecimal amount;
private Date date;
}
dto
.............
枚举状态类
public enum SettlementStatus {
SUCCESS,
FAILED;
}
public enum OrderStatus {
ORDER_CREATING,
RESTAURANT_CONFIRMED,
DELIVERYMAN_CONFIRMED,
SETTLEMENT_CONFIRMED,
ORDER_CREATED,
FAILED;
}
service
结算生成订单
@Service
public class SettlementService {
Random rand = new Random(25);
public Integer settlement(Integer accountId, BigDecimal amount) {
return rand.nextInt(1000000000);
}
}
@Service
@Slf4j
public class OrderMessageService {
ObjectMapper objectMapper = new ObjectMapper();
@Autowired
private SettlementDao settlementDao;
@Autowired
private SettlementService settlementService;
@Async
public void handleMessage() throws Exception {
log.info("start linstening message");
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setHost("localhost");
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(
"exchange.order.settlement",
BuiltinExchangeType.FANOUT,
true,
false,
null);
channel.queueDeclare(
"queue.settlement",
true,
false,
false,
null);
channel.queueBind(
"queue.settlement",
"exchange.order.settlement",
"key.settlement");
channel.basicConsume("queue.settlement", true, deliverCallback, consumerTag -> {
});
while (true) {
Thread.sleep(100000);
}
}
}
DeliverCallback deliverCallback = (consumerTag, message) -> {
String messageBody = new String(message.getBody());
log.info("deliverCallback:messageBody:{}", messageBody);
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
try {
OrderMessageDTO orderMessageDTO = objectMapper.readValue(messageBody,
OrderMessageDTO.class);
SettlementPO settlementPO = new SettlementPO();
settlementPO.setOrderId(orderMessageDTO.getOrderId());
settlementPO.setAmount(orderMessageDTO.getPrice());
settlementPO.setTransactionId(settlementService.settlement
(orderMessageDTO.getAccountId(), orderMessageDTO.getPrice()));
settlementPO.setDate(new Date());
settlementPO.setStatus(SettlementStatus.SUCCESS);
settlementDao.insert(settlementPO);
orderMessageDTO.setSettlementId(settlementPO.getId());
log.info("sendMessage:restaurantOrderMessageDTO:{}", orderMessageDTO);
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
channel.basicPublish("exchange.settlement.order", "key.order", null, messageToSend.getBytes());
}
} catch (Exception e) {
e.printStackTrace();
}
};
}
congfig
..................
2.6 利用Topic开发积分微服务
这个使用简单的功能不使用匹配符和Direct一样
2.6.2 补充order-service微服务
/*---------------------reward---------------------*/
channel.exchangeDeclare(
"exchange.order.reward",
BuiltinExchangeType.TOPIC,
true,
false,
null);
channel.queueBind(
"queue.order",
"exchange.order.reward",
"key.order");
.............
case DELIVERYMAN_CONFIRMED:
if (null!=orderMessageDTO.getSettlementId()){
orderPO.setDeliverymanId(orderMessageDTO.getSettlementId());
orderPO.setStatus(OrderStatus.SETTLEMENT_CONFIRMED);
orderDetailDao.update(orderPO);
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
channel.basicPublish("exchange.order.reward", "key.reward", null,
messageToSend.getBytes());
}
} else {
orderPO.setStatus(OrderStatus.FAILED);
orderDetailDao.update(orderPO);
}
break;
case SETTLEMENT_CONFIRMED:
if (null!=orderMessageDTO.getRewardId()){
orderPO.setStatus(OrderStatus.ORDER_CREATED);
orderPO.setDeliverymanId(orderMessageDTO.getRewardId());
orderDetailDao.update(orderPO);
} else {
orderPO.setStatus(OrderStatus.FAILED);
orderDetailDao.update(orderPO);
}
break;
2.6.2 reward-service微服务
pojo
@Getter
@Setter
@ToString
public class RewardPO {
private Integer id;
private Integer orderId;
private BigDecimal amount;
private RewardStatus status;
private Date date;
}
dto
@Mapper
@Repository
public interface RewardDao {
@Insert("INSERT INTO reward (order_id, amount, status, date) VALUES(#{orderId}, #{amount}, #{status}, #{date})")
@Options(useGeneratedKeys = true, keyProperty = "id")
void insert(RewardPO rewardPO);
}
枚举类状态
public enum RewardStatus {
SUCCESS,
FAILED;
}
public enum OrderStatus {
ORDER_CREATING,
RESTAURANT_CONFIRMED,
DELIVERYMAN_CONFIRMED,
SETTLEMENT_CONFIRMED,
ORDER_CREATED,
FAILED;
}
service
@Service
@Slf4j
public class OrderMessageService {
ObjectMapper objectMapper = new ObjectMapper();
@Autowired
private RewardDao rewardDao;
@Async
public void handleMessage() throws Exception {
log.info("start linstening message");
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setHost("localhost");
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(
"exchange.order.reward",
BuiltinExchangeType.TOPIC,
true,
false,
null);
channel.queueDeclare(
"queue.reward",
true,
false,
false,
null);
channel.queueBind(
"queue.reward",
"exchange.order.reward",
"key.reward");
channel.basicConsume("queue.reward", true, deliverCallback, consumerTag -> {
});
while (true) {
Thread.sleep(100000);
}
}
}
DeliverCallback deliverCallback = (consumerTag, message) -> {
String messageBody = new String(message.getBody());
log.info("deliverCallback:messageBody:{}", messageBody);
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
try {
OrderMessageDTO orderMessageDTO = objectMapper.readValue(messageBody,
OrderMessageDTO.class);
log.info("handleOrderService:orderSettlementDTO:{}", orderMessageDTO);
RewardPO rewardPO = new RewardPO();
rewardPO.setOrderId(orderMessageDTO.getOrderId());
rewardPO.setStatus(RewardStatus.SUCCESS);
rewardPO.setAmount(orderMessageDTO.getPrice());
rewardPO.setDate(new Date());
rewardDao.insert(rewardPO);
orderMessageDTO.setRewardId(rewardPO.getId());
log.info("handleOrderService:settlementOrderDTO:{}", orderMessageDTO);
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
channel.basicPublish("exchange.order.reward", "key.order", null, messageToSend.getBytes());
}
} catch (Exception e) {
e.printStackTrace();
}
};
}
config
............
2.7 目前项目不足之处
消息真的发出去了吗?
消息发送后,发送端不知道RabbitMQ是否真的收到了消息
若RabbitMQ异常,消息丢失后,订单处理流程停止,业务异常
需要使用RabbitMQ发送端确认机制
,确认消息发送
消息真被路由了吗?
消息发送后,发送端不知道消息是否被正确路由,若路由异常,消息会被丢弃
消息丢弃后,订单处理流程停止,业务异常
需要使用RabbitMQ消息返回机制
,确认消息被正确路由
消费端处理的过来吗?
业务高峰期,可能出现发送端与接收端性能不一致,大量消息被同时推送给接收端,造成接收端服务崩溃
需要使用RabbitMQ消费端限流机制
,限制消息推送速度,保障接收端服务稳定
消费端处理异常怎么办?
默认情况下,消费端接收消息时,消息会被自动确认(ACK)
消费端消息处理异常时,发送端与消息中间件无法得知消息处理情况
需要使用RabbitMQ消费端确认机制,
确认消息被正确处理
队列爆满怎么办?
默认情况下,消息进入队列,会永远存在,直到被消费
大量堆积的消息会给RabbitMQ产生很大的压力
需要使用RabbitMQ消息过期时间
,防止消息大量积压
如何转移过期消息?
消息被设置了过期时间,过期后会直接被丢弃
直接被丢弃的消息,无法对系统运行异常发出警报
需要使用RabbitMQ死信队列
收集过期消息,以供分析
总结:
目前项目急需引入的RabbitMQ新特性:
发送端确认机制
消费端确认机制
消息返回机制
消费端限流机制
消息过期机制
死信队列
2.8 小结
1. 使用线程池
对于频繁创建与销毁的线程,必须使用线程池,否则极易线程溢出,造成“线程爆炸"
2. POJO类单一职责
各种POJO数据结构必须单一职责,混用会导致代码混乱
- PO/DO: (Persistent Object/Data Object)持久对象
- DTO: (Data Transfer Object)数据传输对象
- BO: (Business Object)业务对象
- VO: (View Object)显示层对象
3. 本章学习
体会微服务架构设计方法、服务拆分方法、数据库设计原则
了解RabbitMQ Java客户端使用方法
掌握三种Exchange的Java端管理与配置方法
标签:connectionFactory,private,orderMessageDTO,学习,public,笔记,RabbitMQ,order,channel From: https://www.cnblogs.com/mrwyk/p/16903841.html