首页 > 数据库 >使用Mysql实现消息队列

使用Mysql实现消息队列

时间:2023-04-10 23:02:37浏览次数:31  
标签:status 队列 ret subprocess 版本号 version mq 消息 Mysql

1.gif

实现起来就是 消息状态版本号 字段。

2.png

更新时用 版本号 做乐观锁。操作逻辑就是个状态机。

UPDATE mq SET mq.status=new_status mq.version = mq.version + 1 WHERE mq.version = old_version

实现

mysql mq 表结构设计

CREATE TABLE `mq` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
  `msg` varchar(1024) DEFAULT NULL,
  `status` varchar(100) DEFAULT 'ready',
  `version` bigint(20) unsigned DEFAULT 0,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8mb4;

生产者

测试,向队列插入两条消息。

insert into mq(msg) values('第一条消息 hello world! 1024');
insert into mq(msg) values('第二条消息 欢迎访问 https://spaceack.com');

3.png

消费者

  • 获取队列中的消息, 此时不会改变队列(mq表)中的数据。
    select * from mq where status='ready' limit 1;
    

4.png

  • 消息确认

    update mq set mq.status = 'ack', mq.version = mq.version + 1 WHERE mq.version = {query_version} and id = {query_id}
    

    确认后的状态:

    5.png

再次获取数据仅能获取第二条数据。

6.png


这样的一个好处就是消息都是可见的。 可以直接查数据库。

不用加悲观锁,因为update成功后就带锁。其它同版本的更新都会失败。

还有个好处就是减少组件依赖。 简单的服务数据库就能搞定。 不用再起个rabbitmq服务啥的。 节约运维成本。

版本号 另一个小作用:InnoDB如果更新语句没有改变任何字段值时,影响行数会返回0.那么是没找到记录还是没改变值的0呢?前者是bug, 后者是正常情况但区分不了。加版本号后每次修改+1,影响行数一定不为0。


隐藏福利:生产者自动批量测试脚本:

import random
import subprocess

def runcmd(command):
    ret = subprocess.run(command,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE,encoding="utf-8",timeout=1)
    if ret.returncode == 0:
        # print("success:",ret, ret.stdout)
        return ret.stdout
    else:
        print("error:",ret)


def producer():
    sql = "\"insert into mq(msg) values('%s');\"" % (str(random.randint(1,99999)))
    cmd = """mysql -uroot -ppassword test -e %s """ % (sql)
    runcmd(cmd)

标签:status,队列,ret,subprocess,版本号,version,mq,消息,Mysql
From: https://blog.51cto.com/spaceack/6181592

相关文章

  • mysql数据库的登录脚本
    ######################## ku脚本:可以使用以下ku脚本,它可以根据提供的参数登录到MySQL数据库:#!/bin/bash#Checkforcorrectnumberofargumentsif[$#-lt1];thenecho"Usage:$0<ip>[<port>][<mysqloptions>]"exit1fi#SettheIPaddress......
  • chatpgt-flinkcdc从mysql到kafka再到mysql
    flinkcdcmysql到kafkaimportorg.apache.flink.api.common.serialization.SimpleStringSchema;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;importorg.apach......
  • 学习笔记394—Windows 10 MySQL 数据库安装
    Windows10MySQL数据库安装1、MySQL的安装方式MySQL的社区版(MySQLCommunity)是免费的、开源的,像企业版这些是收费的,学习阶段使用社区版的即可。MySQL社区版在Windows10的安装方式可以分为两种,一种是使用安装程序安装,另一种是使用压缩包安装。个人倾向于使用压缩包......
  • 小程序入门4—钉钉群机器人消息通知和钉钉工作通知
    前言在消息通知这块,钉钉可谓是玩出了花,比如工作通知、群机器人通知,还有那万恶的Ding一下。钉钉的通知不仅花样多,而且大部分渠道都支持自定义,也即可以自定义设置发送时间、发送内容,并且还支持多种样式的消息如文本、卡片、Markdown等。这篇文章我主要介绍一下常用的两类:钉钉群机......
  • Mysql tinyint长度为1时在java中被转化成boolean型(踩坑)
    资料参考链接1:https://www.cnblogs.com/joeylee/p/3878223.html资料参考链接2:https://blog.csdn.net/HD243608836/article/details/118197811目录背景线上事故1污染数据2类型转换异常原因解决方法.背景踩过两次tinyint的坑线上事故1污染数据问题背景tinyint(1)在j......
  • 线程和队列应用--消费者和生产者
    1、用一个队列存储商品2、创建一个专门生产商品的线程类,当商品数量少于50时,开始生产商品,每次生产200个商品,每生产一轮,暂停1s3、创建一个专门消费商品的线程类,当商品数量大于10时就开始消费,循环消费,每次消费3个,当商品数量少于10的时候,暂停2s    ......
  • 【HMS Core】Health Kit关于订阅消息的资讯
    【问题描述】1.数据跨N天同步,怎么去区分每一天的数据,开始时间和结束时间可以区分吗。会出现一天时间内有多段数据的情况出现吗2.华为健康APP自动同步的频率是多少,凌晨0点0分会把前一天的数据上云吗3.假设我可以在APP自动同步的瞬间点击同步,会不会出现数据重复问题4.针对运动数......
  • python操作mysql数据库
    Python操作mysql库python操作mysql数据库,需要使用第三库:pymysql一、mysql安装官网:https://www.mysql.com/二、安装pymysqlpipinstallPyMySql-ihttp://pypi.douban.com/simple/--trusted-hostpypi.douban.com三、使用代码演示:importpymysqlconn=pymysql.connec......
  • mysql 2023-04-09 23:59:59 999 为什么会展示为第二天
    在MySQL中,日期时间类型包括DATE、TIME、DATETIME、TIMESTAMP等。其中,DATETIME和TIMESTAMP类型可以表示具体的日期和时间,包含年、月、日、时、分、秒等信息。当使用DATETIME或TIMESTAMP存储日期时间值时,如果精度达到了秒级别以上,MySQL会进行四舍五入,将精度保留到秒级别......
  • ubuntu 修改mysql的大小写不敏感
    1.进入目录径:/etc/mysql/mysql.conf.d2.修改文件mysqld.cnf文件  [mysqld]  标签下方添加  lower_case_table_names=1注意:该配置会强制将表名改为小写,如果当前存在大写的表,请将大写的表改为小写再改配置,否则原来大写的表无法删除,小写的表名也无法创建或者修改完成后......