首页 > 数据库 >Java中使用redis作为消息队列

Java中使用redis作为消息队列

时间:2024-12-26 16:55:07浏览次数:3  
标签:Java String 队列 redis jedis 消息 Jedis message public

Java中使用redis作为消息队列

使用redis作为消息队列

在Java中使用Redis作为消息队列,可以通过Redis的List​数据结构或者Pub/Sub​模式来实现。以下是一个简单的示例,展示了如何使用Redis的List​作为消息队列。

1. 使用Redis的List作为消息队列

Redis的List​数据结构非常适合用来实现消息队列,因为它支持在列表的两端进行操作。

1.1 生产者(发送消息)

生产者将消息推入列表的头部:

import redis.clients.jedis.Jedis;

public class RedisProducer {
    private Jedis jedis;

    public RedisProducer(String host, int port) {
        this.jedis = new Jedis(host, port);
    }

    public void produce(String queueName, String message) {
        jedis.lpush(queueName, message);
    }

    public void close() {
        jedis.close();
    }
}

1.2 消费者(接收消息)

消费者从列表的尾部弹出消息:

import redis.clients.jedis.Jedis;

public class RedisConsumer {
    private Jedis jedis;

    public RedisConsumer(String host, int port) {
        this.jedis = new Jedis(host, port);
    }

    public String consume(String queueName) {
        return jedis.rpop(queueName);
    }

    public void close() {
        jedis.close();
    }
}

1.3 阻塞式消费(BLPOP)

为了提高效率,可以使用BLPOP​方法,它会在没有消息时阻塞,直到有新消息到达:

import redis.clients.jedis.Jedis;

public class RedisBlockingConsumer {
    private Jedis jedis;

    public RedisBlockingConsumer(String host, int port) {
        this.jedis = new Jedis(host, port);
    }

    public String blockingConsume(String queueName, int timeout) {
        String message = jedis.blpop(timeout, queueName).get(1);
        return message;
    }

    public void close() {
        jedis.close();
    }
}

2. 使用Redis的Pub/Sub模式

Redis的发布订阅模式适用于一对多的消息分发。

2.1 发布者(发布消息)

import redis.clients.jedis.Jedis;

public class RedisPublisher {
    private Jedis jedis;

    public RedisPublisher(String host, int port) {
        this.jedis = new Jedis(host, port);
    }

    public void publish(String channel, String message) {
        jedis.publish(channel, message);
    }

    public void close() {
        jedis.close();
    }
}

2.2 订阅者(接收消息)

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;

public class RedisSubscriber {
    private Jedis jedis;

    public RedisSubscriber(String host, int port) {
        this.jedis = new Jedis(host, port);
    }

    public void subscribe(String channel, JedisPubSub jedisPubSub) {
        jedis.subscribe(jedisPubSub, channel);
    }

    public void close() {
        jedis.close();
    }
}

3. 使用Redisson库

Redisson是一个功能丰富的Redis Java客户端,提供了更高层次的抽象,包括消息队列、分布式锁等。

3.1 添加依赖

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.17.5</version>
</dependency>

运行 HTML

3.2 生产者和消费者

import org.redisson.Redisson;
import org.redisson.api.RQueue;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;

public class RedissonQueueExample {
    public static void main(String[] args) {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://127.0.0.1:6379");
        RedissonClient redisson = Redisson.create(config);

        RQueue<String> queue = redisson.getQueue("myQueue");

        // 生产者
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                queue.offer("Message " + i);
                System.out.println("Produced: Message " + i);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        // 消费者
        new Thread(() -> {
            while (true) {
                String message = queue.poll();
                if (message != null) {
                    System.out.println("Consumed: " + message);
                } else {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();

        redisson.shutdown();
    }
}

4. 总结

  • List:适用于简单的队列场景,支持阻塞式操作。
  • Pub/Sub:适用于发布订阅场景,适合一对多的消息分发。
  • Redisson:提供了更高级的抽象,适合复杂的分布式应用场景。

选择哪种方式取决于你的具体需求。如果需要可靠的消息传递和队列功能,建议使用List​或Redisson的队列实现。

但是还是存在缺点,redis不存在消息确认机制,所以还得再优化一下。

Redis 作为消息队列+消息确认机制

为了在使用 Redis 作为消息队列的基础上加上消息确认机制,我们可以采用以下方案:

方案概述

  1. 生产者:将消息放入 queue:待处理​ 列表。
  2. 消费者:使用 Lua 脚本原子地将消息从 queue:待处理​ 移动到 queue:正在处理​,并记录处理开始时间。
  3. 消息处理:消费者处理消息,处理完成后从 queue:正在处理​ 删除消息,并删除处理记录。
  4. 故障处理:如果处理失败,可以选择不删除消息,或者将其重新放入 queue:待处理​。
  5. 定时检查:使用定时任务检查 queue:正在处理​ 中的消息,如果超过阈值时间未处理完成,则重新放入 queue:待处理​。

实现步骤

1. 生产者代码

import redis.clients.jedis.Jedis;

public class RedisProducer {
    private Jedis jedis;

    public RedisProducer(String host, int port) {
        this.jedis = new Jedis(host, port);
    }

    public void produce(String queueName, String message) {
        jedis.lpush(queueName, message);
    }

    public void close() {
        jedis.close();
    }
}

2. 消费者代码

import redis.clients.jedis.Jedis;
import redis.clients.jedis.Scripting;

public class RedisConsumer {
    private Jedis jedis;
    private String moveScript = "local message = redis.call('RPOP', KEYS[1]); if message then redis.call('LPUSH', KEYS[2], message); end; return message;";

    public RedisConsumer(String host, int port) {
        this.jedis = new Jedis(host, port);
    }

    public String consume(String待处理Queue, String正在处理Queue) {
        String message = (String) jedis.eval(moveScript, 2,待处理Queue,正在处理Queue);
        if (message != null) {
            jedis.hset("hash:正在处理", message, String.valueOf(System.currentTimeMillis()));
        }
        return message;
    }

    public void confirm(String正在处理Queue, String message) {
        jedis.lrem(正在处理Queue, 0, message);
        jedis.hdel("hash:正在处理", message);
    }

    public void close() {
        jedis.close();
    }
}

3. 定时检查任务

import redis.clients.jedis.Jedis;
import java.util.Set;
import java.util.Date;

public class MessageMonitor {
    private Jedis jedis;
    private long threshold = 60000; // 1分钟

    public MessageMonitor(String host, int port) {
        this.jedis = new Jedis(host, port);
    }

    public void checkMessages() {
        Set<String> messages = jedis.hkeys("hash:正在处理");
        for (String message : messages) {
            long startTime = Long.parseLong(jedis.hget("hash:正在处理", message));
            if (new Date().getTime() - startTime > threshold) {
                jedis.lpush("queue:待处理", message);
                jedis.hdel("hash:正在处理", message);
            }
        }
    }

    public void scheduleCheck() {
        new ScheduledThreadPoolExecutor(1).scheduleAtFixedRate(() -> checkMessages(), 0, 30, TimeUnit.SECONDS);
    }

    public void close() {
        jedis.close();
    }
}

代码说明

  • 生产者:将消息放入 queue:待处理​ 列表。

  • 消费者

    • 使用 Lua 脚本原子地将消息从 queue:待处理​ 移动到 queue:正在处理​。
    • 记录消息处理开始时间到 hash:正在处理​。
    • 处理消息后,调用 confirm​ 方法删除消息和处理记录。
  • 定时检查任务

    • 每 30 秒检查一次 hash:正在处理​ 中的消息。
    • 如果消息处理时间超过阈值(例如 1 分钟),将其重新放入 queue:待处理​。

优点

  • 原子操作:使用 Lua 脚本确保消息移动的原子性。
  • 故障 tolerance:通过定时检查任务,确保消息不会永久卡在 queue:正在处理​ 中。
  • 消息不丢失:处理失败的消息会重新进入待处理队列。

缺点

  • 复杂性:需要额外的逻辑来处理消息确认和故障恢复。
  • 性能开销:定时任务和哈希表操作可能增加系统负载。

结论

通过上述方案,我们实现了基于 Redis 的消息队列的消息确认机制,确保消息的可靠传递和处理。虽然实现较为复杂,但在 Redis 作为消息队列的场景下,这是一个可行的解决方案。

标签:Java,String,队列,redis,jedis,消息,Jedis,message,public
From: https://www.cnblogs.com/createBUG/p/18633518/use-redis-as-a-message-queue-in-java-zniajv

相关文章

  • 纯JS加密+时间校验(没引用任何框架),Java解密
    1、JS加密functioncaesarEncrypt(text,shift){ letstr="G7bCk1Xf2A9qLmJ0pVrTz5D4oS6wEhN"+Date.now().toString(); returnstr.split('') .map(char=>String.fromCharCode(char.charCodeAt(0)+3)) .join('');}2、Java校验......
  • Java重要面试名词整理(五):Redis
    文章目录Redis高级命令Redis持久化RDB快照(snapshot)**AOF(append-onlyfile)****Redis4.0混合持久化**管道(Pipeline)**StringRedisTemplate与RedisTemplate详解**Redis集群方案gossip脑裂RedisLuaRedisMultiLockRedis红锁缓存相关问题**缓存穿透****缓存失效(击穿)****......
  • 农家乐系统|Java|SSM|VUE| 前后端分离
                 【技术栈】1⃣️:架构:B/S、MVC2⃣️:系统环境:Windowsh/Mac3⃣️:开发环境:IDEA、JDK1.8、Maven、Mysql5.7+4⃣️:技术栈:Java、Mysql、SSM、Mybatis-Plus、VUE、jquery,html5⃣️数据库可视化工具:navicat6⃣️服务器:SpringBoot自带apachetomca......
  • 企业销售人员培训系统|Java|SSM|VUE| 前后端分离
                  【技术栈】1⃣️:架构:B/S、MVC2⃣️:系统环境:Windowsh/Mac3⃣️:开发环境:IDEA、JDK1.8、Maven、Mysql5.7+4⃣️:技术栈:Java、Mysql、SSM、Mybatis-Plus、VUE、jquery,html5⃣️数据库可视化工具:navicat6⃣️服务器:SpringBoot自带apacheto......
  • 想到了个童年小游戏,2个人4只手就能玩,简单用JavaScript实现一下
    /** *规则:双方各有左右2个数,初始值为1。每回合,可以将自身的一个数与对方的一个数相加,然后模10。 *如,第一回合你操作:你(11)机器人(11)-->你(12)机器人(11) *下回合机器人操作:你(12)机器人(11)-->你(12)机器人(13) *第三回合你操作:你(12)机器人(13)-->你(15)机器人(13) *......
  • 大数据学习之Redis 缓存数据库二,Scala分布式语言一
    一.Redis缓存数据库二26.Redis数据安全_AOF持久化机制27.Redis数据安全_企业中该如何选择持久化机制28.Redis集群_主从复制概念29.Redis集群_主从复制搭建30.Redis集群_主从复制原理剖析31.Redis集群_哨兵监控概述32.Redis集群_配置哨兵监控33......
  • 【Java并发】读写锁ReadWriteLock
    目录什么是读写锁什么时候使用读写锁怎么使用读写锁 使用步骤1.创建 ReentrantReadWriteLock实例2.获取读锁和写锁3.锁和解锁完整代码使用读写锁的注意事项什么是读写锁ReadWriteLock是Java并发包(java.util.concurrent.locks)中提供的一种锁机制,旨在......
  • 第1章 初识Java与面向对象程序设计
    2024021504831.1Java概述1.1.1计算机编程语言发展史历经了三个发展历程:第一代计算机编程语言:机器语言:是微处理器理解和使用的语言,用于控制它的操作二进制代码。第二代计算机编程语言:汇编语言:采用英文单词作为指令来编写程序,这意味着不去记忆那些枯燥的0和1也可以写出一......
  • Java学习,文件写入
    Java中文件写入是一个常见的任务,可以使用java.io包中的类来实现这一点。需要注意,写入文件需要写入文件的权限,需要指定文件位置,绝对路径或相对路径来指定。使用FileWriter与BufferedWriter写入文件:importjava.io.BufferedWriter;importjava.io.FileWriter;importjava.io......
  • Java学习,continue关键字
    Javacontinue语句用来结束当前循环,并进入下一次循环,不是所有循环结束了。Java中continue关键字用于跳过,当前循环迭代中的剩余代码,并立即开始下一次迭代。它通常与循环结构(如 for、while 或 do-while)一起使用,不与switch语句一起使用。for循环使用continuepublicclassCo......