一、生产、消费者 流程
1、生产者(下单后生产 务必成功)
派单队列:order_platonn_queue
交换机:order_exchange_name
绑交换机路由键:orderRoutingKey
生产者=>采用confirm,确认应答机制
Ack模式:成功
失败则重试
2、消费者(platonn派单)
派单队列:order_platonn_queue
交换机:order_exchange_name
绑交换机路由键:orderRoutingKey
消费者=>重发Retry/清除Ack/丢弃Nack
二、RabbitMQ
三、SB+Order+Platoon
(一)、rabbitmq_order2050
1、 pom.xml
<dependencies>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
<version>8.0.22</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.2.11</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--spring对amqp-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.3.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.9</version>
</dependency>
</dependencies>
2、 application.properties
spring.application.name=rabbitmq_order2050
server.port=2050
spring.rabbitmq.host=localhost
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.port=5672
###开启消息确认机制confirms
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
# 数据源名称
spring.datasource.name=test
# 数据库连接地址
spring.datasource.url=jdbc:mysql://localhost:3306/newtest?characterEncoding=UTF-8&serverTimezone=UTC
# 数据库用户名&密码:
spring.datasource.username=root
spring.datasource.password=root
#使用druid数据源
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
# 数据库驱动:
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
3、base
3.1、ApiContains
package com.sc.rabbitmq_order2050.base;
public interface ApiContains {
//响应请求成功
String HTTP_RES_CODE_200_VALUE = "success";
//系统错识
String HTTP_RES_CODE_500_VALUE = "fail";
//响应请求成功code
Integer HTTP_RES_CODE_200 = 200;
//系统错识
Integer HTTP_RES_CODE_500 = 500;
//未关联QQ账号
Integer HTTP_RES_CODE_201 = 201;
}
3.2、BaseApiService
package com.sc.rabbitmq_order2050.base;
import org.springframework.stereotype.Component;
@Component
public class BaseApiService {
public ResponseBase setResultError(Integer code, String msg) {
return setResult(code, msg, null);
}
//返回错识,可以传msg
public ResponseBase setResultError(String msg) {
return setResult(ApiContains.HTTP_RES_CODE_500, msg, null);
}
//返回成功,可以传data值
public ResponseBase setResultSuccess(Object data) {
return setResult(ApiContains.HTTP_RES_CODE_200, ApiContains.HTTP_RES_CODE_200_VALUE, data);
}
//返回成功,无data值
public ResponseBase setResultSuccess() {
return setResult(ApiContains.HTTP_RES_CODE_200, ApiContains.HTTP_RES_CODE_200_VALUE, null);
}
//返回成功,无data值
public ResponseBase setResultSuccess(String msg) {
return setResult(ApiContains.HTTP_RES_CODE_200, msg, null);
}
//通用封装
public ResponseBase setResult(Integer code, String msg, Object data) {
return new ResponseBase(code, msg, data);
}
}
3.3、ResponseBase
package com.sc.rabbitmq_order2050.base;
import lombok.Data;
@Data
public class ResponseBase {
private Integer rtnCode;
private String msg;
private Object data;
public ResponseBase() {
}
public ResponseBase(Integer rtnCode, String msg, Object data) {
super();
this.rtnCode = rtnCode;
this.msg = msg;
this.data = data;
}
@Override
public String toString() {
return "ResponseBase[rtnCode=" + rtnCode + ", msg=" + msg + ", data=" + data + "]";
}
}
4、RabbitmqConfig
package com.sc.rabbitmq_order2050.rabbitmq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
/**
* 下单且派单、补单队列
* 下单、派单交换机
* 定义【下单且派单队列、补单队列】与交换机绑定
*/
@Component
public class RabbitmqConfig {
//下单且派单队列
public static final String ORDER_PLATOON_QUEUE = "order_platoon_queue";
//补单队列,判断订单是否已经被创建
//public static final String ORDER_CREATE_QUEUE = "order_create_queue";
//下单并且派单交换机
public static final String ORDER_EXCHANGE_NAME= "order_exchange_name";
//1、定义订单队列
@Bean
public Queue directOrderPlatoonQueue() {
return new Queue(ORDER_PLATOON_QUEUE);
}
//2、定义补单队列
@Bean
public Queue directCreateOrderQueue() {
return new Queue(ORDER_CREATE_QUEUE);
}
//2、定义交换机
DirectExchange directOrderExchange() {
return new DirectExchange(ORDER_EXCHANGE_NAME);
}
//3、定义【下单且派单队列】与交换机绑定
@Bean
Binding bindingExchangeOrderDicQueue() {
return BindingBuilder.bind(directOrderPlatoonQueue()).to(directOrderExchange())
.with("orderRoutingKey");
}
//3、定义补单队列与交换机绑定
@Bean
Binding bindingExchangeCreateOrderQueue() {
return BindingBuilder.bind(directCreateOrderQueue()).to(directOrderExchange())
. .with("orderRoutingKey");
}
}
5、OrderEnity
package com.sc.rabbitmq_order2050.entity;
import lombok.Data;
@Data
public class OrderEnity {
private Long id;
//订单名称
private String name;
//订单金额
private Double orderMoney;
//订单id
private String orderId;
}
6、OrderMapper
package com.sc.rabbitmq_order2050.mapper;
import com.sc.rabbitmq_order2050.entity.OrderEnity;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Options;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
public interface OrderMapper {
@Insert(value = "insert into order_info values(#{id},#{name},#{orderMoney},#{orderId})")
@Options(useGeneratedKeys = true, keyProperty = "id", keyColumn = "id")
public int addOder(OrderEnity orderEnity);
@Select("select id as id,name as name,order_money as orderMonty,orderId as orderId" +
" from order_info where orderId=#{orderId};")
public OrderEnity findOrderId(@Param("orderId") String orderId);
}
7、OrderService
package com.sc.rabbitmq_order2050.service;
import com.alibaba.fastjson.JSONObject;
import com.sc.rabbitmq_order2050.base.BaseApiService;
import com.sc.rabbitmq_order2050.base.ResponseBase;
import com.sc.rabbitmq_order2050.entity.OrderEnity;
import com.sc.rabbitmq_order2050.mapper.OrderMapper;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.UUID;
@Service
public class OrderService extends BaseApiService implements RabbitTemplate.ConfirmCallback {
@Autowired
private OrderMapper orderMapper;
@Autowired
private RabbitTemplate rabbitTemplate;
public ResponseBase addOrderAndPlatoon() {
//先下单 订单表插入数据
OrderEnity orderEnity = new OrderEnity();
orderEnity.setName("xxxx面");
//价格是300元
orderEnity.setOrderMoney(300d);
//商品id
String orderId = UUID.randomUUID().toString();
orderEnity.setOrderId(orderId);
//1、先下单,创建订单(往订单数据库中插入一条数据)
int orderResult = orderMapper.addOder(orderEnity);
System.out.println("orderResult:" + orderResult);
if (orderResult <= 0) {
return setResultError("下单失败!");
}
//2、订单表插入完数据后 订单发送 外卖小哥【派单】
send(orderId);
return setResultSuccess();
}
private void send(String orderId) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("orderId", orderId);
String msg = jsonObject.toJSONString();
System.out.println("msg:" + msg);
//封装信息
Message message = MessageBuilder.withBody(msg.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON)
.setContentEncoding("utf-8").setMessageId(orderId).build();
//构建回调返回的数据
CorrelationData correlationData = new CorrelationData(orderId);
//发送信息
this.rabbitTemplate.setMandatory(true);
this.rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.convertAndSend("order_exchange_name",
“orderRoutingKey", message, correlationData);
}
//生产信息确认机制 生产者往服务器端发送消息的时候 采用应答机制
@Override
public void confirm(CorrelationData correlationData, boolean ack, String s) {
String orderId = correlationData.getId();//id都是相同 全局id
System.out.println("消息id:" + correlationData.getId());
if (ack) {//消息发送成功
System.out.println("消息id:" + correlationData.getId());
} else {
//重试机制
send(orderId);
System.out.println("消息发送确认失败:" + s);
}
}
}
8、OrderController
package com.sc.rabbitmq_order2050.controller;
import com.sc.rabbitmq_order2050.base.BaseApiService;
import com.sc.rabbitmq_order2050.base.ResponseBase;
import com.sc.rabbitmq_order2050.service.OrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class OrderController extends BaseApiService {
@Autowired
private OrderService orderService;
@RequestMapping("/addOrder")
public ResponseBase addOrder() {
return orderService.addOrderAndDispath();
}
}
9、RabbitmqOrder2050Application
package com.sc.rabbitmq_order2050;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@MapperScan("com.sc.rabbitmq_order2050.mapper")
@SpringBootApplication
public class RabbitmqOrder2050Application {
public static void main(String[] args) {
SpringApplication.run(RabbitmqOrder2050Application.class, args);
}
}
(二)、rabbitmq_platoon2051
1、 pom.xml
<dependencies>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
<version>8.0.22</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.2.11</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--spring对amqp-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.3.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.9</version>
</dependency>
</dependencies>
2、 application.properties
spring.application.name=rabbitmq_platoon2051
server.port=2051
spring.rabbitmq.host=localhost
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.port=5672
###开启消息者(程序出现异常情况下会)进行重试
spring.rabbitmq.listener.simple.retry.enabled=true
##最大重试次数
spring.rabbitmq.listener.simple.retry.max-attempts=5
#重试间隔
spring.rabbitmq.listener.simple.retry.initial-interval=3000
#开启手动ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 数据源名称
spring.datasource.name=test
# 数据库连接地址
spring.datasource.url=jdbc:mysql://localhost:3306/newtest?characterEncoding=UTF-8&serverTimezone=UTC
# 数据库用户名&密码:
spring.datasource.username=root
spring.datasource.password=root
#使用druid数据源
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
# 数据库驱动:
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
3、RabbitmqConfig
package com.sc.rabbitmq_platoon2051.rabbitmq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
/**
* 下单且派单队列
* 下单、派单交换机
* 定义【下单且派单队列】与交换机绑定
*/
@Component
public class RabbitmqConfig {
//下单且派单队列
public static final String ORDER_PLATOO_QUEUE = "order_platoon_queue";
///补单队列,判断订单是否已经被创建
public static final String ORDER_CREATE_QUEUE = "order_create_queue";
//下单、派单交换机
public static final String ORDER_EXCHANGE_NAME= "order_exchange_name";
//1、定义订单队列
@Bean
public Queue directOrderPlatoonQueue() {
return new Queue(ORDER_PLATOO_QUEUE);
}
//2、定义补单队列
/* @Bean
public Queue directCreateOrderQueue() {
return new Queue(ORDER_CREATE_QUEUE);
}*/
//2、定义交换机
DirectExchange directOrderExchange() {
return new DirectExchange(ORDER_EXCHANGE_NAME);
}
//3、定义【下单且派单队列】与交换机绑定
@Bean
Binding bindingExchangeOrderDicQueue() {
return BindingBuilder.bind(directOrderPlatoonQueue()).to(directOrderExchange())
.with("orderRoutingKey");
}
//3、定义补单队列与交换机绑定
/*@Bean
Binding bindingExchangeCreateOrderQueue() {
return BindingBuilder.bind(directCreateOrderQueue()).to(directOrderExchange())
.with("orderRoutingKey");
}*/
}
4、PlatoonEntity
package com.sc.rabbitmq_platoon2051.entity;
import lombok.Data;
@Data
public class PlatoonEntity {
private Long id;
//订单号
private String orderId;
//外卖员
private Long takeoutUserId;
}
5、PlatoonMapper
package com.sc.rabbitmq_platoon2051.mapper;
import com.sc.rabbitmq_platoon2051.entity.PlatoonEntity;
import org.apache.ibatis.annotations.Insert;
public interface PlatoonMapper
{
/**
* 新增派单任务
*/
@Insert("insert into platoon values(null,#{orderId},#{takeoutUserId});")
public int insertPlatoon(PlatoonEntity platoon);
}
6、PlatoonConsumer
package com.sc.rabbitmq_platoon2051.consumber;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import com.sc.rabbitmq_platoon2051.entity.PlatoonEntity;
import com.sc.rabbitmq_platoon2051.mapper.PlatoonMapper;
import com.sc.rabbitmq_platoon2051.rabbitmq.RabbitmqConfig;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
/**
* 派单服务
*/
@Component
public class PlatoonConsumer {
@Autowired
private PlatoonMapper platoonMapper;
@RabbitListener(queues =RabbitmqConfig.ORDER_PLATOO_QUEUE)//下单且派单队列
public void process(Message message, @Headers Map<String,Object> headers,
Channel channel) throws IOException {
String messageId=message.getMessageProperties().getMessageId();
String msg=new String(message.getBody(),"UTF-8");
System.out.println("派单平台"+msg+",消息id:"+messageId);
JSONObject jsonObject=JSONObject.parseObject(msg);
String orderId=jsonObject.getString("orderId");
if(StringUtils.isEmpty(orderId)){
//日志记录
return;
}
PlatoonEntity platoonEntity=new PlatoonEntity();
//订单id
platoonEntity.setOrderId(orderId);
//外卖员id
platoonEntity.setTakeoutUserId(Long.valueOf("121"));
try{
int result=platoonMapper.insertPlatoon(platoonEntity);
if(result>0){
//手动签收信息,通知msg服务端删除该信息
channel.basicAck(message.getMessageProperties().getDeliveryTag()
,false);
}
}catch (Exception e){
e.printStackTrace();
//丢该消息
channel.basicNack(message.getMessageProperties().getDeliveryTag()
,false,false);
}
}
}
7、RabbitmqPlatoon2051Application
package com.sc.rabbitmq_platoon2051;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@MapperScan("com.sc.rabbitmq_platoon2051.mapper")
@SpringBootApplication
public class RabbitmqPlatoon2051Application {
public static void main(String[] args) {
SpringApplication.run(RabbitmqPlatoon2051Application.class, args);
}
}
标签:spring,20230406,rabbitmq,Rabbit,org,import,com,public,分布式 From: https://www.cnblogs.com/smallfa/p/17292257.html