问题:
如果脱离了Spring要怎么实现一个RabbitMq生产者和消费者的客户端?
方案
资源
依赖
<dependencies>
<!-- 核心依赖-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.14.3</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.8.1</version>
</dependency>
<!-- 代码工具和日志 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j2-impl</artifactId>
<version>2.23.1</version>
<scope>optional</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.23.1</version>
<scope>optional</scope>
</dependency>
</dependencies>
<!-- 打包 -->
<build>
<resources>
<resource>
<!-- 设定主资源目录 -->
<directory>src/main/resources</directory>
<includes>
<include>**/*.properties</include>
<include>**/*.yml</include>
<include>README.md</include>
<include>log4j2.xml</include>
</includes>
<filtering>true</filtering>
</resource>
</resources>
<plugins>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
服务端配置
rabbitmq_host=localhost
rabbitmq_port=5672
rabbitmq_username=admin
rabbitmq_password=123456
日志配置
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<Appenders>
<Console name="CONSOLE" target="SYSTEM_OUT">
<PatternLayout charset="UTF-8" pattern="[%-5p] %d %c - %m%n"/>
</Console>
</Appenders>
<Loggers>
<root level="info">
<AppenderRef ref="CONSOLE"/>
</root>
</Loggers>
</configuration>
代码
工具类
服务器配置文件工具类
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.io.InputStream;
import java.util.Objects;
import java.util.Properties;
/**
* rabbitmq 配置工具类
*
* @author xuyuansheng
*/
@Data
@Slf4j
public class RabbitConfigUtil {
private final static String CONFIG_PATH = "rabbitmq.properties";
public static Properties CONFIG = null;
private volatile static boolean IS_LOADED = false;
public static boolean initConfig(String configPath) {
try {
InputStream resourceAsStream = Thread.currentThread().getContextClassLoader().getResourceAsStream(configPath);
Properties properties = new Properties();
properties.load(resourceAsStream);
return Objects.nonNull(doInitConfig(properties));
} catch (Throwable e) {
log.error(e.toString(), e);
throw new RuntimeException(e);
}
}
public static boolean initConfig() {
return initConfig(CONFIG_PATH);
}
public static boolean initConfig(Properties config) {
return Objects.nonNull(doInitConfig(config));
}
private static synchronized Properties doInitConfig(Properties config) {
if (IS_LOADED) {
return CONFIG;
}
if (Objects.isNull(config)) {
throw new RuntimeException("config is null");
}
boolean hostNull = Objects.isNull(config.getProperty("rabbitmq_host"));
boolean portNull = Objects.isNull(config.getProperty("rabbitmq_port"));
boolean usernameNull = Objects.isNull(config.getProperty("rabbitmq_username"));
boolean passwordNull = Objects.isNull(config.getProperty("rabbitmq_password"));
if (hostNull || portNull || usernameNull || passwordNull) {
throw new RuntimeException("config properties is null");
}
IS_LOADED = true;
RabbitConfigUtil.CONFIG = config;
return config;
}
public static Properties getConfig() {
if (IS_LOADED || initConfig()) {
return CONFIG;
}
throw new RuntimeException("not found rabbitmq config");
}
}
线程池
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author xuyuansheng
*/
public class ThreadPoolBuilder {
private static final int corePoolSize = 10;
private static final int maximumPoolSize = 100;
private static final long keepAliveTime = 10;
private static final TimeUnit unit = TimeUnit.SECONDS;
private static ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(500);
public static ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new ThreadFactory() {
private int threadCount = 0;
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("RabbitMqThread-" + (++threadCount));
return thread;
}
});
}
RabbitConnectionPool
RabbitMq Connection(连接)的池化实现
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.jinko.util.rabbitmq.util.RabbitConfigUtil;
import java.util.Properties;
/**
* @author xuyuansheng
*/
public class RabbitConnectionPool extends GenericObjectPool<Connection> {
private static RabbitConnectionPool SINGLETON;
public RabbitConnectionPool() {
super(new PooledObjectFactory());
this.setConfig(poolConfig());
}
private GenericObjectPoolConfig<Connection> poolConfig() {
GenericObjectPoolConfig<Connection> poolConfig = new GenericObjectPoolConfig<>();
// 最大连接数
poolConfig.setMaxTotal(1);
// 最大空闲连接数
poolConfig.setMaxIdle(1);
// 最小空闲连接数
poolConfig.setMinIdle(1);
// 借用连接时是否进行有效性检查
poolConfig.setTestOnBorrow(true);
// 归还连接时是否进行有效性检查
poolConfig.setTestOnReturn(true);
// 空闲连接是否进行有效性检查
poolConfig.setTestWhileIdle(true);
// 空闲连接检查间隔时间
poolConfig.setTimeBetweenEvictionRunsMillis(30000);
return poolConfig;
}
private static class PooledObjectFactory extends BasePooledObjectFactory<Connection> {
private static final String RABBITMQ_VIRTUAL_HOST = "/";
private static final int RABBITMQ_CONNECTION_TIMEOUT = 30000;
private static final int RABBITMQ_REQUESTED_HEARTBEAT = 60;
private final ConnectionFactory connectionFactory;
public PooledObjectFactory() {
this.connectionFactory = new ConnectionFactory();
initFactory();
}
private void initFactory() {
Properties config = RabbitConfigUtil.getConfig();
connectionFactory.setHost(config.getProperty("rabbitmq_host"));
connectionFactory.setPort(Integer.parseInt(config.getProperty("rabbitmq_port")));
connectionFactory.setUsername(config.getProperty("rabbitmq_username"));
connectionFactory.setPassword(config.getProperty("rabbitmq_password"));
connectionFactory.setConnectionTimeout(Integer.parseInt(config.getProperty("rabbitmq_connection_timeout", RABBITMQ_CONNECTION_TIMEOUT + "")));
connectionFactory.setRequestedHeartbeat(Integer.parseInt(config.getProperty("rabbitmq_requested_heartbeat", RABBITMQ_REQUESTED_HEARTBEAT + "")));
connectionFactory.setVirtualHost(config.getProperty("rabbitmq_virtual_host", RABBITMQ_VIRTUAL_HOST));
}
@Override
public void destroyObject(PooledObject<Connection> p) throws Exception {
p.getObject().close();
}
@Override
public Connection create() throws Exception {
return connectionFactory.newConnection();
}
@Override
public PooledObject<Connection> wrap(Connection obj) {
return new DefaultPooledObject<>(obj);
}
}
}
RabbitChannelPool
RabbitMq Channel(通道)的池化实现
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import lombok.SneakyThrows;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import java.util.Objects;
/**
* @author xuyuansheng
*/
public class RabbitChannelPool extends GenericObjectPool<Channel> {
private static RabbitChannelPool SINGLETON = null;
private RabbitChannelPool() {
super(new PooledObjectFactory());
this.setConfig(poolConfig());
}
@SneakyThrows
public static Channel getChannel() {
if (Objects.nonNull(SINGLETON)) {
return SINGLETON.borrowObject();
}
synchronized (RabbitChannelPool.class) {
SINGLETON = new RabbitChannelPool();
}
return SINGLETON.borrowObject();
}
private GenericObjectPoolConfig<Channel> poolConfig() {
GenericObjectPoolConfig<Channel> poolConfig = new GenericObjectPoolConfig<>();
// 最大连接数
poolConfig.setMaxTotal(1000);
// 最大空闲连接数
poolConfig.setMaxIdle(10);
// 最小空闲连接数
poolConfig.setMinIdle(1);
// 借用连接时是否进行有效性检查
poolConfig.setTestOnBorrow(true);
// 归还连接时是否进行有效性检查
poolConfig.setTestOnReturn(true);
// 空闲连接是否进行有效性检查
poolConfig.setTestWhileIdle(true);
// 空闲连接检查间隔时间
poolConfig.setTimeBetweenEvictionRunsMillis(30000);
return poolConfig;
}
private static class PooledObjectFactory extends BasePooledObjectFactory<Channel> {
private final RabbitConnectionPool connectionPool;
public PooledObjectFactory() {
connectionPool = new RabbitConnectionPool();
}
@Override
public void destroyObject(PooledObject<Channel> p) throws Exception {
p.getObject().close();
}
@Override
public Channel create() throws Exception {
Connection connection = connectionPool.borrowObject();
connectionPool.returnObject(connection);
return connection.createChannel();
}
@Override
public PooledObject<Channel> wrap(Channel obj) {
return new DefaultPooledObject<>(obj);
}
}
}
CommonConsumerWrapper
消费者实现
import com.rabbitmq.client.*;
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import org.jinko.util.rabbitmq.pool.RabbitChannelPool;
import org.jinko.util.rabbitmq.pool.ThreadPoolBuilder;
import java.io.IOException;
import java.util.Objects;
import java.util.Random;
import java.util.function.BiConsumer;
/**
* @author xuyuansheng
*/
@Accessors(chain = true)
@Slf4j
public class CommonConsumerWrapper extends DefaultConsumer {
private final String queueName;
private final boolean autoAck;
private final String consumerTag;
@Setter
private DeliverCallback deliverCallback;
@Setter
private BiConsumer<DefaultConsumer, Delivery> deliverAckCallback;
@Setter
private BiConsumer<DefaultConsumer, String> handleConsumeOk;
@Setter
private BiConsumer<DefaultConsumer, String> handleCancelOk;
@Setter
private BiConsumer<DefaultConsumer, String> handleCancel;
@Setter
private BiConsumer<DefaultConsumer, Object[]> handleShutdownSignal;
@Setter
private BiConsumer<DefaultConsumer, String> handleRecoverOk;
public CommonConsumerWrapper(String queueName, boolean autoAck, String consumerTag) {
this(queueName, autoAck, consumerTag, null, null, null, null, null, null);
}
public CommonConsumerWrapper(String queueName, boolean autoAck, String consumerTag, DeliverCallback deliverCallback) {
this(queueName, autoAck, consumerTag, deliverCallback, null, null, null, null, null);
}
public CommonConsumerWrapper(String queueName, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, BiConsumer<DefaultConsumer, String> handleConsumeOk, BiConsumer<DefaultConsumer, String> handleCancelOk, BiConsumer<DefaultConsumer, String> handleCancel, BiConsumer<DefaultConsumer, Object[]> handleShutdownSignal, BiConsumer<DefaultConsumer, String> handleRecoverOk) {
super(RabbitChannelPool.getChannel());
this.queueName = queueName;
this.autoAck = autoAck;
this.consumerTag = queueName + ((Objects.isNull(consumerTag) ? String.valueOf(new Random().nextLong()) : consumerTag));
this.deliverCallback = deliverCallback;
this.handleConsumeOk = handleConsumeOk;
this.handleCancelOk = handleCancelOk;
this.handleCancel = handleCancel;
this.handleShutdownSignal = handleShutdownSignal;
this.handleRecoverOk = handleRecoverOk;
}
@SneakyThrows
public void consumer() {
ThreadPoolBuilder.executor.submit(() -> {
try {
getChannel().basicConsume(queueName, autoAck, consumerTag, this);
} catch (IOException e) {
log.error(e.toString(), e);
}
});
}
@Override
public void handleConsumeOk(String consumerTag) {
/* 当消费者通过调用任何Channel. basicConsum方法注册时调用。 */
if (Objects.isNull(this.handleConsumeOk)) {
super.handleConsumeOk(consumerTag);
} else {
this.handleConsumeOk.accept(this, consumerTag);
}
}
@Override
public void handleCancelOk(String consumerTag) {
/* 当消费者通过调用Channel. basicCanel取消时调用。 */
if (Objects.isNull(this.handleCancelOk)) {
super.handleConsumeOk(consumerTag);
} else {
this.handleCancelOk.accept(this, consumerTag);
}
}
@Override
public void handleCancel(String consumerTag) throws IOException {
/* 当消费者因不是通过调用Channel. basicCanel的原因而被取消时调用。例如,队列已被删除。有关由于Channel.basicCanel导致消费者取消的通知,请参阅handleCUelOk。 */
if (Objects.isNull(this.handleCancel)) {
super.handleCancel(consumerTag);
} else {
this.handleCancel.accept(this, consumerTag);
}
}
@Override
public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
/* 当通道或基础连接已关闭时调用。 */
if (Objects.isNull(this.handleShutdownSignal)) {
super.handleShutdownSignal(consumerTag, sig);
} else {
this.handleShutdownSignal.accept(this, new Object[]{consumerTag, sig});
}
}
@Override
public void handleRecoverOk(String consumerTag) {
/* Called when a basic.recover-ok is received in reply to a basic.recover. All messages received before this is invoked that haven't been ack'ed will be re-delivered. All messages received afterwards won't be. */
if (Objects.isNull(this.handleRecoverOk)) {
super.handleRecoverOk(consumerTag);
} else {
this.handleRecoverOk.accept(this, consumerTag);
}
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/* 当收到此消费者的基本交付时调用 */
Delivery delivery = new Delivery(envelope, properties, body);
try {
this.deliverCallback.handle(consumerTag, delivery);
} catch (Exception e) {
log.error("message handleDelivery Error ", e);
} finally {
if (!autoAck && Objects.nonNull(this.deliverAckCallback)) {
this.deliverAckCallback.accept(this, delivery);
} else {
/* 当策略是自动提交时,如果发生异常直接拒绝消息,且重新放回队列中 */
getChannel().basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
}
}
}
}
案例
消费者
/**
* @author xuyuansheng
*/
@Slf4j
public class RabbitConsumerDemo {
static {
/* 这块代码全局只需要执行一次 */
boolean b = RabbitConfigUtil.initConfig("rabbitmq.properties");
}
public static void main(String[] args) {
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
try {
/* 消费者业务逻辑代码 */
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
log.info("consumerTag {} Received {}", consumerTag, message);
} catch (Exception e) {
log.error(" ", e);
}
};
/* 声明一个消费者,参数:队列名称,是否自动提交,业务逻辑(DeliverCallback),消费者标签 */
CommonConsumerWrapper consumer = new CommonConsumerWrapper("hello", false, "消费者Tag", deliverCallback)
.setDeliverCallback(deliverCallback)
.setDeliverAckCallback((ack, delivery) -> {
try {
/* 手动确认 拒绝 */
ack.getChannel().basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
/* 手动确认 完成 */
// ack.getChannel().basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} catch (Throwable e) {
}
})
.setHandleShutdownSignal((DefaultConsumer handleShutdownSignal, Object[] errorInfo) -> {
/* 业务逻辑异常处理 */
String consumerTag = (String) errorInfo[0];
ShutdownSignalException exception = (ShutdownSignalException) errorInfo[1];
log.error("[*] 消费者发生异常,consumerTag: {}", consumerTag, exception);
handleShutdownSignal.getChannel().getConnection();
}).setHandleCancelOk((handleCancelOk, consumerTag) -> {
log.info("handleCancel ,consumerTag: {}", consumerTag);
}).setHandleCancel((handleCancel, consumerTag) -> {
log.info("handleCancel ,consumerTag: {}", consumerTag);
}).setHandleRecoverOk((handleRecoverOk, consumerTag) -> {
log.info("handleRecoverOk ,consumerTag: {}", consumerTag);
});
consumer.consumer();
System.out.println("[*] Waiting for messages. To exit press CTRL+C");
}
}
生产者
/**
* @author xuyuansheng
*/
public class RabbitProducerDemo {
static {
/* 这块代码全局只需要执行一次 */
boolean b = RabbitConfigUtil.initConfig("rabbitmq.properties");
}
@SneakyThrows
public static void main(String[] args) {
boolean b = RabbitConfigUtil.initConfig("rabbitmq.properties");
try (Channel channel = RabbitChannelPool.getChannel()) {
String message = "Hello, RabbitMQ!";
channel.basicPublish("", "hello", null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
标签:消费,consumerTag,Spring,rabbitmq,RabbitMq,static,private,import,public
From: https://www.cnblogs.com/xysgo/p/18080973