首页 > 其他分享 >非Spring项目实现RabbitMq消息生产和消费

非Spring项目实现RabbitMq消息生产和消费

时间:2024-03-18 17:25:50浏览次数:34  
标签:消费 consumerTag Spring rabbitmq RabbitMq static private import public

问题:

​ 如果脱离了Spring要怎么实现一个RabbitMq生产者和消费者的客户端?

方案

资源

依赖

    <dependencies>
<!--  核心依赖-->
		<dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.14.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
            <version>2.8.1</version>
        </dependency>
<!--  代码工具和日志  -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.24</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j2-impl</artifactId>
            <version>2.23.1</version>
            <scope>optional</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.23.1</version>
            <scope>optional</scope>
        </dependency>
    </dependencies>
<!--  打包  -->
    <build>
        <resources>
            <resource>
                <!-- 设定主资源目录  -->
                <directory>src/main/resources</directory>
                <includes>
                    <include>**/*.properties</include>
                    <include>**/*.yml</include>
                    <include>README.md</include>
                    <include>log4j2.xml</include>
                </includes>
                <filtering>true</filtering>
            </resource>
        </resources>

        <plugins>
            <plugin>
                <artifactId>maven-dependency-plugin</artifactId>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-source-plugin</artifactId>
                <executions>
                    <execution>
                        <id>attach-sources</id>
                        <goals>
                            <goal>jar</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

服务端配置

rabbitmq_host=localhost
rabbitmq_port=5672
rabbitmq_username=admin
rabbitmq_password=123456

日志配置

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <Appenders>
        <Console name="CONSOLE" target="SYSTEM_OUT">
            <PatternLayout charset="UTF-8" pattern="[%-5p] %d %c - %m%n"/>
        </Console>

    </Appenders>
    <Loggers>
        <root level="info">
            <AppenderRef ref="CONSOLE"/>
        </root>
    </Loggers>
</configuration>

代码

工具类

服务器配置文件工具类


import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import java.io.InputStream;
import java.util.Objects;
import java.util.Properties;

/**
 * rabbitmq 配置工具类
 *
 * @author xuyuansheng
 */
@Data
@Slf4j
public class RabbitConfigUtil {

    private final static String CONFIG_PATH = "rabbitmq.properties";
    public static Properties CONFIG = null;
    private volatile static boolean IS_LOADED = false;

    public static boolean initConfig(String configPath) {
        try {
            InputStream resourceAsStream = Thread.currentThread().getContextClassLoader().getResourceAsStream(configPath);
            Properties properties = new Properties();
            properties.load(resourceAsStream);
            return Objects.nonNull(doInitConfig(properties));
        } catch (Throwable e) {
            log.error(e.toString(), e);
            throw new RuntimeException(e);
        }
    }

    public static boolean initConfig() {
        return initConfig(CONFIG_PATH);
    }

    public static boolean initConfig(Properties config) {
        return Objects.nonNull(doInitConfig(config));
    }

    private static synchronized Properties doInitConfig(Properties config) {
        if (IS_LOADED) {
            return CONFIG;
        }
        if (Objects.isNull(config)) {
            throw new RuntimeException("config is null");
        }
        boolean hostNull = Objects.isNull(config.getProperty("rabbitmq_host"));
        boolean portNull = Objects.isNull(config.getProperty("rabbitmq_port"));
        boolean usernameNull = Objects.isNull(config.getProperty("rabbitmq_username"));
        boolean passwordNull = Objects.isNull(config.getProperty("rabbitmq_password"));
        if (hostNull || portNull || usernameNull || passwordNull) {
            throw new RuntimeException("config properties is null");
        }
        IS_LOADED = true;
        RabbitConfigUtil.CONFIG = config;
        return config;
    }

    public static Properties getConfig() {
        if (IS_LOADED || initConfig()) {
            return CONFIG;
        }
        throw new RuntimeException("not found rabbitmq config");
    }

}

线程池


import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author xuyuansheng
 */
public class ThreadPoolBuilder {
    private static final int corePoolSize = 10;
    private static final int maximumPoolSize = 100;
    private static final long keepAliveTime = 10;
    private static final TimeUnit unit = TimeUnit.SECONDS;
    private static ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(500);

    public static ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new ThreadFactory() {
        private int threadCount = 0;

        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setName("RabbitMqThread-" + (++threadCount));
            return thread;
        }
    });
}

RabbitConnectionPool

RabbitMq Connection(连接)的池化实现


import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.jinko.util.rabbitmq.util.RabbitConfigUtil;

import java.util.Properties;

/**
 * @author xuyuansheng
 */
public class RabbitConnectionPool extends GenericObjectPool<Connection> {
    private static RabbitConnectionPool SINGLETON;

    public RabbitConnectionPool() {
        super(new PooledObjectFactory());
        this.setConfig(poolConfig());
    }

    private GenericObjectPoolConfig<Connection> poolConfig() {
        GenericObjectPoolConfig<Connection> poolConfig = new GenericObjectPoolConfig<>();
        // 最大连接数
        poolConfig.setMaxTotal(1);
        // 最大空闲连接数
        poolConfig.setMaxIdle(1);
        // 最小空闲连接数
        poolConfig.setMinIdle(1);
        // 借用连接时是否进行有效性检查
        poolConfig.setTestOnBorrow(true);
        // 归还连接时是否进行有效性检查
        poolConfig.setTestOnReturn(true);
        // 空闲连接是否进行有效性检查
        poolConfig.setTestWhileIdle(true);
        // 空闲连接检查间隔时间
        poolConfig.setTimeBetweenEvictionRunsMillis(30000);
        return poolConfig;
    }

    private static class PooledObjectFactory extends BasePooledObjectFactory<Connection> {
        private static final String RABBITMQ_VIRTUAL_HOST = "/";
        private static final int RABBITMQ_CONNECTION_TIMEOUT = 30000;
        private static final int RABBITMQ_REQUESTED_HEARTBEAT = 60;

        private final ConnectionFactory connectionFactory;

        public PooledObjectFactory() {
            this.connectionFactory = new ConnectionFactory();
            initFactory();
        }

        private void initFactory() {
            Properties config = RabbitConfigUtil.getConfig();
            connectionFactory.setHost(config.getProperty("rabbitmq_host"));
            connectionFactory.setPort(Integer.parseInt(config.getProperty("rabbitmq_port")));
            connectionFactory.setUsername(config.getProperty("rabbitmq_username"));
            connectionFactory.setPassword(config.getProperty("rabbitmq_password"));
            connectionFactory.setConnectionTimeout(Integer.parseInt(config.getProperty("rabbitmq_connection_timeout", RABBITMQ_CONNECTION_TIMEOUT + "")));
            connectionFactory.setRequestedHeartbeat(Integer.parseInt(config.getProperty("rabbitmq_requested_heartbeat", RABBITMQ_REQUESTED_HEARTBEAT + "")));
            connectionFactory.setVirtualHost(config.getProperty("rabbitmq_virtual_host", RABBITMQ_VIRTUAL_HOST));
        }

        @Override
        public void destroyObject(PooledObject<Connection> p) throws Exception {
            p.getObject().close();
        }

        @Override
        public Connection create() throws Exception {
            return connectionFactory.newConnection();
        }

        @Override
        public PooledObject<Connection> wrap(Connection obj) {
            return new DefaultPooledObject<>(obj);
        }
    }

}

RabbitChannelPool

RabbitMq Channel(通道)的池化实现


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import lombok.SneakyThrows;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import java.util.Objects;

/**
 * @author xuyuansheng
 */
public class RabbitChannelPool extends GenericObjectPool<Channel> {

    private static RabbitChannelPool SINGLETON = null;

    private RabbitChannelPool() {
        super(new PooledObjectFactory());
        this.setConfig(poolConfig());
    }

    @SneakyThrows
    public static Channel getChannel() {
        if (Objects.nonNull(SINGLETON)) {
            return SINGLETON.borrowObject();
        }
        synchronized (RabbitChannelPool.class) {
            SINGLETON = new RabbitChannelPool();
        }
        return SINGLETON.borrowObject();
    }

    private GenericObjectPoolConfig<Channel> poolConfig() {
        GenericObjectPoolConfig<Channel> poolConfig = new GenericObjectPoolConfig<>();
        // 最大连接数
        poolConfig.setMaxTotal(1000);
        // 最大空闲连接数
        poolConfig.setMaxIdle(10);
        // 最小空闲连接数
        poolConfig.setMinIdle(1);
        // 借用连接时是否进行有效性检查
        poolConfig.setTestOnBorrow(true);
        // 归还连接时是否进行有效性检查
        poolConfig.setTestOnReturn(true);
        // 空闲连接是否进行有效性检查
        poolConfig.setTestWhileIdle(true);
        // 空闲连接检查间隔时间
        poolConfig.setTimeBetweenEvictionRunsMillis(30000);
        return poolConfig;
    }

    private static class PooledObjectFactory extends BasePooledObjectFactory<Channel> {


        private final RabbitConnectionPool connectionPool;

        public PooledObjectFactory() {
            connectionPool = new RabbitConnectionPool();
        }

        @Override
        public void destroyObject(PooledObject<Channel> p) throws Exception {
            p.getObject().close();
        }

        @Override
        public Channel create() throws Exception {
            Connection connection = connectionPool.borrowObject();
            connectionPool.returnObject(connection);
            return connection.createChannel();
        }

        @Override
        public PooledObject<Channel> wrap(Channel obj) {
            return new DefaultPooledObject<>(obj);
        }
    }

}

CommonConsumerWrapper

消费者实现


import com.rabbitmq.client.*;
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import org.jinko.util.rabbitmq.pool.RabbitChannelPool;
import org.jinko.util.rabbitmq.pool.ThreadPoolBuilder;

import java.io.IOException;
import java.util.Objects;
import java.util.Random;
import java.util.function.BiConsumer;

/**
 * @author xuyuansheng
 */

@Accessors(chain = true)
@Slf4j
public class CommonConsumerWrapper extends DefaultConsumer {


    private final String queueName;
    private final boolean autoAck;
    private final String consumerTag;
    @Setter
    private DeliverCallback deliverCallback;
    @Setter
    private BiConsumer<DefaultConsumer, Delivery> deliverAckCallback;
    @Setter
    private BiConsumer<DefaultConsumer, String> handleConsumeOk;
    @Setter
    private BiConsumer<DefaultConsumer, String> handleCancelOk;
    @Setter
    private BiConsumer<DefaultConsumer, String> handleCancel;
    @Setter
    private BiConsumer<DefaultConsumer, Object[]> handleShutdownSignal;
    @Setter
    private BiConsumer<DefaultConsumer, String> handleRecoverOk;


    public CommonConsumerWrapper(String queueName, boolean autoAck, String consumerTag) {
        this(queueName, autoAck, consumerTag, null, null, null, null, null, null);
    }

    public CommonConsumerWrapper(String queueName, boolean autoAck, String consumerTag, DeliverCallback deliverCallback) {
        this(queueName, autoAck, consumerTag, deliverCallback, null, null, null, null, null);
    }

    public CommonConsumerWrapper(String queueName, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, BiConsumer<DefaultConsumer, String> handleConsumeOk, BiConsumer<DefaultConsumer, String> handleCancelOk, BiConsumer<DefaultConsumer, String> handleCancel, BiConsumer<DefaultConsumer, Object[]> handleShutdownSignal, BiConsumer<DefaultConsumer, String> handleRecoverOk) {
        super(RabbitChannelPool.getChannel());
        this.queueName = queueName;
        this.autoAck = autoAck;
        this.consumerTag = queueName + ((Objects.isNull(consumerTag) ? String.valueOf(new Random().nextLong()) : consumerTag));
        this.deliverCallback = deliverCallback;
        this.handleConsumeOk = handleConsumeOk;
        this.handleCancelOk = handleCancelOk;
        this.handleCancel = handleCancel;
        this.handleShutdownSignal = handleShutdownSignal;
        this.handleRecoverOk = handleRecoverOk;
    }

    @SneakyThrows
    public void consumer() {
        ThreadPoolBuilder.executor.submit(() -> {
            try {
                getChannel().basicConsume(queueName, autoAck, consumerTag, this);
            } catch (IOException e) {
                log.error(e.toString(), e);
            }
        });
    }

    @Override
    public void handleConsumeOk(String consumerTag) {
        /*  当消费者通过调用任何Channel. basicConsum方法注册时调用。 */
        if (Objects.isNull(this.handleConsumeOk)) {
            super.handleConsumeOk(consumerTag);
        } else {
            this.handleConsumeOk.accept(this, consumerTag);
        }
    }

    @Override
    public void handleCancelOk(String consumerTag) {
        /*  当消费者通过调用Channel. basicCanel取消时调用。 */
        if (Objects.isNull(this.handleCancelOk)) {
            super.handleConsumeOk(consumerTag);
        } else {
            this.handleCancelOk.accept(this, consumerTag);
        }
    }

    @Override
    public void handleCancel(String consumerTag) throws IOException {
        /* 当消费者因不是通过调用Channel. basicCanel的原因而被取消时调用。例如,队列已被删除。有关由于Channel.basicCanel导致消费者取消的通知,请参阅handleCUelOk。 */
        if (Objects.isNull(this.handleCancel)) {
            super.handleCancel(consumerTag);
        } else {
            this.handleCancel.accept(this, consumerTag);
        }
    }

    @Override
    public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
        /*  当通道或基础连接已关闭时调用。  */
        if (Objects.isNull(this.handleShutdownSignal)) {
            super.handleShutdownSignal(consumerTag, sig);
        } else {
            this.handleShutdownSignal.accept(this, new Object[]{consumerTag, sig});
        }
    }

    @Override
    public void handleRecoverOk(String consumerTag) {
        /*  Called when a basic.recover-ok is received in reply to a basic.recover. All messages received before this is invoked that haven't been ack'ed will be re-delivered. All messages received afterwards won't be. */
        if (Objects.isNull(this.handleRecoverOk)) {
            super.handleRecoverOk(consumerTag);
        } else {
            this.handleRecoverOk.accept(this, consumerTag);
        }
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        /*  当收到此消费者的基本交付时调用 */
        Delivery delivery = new Delivery(envelope, properties, body);
        try {
            this.deliverCallback.handle(consumerTag, delivery);
        } catch (Exception e) {
            log.error("message handleDelivery Error ", e);
        } finally {
            if (!autoAck && Objects.nonNull(this.deliverAckCallback)) {
                this.deliverAckCallback.accept(this, delivery);
            } else {
                /*  当策略是自动提交时,如果发生异常直接拒绝消息,且重新放回队列中 */
                getChannel().basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
            }
        }

    }
}

案例

消费者


/**
 * @author xuyuansheng
 */
@Slf4j
public class RabbitConsumerDemo {


    static {
        /*  这块代码全局只需要执行一次 */
        boolean b = RabbitConfigUtil.initConfig("rabbitmq.properties");
    }

    public static void main(String[] args) {
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            try {
                /*  消费者业务逻辑代码 */
                String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
                log.info("consumerTag {} Received {}", consumerTag, message);
            } catch (Exception e) {
                log.error(" ", e);
            }

        };
        /*  声明一个消费者,参数:队列名称,是否自动提交,业务逻辑(DeliverCallback),消费者标签 */
        CommonConsumerWrapper consumer = new CommonConsumerWrapper("hello", false, "消费者Tag", deliverCallback)
                .setDeliverCallback(deliverCallback)
                .setDeliverAckCallback((ack, delivery) -> {
                    try {
                        /*  手动确认  拒绝 */
                        ack.getChannel().basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
                        /*  手动确认  完成 */
//                        ack.getChannel().basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                    } catch (Throwable e) {

                    }
                })
                .setHandleShutdownSignal((DefaultConsumer handleShutdownSignal, Object[] errorInfo) -> {
                    /*  业务逻辑异常处理 */
                    String consumerTag = (String) errorInfo[0];
                    ShutdownSignalException exception = (ShutdownSignalException) errorInfo[1];
                    log.error("[*] 消费者发生异常,consumerTag: {}", consumerTag, exception);
                    handleShutdownSignal.getChannel().getConnection();
                }).setHandleCancelOk((handleCancelOk, consumerTag) -> {
                    log.info("handleCancel ,consumerTag: {}", consumerTag);
                }).setHandleCancel((handleCancel, consumerTag) -> {
                    log.info("handleCancel ,consumerTag: {}", consumerTag);
                }).setHandleRecoverOk((handleRecoverOk, consumerTag) -> {
                    log.info("handleRecoverOk ,consumerTag: {}", consumerTag);
                });

        consumer.consumer();
        System.out.println("[*] Waiting for messages. To exit press CTRL+C");
    }
}

生产者

/**
 * @author xuyuansheng
 */
public class RabbitProducerDemo {


    static {
        /*  这块代码全局只需要执行一次 */
        boolean b = RabbitConfigUtil.initConfig("rabbitmq.properties");
    }

    @SneakyThrows
    public static void main(String[] args) {
        boolean b = RabbitConfigUtil.initConfig("rabbitmq.properties");
        try (Channel channel = RabbitChannelPool.getChannel()) {
            String message = "Hello, RabbitMQ!";
            channel.basicPublish("", "hello", null, message.getBytes(StandardCharsets.UTF_8));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

标签:消费,consumerTag,Spring,rabbitmq,RabbitMq,static,private,import,public
From: https://www.cnblogs.com/xysgo/p/18080973

相关文章

  • 面试官:SpringBoot如何优雅停机?
    优雅停机(GracefulShutdown)是指在服务器需要关闭或重启时,能够先处理完当前正在进行的请求,然后再停止服务的操作。优雅停机的实现步骤主要分为以下几步:停止接收新的请求:首先,系统会停止接受新的请求,这样就不会有新的任务被添加到任务队列中。处理当前请求:系统会继续处理当前已......
  • 基于springboot的在线教育系统的设计与实现
    基于springboot的在线教育系统的设计与实现文章目录基于springboot的在线教育系统的设计与实现引言功能演示视频开发环境系统功能介绍功能对照表功能截图编程框架SpringBoot框架SSM框架vue框架示例代码数据库操作示例源码获取引言博主介绍:✌专注于Java技术......
  • 基于springboot的古典舞在线交流平台的设计与实现
    基于springboot的古典舞在线交流平台的设计与实现文章目录基于springboot的古典舞在线交流平台的设计与实现引言功能演示视频开发环境系统功能介绍功能对照表功能截图编程框架SpringBoot框架SSM框架vue框架示例代码数据库操作示例源码获取引言博主介绍:✌专......
  • dea设置自动编译spring boot代码,idea代码修改后无须重启服务立即生效
    解决办法1:spring-boot-devtools<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><......
  • 鸿鹄电子招投标系统源码实现与立项流程:基于Spring Boot、Mybatis、Redis和Layui的企业
    随着企业的快速发展,招采管理逐渐成为企业运营中的重要环节。为了满足公司对内部招采管理提升的要求,建立一个公平、公开、公正的采购环境至关重要。在这个背景下,我们开发了一款电子招标采购软件,以最大限度地控制采购成本,提高招投标工作的公开性和透明性,并确保符合国家电子招投标......
  • java springboot 指定运行端口
    javaspringboot指定运行端口 方法1:修改源代码里的“\src\main\resources\application.properties”文件,增加或修改server.port=8081 方法2:如果是已经打包好的jar包,在运行时指定端口。可以将 “\src\main\resources\application.properties” 文件复制到jar包同......
  • springboot结合rocketmq的使用以及遇到的问题
    rocketmq是一款低延迟、高并发、高可用、高可靠的分布式消息中间件。消息队列RocketMQ可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。首先需要下载安装rocketmq:1.官网 https://rocketmq.apache.org/zh/do......
  • Spring6如此厉害的框架到底包含哪些内容
    源码下面无秘密,这是程序员的口头禅。对于强大而且设计优秀的Spring框架也是这样的,在基础代码层层堆叠之下,Spring成为了一个非常流行的框架。Spring6框架的开发者们通过层层设计和封装打造了一个功能如此之多而兼容性非常好的框架。这也是解构这个框架难点,而通过理解整个框架功能......
  • springboot有事务隔离级别
    springboot有五种隔离级别1、DEFAULT:spring默认的事务隔离级别,以连接的数据库事务隔离级别为准;2、READ_UNCOMMITTED:读未提交,该隔离级别事务可以看到其他事务中未提交的数据。因为可以读到别人未提交的数据,如果对方事务发生回滚,容易导致脏读。3、READ_COMMITTED:读已提交,该隔离级......
  • Vue.js+SpringBoot开发企业项目合同信息系统
    目录一、摘要1.1项目介绍1.2项目录屏二、功能模块2.1数据中心模块2.2合同审批模块2.3合同签订模块2.4合同预警模块2.5数据可视化模块三、系统设计3.1用例设计3.2数据库设计3.2.1合同审批表3.2.2合同签订表3.2.3合同预警表四、系统展示五、核心代码5.1......