首页 > 其他分享 >SpringBoot+RocketMq+Mybatis项目整合demo

SpringBoot+RocketMq+Mybatis项目整合demo

时间:2023-03-28 17:03:00浏览次数:47  
标签:SpringBoot demo import Mybatis org consumer public rocketmq String

转载自:https://blog.csdn.net/mikewuhao/article/details/106666109 

=============

 

Demo实现的功能
项目启动生产者和消费者2个服务后, 生产者端执行用户查询, 从数据库查出用户数据后, 发送给消息中间件rocketMq, 消费者监听到mq消息后获取到用户数据.

github源码地址
https://github.com/huangdan92/springBoot-mq-demo

搭建详细步骤
1. 准备工作
提前把rocketMq环境搭建和启动好, rocketMq可视化工具安装启动好.
(本人mac环境部署rocketMq,参考https://www.jianshu.com/p/a759e8ea6ac1)
(windows环境部署rocketMq,参考https://www.cnblogs.com/darendu/p/12036380.html)

注意: 按照官方启动rocketMq ,可能会报运行环境内存不足,建议修改内存, 参考: https://blog.csdn.net/u014803081/article/details/90705792?utm_medium=distribute.pc_relevant.none-task-blog-baidujs-6

2. 项目结构

在idea开发工具里面先建普通空的project, 命名为springBoot-mq-demo, 再新建2个maven类型的module, 分别命名 consumer(消费者服务), provider(生产者服务).
3. provider生产者服务创建
3.1 provider服务目录结构

3.2 provider服务的pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.wuhao.demo</groupId>
<artifactId>provider</artifactId>
<version>1.0-SNAPSHOT</version>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.7.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

<!-- spring-boot整合mybatis -->
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>1.1.1</version>
</dependency>

<!-- mysql驱动 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.6</version>
</dependency>

<!--rocketMq-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-common</artifactId>
<version>4.4.0</version>
</dependency>

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>

<!--lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.0</version>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>

3.3 provider服务的application.properties

server.port=8082

logging.level.org.springframework=DEBUG

#数据库
spring.datasource.driver-class-name= com.mysql.jdbc.Driver
spring.datasource.url = jdbc:mysql://127.0.0.1:3306/boot?useUnicode=true&characterEncoding=utf-8
spring.datasource.username = root
spring.datasource.password = root

#mybatis
mybatis.type-aliases-package=com.wuhao.domain
mybatis.mapper-locations=classpath:mapper/*.xml

#Rocketmq producer
rocketmq.producer.groupName=ProducerGroup
rocketmq.producer.namesrvAddr=127.0.0.1:9876
rocketmq.producer.instanceName=ProducerGroup
rocketmq.producer.topic=topic2020
rocketmq.producer.tag=test
rocketmq.producer.maxMessageSize=131072
rocketmq.producer.sendMsgTimeout=10000

3.4 provider服务的启动类, ProviderApplication

package com.wuhao;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ProviderApplication {

public static void main(String[] args) {
SpringApplication.run(ProviderApplication.class, args);
}

}

3.5 provider服务的controller, MqController

package com.wuhao.controller;

import com.alibaba.fastjson.JSONObject;
import com.wuhao.domain.User;
import com.wuhao.mq.RocketMQProducer;
import com.wuhao.service.UserService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.text.SimpleDateFormat;
import java.util.Date;
/**
* @author wuhao
* @Title: MqController
* @Description: Mq测试 controller
* @date 2020/6/9 17:38
*/
@RestController
@Slf4j
public class MqController {
@Autowired
@Qualifier("rocketMQProducer")
RocketMQProducer rocketMQProducer;

@Autowired
private UserService userService;

@GetMapping("/testSend")
public void testSend() {

DefaultMQProducer producer = rocketMQProducer.getRocketMQProducer();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
User user = userService.queryUserById(1L);
String body = "hi RocketMQ, now is " + sdf.format(new Date()) + "---"+ JSONObject.toJSONString(user);
Message message = new Message("topic2020", "test", body.getBytes());
try {
producer.send(message);
} catch (Exception e) {
e.printStackTrace();
}
}

}

3.6 provider服务的mq配置类, RocketMQProducer

package com.wuhao.mq;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* @author wuhao
* @Title: RocketMQProducer
* @Description: 消息生产者
* @date 2020/6/9 17:31
*/
@Configuration
@Slf4j
public class RocketMQProducer {

@Value("${rocketmq.producer.groupName}")
private String groupName;

@Value("${rocketmq.producer.namesrvAddr}")
private String nameserAddr;

@Value("${rocketmq.producer.instanceName}")
private String instanceName;

@Value("${rocketmq.producer.maxMessageSize}")
private int maxMessageSize;

@Value("${rocketmq.producer.sendMsgTimeout}")
private int sendMsgTimeout;

@Bean(initMethod = "start", destroyMethod = "shutdown")
public DefaultMQProducer getRocketMQProducer() {
DefaultMQProducer producer = new DefaultMQProducer(groupName);
producer.setNamesrvAddr(nameserAddr);
producer.setInstanceName(instanceName);
producer.setMaxMessageSize(maxMessageSize);
producer.setSendMsgTimeout(sendMsgTimeout);
producer.setVipChannelEnabled(false);
log.info("================>生产者创建完成,ProducerGroupName{}<================", groupName);
return producer;
}

}

3.7 provider服务的实体类, User

package com.wuhao.domain;

import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;

import javax.persistence.*;

/**
* @Description: User实体
* @CreateDate: 2020-02-04 19:25
* @Author: wuhao
*/
@Entity
@NoArgsConstructor
@AllArgsConstructor
public class User {


@Id
@GeneratedValue
private Long id;

@Column(name = "username")
private String username;

@Column(name = "birthday")
private String birthday;

@Column(name = "sex")
private String sex;

@Column(name = "address")
private String address;

public Long getId() {
return id;
}

public void setId(Long id) {
this.id = id;
}

public String getUsername() {
return username;
}

public void setUsername(String username) {
this.username = username;
}

public String getBirthday() {
return birthday;
}

public void setBirthday(String birthday) {
this.birthday = birthday;
}

public String getSex() {
return sex;
}

public void setSex(String sex) {
this.sex = sex;
}

public String getAddress() {
return address;
}

public void setAddress(String address) {
this.address = address;
}

@Override
public String toString() {
return "User{" +
"id=" + id +
", username='" + username + '\'' +
", birthday='" + birthday + '\'' +
", sex='" + sex + '\'' +
", address='" + address + '\'' +
'}';
}

}

3.8 provider服务的接口

package com.wuhao.service;

import com.wuhao.domain.User;

/**
* @Description: User的Service
* @CreateDate: 2020-06-09 09:35
* @Author: wuhao
*/
public interface UserService {

User queryUserById(Long id);

int addUser(User user);

int modifyUser(User user);

int deleteUserById(Long id);

}

3.8 provider服务的接口实现类

package com.wuhao.service.impl;

import com.wuhao.domain.User;
import com.wuhao.dao.UserMapper;
import com.wuhao.service.UserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
* @Description:User的Service实现类
* @CreateDate: 2020-02-05 09:36
* @Author: wuhao
*/
@Service
public class UserServiceImpl implements UserService {

@Autowired
private UserMapper userMapper;

@Override
public User queryUserById(Long id) {
return userMapper.queryUserById(id);
}

@Override
public int addUser(User user) {
return userMapper.addUser(user);

}

@Override
public int modifyUser(User user) {
return userMapper.modifyUser(user);
}

@Override
public int deleteUserById(Long id) {
return userMapper.deleteUserById(id);
}

}

3.9 provider服务的Dao层, UserMapper

package com.wuhao.dao;

import com.wuhao.domain.User;
import org.apache.ibatis.annotations.Mapper;
import org.springframework.stereotype.Repository;

/**
* @Description: user的mapper
* @CreateDate: 2020-06-04 19:38
* @Author: wuhao
*/
@Mapper
@Repository
public interface UserMapper {

User queryUserById(Long id);

int addUser(User user);

int modifyUser(User user);

int deleteUserById(Long id);

}

3.10 provider服务sql的xml文件, userMapper.xml

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">

<mapper namespace="com.wuhao.dao.UserMapper">

<!--按id查询用户-->
<select id="queryUserById" resultType="com.wuhao.domain.User">
select * from `user` where id = #{id}
</select>

<!--用户更新-->
<update id="modifyUser" parameterType="com.wuhao.domain.User" >
update `user` set username=#{username},birthday=#{birthday},sex=#{sex}, address=#{address} where id=#{id}
</update>

<!--删除用户-->
<delete id="deleteUserById" parameterType="long">
delete from `user` where id=#{id}
</delete>

<!--用户添加-->
<insert id="addUser" parameterType="com.wuhao.domain.User">
insert into `user` (username,birthday,sex,address)
values(#{username},#{birthday},#{sex},#{address})
</insert>


</mapper>

4. consumer消费者服务创建
4.1 consumer服务目录结构

4.2 consumer服务的pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.wuhao.demo</groupId>
<artifactId>consumer</artifactId>
<version>1.0-SNAPSHOT</version>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.7.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

<!--rocketMq-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-common</artifactId>
<version>4.4.0</version>
</dependency>

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>

<!--lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.0</version>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>

4.3 consumer服务的application.properties

server.port=8083

logging.level.org.springframework=DEBUG

#Rocketmq consumer
rocketmq.consumer.namesrvAddr=127.0.0.1:9876
rocketmq.consumer.groupName=ConsumerGroup
rocketmq.consumer.topic=topic2020
rocketmq.consumer.tag=test
rocketmq.consumer.consumeThreadMin=20
rocketmq.consumer.consumeThreadMax=64

4.4 consumer服务的启动类

package com.wuhao;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;

@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
public class ConsumerApplication {

public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}

}

4.5 consumer服务的实体类, User

package com.wuhao.domain;

import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;

import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.Id;

/**
* @Description: User实体
* @CreateDate: 2020-02-04 19:25
* @Author: wuhao
*/
@Entity
@NoArgsConstructor
@AllArgsConstructor
public class User {


@Id
@GeneratedValue
private Long id;

@Column(name = "username")
private String username;

@Column(name = "birthday")
private String birthday;

@Column(name = "sex")
private String sex;

@Column(name = "address")
private String address;

public Long getId() {
return id;
}

public void setId(Long id) {
this.id = id;
}

public String getUsername() {
return username;
}

public void setUsername(String username) {
this.username = username;
}

public String getBirthday() {
return birthday;
}

public void setBirthday(String birthday) {
this.birthday = birthday;
}

public String getSex() {
return sex;
}

public void setSex(String sex) {
this.sex = sex;
}

public String getAddress() {
return address;
}

public void setAddress(String address) {
this.address = address;
}

@Override
public String toString() {
return "User{" +
"id=" + id +
", username='" + username + '\'' +
", birthday='" + birthday + '\'' +
", sex='" + sex + '\'' +
", address='" + address + '\'' +
'}';
}

}

4.6 consumer服务的监听器, MessageListen

package com.wuhao.mq;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.List;

/**
* @author wuhao
* @Title: MessageListen
* @Description: 消息监听类
* @date 2020/4/17 17:28
*/
@Component
public class MessageListen implements MessageListenerConcurrently {

@Autowired
private MessageProcessor messageProcessor;

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
MessageExt ext = list.get(0);
boolean result = messageProcessor.handle(ext);
if (!result) {
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}

4.7 consumer服务的mq消息处理接口

package com.wuhao.mq;

import org.apache.rocketmq.common.message.MessageExt;

/**
* @author wuhao
* @Title: MessageProcessor
* @Description: mq消息处理接口
* @date 2020/4/17 17:24
*/
public interface MessageProcessor {

boolean handle(MessageExt messageExt);
}

4.8 consumer服务的消息处理类

package com.wuhao.mq;

import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Service;

/**
* @author wuhao
* @Title: MessageProcessorImpl
* @Description: 消息处理类
* @date 2020/4/17 17:27
*/
@Service
public class MessageProcessorImpl implements MessageProcessor {
@Override
public boolean handle(MessageExt messageExt) {
// 收到的body(消息体),字节类型,需转为String
String result = new String(messageExt.getBody());
System.out.println("监听到了消息,消息为:"+ result);
return true;
}
}

4.9 consumer服务的mq消费者配置类

package com.wuhao.mq;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* @author wuhao
* @Title: RocketMQConsumer
* @Description: Mq消费者
* @date 2020/4/17 17:36
*/
@Configuration
@Slf4j
public class RocketMQConsumer {

@Autowired
private MessageListen messageListen;

@Value("${rocketmq.consumer.namesrvAddr}")
private String namesrvAddr;

@Value("${rocketmq.consumer.groupName}")
private String groupName;

@Value("${rocketmq.consumer.topic}")
private String topic;

@Value("${rocketmq.consumer.tag}")
private String tag;

@Value("${rocketmq.consumer.consumeThreadMin}")
private int consumeThreadMin;

@Value("${rocketmq.consumer.consumeThreadMax}")
private int consumeThreadMax;

@Bean(initMethod = "start", destroyMethod = "shutdown")
public DefaultMQPushConsumer getRocketMQConsumer() {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
consumer.setNamesrvAddr(namesrvAddr);
consumer.setConsumeThreadMin(consumeThreadMin);
consumer.setConsumeThreadMax(consumeThreadMax);
consumer.setVipChannelEnabled(false);
// 我们自己实现的监听类
consumer.registerMessageListener(messageListen);
try {
consumer.subscribe(topic, tag);
log.info("================>消费者创建完成,ConsumerGroupName{}<================", groupName);
log.info("============>消费者监听开始,groupName:{},topic:{}<============", groupName, topic);
} catch (MQClientException e) {
log.error("消费者启动失败");
e.printStackTrace();
}
return consumer;
}

}

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
演示效果
编写完成后, 按顺序启动provider服务, consumer服务
1 浏览器上执行http://localhost:8082/testSend, 开始发送消息
2 查看rocketMq可视化工具, mq里已经有消息了.
3查看consumer服务的控制台日志, 已经有消息输出了


遇到过的问题
安装启动rocketMq时 ,windows环境下一直报运行环境内存不足,修改内存也不起作用, 不知道是够是个人电脑问题, 后来换成mac环境, 成功安装和启动了rocketMq.
————————————————
版权声明:本文为CSDN博主「吴free」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/mikewuhao/article/details/106666109

标签:SpringBoot,demo,import,Mybatis,org,consumer,public,rocketmq,String
From: https://www.cnblogs.com/hd92/p/17265870.html

相关文章

  • SpringBoot的EnableCaching简述
    SpringBoot中的EnableCaching简述springboot中自带有数据缓存机制,主要通过其org.springframework.cache包下的各种类来实现。EnableCaching@EnableCaching是启用缓存......
  • SpringBoot运行端口被占用
    运行Springboot项目端口被占用如图:   解决方式-命令行进入cmd界面执行查询端口占用情况netstat-ano|findstr8210结果执行中止端口命令 task......
  • Spring、SpringBoot基于内存的异步调用ApplicationContext.publishEvent (生产、消费)
     ApplicationContext.publishEvent是Spring提供的解耦的一种方式(基于内存)。同样可以使用MQ组件/线程池代替。 参数类NotifyEvent.javaimportlombok.AllA......
  • SpringCloud和SpringBoot关系
    SpringCloud和SpringBoot关系SpringBoot专注于快速开发单个个体微服务-jarSpringCloud是关注全局的微服务协调整理治理框架,它将SpringBoot开发的一个个单体微服务整......
  • SpringBoot 整合AOP(面向切面编程)其中@Around失效问题
    1.AOP实现知识点​核心思想:动态代理。​支持技术:反射。2.官方文档名词解释​Aspect(切面):关注点的模块化(新增业务的模块化)。为完成新业务而编写的类对象。(带@Aspect注解......
  • MyBatis中Java类型与别名的对应关系表
    常见Java类型有许多内置的类型别名。注意,它们都是不区分大小写的,由于重载名称而对原语进行特殊处理。下面给出对应关系表:别名Java类型_bytebyte_longlong_......
  • 一个程序从Google应用市场获取程序信息的Demo
    importjava.io.FileOutputStream;importcom.gc.android.market.api.MarketSession;importcom.gc.android.market.api.MarketSessio......
  • mybatis面试题
    1.Mybatis中${}和#{}有什么区别? #{}是预编译处理,${}是字符串替换Mybatis在处理#{}时,会将sql中的#{}替换为?号,调用PreparedStatement的set方法来赋值;Mybatis在......
  • vue2+element-ui+springboot+mybatis-plus获取当前账户进行修改密码详细教程
    以下内容仅供学习使用新建一个dto类,用于专门修改当前账户的使用importlombok.Data;@DatapublicclassUserPasswordDTO{privateStringusername;priva......
  • MyBatisPlus快速上手
    ORM介绍ORM(ObjectRelationalMapping,对象关系映射)是为了解决面向对象与关系数据库存在的互不匹配现象的一种技术。ORM通过使用描述对象和数据库之间映射的元数据将程......