首页 > 其他分享 >Rabbit-分布式事务实例 20230406

Rabbit-分布式事务实例 20230406

时间:2023-04-06 16:36:56浏览次数:41  
标签:spring 20230406 rabbitmq Rabbit org import com public 分布式

 

一、生产、消费者 流程

 

 

 

 

 

 

 

 

1、生产者(下单后生产 务必成功)
  派单队列:order_platonn_queue
  交换机:order_exchange_name
  绑交换机路由键:orderRoutingKey

  生产者=>采用confirm,确认应答机制
        Ack模式:成功
            失败则重试


2、消费者(platonn派单)
  派单队列:order_platonn_queue
  交换机:order_exchange_name
  绑交换机路由键:orderRoutingKey

       消费者=>重发Retry/清除Ack/丢弃Nack

 

 

 

 二、RabbitMQ

 

 

 

 

 

 

 

 

 

 

 

 

三、SB+Order+Platoon

 

                (一)、rabbitmq_order2050

 

  1、 pom.xml
    <dependencies>
     <dependency>
      <groupId>org.mybatis.spring.boot</groupId>
      <artifactId>mybatis-spring-boot-starter</artifactId>
      <version>2.2.2</version>
    </dependency>
    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <scope>runtime</scope>
      <version>8.0.22</version>

     </dependency>
    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>druid</artifactId>
      <version>1.2.11</version>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
    </dependency>
    <!--spring对amqp-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
      <version>2.3.1.RELEASE</version>
    </dependency>
    <dependency>
      <groupId>org.apache.commons</groupId>
      <artifactId>commons-lang3</artifactId>
    </dependency>
    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>2.0.9</version>
    </dependency>
   </dependencies>

 

  2、 application.properties

 

  spring.application.name=rabbitmq_order2050
  server.port=2050
  spring.rabbitmq.host=localhost
  spring.rabbitmq.username=guest
  spring.rabbitmq.password=guest
  spring.rabbitmq.port=5672
  ###开启消息确认机制confirms
  spring.rabbitmq.publisher-confirm-type=correlated
  spring.rabbitmq.publisher-returns=true

 

  # 数据源名称
  spring.datasource.name=test
  # 数据库连接地址
  spring.datasource.url=jdbc:mysql://localhost:3306/newtest?characterEncoding=UTF-8&serverTimezone=UTC
  # 数据库用户名&密码:
  spring.datasource.username=root
  spring.datasource.password=root
  #使用druid数据源
  spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
  # 数据库驱动:
  spring.datasource.driver-class-name=com.mysql.jdbc.Driver

 


  3、base
  3.1、ApiContains
  package com.sc.rabbitmq_order2050.base;

 

  public interface ApiContains {
    //响应请求成功
    String HTTP_RES_CODE_200_VALUE = "success";
    //系统错识
    String HTTP_RES_CODE_500_VALUE = "fail";
    //响应请求成功code
    Integer HTTP_RES_CODE_200 = 200;
    //系统错识
    Integer HTTP_RES_CODE_500 = 500;
    //未关联QQ账号
    Integer HTTP_RES_CODE_201 = 201;
  }

 


  3.2、BaseApiService
  package com.sc.rabbitmq_order2050.base;
  import org.springframework.stereotype.Component;

 

  @Component
  public class BaseApiService {

 

    public ResponseBase setResultError(Integer code, String msg) {
      return setResult(code, msg, null);
    }

 

    //返回错识,可以传msg
    public ResponseBase setResultError(String msg) {
      return setResult(ApiContains.HTTP_RES_CODE_500, msg, null);
    }

 

    //返回成功,可以传data值
    public ResponseBase setResultSuccess(Object data) {
      return setResult(ApiContains.HTTP_RES_CODE_200, ApiContains.HTTP_RES_CODE_200_VALUE, data);
    }

 

    //返回成功,无data值
    public ResponseBase setResultSuccess() {
      return setResult(ApiContains.HTTP_RES_CODE_200, ApiContains.HTTP_RES_CODE_200_VALUE, null);
    }

 

    //返回成功,无data值
    public ResponseBase setResultSuccess(String msg) {
      return setResult(ApiContains.HTTP_RES_CODE_200, msg, null);
    }

 

    //通用封装
    public ResponseBase setResult(Integer code, String msg, Object data) {
      return new ResponseBase(code, msg, data);
    }
  }

 

  3.3、ResponseBase
  package com.sc.rabbitmq_order2050.base;

 

  import lombok.Data;
  @Data
  public class ResponseBase {

    private Integer rtnCode;
    private String msg;
    private Object data;

 

    public ResponseBase() {
    }

 

    public ResponseBase(Integer rtnCode, String msg, Object data) {
      super();
      this.rtnCode = rtnCode;
      this.msg = msg;
      this.data = data;
    }

 

    @Override
    public String toString() {
      return "ResponseBase[rtnCode=" + rtnCode + ", msg=" + msg + ", data=" + data + "]";
    }
  }

 


  4、RabbitmqConfig
  package com.sc.rabbitmq_order2050.rabbitmq;

  import org.springframework.amqp.core.Binding;
  import org.springframework.amqp.core.BindingBuilder;
  import org.springframework.amqp.core.DirectExchange;
  import org.springframework.amqp.core.Queue;
  import org.springframework.context.annotation.Bean;
  import org.springframework.stereotype.Component;

 

  /**
  * 下单且派单、补单队列
  * 下单、派单交换机
  * 定义【下单且派单队列、补单队列】与交换机绑定
  */
  @Component
  public class RabbitmqConfig {

 

    //下单且派单队列
    public static final String ORDER_PLATOON_QUEUE = "order_platoon_queue";
    //补单队列,判断订单是否已经被创建
    //public static final String ORDER_CREATE_QUEUE = "order_create_queue";
    //下单并且派单交换机
    public static final String ORDER_EXCHANGE_NAME= "order_exchange_name";

 

    //1、定义订单队列
    @Bean
    public Queue directOrderPlatoonQueue() {
      return new Queue(ORDER_PLATOON_QUEUE);
    }

    //2、定义补单队列
    @Bean
    public Queue directCreateOrderQueue() {
      return new Queue(ORDER_CREATE_QUEUE);
    }

 

    //2、定义交换机
    DirectExchange directOrderExchange() {
      return new DirectExchange(ORDER_EXCHANGE_NAME);
    }

 

    //3、定义【下单且派单队列】与交换机绑定
    @Bean
    Binding bindingExchangeOrderDicQueue() {
      return BindingBuilder.bind(directOrderPlatoonQueue()).to(directOrderExchange())
      .with("orderRoutingKey");
    }

 

    //3、定义补单队列与交换机绑定
    @Bean
      Binding bindingExchangeCreateOrderQueue() {
        return BindingBuilder.bind(directCreateOrderQueue()).to(directOrderExchange())
.        .with("orderRoutingKey");
    }
  }

 

 

 

  5、OrderEnity
  package com.sc.rabbitmq_order2050.entity;

  import lombok.Data;
  @Data
  public class OrderEnity {
    private Long id;
    //订单名称
    private String name;
    //订单金额
    private Double orderMoney;
    //订单id
    private String orderId;
  }

 


  6、OrderMapper
  package com.sc.rabbitmq_order2050.mapper;

  import com.sc.rabbitmq_order2050.entity.OrderEnity;
  import org.apache.ibatis.annotations.Insert;
  import org.apache.ibatis.annotations.Options;
  import org.apache.ibatis.annotations.Param;
  import org.apache.ibatis.annotations.Select;

 

  public interface OrderMapper {

    @Insert(value = "insert into order_info values(#{id},#{name},#{orderMoney},#{orderId})")
    @Options(useGeneratedKeys = true, keyProperty = "id", keyColumn = "id")
    public int addOder(OrderEnity orderEnity); 

    @Select("select id as id,name as name,order_money as orderMonty,orderId as orderId" +
    " from order_info where orderId=#{orderId};")
    public OrderEnity findOrderId(@Param("orderId") String orderId);

   }

 

  7、OrderService

  package com.sc.rabbitmq_order2050.service;

  import com.alibaba.fastjson.JSONObject;
  import com.sc.rabbitmq_order2050.base.BaseApiService;
  import com.sc.rabbitmq_order2050.base.ResponseBase;
  import com.sc.rabbitmq_order2050.entity.OrderEnity;
  import com.sc.rabbitmq_order2050.mapper.OrderMapper;
  import org.springframework.amqp.core.Message;
  import org.springframework.amqp.core.MessageBuilder;
  import org.springframework.amqp.core.MessageProperties;
  import org.springframework.amqp.rabbit.connection.CorrelationData;
  import org.springframework.amqp.rabbit.core.RabbitTemplate;
  import org.springframework.beans.factory.annotation.Autowired;
  import org.springframework.stereotype.Service;
  import java.util.UUID;

  @Service
  public class OrderService extends BaseApiService implements RabbitTemplate.ConfirmCallback {

    @Autowired
    private OrderMapper orderMapper;
    @Autowired
    private RabbitTemplate rabbitTemplate;

 

    public ResponseBase addOrderAndPlatoon() {
      //先下单 订单表插入数据
      OrderEnity orderEnity = new OrderEnity();
      orderEnity.setName("xxxx面");
      //价格是300元
      orderEnity.setOrderMoney(300d);
      //商品id
      String orderId = UUID.randomUUID().toString();
      orderEnity.setOrderId(orderId);
      //1、先下单,创建订单(往订单数据库中插入一条数据)
      int orderResult = orderMapper.addOder(orderEnity);
      System.out.println("orderResult:" + orderResult);
      if (orderResult <= 0) {
        return setResultError("下单失败!");
      }
      //2、订单表插入完数据后 订单发送 外卖小哥【派单】
      send(orderId);
      return setResultSuccess();
    }

 

    private void send(String orderId) {
      JSONObject jsonObject = new JSONObject();
      jsonObject.put("orderId", orderId);
      String msg = jsonObject.toJSONString();
      System.out.println("msg:" + msg);
      //封装信息
      Message message = MessageBuilder.withBody(msg.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON)
      .setContentEncoding("utf-8").setMessageId(orderId).build();
      //构建回调返回的数据
      CorrelationData correlationData = new CorrelationData(orderId);
      //发送信息
      this.rabbitTemplate.setMandatory(true);
      this.rabbitTemplate.setConfirmCallback(this);
      rabbitTemplate.convertAndSend("order_exchange_name",
      “orderRoutingKey", message, correlationData);
    }

 

    //生产信息确认机制 生产者往服务器端发送消息的时候 采用应答机制
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String s) {
      String orderId = correlationData.getId();//id都是相同 全局id
      System.out.println("消息id:" + correlationData.getId());
      if (ack) {//消息发送成功
        System.out.println("消息id:" + correlationData.getId());
      } else {
        //重试机制
        send(orderId);
        System.out.println("消息发送确认失败:" + s);
      }
    }

 

  }

 


  8、OrderController
  package com.sc.rabbitmq_order2050.controller; 

  import com.sc.rabbitmq_order2050.base.BaseApiService;
  import com.sc.rabbitmq_order2050.base.ResponseBase;
  import com.sc.rabbitmq_order2050.service.OrderService;
  import org.springframework.beans.factory.annotation.Autowired;
  import org.springframework.web.bind.annotation.RequestMapping;
  import org.springframework.web.bind.annotation.RestController;

  @RestController
  public class OrderController extends BaseApiService {
  @Autowired
  private OrderService orderService;

 

  @RequestMapping("/addOrder")
    public ResponseBase addOrder() {
    return orderService.addOrderAndDispath();
    }
  }

 

  9、RabbitmqOrder2050Application

  package com.sc.rabbitmq_order2050;

  import org.mybatis.spring.annotation.MapperScan;
  import org.springframework.boot.SpringApplication;
  import org.springframework.boot.autoconfigure.SpringBootApplication;

  @MapperScan("com.sc.rabbitmq_order2050.mapper")
  @SpringBootApplication
  public class RabbitmqOrder2050Application {

    public static void main(String[] args) {
      SpringApplication.run(RabbitmqOrder2050Application.class, args);
    }

  }

 

 

 

 

 

 

 

 

 

 

 

                (二)、rabbitmq_platoon2051

 

 

 

  1、 pom.xml
    <dependencies>
      <dependency>
      <groupId>org.mybatis.spring.boot</groupId>
      <artifactId>mybatis-spring-boot-starter</artifactId>
      <version>2.2.2</version>
    </dependency>
    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <scope>runtime</scope>
      <version>8.0.22</version>
    </dependency>
    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>druid</artifactId>
      <version>1.2.11</version>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
    </dependency>
    <!--spring对amqp-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
      <version>2.3.1.RELEASE</version>
    </dependency>
    <dependency>
      <groupId>org.apache.commons</groupId>
      <artifactId>commons-lang3</artifactId>
    </dependency>
    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>2.0.9</version>
    </dependency>
  </dependencies>

 

  2、 application.properties

 

    spring.application.name=rabbitmq_platoon2051
    server.port=2051
    spring.rabbitmq.host=localhost
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    spring.rabbitmq.port=5672
    ###开启消息者(程序出现异常情况下会)进行重试
    spring.rabbitmq.listener.simple.retry.enabled=true
    ##最大重试次数
    spring.rabbitmq.listener.simple.retry.max-attempts=5
    #重试间隔
    spring.rabbitmq.listener.simple.retry.initial-interval=3000
    #开启手动ack
    spring.rabbitmq.listener.simple.acknowledge-mode=manual

 


    # 数据源名称
    spring.datasource.name=test
    # 数据库连接地址
    spring.datasource.url=jdbc:mysql://localhost:3306/newtest?characterEncoding=UTF-8&serverTimezone=UTC
    # 数据库用户名&密码:
    spring.datasource.username=root
    spring.datasource.password=root
    #使用druid数据源
    spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
    # 数据库驱动:
    spring.datasource.driver-class-name=com.mysql.jdbc.Driver

 


  3、RabbitmqConfig
    package com.sc.rabbitmq_platoon2051.rabbitmq;

 

    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.stereotype.Component;

 

    /**
    * 下单且派单队列
    * 下单、派单交换机
    * 定义【下单且派单队列】与交换机绑定
    */
    @Component
    public class RabbitmqConfig {

 

      //下单且派单队列
      public static final String ORDER_PLATOO_QUEUE = "order_platoon_queue";
      ///补单队列,判断订单是否已经被创建
      public static final String ORDER_CREATE_QUEUE = "order_create_queue";
      //下单、派单交换机
      public static final String ORDER_EXCHANGE_NAME= "order_exchange_name";

 

      //1、定义订单队列
      @Bean
      public Queue directOrderPlatoonQueue() {
        return new Queue(ORDER_PLATOO_QUEUE);
      }

 

 

      //2、定义补单队列
      /* @Bean
      public Queue directCreateOrderQueue() {
        return new Queue(ORDER_CREATE_QUEUE);
      }*/

 

      //2、定义交换机
      DirectExchange directOrderExchange() {
        return new DirectExchange(ORDER_EXCHANGE_NAME);
      }

 

      //3、定义【下单且派单队列】与交换机绑定
      @Bean
      Binding bindingExchangeOrderDicQueue() {
        return BindingBuilder.bind(directOrderPlatoonQueue()).to(directOrderExchange())
        .with("orderRoutingKey");
      }

 

      //3、定义补单队列与交换机绑定
      /*@Bean
      Binding bindingExchangeCreateOrderQueue() {
        return BindingBuilder.bind(directCreateOrderQueue()).to(directOrderExchange())
        .with("orderRoutingKey");
      }*/
    }

 

 

 

  4、PlatoonEntity
  package com.sc.rabbitmq_platoon2051.entity;

 

  import lombok.Data;

 

  @Data
  public class PlatoonEntity {
    private Long id;
    //订单号
    private String orderId;
    //外卖员
    private Long takeoutUserId;
  }

 

 

 

  5、PlatoonMapper
  package com.sc.rabbitmq_platoon2051.mapper;

 

  import com.sc.rabbitmq_platoon2051.entity.PlatoonEntity;
  import org.apache.ibatis.annotations.Insert;

 

  public interface PlatoonMapper
  {
    /**
    * 新增派单任务
    */
    @Insert("insert into platoon values(null,#{orderId},#{takeoutUserId});")
    public int insertPlatoon(PlatoonEntity platoon);
  }

 


  6、PlatoonConsumer

 

  package com.sc.rabbitmq_platoon2051.consumber;

 

  import com.alibaba.fastjson.JSONObject;
  import com.rabbitmq.client.Channel;
  import com.sc.rabbitmq_platoon2051.entity.PlatoonEntity;
  import com.sc.rabbitmq_platoon2051.mapper.PlatoonMapper;
  import com.sc.rabbitmq_platoon2051.rabbitmq.RabbitmqConfig;
  import org.apache.commons.lang3.StringUtils;
  import org.springframework.amqp.core.Message;
  import org.springframework.amqp.rabbit.annotation.RabbitListener;
  import org.springframework.beans.factory.annotation.Autowired;
  import org.springframework.messaging.handler.annotation.Headers;
  import org.springframework.stereotype.Component;
  import java.io.IOException;
  import java.util.Map;

 

  /**
    * 派单服务
  */
  @Component
  public class PlatoonConsumer {

 

    @Autowired
    private PlatoonMapper platoonMapper;

 

    @RabbitListener(queues =RabbitmqConfig.ORDER_PLATOO_QUEUE)//下单且派单队列
    public void process(Message message, @Headers Map<String,Object> headers,
      Channel channel) throws IOException {
      String messageId=message.getMessageProperties().getMessageId();
      String msg=new String(message.getBody(),"UTF-8");
      System.out.println("派单平台"+msg+",消息id:"+messageId);
      JSONObject jsonObject=JSONObject.parseObject(msg);
      String orderId=jsonObject.getString("orderId");
      if(StringUtils.isEmpty(orderId)){
        //日志记录
        return;
      }
      PlatoonEntity platoonEntity=new PlatoonEntity();
      //订单id
      platoonEntity.setOrderId(orderId);
      //外卖员id
      platoonEntity.setTakeoutUserId(Long.valueOf("121"));
      try{
        int result=platoonMapper.insertPlatoon(platoonEntity);
        if(result>0){
          //手动签收信息,通知msg服务端删除该信息
          channel.basicAck(message.getMessageProperties().getDeliveryTag()
          ,false);
        }
      }catch (Exception e){
        e.printStackTrace();
        //丢该消息
        channel.basicNack(message.getMessageProperties().getDeliveryTag()
        ,false,false);
      }
    }
  }

 


  7、RabbitmqPlatoon2051Application
  package com.sc.rabbitmq_platoon2051;

 

  import org.mybatis.spring.annotation.MapperScan;
  import org.springframework.boot.SpringApplication;
  import org.springframework.boot.autoconfigure.SpringBootApplication;

 

  @MapperScan("com.sc.rabbitmq_platoon2051.mapper")
  @SpringBootApplication
  public class RabbitmqPlatoon2051Application {

 

    public static void main(String[] args) {
      SpringApplication.run(RabbitmqPlatoon2051Application.class, args);
    }

 

  }

 

 

 

 

标签:spring,20230406,rabbitmq,Rabbit,org,import,com,public,分布式
From: https://www.cnblogs.com/smallfa/p/17292257.html

相关文章

  • 分布式技术剖析
    随着企业数字化进程的进一步深入,企业为了解决大数据的“4个V”问题,往往需要构建多个不同技术栈的大数据平台,其中不乏会使用到分布式相关的存储、计算、资源管理技术。分布式系统的出现解决了单机系统无法解决的成本、效率和高可用问题。那么什么是分布式技术?如何发展至今?主要包括......
  • 20230406 英语学习进度慢
    我从2月中开始,一直在做精听的练习.但是,你关于精听的进度,你认为太慢了.你的听力加起来,不至5篇,这个量,我认为是严重不足地.正如大佬所言,20篇以上的精听&英转中的完全掌握,将会有英语的极大提升.一方面,精听的确需要大量的时间投入.但是,另外一方面,你的确时间投入需要加......
  • 20230406-python-yaml文件操作
               ......
  • 互联网项目实战——Athena-OSS分布式文件存储服务设计
    摘要在系统中需要有统一的存储系统,用于较大型的文件和图片进行存储,Athena系统中利用开源的FastDFS来构建Athena分布式文件存储系统OSS服务。用于整个系统的存储服务。博文将详细的介绍分布式存储系统的背景和意义以及相关的技术选型与原理,供大家学习参考。一、分布式文件存储系统背......
  • 04.分布式选举:国不可一日无君(非原创)
    作者:聂博士Bully算法:长者为大,选择剩余存活的节点中最大的id   Bully算法在选举过程中,需要用到以下3种消息:Election消息,用于发起选举;Alive消息,对Election消息的应答;Victory消息,竞选成功的主节点向其他节点发送的宣誓主权的消息。Bully算法选举的原则是“长者为大”,意......
  • 分布式追踪的最佳工具:SigNoz
    分布式追踪的最佳工具:SigNozvsJaeger参考链接分布式追踪的最佳工具:SigNozvsJaeger_devops_weixin_0010034-DevPress官方社区(csdn.net)开源可观测性平台SigNoz参考链接开源可观测性平台SigNoz_JAVA序码的博客-CSDN博客使用开源工具监控全栈Nodejs应用参考链接使用开源工具监......
  • 分布式与微服务之间的关系
    根据设计期的架构思想和运行期的不同结构分为:面向服务的架构分布式服务架构微服务架构1、面向服务架构。以业务服务的角度和服务总线的方式,一般是webservice与ESB,考虑系统架构和企业IT治理;2、分布式服务架构。基于去中心化的分布式服务框架与技术,考虑系统架构和服务治理;3、......
  • 新一代分布式任务调度框架
    本文已经收录到Github仓库,该仓库包含计算机基础、Java基础、多线程、JVM、数据库、Redis、Spring、Mybatis、SpringMVC、SpringBoot、分布式、微服务、设计模式、架构、校招社招分享等核心知识点,欢迎star~Github地址如果访问不了Github,可以访问gitee地址。gitee地址我们先思考......
  • GFS分布式文件系统
    GFS分布式文件系统  1.GlusterFS简介  GlusterFS(GlusterFileSystem)是一个开源的分布式文件系统,主要由ZRESEARCH公司负责开发。GlusterFS是Scale-Out存储解决方案Gluster的核心,具有强大的横向扩展能力,通过扩展能够支持数PB存储容量和处理数千客户端。GlusterFS......
  • DFS分布式文件系统
    一、GFS文件系统概述1.GlusterFS简介GlusterFS是一个开源的分布式文件系统。由存储服务器、客户端以及NFS/Samba存储网关(可选,根据需要选择使用)组成。没有元数据服务器组件,这有助于提升整个系统的性能、可靠性和稳定性。MFS传统的分布式文件系统大多通过元服务器来存储元数据,元数......