转载自:https://blog.csdn.net/mikewuhao/article/details/106666109
=============
Demo实现的功能
项目启动生产者和消费者2个服务后, 生产者端执行用户查询, 从数据库查出用户数据后, 发送给消息中间件rocketMq, 消费者监听到mq消息后获取到用户数据.
github源码地址
https://github.com/huangdan92/springBoot-mq-demo
搭建详细步骤
1. 准备工作
提前把rocketMq环境搭建和启动好, rocketMq可视化工具安装启动好.
(本人mac环境部署rocketMq,参考https://www.jianshu.com/p/a759e8ea6ac1)
(windows环境部署rocketMq,参考https://www.cnblogs.com/darendu/p/12036380.html)
注意: 按照官方启动rocketMq ,可能会报运行环境内存不足,建议修改内存, 参考: https://blog.csdn.net/u014803081/article/details/90705792?utm_medium=distribute.pc_relevant.none-task-blog-baidujs-6
2. 项目结构
在idea开发工具里面先建普通空的project, 命名为springBoot-mq-demo, 再新建2个maven类型的module, 分别命名 consumer(消费者服务), provider(生产者服务).
3. provider生产者服务创建
3.1 provider服务目录结构
3.2 provider服务的pom文件
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.wuhao.demo</groupId> <artifactId>provider</artifactId> <version>1.0-SNAPSHOT</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.7.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- spring-boot整合mybatis --> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>1.1.1</version> </dependency> <!-- mysql驱动 --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.6</version> </dependency> <!--rocketMq--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-common</artifactId> <version>4.4.0</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.4.0</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <!--lombok--> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.0</version> <scope>provided</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
3.3 provider服务的application.properties
server.port=8082 logging.level.org.springframework=DEBUG #数据库 spring.datasource.driver-class-name= com.mysql.jdbc.Driver spring.datasource.url = jdbc:mysql://127.0.0.1:3306/boot?useUnicode=true&characterEncoding=utf-8 spring.datasource.username = root spring.datasource.password = root #mybatis mybatis.type-aliases-package=com.wuhao.domain mybatis.mapper-locations=classpath:mapper/*.xml #Rocketmq producer rocketmq.producer.groupName=ProducerGroup rocketmq.producer.namesrvAddr=127.0.0.1:9876 rocketmq.producer.instanceName=ProducerGroup rocketmq.producer.topic=topic2020 rocketmq.producer.tag=test rocketmq.producer.maxMessageSize=131072 rocketmq.producer.sendMsgTimeout=10000
3.4 provider服务的启动类, ProviderApplication
package com.wuhao; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class ProviderApplication { public static void main(String[] args) { SpringApplication.run(ProviderApplication.class, args); } }
3.5 provider服务的controller, MqController
package com.wuhao.controller; import com.alibaba.fastjson.JSONObject; import com.wuhao.domain.User; import com.wuhao.mq.RocketMQProducer; import com.wuhao.service.UserService; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.text.SimpleDateFormat; import java.util.Date; /** * @author wuhao * @Title: MqController * @Description: Mq测试 controller * @date 2020/6/9 17:38 */ @RestController @Slf4j public class MqController { @Autowired @Qualifier("rocketMQProducer") RocketMQProducer rocketMQProducer; @Autowired private UserService userService; @GetMapping("/testSend") public void testSend() { DefaultMQProducer producer = rocketMQProducer.getRocketMQProducer(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); User user = userService.queryUserById(1L); String body = "hi RocketMQ, now is " + sdf.format(new Date()) + "---"+ JSONObject.toJSONString(user); Message message = new Message("topic2020", "test", body.getBytes()); try { producer.send(message); } catch (Exception e) { e.printStackTrace(); } } }
3.6 provider服务的mq配置类, RocketMQProducer
package com.wuhao.mq; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author wuhao * @Title: RocketMQProducer * @Description: 消息生产者 * @date 2020/6/9 17:31 */ @Configuration @Slf4j public class RocketMQProducer { @Value("${rocketmq.producer.groupName}") private String groupName; @Value("${rocketmq.producer.namesrvAddr}") private String nameserAddr; @Value("${rocketmq.producer.instanceName}") private String instanceName; @Value("${rocketmq.producer.maxMessageSize}") private int maxMessageSize; @Value("${rocketmq.producer.sendMsgTimeout}") private int sendMsgTimeout; @Bean(initMethod = "start", destroyMethod = "shutdown") public DefaultMQProducer getRocketMQProducer() { DefaultMQProducer producer = new DefaultMQProducer(groupName); producer.setNamesrvAddr(nameserAddr); producer.setInstanceName(instanceName); producer.setMaxMessageSize(maxMessageSize); producer.setSendMsgTimeout(sendMsgTimeout); producer.setVipChannelEnabled(false); log.info("================>生产者创建完成,ProducerGroupName{}<================", groupName); return producer; } }
3.7 provider服务的实体类, User
package com.wuhao.domain; import lombok.AllArgsConstructor; import lombok.NoArgsConstructor; import javax.persistence.*; /** * @Description: User实体 * @CreateDate: 2020-02-04 19:25 * @Author: wuhao */ @Entity @NoArgsConstructor @AllArgsConstructor public class User { @Id @GeneratedValue private Long id; @Column(name = "username") private String username; @Column(name = "birthday") private String birthday; @Column(name = "sex") private String sex; @Column(name = "address") private String address; public Long getId() { return id; } public void setId(Long id) { this.id = id; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } public String getBirthday() { return birthday; } public void setBirthday(String birthday) { this.birthday = birthday; } public String getSex() { return sex; } public void setSex(String sex) { this.sex = sex; } public String getAddress() { return address; } public void setAddress(String address) { this.address = address; } @Override public String toString() { return "User{" + "id=" + id + ", username='" + username + '\'' + ", birthday='" + birthday + '\'' + ", sex='" + sex + '\'' + ", address='" + address + '\'' + '}'; } }
3.8 provider服务的接口
package com.wuhao.service; import com.wuhao.domain.User; /** * @Description: User的Service * @CreateDate: 2020-06-09 09:35 * @Author: wuhao */ public interface UserService { User queryUserById(Long id); int addUser(User user); int modifyUser(User user); int deleteUserById(Long id); }
3.8 provider服务的接口实现类
package com.wuhao.service.impl; import com.wuhao.domain.User; import com.wuhao.dao.UserMapper; import com.wuhao.service.UserService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** * @Description:User的Service实现类 * @CreateDate: 2020-02-05 09:36 * @Author: wuhao */ @Service public class UserServiceImpl implements UserService { @Autowired private UserMapper userMapper; @Override public User queryUserById(Long id) { return userMapper.queryUserById(id); } @Override public int addUser(User user) { return userMapper.addUser(user); } @Override public int modifyUser(User user) { return userMapper.modifyUser(user); } @Override public int deleteUserById(Long id) { return userMapper.deleteUserById(id); } }
3.9 provider服务的Dao层, UserMapper
package com.wuhao.dao; import com.wuhao.domain.User; import org.apache.ibatis.annotations.Mapper; import org.springframework.stereotype.Repository; /** * @Description: user的mapper * @CreateDate: 2020-06-04 19:38 * @Author: wuhao */ @Mapper @Repository public interface UserMapper { User queryUserById(Long id); int addUser(User user); int modifyUser(User user); int deleteUserById(Long id); }
3.10 provider服务sql的xml文件, userMapper.xml
<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <mapper namespace="com.wuhao.dao.UserMapper"> <!--按id查询用户--> <select id="queryUserById" resultType="com.wuhao.domain.User"> select * from `user` where id = #{id} </select> <!--用户更新--> <update id="modifyUser" parameterType="com.wuhao.domain.User" > update `user` set username=#{username},birthday=#{birthday},sex=#{sex}, address=#{address} where id=#{id} </update> <!--删除用户--> <delete id="deleteUserById" parameterType="long"> delete from `user` where id=#{id} </delete> <!--用户添加--> <insert id="addUser" parameterType="com.wuhao.domain.User"> insert into `user` (username,birthday,sex,address) values(#{username},#{birthday},#{sex},#{address}) </insert> </mapper>
4. consumer消费者服务创建
4.1 consumer服务目录结构
4.2 consumer服务的pom文件
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.wuhao.demo</groupId> <artifactId>consumer</artifactId> <version>1.0-SNAPSHOT</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.7.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!--rocketMq--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-common</artifactId> <version>4.4.0</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.4.0</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <!--lombok--> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.0</version> <scope>provided</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
4.3 consumer服务的application.properties
server.port=8083 logging.level.org.springframework=DEBUG #Rocketmq consumer rocketmq.consumer.namesrvAddr=127.0.0.1:9876 rocketmq.consumer.groupName=ConsumerGroup rocketmq.consumer.topic=topic2020 rocketmq.consumer.tag=test rocketmq.consumer.consumeThreadMin=20 rocketmq.consumer.consumeThreadMax=64
4.4 consumer服务的启动类
package com.wuhao; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; @SpringBootApplication(exclude = DataSourceAutoConfiguration.class) public class ConsumerApplication { public static void main(String[] args) { SpringApplication.run(ConsumerApplication.class, args); } }
4.5 consumer服务的实体类, User
package com.wuhao.domain; import lombok.AllArgsConstructor; import lombok.NoArgsConstructor; import javax.persistence.Column; import javax.persistence.Entity; import javax.persistence.GeneratedValue; import javax.persistence.Id; /** * @Description: User实体 * @CreateDate: 2020-02-04 19:25 * @Author: wuhao */ @Entity @NoArgsConstructor @AllArgsConstructor public class User { @Id @GeneratedValue private Long id; @Column(name = "username") private String username; @Column(name = "birthday") private String birthday; @Column(name = "sex") private String sex; @Column(name = "address") private String address; public Long getId() { return id; } public void setId(Long id) { this.id = id; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } public String getBirthday() { return birthday; } public void setBirthday(String birthday) { this.birthday = birthday; } public String getSex() { return sex; } public void setSex(String sex) { this.sex = sex; } public String getAddress() { return address; } public void setAddress(String address) { this.address = address; } @Override public String toString() { return "User{" + "id=" + id + ", username='" + username + '\'' + ", birthday='" + birthday + '\'' + ", sex='" + sex + '\'' + ", address='" + address + '\'' + '}'; } }
4.6 consumer服务的监听器, MessageListen
package com.wuhao.mq; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.List; /** * @author wuhao * @Title: MessageListen * @Description: 消息监听类 * @date 2020/4/17 17:28 */ @Component public class MessageListen implements MessageListenerConcurrently { @Autowired private MessageProcessor messageProcessor; @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { MessageExt ext = list.get(0); boolean result = messageProcessor.handle(ext); if (!result) { return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }
4.7 consumer服务的mq消息处理接口
package com.wuhao.mq; import org.apache.rocketmq.common.message.MessageExt; /** * @author wuhao * @Title: MessageProcessor * @Description: mq消息处理接口 * @date 2020/4/17 17:24 */ public interface MessageProcessor { boolean handle(MessageExt messageExt); }
4.8 consumer服务的消息处理类
package com.wuhao.mq; import org.apache.rocketmq.common.message.MessageExt; import org.springframework.stereotype.Service; /** * @author wuhao * @Title: MessageProcessorImpl * @Description: 消息处理类 * @date 2020/4/17 17:27 */ @Service public class MessageProcessorImpl implements MessageProcessor { @Override public boolean handle(MessageExt messageExt) { // 收到的body(消息体),字节类型,需转为String String result = new String(messageExt.getBody()); System.out.println("监听到了消息,消息为:"+ result); return true; } }
4.9 consumer服务的mq消费者配置类
package com.wuhao.mq; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.exception.MQClientException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author wuhao * @Title: RocketMQConsumer * @Description: Mq消费者 * @date 2020/4/17 17:36 */ @Configuration @Slf4j public class RocketMQConsumer { @Autowired private MessageListen messageListen; @Value("${rocketmq.consumer.namesrvAddr}") private String namesrvAddr; @Value("${rocketmq.consumer.groupName}") private String groupName; @Value("${rocketmq.consumer.topic}") private String topic; @Value("${rocketmq.consumer.tag}") private String tag; @Value("${rocketmq.consumer.consumeThreadMin}") private int consumeThreadMin; @Value("${rocketmq.consumer.consumeThreadMax}") private int consumeThreadMax; @Bean(initMethod = "start", destroyMethod = "shutdown") public DefaultMQPushConsumer getRocketMQConsumer() { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName); consumer.setNamesrvAddr(namesrvAddr); consumer.setConsumeThreadMin(consumeThreadMin); consumer.setConsumeThreadMax(consumeThreadMax); consumer.setVipChannelEnabled(false); // 我们自己实现的监听类 consumer.registerMessageListener(messageListen); try { consumer.subscribe(topic, tag); log.info("================>消费者创建完成,ConsumerGroupName{}<================", groupName); log.info("============>消费者监听开始,groupName:{},topic:{}<============", groupName, topic); } catch (MQClientException e) { log.error("消费者启动失败"); e.printStackTrace(); } return consumer; } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
演示效果
编写完成后, 按顺序启动provider服务, consumer服务
1 浏览器上执行http://localhost:8082/testSend, 开始发送消息
2 查看rocketMq可视化工具, mq里已经有消息了.
3查看consumer服务的控制台日志, 已经有消息输出了
遇到过的问题
安装启动rocketMq时 ,windows环境下一直报运行环境内存不足,修改内存也不起作用, 不知道是够是个人电脑问题, 后来换成mac环境, 成功安装和启动了rocketMq.
————————————————
版权声明:本文为CSDN博主「吴free」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/mikewuhao/article/details/106666109