首页 > 其他分享 >第七章 Rocketmq--消息驱动

第七章 Rocketmq--消息驱动

时间:2023-08-20 15:13:31浏览次数:33  
标签:shop -- rocketmq 消息 第七章 org import order Rocketmq

7.1   MQ简介

7.1.1 什么是MQ

MQ(Message  Queue)是一种跨进程的通信机制,用于传递消息。通俗点说,就是一个先进先出的数据结构。

7.1.2 MQ的应用场景

7.1.2.1 异步解耦

此架构下注册、邮件、短信三个任务全部完成后,才返回注册结果到客户端,用户才能使用账号登录。
但是对于用户来说,注册功能实际只需要注册系统存储用户的账户信息后,该用户便可以登录,而后续
的注册短信和邮件不是即时需要关注的步骤。
所以实际当数据写入注册系统后,注册系统就可以把其他的操作放入对应的消息队列 MQ 中然后马上返
回用户结果,由消息队列 MQ 异步地进行这些操作。架构图如下:

异步解耦是消息队列 MQ 的主要特点,主要目的是减少请求响应时间和解耦。主要的使用场景就是将比 较耗时而且不需要即时(同步)返回结果的操作作为消息放入

消息队列。同时,由于使用了消息队列 MQ,只要保证消息格式不变,消息的发送方和接收方并不需要彼此联系,也不需要受对方的影响,即 解耦合。

7.1.2.2 流量削峰

流量削峰也是消息队列   MQ   的常用场景,一般在秒杀或团队抢购(高并发)活动中使用广泛。

在秒杀或团队抢购活动中,由于用户请求量较大,导致流量暴增,秒杀的应用在处理如此大量的访问流

量后,下游的通知系统无法承载海量的调用量,甚至会导致系统崩溃等问题而发生漏通知的情况。为解 决这些问题,可在应用和下游通知系统之间加入消息队列   MQ。

秒杀处理流程如下所述:

  1. 用户发起海量秒杀请求到秒杀业务处理系统。
  2. 秒杀处理系统按照秒杀处理逻辑将满足秒杀条件的请求发送至消息队列 MQ。
  3. 下游的通知系统订阅消息队列 MQ   的秒杀相关消息,再将秒杀成功的消息发送到相应用户。
  4. 用户收到秒杀成功的通知。

7.1.3 常见的MQ产品

目前业界有很多MQ产品,比较出名的有下面这些:

ZeroMQ

号称最快的消息队列系统,尤其针对大吞吐量的需求场景。扩展性好,开发比较灵活,采用C语言 实现,实际上只是一个socket库的重新封装,

如果做为消息队列使用,需要开发大量的代码。 ZeroMQ仅提供非持久性的队列,也就是说如果down机,数据将会丢失。

RabbitMQ

使用erlang语言开发,性能较好,适合于企业级的开发。但是不利于做二次开发和维护。

ActiveMQ

历史悠久的Apache开源项目。已经在很多产品中得到应用,实现了JMS1.1规范,可以和spring- jms轻松融合,实现了多种协议,支持持久化到数据库,

对队列数较多的情况支持不好。

RocketMQ

阿里巴巴的MQ中间件,由java语言开发,性能非常好,能够撑住双十一的大流量,而且使用起来 很简单。

Kafka

Kafka是Apache下的一个子项目,是一个高性能跨语言分布式Publish/Subscribe消息队列系统, 相对于ActiveMQ是一个非常轻量级的消息系统,

除了性能非常好之外,还是一个工作良好的分布 式系统。

7.2   RocketMQ入门

RocketMQ是阿里巴巴开源的分布式消息中间件,现在是Apache的一个顶级项目。在阿里内部使用 非常广泛,已经经过了"双11"这种万亿级的消息流转。

7.2.1   RocketMQ环境搭建

接下来我们先在linux平台下安装一个RocketMQ的服务

7.2.1.1 环境准备

下载RocketMQ

http://rocketmq.apache.org/release_notes/release-notes-4.4.0/

环境要求

Linux 64位操作系统

64bit JDK 1.8+

7.2.1.2   安装RocketMQ

1  上传文件到Linux系统

使用SecureCRT 登录的linux 服务器:输入命令RZ上传 rocketMQ 安装包 如图:

将rocketmq-all-4.4.0-bin-release.zip文件上传到  /usr/local/src/ 目录下

2 解压到安装目录:

解压:

 unzip rocketmq-all-4.4.0-bin-release.zip 

然后移动到另一个目录并重命名文件:

mv rocketmq-all-4.4.0-bin-release ../rocketmq

7.2.1.3   启动RocketMQ

1切换到安装目录

[root@localhost src]# cd ../rocketmq/
[root@localhost rocketmq]# 

2。进入到bin目录[root@localhost bin]# cd /usr/local/rocketmq/bin 修改配置:[root@localhost bin]# vi runserver.sh

注意这里是本地虚拟机环境,需要按如图该一下配置:可以根据实际情况调整大小

3.进入 bin目录执行:启动mqnamesrv

[root@localhost bin]## nohup ./bin/mqnamesrv &

[1] 1467

#  只要进程不报错,就应该是启动成功了,可以查看一下日志

[root@localhost bin]# tail -f /root/logs/rocketmqlogs/namesrv.log

启动mqbroker

[root@localhost rocketmq]# nohup bin/mqbroker -n localhost:9876 &

查看日志

[root@localhost rocketmq]# tail -f /root/logs/rocketmqlogs/broker.log

7.2.1.4   测试RocketMQ

1 测试消息发送

[root@localhost rocketmq]# export NAMESRV_ADDR=localhost:9876
[root@localhost rocketmq]# bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

2 测试消息接收


[root@localhost rocketmq]# export NAMESRV_ADDR=localhost:9876

[root@localhost rocketmq]# bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

7.2.1.5   关闭RocketMQ

[root@localhost rocketmq]# bin/mqshutdown broker

[root@localhost rocketmq]# bin/mqshutdown namesrv

7.2.2 RocketMQ的架构及概念

 

Broker(邮递员)

Broker是RocketMQ的核心,负责消息的接收,存储,投递等功能

NameServer(邮局)

消息队列的协调者,Broker向它注册路由信息,同时Producer和Consumer向其获取路由信息

Producer(寄件人)

消息的生产者,需要从NameServer获取Broker信息,然后与Broker建立连接,向Broker发送消 息

Consumer(收件人)

消息的消费者,需要从NameServer获取Broker信息,然后与Broker建立连接,从Broker获取消息

Topic(地区)

用来区分不同类型的消息,发送和接收消息前都需要先创建Topic,针对Topic来发送和接收消息

Message  Queue(邮件)

为了提高性能和吞吐量,引入了Message Queue,一个Topic可以设置一个或多个Message Queue,

这样消息就可以并行往各个Message Queue发送消息,消费者也可以并行的从多个 Message Queue读取消息

Message

Message  是消息的载体。

Producer Group

生产者组,简单来说就是多个发送同一类消息的生产者称之为一个生产者组。

Consumer Group

消费者组,消费同一类消息的多个 consumer 实例组成一个消费者组。

Producer Group

生产者组,简单来说就是多个发送同一类消息的生产者称之为一个生产者组。

Consumer Group

消费者组,消费同一类消息的多个 consumer 实例组成一个消费者组。

7.2.3 RocketMQ控制台安装

1 下载

# 在git上下载下面的工程rocketmq-console-1.0.0
https://github.com/apache/rocketmq-externals/releases

2 修改配置文件

# 修改配置文件 rocketmq-console\src\main\resources\application.properties
server.port=8866 #项目启动后的端口号 
rocketmq.config.namesrvAddr=192.168.200.20:9876 #nameserv的地址,注意防火墙要开启 9876端口

3 打成jar包,并启动

进入目录

# 进入控制台项目,将工程打成jar包
mvn clean package -Dmaven.test.skip=true
# 启动控制台
java -jar target/rocketmq-console-ng-1.0.0.jar

4 访问控制台

http://localhost:8866/

7.3 消息发送和接收演示

接下来我们使用Java代码来演示消息的发送和接收

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.2</version>
</dependency>

7.3.1 发送消息

消息发送步骤:

  1. 创建消息生产者, 指定生产者所属的组名
  2. 指定Nameserver地址
  3. 启动生产者
  4. 创建消息对象,指定主题、标签和消息体
  5. 发送消息
  6. 关闭生产者
public class RocketMQReceiveTest {
    public static void main(String[] args) throws Exception {
        //1. 创建消息消费者, 指定消费者所属的组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myconsumergroup");
        //2. 指定Nameserver地址
        consumer.setNamesrvAddr("192.168.200.20:9876");
        //3. 指定消费者订阅的主题和标签
        consumer.subscribe("myTopic", "*");
        //4. 设置回调函数,编写处理消息的方法
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {
                System.out.println("Receive New Messages: " + msgs);
                //返回消费状态
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //5. 启动消息消费者
        consumer.start();
        System.out.println("Consumer Started.");
    }
}

7.3.2 接收消息

消息接收步骤:

  1. 创建消息消费者, 指定消费者所属的组名
  2. 指定Nameserver地址
  3. 指定消费者订阅的主题和标签
  4. 设置回调函数,编写处理消息的方法
  5. 启动消息消费者
package com.shop.order.test;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class RocketMQSendTest {
    public static void main(String[] args) throws Exception {
        //1. 创建消息生产者, 指定生产者所属的组名
        DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");
        //2. 指定Nameserver地址
        producer.setNamesrvAddr("192.168.200.20:9876");
        //3. 启动生产者
        producer.start();
        //4. 创建消息对象,指定主题、标签和消息体
        Message msg = new Message("myTopic", "myTag", ("RocketMQ Message").getBytes());
        //5. 发送消息
        SendResult sendResult = producer.send(msg,10000);
        System.out.println(sendResult);
        //6. 关闭生产者
        producer.shutdown();
    }
}

注意:RocketMQ连接报错RemotingConnectException: connect to <172.17.0.1:10:109011>解决方法:

1. 进入RocketMQ的安装目录下的conf目录

cd /usr/local/rocketmq/conf

2. vi broker.conf

3. 在后面追加两行:

namesrvAddr=127.0.0.1:9876
brokerIP1=192.168.200.20#大写的IP一定要注意!!!修改成自己的IP地址

4. 然后重启nameserver和broker 一定要先启动namesrv,因为消息服务器是注册到命名服务器上的,不先启动命名服务器怎么注册

nohup sh mqnamesrv

5.再启动消息服务器,同时指定刚刚修改过的conf文件,不然还是会读取原本默认的阿里外网IP,还是会报错。

nohup sh mqbroker -n localhost:9876 -c ../conf/broker.conf

注:-n 指定的是注册到哪个消息服务器上 ;  -c 指定的是刚刚修改的conf配置文件.

 

7.4 案例

接下来我们模拟一种场景:     下单成功之后,向下单用户发送短信。设计图如下:

7.4.1 订单微服务发送消息

1 在 shop-order 中添加rocketmq的依赖

<!--rocketmq-->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.2</version>
</dependency>
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.4.0</version>
</dependency>

2 添加配置

rocketmq:
    name-server: 192.168.200.20:9876	#rocketMQ服务的地址 
    producer:
        group: shop-order # 生产者组

3.OrderController 添加如下代码:

package com.shop.order.controller;
import com.alibaba.fastjson.JSON;
import com.shop.common.entity.Order;
import com.shop.common.entity.Product;
import com.shop.order.service.OrderService;
import com.shop.order.service.ProductService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
@RestController
@Slf4j
public class OrderController {
    @Autowired
    private OrderService orderService;
    @Autowired
    private ProductService productService;
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @GetMapping("/order/prod/{pid}")//基于Feign实现服务调用
    public Order order(@PathVariable("pid") Integer pid) {
        log.info(">>客户下单,这时候要调用商品微服务查询商品信息");
        //通过fegin调用商品微服务
        Product product = productService.findByPid(pid);
        if (product.getPid() == -1) {
            Order order = new Order();
            order.setPname("下单失败");
            return order;
        }
        log.info(">>商品信息,查询结果:" + JSON.toJSONString(product));
        Order order = new Order();
        order.setUid(1);
        order.setUsername("测试用户");
        order.setPid(product.getPid());
        order.setPname(product.getPname());
        order.setPprice(product.getPprice());
        order.setNumber(1);
        orderService.save(order);

        //下单成功之后,将消息放到mq中
        rocketMQTemplate.convertAndSend("order-topic",  order);
        return order;
    }
}

7.4.2 用户微服务订阅消息

1 修改 shop-user 模块配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>shop-parent</artifactId>
        <groupId>com.shop</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>shop-user</artifactId>

    <dependencies>
        <dependency>
            <groupId>com.shop</groupId>
            <artifactId>shop-common</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.1.3.RELEASE</version>
        </dependency>
        <!--nacos客户端-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        </dependency>
        <!--rocketmq-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.4.0</version>
        </dependency>
    </dependencies>
</project>

2 修改主类

package com.shop;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
@SpringBootApplication
@EnableDiscoveryClient
public class UserApplication {
    public static void main(String[] args) {
        SpringApplication.run(UserApplication.class,args);
    }
}

3 修改配置文件

server:
  port: 8071
spring:
  application:
    name: service-user
  zipkin:
    base-url: http://127.0.0.1:9411/    #zipkin server的请求地址
    discoveryClientEnabled: false #让nacos把它当成一个URL,而不要当做服务
  sleuth:
    sampler:
      probability: 1.0    #采样的百分比
  datasource:
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://localhost:3306/shop?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=true
    username: root
    password: root
  jpa:
    properties:
      hibernate:
        hbm2ddl:
          auto: update
        dialect: org.hibernate.dialect.MySQL5InnoDBDialect
  cloud:
    nacos:
      discovery:
        server-addr: 127.0.0.1:8848
rocketmq:
  name-server: 192.168.200.20:9876

4 编写消息接收服务

package com.shop.service.impl;

import com.alibaba.fastjson.JSON;
import com.shop.common.entity.Order;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Slf4j
@Service
@RocketMQMessageListener(consumerGroup = "shop-user", topic = "order-topic")
public class SmsService implements RocketMQListener<Order> {
    @Override
    public void onMessage(Order order) {
        log.info("收到一个订单信息{},接下来发送短信", JSON.toJSONString(order));
    }
}

5   启动服务,执行下单操作,可以看到后台输出日志

7.5 发送不同类型的消息

7.5.1 普通消息

RocketMQ提供三种方式来发送普通消息:可靠同步发送、可靠异步发送和单向发送。

可靠同步发送

同步发送是指消息发送方发出数据后,会在收到接收方发回响应之后才发下一个数据包的通讯方 式。

此种方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等。

可靠异步发送

异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式。发送 方通过回调接口接收服务器响应,并对响应结果进行处理。

异步发送一般用于链路耗时较长,对 RT 响应时间较为敏感的业务场景,例如用户视频上传后通知 启动转码服务,转码完成后通知推送转码结果等。

单向发送

单向发送是指发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不 等待应答。

适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。

在shop-order 项目加入一下依赖:

<!--依赖-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
</dependency>

编写测试用例代码如下。分别测试查看效果:

package com.shop.order.test;
import com.shop.order.OrderApplication;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
//测试
@RunWith(SpringRunner.class)
@SpringBootTest(classes = OrderApplication.class)
public class MessageTypeTest {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    //同步消息
    @Test
    public void testSyncSend() {
        //参数一: topic, 如果想添加tag	可以使用"topic:tag"的写法
        //参数二: 消息内容
        SendResult sendResult =
                rocketMQTemplate.syncSend("test-topic-1", "这是一条同步消息");
        System.out.println(sendResult);
    }
    //异步消息
    @Test
    public void testAsyncSend() throws InterruptedException {
        //参数一: topic, 如果想添加tag	可以使用"topic:tag"的写法
        //参数二: 消息内容
        //参数三: 回调函数, 处理返回结果
        rocketMQTemplate.asyncSend("test-topic-1", "这是一条异步消息", new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println(sendResult);
            }

            @Override
            public void onException(Throwable throwable) {
                System.out.println(throwable);
            }
        });
        //让线程不要终止
        Thread.sleep(30000000);
    }
    //单向消息
    @Test
    public void testOneWay() {
        rocketMQTemplate.sendOneWay("test-topic-1",  "这是一条单向消息");
    }
    //同步顺序消息[异步顺序	单向顺序写法类似]
    public void testSyncSendOrderly() {
        //第三个参数用于队列的选择
        rocketMQTemplate.syncSendOrderly("test-topic-1", "这是一条异步顺序消息", "xxxx");
    }
}

三种发送方式的对比

7.5.2 顺序消息

顺序消息是消息队列提供的一种严格按照顺序来发布和消费的消息类型。

//同步顺序消息[异步顺序	单向顺序写法类似] 
public void testSyncSendOrderly() {
//第三个参数用于队列的选择
    rocketMQTemplate.syncSendOrderly("test-topic-1", "这是一条异步顺序消息", "xxxx");
}

7.5.3 事务消息

RocketMQ提供了事务消息,通过事务消息就能达到分布式事务的最终一致。

事务消息交互流程:

两个概念: 

半事务消息:暂不能投递的消息,发送方已经成功地将消息发送到了RocketMQ服务端,但是服务

端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的 消息即半事务消息。 消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失, RocketMQ服务端通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该 消息的最终状态(Commit  或是 Rollback),该询问过程即消息回查。

事务消息发送步骤:
  1. 发送方将半事务消息发送至RocketMQ服务端。
  2. RocketMQ服务端将消息持久化之后,向发送方返回Ack确认消息已经发送成功,此时消息为半事 务消息。
  3. 发送方开始执行本地事务逻辑。
  4. 发送方根据本地事务执行结果向服务端提交二次确认(Commit 或是 Rollback),服务端收到 Commit 状态则将半事务消息标记为可投递,订阅方最终将收到该消息;服务端收到 Rollback 状 态则删除半事务消息,订阅方将不会接受该消息。

事务消息回查步骤:

1. 在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查。 2. 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。 3. 发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行操作。


1.在shop-common 项目新增class类:

package com.shop.common.entity;
import lombok.Data;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;
import java.util.Date;
//事物日志
@Entity
@Table(name = "shop_txlog")
@Data
public class TxLog {
    @Id
    private String txLogId;
    private String content;
    private Date date;
}

2在shop-order项目下新增TxLogDao如下代码:

package com.shop.order.dao;
import com.shop.common.entity.TxLog;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;
@Repository
public interface TxLogDao extends JpaRepository<TxLog,String> {
}

3.在shop-order项目下新增OrderServiceImpl4

package com.shop.order.service.impl;
import com.shop.common.entity.Order;
import com.shop.common.entity.TxLog;
import com.shop.order.dao.OrderDao;
import com.shop.order.dao.TxLogDao;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import javax.transaction.Transactional;
import java.util.Date;
import java.util.UUID;

@Service
@Slf4j
public class OrderServiceImpl4 {
    @Autowired
    private OrderDao orderDao;
    @Autowired
    private TxLogDao txLogDao;
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    public void createOrderBefore(Order order){
        String txId = UUID.randomUUID().toString();
        //发送半事务消息
        rocketMQTemplate.sendMessageInTransaction(
        "tx_producer_group", "tx_topic",
                MessageBuilder.withPayload(order).setHeader("txId", txId).build(),order);

    }
    //本地事物
    @Transactional
    public void createOrder(String txId, Order order) {
        //本地事物代码
        orderDao.save(order);
        //记录日志到数据库,回查使用
        TxLog txLog = new TxLog();
        txLog.setTxLogId(txId);
        txLog.setContent("事物测试");
        txLog.setDate(new Date());
        txLogDao.save(txLog);
    }
}

4.在shop-order项目下新增OrderServiceImpl4Listener 

package com.shop.order.service.impl;
import com.shop.common.entity.Order;
import com.shop.common.entity.TxLog;
import com.shop.order.dao.TxLogDao;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
@RocketMQTransactionListener(txProducerGroup = "tx_producer_group")
public class OrderServiceImpl4Listener implements RocketMQLocalTransactionListener {
    @Autowired
    private TxLogDao txLogDao;
    @Autowired
    private OrderServiceImpl4 orderServiceImpl4;
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
        try {
            //本地事物
            orderServiceImpl4.createOrder((String) message.getHeaders().get("txId"), (Order) arg);
            return  RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            return  RocketMQLocalTransactionState.ROLLBACK;
        }
    }
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        //查询日志记录
        TxLog txLog = txLogDao.findById((String) message.getHeaders().get("txId")).get();
        if(txLog != null){
            return  RocketMQLocalTransactionState.COMMIT;
        }else{
            return  RocketMQLocalTransactionState.ROLLBACK;
        }
    }
}

5.在shop-order项目下新增OrderController4

package com.shop.order.controller;

import com.alibaba.fastjson.JSON;
import com.shop.common.entity.Order;
import com.shop.common.entity.Product;
import com.shop.order.service.ProductService;
import com.shop.order.service.impl.OrderServiceImpl4;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

@RestController
@Slf4j
public class OrderController4 {
    @Autowired
    private OrderServiceImpl4 orderService;
    @Autowired
    private ProductService productService;
    @GetMapping("/order/prod/{pid}")//基于Feign实现服务调用
    public Order order(@PathVariable("pid") Integer pid) {
        log.info(">>客户下单,这时候要调用商品微服务查询商品信息");
        //通过fegin调用商品微服务
        Product product = productService.findByPid(pid);
        if (product.getPid() == -1) {
            Order order = new Order();
            order.setPname("下单失败");
            return order;
        }
        log.info(">>商品信息,查询结果:" + JSON.toJSONString(product));
        Order order = new Order();
        order.setUid(1);
        order.setUsername("测试用户");
        order.setPid(product.getPid());
        order.setPname(product.getPname());
        order.setPprice(product.getPprice());
        order.setNumber(1);
        orderService.createOrderBefore(order);

        //下单成功之后,将消息放到mq中
        //rocketMQTemplate.convertAndSend("order-topic",  order);
        return order;
    }
}

6.重启服务 ProductService 和OrderService 在浏览器输入地址 http://localhost:8091/order/prod/2 查看 效果:
 

 

标签:shop,--,rocketmq,消息,第七章,org,import,order,Rocketmq
From: https://www.cnblogs.com/eagle888/p/17644006.html

相关文章

  • 创业公司如何选择管理系统?
    对于创业公司,既要控制成本,又需要简化管理,更需要一套高效且多功能整合的管理软件,来对企业运行中的数据、流程和信息沟通进行综合性管理。为应付随时可能出现的需求变化和扩展,软件的自定义能力和扩展性显得尤为重要。选择适合创业公司的管理软件可以帮助提高工作效率、组织协调和......
  • 某金融机构测试开发笔试题
    一、Linux笔试题1、什么是符号链接?如何创建符号链接?2、环境变量是什么?如何理解进程与环境变量的关系?3、如何查看文件的权限?文件权限信息的具体含义是什么?4、如何查看一个进程是否存在?如何杀掉一个进程?5、如何将进程放到后台执行并且重定向标准输出与错误输出?二、数据库数......
  • 如何快速在 Kubernetes 集群中新建用户
    如何快速在Kubernetes集群中新建用户Se7en 奇妙的Linux世界 2023-08-1911:59 发表于重庆收录于合集#Kubernetes274个#云原生261个#Docker197个#程序员421个公众号关注 「奇妙的Linux世界」设为「星标」,每天带你玩转Linux! Kubernetes中的用户K8S中......
  • [AGC031B] Reversi
    题目大意有一个长度为\(n\)的数列\(a\),你需要对其进行\(q\)次操作,操作有两种类型,按如下格式给出:1xy:将\(a_x\)变成\(y\);2lr:询问位置在\(\left[l,r\right]\)之间的不下降子串有多少个。思路考虑DP。考虑第\(i\)个石头,如果第\(i\)个石头不修改,方案数仍......
  • MySql Workbench 迁移工具 migration 提示缺少pyodbc 2.1.8 的解决方法
    想把公司的数据库转到MySQL,所以想装个MySQL测试,发现新版的MySQL(8.0.34)默认安装还是有不少问题,##一、譬如表、字段大小写的问题:lower_case_table_names=0--表名存储为给定的大小和比较是区分大小写的(linux默认)lower_case_table_names=1--表名存储在磁......
  • Jmeter中的ramp-up时间指的是什么?请举说明
    在JMeter中,ramp-up时间指的是测试中逐渐增加并发用户数的时间。它表示从测试开始到达最大并发用户数所需的时间。举例说明:假设我们需要对一个网站进行性能测试,设置最大并发用户数为100,并且希望在30秒内逐渐增加并发用户数。那么,ramp-up时间就是30秒。在测试开始时,JMeter会逐渐......
  • Unit 2
    Unit2MistaketoSuccessAFamousQuoteSuccessisgoingfromfailuretofailurewithoutlosingyourenthusiasm.——WinstonChurchill成功就是经历一次一次失败后,热情依旧。......
  • 数据倾斜问题
    数据倾斜的简介数据倾斜即单个节点任务处理的数据量远高于同类型任务处理的数据量,成为整个作业的性能瓶颈。本文将从产生数据倾斜的原因、不同计算引擎下的解决方法讨论。数据倾斜的场景和对应的解决方案Suffle过程数据倾斜和Suffle过程是密不可分的,Suffle过程在MR和Spark中......
  • IfcCurrencyRelationship
    IfcCurrencyRelationship实体定义IfcCurrentRelationship定义在特定时间应用于两种指定货币之间并由特定来源发布的汇率。 IfcCurrentRelationship用于可能需要将一种货币的IfcCostValue引用到另一种货币中的IfcHostValue的情况。它考虑到货币汇率可能会因要求记录所用货币汇......
  • Unit 6
    Unit6TheValueofMoneyTextATeachingChildrentoSpendPocketMoneyWiselyAFAMOUSQUOTENeverspendyourmoneybeforeyouhaveit.——ThomasJefferson钱财未到手,绝不提前花。......