首页 > 其他分享 >RabbitMQ学习笔记(二)

RabbitMQ学习笔记(二)

时间:2022-11-18 17:15:00浏览次数:78  
标签:connectionFactory private orderMessageDTO 学习 public 笔记 RabbitMQ order channel

二 利用RabbitMQ基本用法,开发项目

2.1 RabbitMQ消息交换的关键是什么?

1.AMQP协议架构

AMQP协议直接定义了RabbitMQ的内部结构和外部行为

我们使用RabbitMQ本质上是在使用AMQP协议

AMQP协议被多种消息中间件使用,可以举一反三

2.消息流转流程

发送者不能直接将消息发送给最终队列,必须发送给交换机

image-20221115184702134

消息根据路由规则,消息由交换机转发给队列

image-20221115184737796

消费者从队列将消息取走

image-20221115184803471

3.合理的交换机和队列设置

交换机数量不能过多,一般来说同一个业务,或者同一类业务使用同一个交换机
合理设置队列数量,一般来说一个微服务监听一个队列,或者一个微服务的一个业务监听一个队列
合理配置交换机类型,使用Topic模式时仔细设置绑定键

4.尽量使用自动化配置

将创建交换机/队列的操作固化在应用代码中,免去复杂的运维操作,高效且不易出错
一般来说,交换机由双方同时声明,队列由接收方声明并配置绑定关系
交换机/队列的参数一定要由双方开发团队确认,否则重复声明时,若参数不一致,会导致声明失败

2.1.1 问题:

为什么AMQP要设计Exchange消息流转机制?

1、模拟交换机处理的机制,通信输入根据绑定规则找到相应的队列发送给消费者,信息传递可靠性高

2、流转机制:同一个交换机可以绑定多个队列,分布式的处理,使性能提高

3、灵活多变的绑定机制可以适用于多种情况的应用

2.2 需求分析与架构设计

2.2.1 需求分析

一个外卖后端系统,用户可以在线下单

外卖用户下单后,可以实时查询订单进度

系统可以承受短时间的大量并发请求

2.2.2 架构设计

使用微服务系统,组件之间充分解耦

使用消息中间件,解耦业务逻辑

使用数据库,持久化业务数据

image-20221115195326218

2.2.3 什么是微服务架构

将应用程序构建为松耦合、可独立部署的一组服务
服务:一个单一的、可独立部署的软件组件,实现了一些有用的功能
松耦合:封装服务的实现细节,通过API调用

2.2.4 如何拆分微服务

根据系统操作进行微服务拆分

根据业务能力进行微服务拆分(推荐使用)

根据子域进行微服务拆分

2.2.5 根据业务能力进行微服务拆分

image-20221115195257389

2.2.6 接口需求

新建订单
接口查询
订单接口接口采用REST风格

2.3 数据库设计与项目搭建

2.3.1 微服务的数据库设计原则

每个微服务使用自己的数据库
不要使用共享数据库的方式进行通信
不要使用外键,对于数据量非常少的表慎用索引

2.3.2 数据库设计

image-20221115201402303

2.3.3 SpringBoot项目搭建

https://start.spring.io/

输入项目名称、包名

选择Lombok, SpringWeb, MyBatis, MySQL Driver,Spring for RabbitMQ 插件

2.3.4 总结

微服务:将应用程序构建为松耦合、可独立部署的一组服务
微服务拆分:推荐使用业务能力拆分方法
微服务对外接口:推荐使用Rest风格

数据库设计原则:使用单独的数据库,服务之间不共享数据库

2.4 利用Direct开发餐厅和骑手服务

image-20221118115320862

2.4.1 订单微服务和餐厅微服务通信

两个模块order-service restaurant-service

1. 首先配置order-service 微服务

properties

server.port=8080
spring.datasource.url=jdbc:mysql://127.0.0.1:3307/food?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8
spring.datasource.username=root
spring.datasource.password=abc123456
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver

pojo

@Getter
@Setter
@ToString
public class OrderDetailPO {
    private Integer id;
    private OrderStatus status;
    private String address;
    private Integer accountId;
    private Integer productId;
    private Integer deliverymanId;
    private Integer settlementId;
    private Integer rewardId;
    private BigDecimal price;
    private Date date;
}

dto

@Getter
@Setter
@ToString
public class OrderMessageDTO {
    private Integer orderId;
    private OrderStatus orderStatus;
    private BigDecimal price;
    private Integer deliverymanId;
    private Integer productId;
    private Integer accountId;
    private Integer settlementId;
    private Integer rewardId;
    private BigDecimal rewardAmount;
    private Boolean confirmed;
}

vo

@Getter
@Setter
@ToString
public class OrderCreateVO {
    private Integer accountId;
    private String address;
    private Integer productId;
}

枚举状态类

public enum OrderStatus {
    ORDER_CREATING,
    RESTAURANT_CONFIRMED,
    DELIVERYMAN_CONFIRMED,
    SETTLEMENT_CONFIRMED,
    ORDER_CREATED,
    FAILED;
}

dao

@Mapper
@Repository
public interface OrderDetailDao {

    @Insert("INSERT INTO order_detail (status, address, account_id, product_id, deliveryman_id, settlement_id, " +
            "reward_id, price, date) VALUES(#{status}, #{address},#{accountId},#{productId},#{deliverymanId}," +
            "#{settlementId}, #{rewardId},#{price}, #{date})")
    @Options(useGeneratedKeys = true, keyProperty = "id")
    void insert(OrderDetailPO orderDetailPO);

    @Update("update order_detail set status =#{status}, address =#{address}, account_id =#{accountId}, " +
            "product_id =#{productId}, deliveryman_id =#{deliverymanId}, settlement_id =#{settlementId}, " +
            "reward_id =#{rewardId}, price =#{price}, date =#{date} where id=#{id}")
    void update(OrderDetailPO orderDetailPO);

    @Select("SELECT id,status,address,account_id accountId, product_id productId,deliveryman_id deliverymanId," +
            "settlement_id settlementId,reward_id rewardId,price, date FROM order_detail WHERE id = #{id}")
    OrderDetailPO selectOrder(Integer id);
}

service

OrderService这个是前端接受消息,保存到数据库,然后发送给商家微服务

@Service
@Slf4j
public class OrderService {

    ObjectMapper objectMapper = new ObjectMapper();
    @Autowired
    private OrderDetailDao orderDetailDao;

    public void insertOrder(OrderCreateVO orderCreateVO) throws Exception {
        log.info("orderCreateVO:{}", orderCreateVO);

        //获取数据保存到数据库
        OrderDetailPO orderPO = new OrderDetailPO();
        orderPO.setAddress(orderCreateVO.getAddress());
        orderPO.setAccountId(orderCreateVO.getAccountId());
        orderPO.setProductId(orderCreateVO.getProductId());
        orderPO.setStatus(OrderStatus.ORDER_CREATING);
        orderPO.setDate(new Date());
        orderDetailDao.insert(orderPO);


        //发送消息
        OrderMessageDTO orderMessageDTO = new OrderMessageDTO();
        orderMessageDTO.setOrderId(orderPO.getId());
        orderMessageDTO.setProductId(orderPO.getProductId());
        orderMessageDTO.setAccountId(orderCreateVO.getAccountId());

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");

        try (Connection connection = connectionFactory.newConnection();
             Channel channel = connection.createChannel()) {
            String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
            channel.basicPublish("exchange.order.restaurant", "key.restaurant", null, messageToSend.getBytes());
        }
    }

}

OrderMessageService这个是接受各种微服务的消息,现在接受的商家微服务发来的,然后收到,在发给骑手微服务

@Service
@Slf4j
public class OrderMessageService {

    ObjectMapper objectMapper = new ObjectMapper();

    @Autowired
    private OrderDetailDao orderDetailDao;

    @Async
    public void handleMessage() throws Exception {

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setHost("localhost");
        try (Connection connection = connectionFactory.newConnection();
             Channel channel = connection.createChannel()) {

            /*---------------------restaurant---------------------*/
            channel.exchangeDeclare(
                    "exchange.order.restaurant",
                    BuiltinExchangeType.DIRECT,
                    true,
                    false,
                    null);

            channel.queueDeclare(
                    "queue.order",
                    true,
                    false,
                    false,
                    null);

            channel.queueBind(
                    "queue.order",
                    "exchange.order.restaurant",
                    "key.order");
                    
              /*---------------------deliveryman---------------------*/
            channel.exchangeDeclare(
                    "exchange.order.deliveryman",
                    BuiltinExchangeType.DIRECT,
                    true,
                    false,
                    null);

            channel.queueBind(
                    "queue.order",
                    "exchange.order.deliveryman",
                    "key.order");

            channel.basicConsume("queue.order", true, deliverCallback, consumerTag -> {
            });
            while (true) {
                Thread.sleep(100000);
            }
        }
    }

    DeliverCallback deliverCallback = (consumerTag, message) -> {
        String messageBody = new String(message.getBody());
        log.info("deliverCallback:messageBody:{}", messageBody);
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        try {
            OrderMessageDTO orderMessageDTO = objectMapper.readValue(messageBody,
                    OrderMessageDTO.class);
            OrderDetailPO orderPO = orderDetailDao.selectOrder(orderMessageDTO.getOrderId());

            switch (orderPO.getStatus()) {
                case ORDER_CREATING:
                    if (orderMessageDTO.getConfirmed() && null != orderMessageDTO.getPrice()) {
                        orderPO.setStatus(OrderStatus.RESTAURANT_CONFIRMED);
                        orderPO.setPrice(orderMessageDTO.getPrice());
                        orderDetailDao.update(orderPO);
                        try (Connection connection = connectionFactory.newConnection();
                             Channel channel = connection.createChannel()) {
                            String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
                            channel.basicPublish("exchange.order.deliveryman", "key.deliveryman", null,
                                    messageToSend.getBytes());
                        }
                    } else {
                        orderPO.setStatus(OrderStatus.FAILED);
                        orderDetailDao.update(orderPO);
                    }
                    break;
                case RESTAURANT_CONFIRMED:
                    break;
                case DELIVERYMAN_CONFIRMED:
                    break;
                case SETTLEMENT_CONFIRMED:
                    break;
            }

        } catch (JsonProcessingException | TimeoutException e) {
            e.printStackTrace();
        }
    };
}

controller

@RestController
@Slf4j
public class OrderController {

    @Autowired
    private OrderService orderService;
    @PostMapping("/orders")
    public void insertOrder(@RequestBody OrderCreateVO orderCreateVO) throws Exception {

        orderService.insertOrder(orderCreateVO);

        log.info("orderCreateVO:{}",orderCreateVO);
    }

}

config

配置线程池

@Configuration
@EnableAsync
public class AsyncTaskConfig implements AsyncConfigurer {

    // ThredPoolTaskExcutor的处理流程
    // 当池子大小小于corePoolSize,就新建线程,并处理请求
    // 当池子大小等于corePoolSize,把请求放入workQueue中,池子里的空闲线程就去workQueue中取任务并处理
    // 当workQueue放不下任务时,就新建线程入池,并处理请求,如果池子大小撑到了maximumPoolSize,就用RejectedExecutionHandler来做拒绝处理
    // 当池子的线程数大于corePoolSize时,多余的线程会等待keepAliveTime长时间,如果无请求可处理就自行销毁

    @Override
    @Bean
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
        //设置核心线程数
        threadPool.setCorePoolSize(10);
        //设置最大线程数
        threadPool.setMaxPoolSize(100);
        //线程池所使用的缓冲队列
        threadPool.setQueueCapacity(10);
        //等待任务在关机时完成--表明等待所有线程执行完
        threadPool.setWaitForTasksToCompleteOnShutdown(true);
        // 等待时间 (默认为0,此时立即停止),并没等待xx秒后强制停止
        threadPool.setAwaitTerminationSeconds(60);
        //  线程名称前缀
        threadPool.setThreadNamePrefix("Rabbit-Async-");
        // 初始化线程
        threadPool.initialize();
        return threadPool;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return null;
    }
}

RabbitConfig自启动线程池

@Slf4j
@Configuration
public class RabbitConfig {

    @Autowired
    OrderMessageService orderMessageService;

    @Autowired
    public void startListenMessage() throws Exception {
        orderMessageService.handleMessage();
    }
}
2. restaurant-service微服务

pojo

@Getter
@Setter
@ToString
public class ProductPO {
    private Integer id;
    private String name;
    private BigDecimal price;
    private Integer restaurantId;
    private ProductStatus status;
    private Date date;
}
@Getter
@Setter
@ToString
public class RestaurantPO {
    private Integer id;
    private String name;
    private String address;
    private RestaurantStatus status;
    private Date date;
}

dto

@Getter
@Setter
@ToString
public class OrderMessageDTO {
    private Integer orderId;
    private OrderStatus orderStatus;
    private BigDecimal price;
    private Integer deliverymanId;
    private Integer productId;
    private Integer accountId;
    private Integer settlementId;
    private Integer rewardId;
    private BigDecimal rewardAmount;
    private Boolean confirmed;
}

枚举状态类

public enum OrderStatus {
    ORDER_CREATING,
    RESTAURANT_CONFIRMED,
    DELIVERYMAN_CONFIRMED,
    SETTLEMENT_CONFIRMED,
    ORDER_CREATED,
    FAILED;
}
public enum  ProductStatus {
    AVALIABIE,
    NOT_AVALIABLE;
}
public enum  RestaurantStatus {
    OPEN,
    CLOSE;
}

dao

@Mapper
@Repository
public interface ProductDao {

    @Select("SELECT id,name,price,restaurant_id restaurantId,status,date FROM product WHERE id = #{id}")
    ProductPO selsctProduct(Integer id);
}
@Mapper
@Repository
public interface RestaurantDao {

    @Select("SELECT id,name,address,status,settlement_id settlementId,date FROM restaurant WHERE id = #{id}")
    RestaurantPO selsctRestaurant(Integer id);
}

service

@Service
@Slf4j
public class OrderMessageService {

    ObjectMapper objectMapper = new ObjectMapper();

    @Autowired
    private RestaurantDao restaurantDao;

    @Autowired
    private ProductDao productDao;


    @Async
    public void handleMessage() throws Exception {
        log.info("start linstening message");
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setHost("localhost");
        try (Connection connection = connectionFactory.newConnection();
             Channel channel = connection.createChannel()) {

            channel.exchangeDeclare(
                    "exchange.order.restaurant",
                    BuiltinExchangeType.DIRECT,
                    true,
                    false,
                    null);

            channel.queueDeclare(
                    "queue.restaurant",
                    true,
                    false,
                    false,
                    null);

            channel.queueBind(
                    "queue.restaurant",
                    "exchange.order.restaurant",
                    "key.restaurant");


            channel.basicConsume("queue.restaurant", true, deliverCallback, consumerTag -> {
            });
            while (true) {
                Thread.sleep(100000);
            }
        }
    }


    DeliverCallback deliverCallback = (consumerTag, message) -> {
        String messageBody = new String(message.getBody());
        log.info("deliverCallback:messageBody:{}", messageBody);
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        try {
            OrderMessageDTO orderMessageDTO = objectMapper.readValue(messageBody,
                    OrderMessageDTO.class);

            ProductPO productPO = productDao.selsctProduct(orderMessageDTO.getProductId());
            log.info("onMessage:productPO:{}", productPO);
            RestaurantPO restaurantPO = restaurantDao.selsctRestaurant(productPO.getRestaurantId());
            log.info("onMessage:restaurantPO:{}", restaurantPO);
            if (ProductStatus.AVALIABIE == productPO.getStatus() && RestaurantStatus.OPEN == restaurantPO.getStatus()) {
                orderMessageDTO.setConfirmed(true);
                orderMessageDTO.setPrice(productPO.getPrice());
            } else {
                orderMessageDTO.setConfirmed(false);
            }
            log.info("sendMessage:restaurantOrderMessageDTO:{}", orderMessageDTO);

            try (Connection connection = connectionFactory.newConnection();
                 Channel channel = connection.createChannel()) {
                String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
                channel.basicPublish("exchange.order.restaurant", "key.order", null, messageToSend.getBytes());


            }
        } catch (Exception  e) {
            e.printStackTrace();
        }
    };
}

config

线程池自启动和订单微服务一样

3. deliveryman-service微服务

pojo

@Getter
@Setter
@ToString
public class DeliverymanPO {
    private Integer id;
    private String name;
    private String district;
    private DeliverymanStatus status;
    private Date date;
}

dto

略.....

枚举类状态

public enum DeliverymanStatus {
    AVALIABIE,
    NOT_AVALIABLE;
}
public enum OrderStatus {
    ORDER_CREATING,
    RESTAURANT_CONFIRMED,
    DELIVERYMAN_CONFIRMED,
    SETTLEMENT_CONFIRMED,
    ORDER_CREATED,
    FAILED;
}

service

@Service
@Slf4j
public class OrderMessageService {

    ObjectMapper objectMapper = new ObjectMapper();


    @Autowired
    private DeliverymanDao deliverymanDao;

    @Async
    public void handleMessage() throws Exception {
        log.info("start linstening message");
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setHost("localhost");
        try (Connection connection = connectionFactory.newConnection();
             Channel channel = connection.createChannel()) {

            channel.exchangeDeclare(
                    "exchange.order.deliveryman",
                    BuiltinExchangeType.DIRECT,
                    true,
                    false,
                    null);

            channel.queueDeclare(
                    "queue.deliveryman",
                    true,
                    false,
                    false,
                    null);

            channel.queueBind(
                    "queue.deliveryman",
                    "exchange.order.deliveryman",
                    "key.deliveryman");


            channel.basicConsume("queue.deliveryman", true, deliverCallback, consumerTag -> {
            });
            while (true) {
                Thread.sleep(100000);
            }
        }
    }


    DeliverCallback deliverCallback = (consumerTag, message) -> {
        String messageBody = new String(message.getBody());
        log.info("deliverCallback:messageBody:{}", messageBody);
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        try {
            OrderMessageDTO orderMessageDTO = objectMapper.readValue(messageBody,
                    OrderMessageDTO.class);

            List<DeliverymanPO> deliverymanPOS = deliverymanDao.selectAvaliableDeliveryman(DeliverymanStatus.AVALIABIE);
            DeliverymanPO deliverymanPO = deliverymanPOS.get(0);
            orderMessageDTO.setDeliverymanId(deliverymanPO.getId());

            log.info("sendMessage:restaurantOrderMessageDTO:{}", orderMessageDTO);

            try (Connection connection = connectionFactory.newConnection();
                 Channel channel = connection.createChannel()) {
                String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
                channel.basicPublish("exchange.order.restaurant", "key.order", null, messageToSend.getBytes());


            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    };
}

config

略.....

2.5 利用Fanout完善结算微服务

使用Fanout用两个交换机

image-20221118144855600

2.5.1 补充order-service


 /*---------------------settlement---------------------*/
            channel.exchangeDeclare(
                    "exchange.settlement.order",
                    BuiltinExchangeType.FANOUT,
                    true,
                    false,
                    null);

            channel.queueBind(
                    "queue.order",
                    "exchange.settlement.order",
                    "key.order");

..........

case RESTAURANT_CONFIRMED:
    if (null!=orderMessageDTO.getDeliverymanId()){
        orderPO.setDeliverymanId(orderMessageDTO.getDeliverymanId());
        orderPO.setStatus(OrderStatus.DELIVERYMAN_CONFIRMED);
        orderDetailDao.update(orderPO);
        try (Connection connection = connectionFactory.newConnection();
             Channel channel = connection.createChannel()) {
            String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
            channel.basicPublish("exchange.order.settlement", "key.settlement", null,
                    messageToSend.getBytes());
    }
    } else {
        orderPO.setStatus(OrderStatus.FAILED);
        orderDetailDao.update(orderPO);
    }
    break;	
 ........   

2.5.2 order-settlement微服务

pojo

@Getter
@Setter
@ToString
public class SettlementPO {
    private Integer id;
    private Integer orderId;
    private Integer transactionId;
    private SettlementStatus status;
    private BigDecimal amount;
    private Date date;
}

dto

.............

枚举状态类

public enum SettlementStatus {
    SUCCESS,
    FAILED;
}
public enum OrderStatus {
    ORDER_CREATING,
    RESTAURANT_CONFIRMED,
    DELIVERYMAN_CONFIRMED,
    SETTLEMENT_CONFIRMED,
    ORDER_CREATED,
    FAILED;
}

service

结算生成订单

@Service
public class SettlementService {

    Random rand = new Random(25);

    public Integer settlement(Integer accountId, BigDecimal amount) {
        return rand.nextInt(1000000000);
    }
}
@Service
@Slf4j
public class OrderMessageService {

    ObjectMapper objectMapper = new ObjectMapper();


    @Autowired
    private SettlementDao settlementDao;

    @Autowired
    private SettlementService settlementService;
    @Async
    public void handleMessage() throws Exception {
        log.info("start linstening message");
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setHost("localhost");
        try (Connection connection = connectionFactory.newConnection();
             Channel channel = connection.createChannel()) {

            channel.exchangeDeclare(
                    "exchange.order.settlement",
                    BuiltinExchangeType.FANOUT,
                    true,
                    false,
                    null);

            channel.queueDeclare(
                    "queue.settlement",
                    true,
                    false,
                    false,
                    null);

            channel.queueBind(
                    "queue.settlement",
                    "exchange.order.settlement",
                    "key.settlement");


            channel.basicConsume("queue.settlement", true, deliverCallback, consumerTag -> {
            });
            while (true) {
                Thread.sleep(100000);
            }
        }
    }


    DeliverCallback deliverCallback = (consumerTag, message) -> {
        String messageBody = new String(message.getBody());
        log.info("deliverCallback:messageBody:{}", messageBody);
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        try {
            OrderMessageDTO orderMessageDTO = objectMapper.readValue(messageBody,
                    OrderMessageDTO.class);

            SettlementPO settlementPO = new SettlementPO();
            settlementPO.setOrderId(orderMessageDTO.getOrderId());
            settlementPO.setAmount(orderMessageDTO.getPrice());
            settlementPO.setTransactionId(settlementService.settlement
                    (orderMessageDTO.getAccountId(), orderMessageDTO.getPrice()));
            settlementPO.setDate(new Date());

            settlementPO.setStatus(SettlementStatus.SUCCESS);
            settlementDao.insert(settlementPO);

            orderMessageDTO.setSettlementId(settlementPO.getId());
            log.info("sendMessage:restaurantOrderMessageDTO:{}", orderMessageDTO);
            try (Connection connection = connectionFactory.newConnection();
                 Channel channel = connection.createChannel()) {
                String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
                channel.basicPublish("exchange.settlement.order", "key.order", null, messageToSend.getBytes());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    };
}

congfig

..................

2.6 利用Topic开发积分微服务

这个使用简单的功能不使用匹配符和Direct一样

2.6.2 补充order-service微服务

/*---------------------reward---------------------*/
channel.exchangeDeclare(
        "exchange.order.reward",
        BuiltinExchangeType.TOPIC,
        true,
        false,
        null);

channel.queueBind(
        "queue.order",
        "exchange.order.reward",
        "key.order");
        
        .............
        
        
        
  case DELIVERYMAN_CONFIRMED:
                    if (null!=orderMessageDTO.getSettlementId()){
                        orderPO.setDeliverymanId(orderMessageDTO.getSettlementId());
                        orderPO.setStatus(OrderStatus.SETTLEMENT_CONFIRMED);
                        orderDetailDao.update(orderPO);
                        try (Connection connection = connectionFactory.newConnection();
                             Channel channel = connection.createChannel()) {
                            String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
                            channel.basicPublish("exchange.order.reward", "key.reward", null,
                                    messageToSend.getBytes());
                        }
                    } else {
                        orderPO.setStatus(OrderStatus.FAILED);
                        orderDetailDao.update(orderPO);
                    }
                    break;
                case SETTLEMENT_CONFIRMED:
                    if (null!=orderMessageDTO.getRewardId()){
                        orderPO.setStatus(OrderStatus.ORDER_CREATED);
                        orderPO.setDeliverymanId(orderMessageDTO.getRewardId());
                        orderDetailDao.update(orderPO);
                    } else {
                        orderPO.setStatus(OrderStatus.FAILED);
                        orderDetailDao.update(orderPO);
                    }
                    break;       

2.6.2 reward-service微服务

pojo

@Getter
@Setter
@ToString
public class RewardPO {
    private Integer id;
    private Integer orderId;
    private BigDecimal amount;
    private RewardStatus status;
    private Date date;
}

dto

@Mapper
@Repository
public interface RewardDao {

    @Insert("INSERT INTO reward (order_id, amount, status, date) VALUES(#{orderId}, #{amount}, #{status}, #{date})")
    @Options(useGeneratedKeys = true, keyProperty = "id")
    void insert(RewardPO rewardPO);
}

枚举类状态

public enum RewardStatus {
    SUCCESS,
    FAILED;
}
public enum OrderStatus {
    ORDER_CREATING,
    RESTAURANT_CONFIRMED,
    DELIVERYMAN_CONFIRMED,
    SETTLEMENT_CONFIRMED,
    ORDER_CREATED,
    FAILED;
}

service

@Service
@Slf4j
public class OrderMessageService {

    ObjectMapper objectMapper = new ObjectMapper();


   @Autowired
   private RewardDao rewardDao;
    @Async
    public void handleMessage() throws Exception {
        log.info("start linstening message");
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setHost("localhost");
        try (Connection connection = connectionFactory.newConnection();
             Channel channel = connection.createChannel()) {

            channel.exchangeDeclare(
                    "exchange.order.reward",
                    BuiltinExchangeType.TOPIC,
                    true,
                    false,
                    null);

            channel.queueDeclare(
                    "queue.reward",
                    true,
                    false,
                    false,
                    null);

            channel.queueBind(
                    "queue.reward",
                    "exchange.order.reward",
                    "key.reward");


            channel.basicConsume("queue.reward", true, deliverCallback, consumerTag -> {
            });
            while (true) {
                Thread.sleep(100000);
            }
        }
    }



    DeliverCallback deliverCallback = (consumerTag, message) -> {
        String messageBody = new String(message.getBody());
        log.info("deliverCallback:messageBody:{}", messageBody);
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        try {
            OrderMessageDTO orderMessageDTO = objectMapper.readValue(messageBody,
                    OrderMessageDTO.class);
            log.info("handleOrderService:orderSettlementDTO:{}", orderMessageDTO);
            RewardPO rewardPO = new RewardPO();
            rewardPO.setOrderId(orderMessageDTO.getOrderId());
            rewardPO.setStatus(RewardStatus.SUCCESS);
            rewardPO.setAmount(orderMessageDTO.getPrice());
            rewardPO.setDate(new Date());
            rewardDao.insert(rewardPO);
            orderMessageDTO.setRewardId(rewardPO.getId());
            log.info("handleOrderService:settlementOrderDTO:{}", orderMessageDTO);

            try (Connection connection = connectionFactory.newConnection();
                 Channel channel = connection.createChannel()) {
                String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
                channel.basicPublish("exchange.order.reward", "key.order", null, messageToSend.getBytes());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    };
}

config

............

2.7 目前项目不足之处

消息真的发出去了吗?

消息发送后,发送端不知道RabbitMQ是否真的收到了消息
若RabbitMQ异常,消息丢失后,订单处理流程停止,业务异常

需要使用RabbitMQ发送端确认机制,确认消息发送

消息真被路由了吗?

消息发送后,发送端不知道消息是否被正确路由,若路由异常,消息会被丢弃

消息丢弃后,订单处理流程停止,业务异常

需要使用RabbitMQ消息返回机制,确认消息被正确路由

消费端处理的过来吗?

业务高峰期,可能出现发送端与接收端性能不一致,大量消息被同时推送给接收端,造成接收端服务崩溃

需要使用RabbitMQ消费端限流机制,限制消息推送速度,保障接收端服务稳定

消费端处理异常怎么办?

默认情况下,消费端接收消息时,消息会被自动确认(ACK)

消费端消息处理异常时,发送端与消息中间件无法得知消息处理情况

需要使用RabbitMQ消费端确认机制,确认消息被正确处理

队列爆满怎么办?

默认情况下,消息进入队列,会永远存在,直到被消费

大量堆积的消息会给RabbitMQ产生很大的压力

需要使用RabbitMQ消息过期时间,防止消息大量积压

如何转移过期消息?

消息被设置了过期时间,过期后会直接被丢弃
直接被丢弃的消息,无法对系统运行异常发出警报

需要使用RabbitMQ死信队列收集过期消息,以供分析

总结:

目前项目急需引入的RabbitMQ新特性:

发送端确认机制
消费端确认机制
消息返回机制
消费端限流机制
消息过期机制
死信队列

2.8 小结

1. 使用线程池

对于频繁创建与销毁的线程,必须使用线程池,否则极易线程溢出,造成“线程爆炸"

2. POJO类单一职责

各种POJO数据结构必须单一职责,混用会导致代码混乱

  • PO/DO: (Persistent Object/Data Object)持久对象
  • DTO: (Data Transfer Object)数据传输对象
  • BO: (Business Object)业务对象
  • VO: (View Object)显示层对象

3. 本章学习

体会微服务架构设计方法、服务拆分方法、数据库设计原则

了解RabbitMQ Java客户端使用方法

掌握三种Exchange的Java端管理与配置方法

标签:connectionFactory,private,orderMessageDTO,学习,public,笔记,RabbitMQ,order,channel
From: https://www.cnblogs.com/mrwyk/p/16903841.html

相关文章

  • P4课程学习(一)
    软件定义网络:逻辑集中控制MainContributionOpenFlow=标准化模型OpenFlow=与交换机交互的标准化协议下载流表条目、查询统计信息等通过单个实体进行逻辑集中控制......
  • 3.1版本【HarmonyOS 第一课】正式上线!参与学习赢官方好礼>>
    【课程介绍】《HarmonyOS第一课》是跟随版本迭代不断推出的系列化课程,本期课程基于HarmonyOS3.1版本的新技术和特性,每个课程单元里面都包含视频、Codelab、文章和习题,帮助......
  • 概率论学习笔记
    多元/多维高斯/正态分布概率密度函数推导@博客园.凯鲁嘎吉多元高斯分布完全解析@知乎.钱默吟......
  • 数组的前闭后开-前开后闭-前闭后闭理解--笔记
    1、数学里面的区间定义设a,b是两个实数,且a≤b,这里实数a,b叫做区间的端点,从下边的三个定义你就可以看出来,闭区间是有a,b两个端点的。1)满足a≤x≤b的实数x的......
  • 领域最全 | 计算机视觉算法在路面坑洼检测中的应用综述(基于2D图像/3D LiDAR/深度学习
    摘要计算机视觉算法在3D道路成像和路面坑洼检测中的应用已有二十多年的历史。尽管如此,目前还缺乏有关最先进(SoTA)的计算机视觉技术的系统调研文章,尤其是为解决这......
  • 8086汇编 王爽版本 笔记集合
    8086汇编语言王爽版个人笔记这篇博客是个导航第一部分:绪论第二部分:访问寄存器和内存第三部分:汇编语言程序(书中4、5、6章)第四部分:内存寻址方式(书中7、8章)第五部分:......
  • 从机器学习到图神经网络,inductive和transductive的区别
    目录机器学习归纳式学习(inductivelearning)转导式学习(transductivelearning)图表示学习归纳式学习(inductivelearning)节点表示图表示转导式学习(transductivelearning)节点......
  • Y73day1学习心得
    Y73day1学习心得.mdY73day1学习心得一、namespace、cgroup在容器中的作用1.namespaceLinuxnamespace是在当前运行的系统环境中创建(隔离)另一个进程的运行环境出来......
  • 《Shell脚本学习 —— 后台检测应用程序运行》
    1.守护脚本一般用来检测项目中程序是否奔溃退出,以及程序重启多次后直接重启整个机器。#!/bin/shyd_media_app&count=0whiletruedostillRunning=$(ps|......
  • 算法学习-1 算法复杂度
    一算法复杂度算法复杂度分为时间复杂度和空间复杂度。时间复杂度是指执行算法所需要的计算工作量;而空间复杂度是指执行这个算法所需要的内存空间。算法的复杂性体运行该......