首页 > 数据库 >通过Python实现MySQL和PG数据比对

通过Python实现MySQL和PG数据比对

时间:2022-11-23 14:01:39浏览次数:37  
标签:Python MySQL cursor pg chksum mysql configs id PG

生产上,有个需要从MySQL异构复制数据到PG中的需求。 数据同步组件用的是go-mysql-postgres (两位前同事基于社区开源的go-mysql-elasticsearch上做的PG功能补丁)。

目前测试环境异构数据复制已经跑起来了,但是还需要做下二者间的数据校验。 简单写了个python脚本,如下:


run.py  内容如下


# pip3 install psycopg2==2.9.4
# pip3 install mysql-connector-python==8.0.31

import mysql.connector
import psycopg2
import time
import configs
import hashlib

# TODO 待设计
# column_list = ["id","name","ctime","utime"] # 需要校验的列。 当前的设计方法是直接select * ,这需要确保2端数据源的列名一致


start_time = time.time()

mydb = mysql.connector.connect(
host=configs.mysql_host,
port=configs.mysql_port,
user=configs.mysql_user,
passwd=configs.mysql_pass,
)

mysql_cursor = mydb.cursor()

# 获取当前最小 最大的id,用于后续的循环比对
get_min_max_sql = 'SELECT min(id),max(id),count(*) from ' + configs.mysql_db + '.' + configs.mysql_tb + ';'
mysql_cursor.execute(get_min_max_sql)
pk_range_result = mysql_cursor.fetchall()

for x in pk_range_result:
min_id = x[0]
max_id = x[1]
count = x[2]

print(f"{configs.mysql_db}.{configs.mysql_tb} 最小id {min_id} , 最大id {max_id} , 总记录数 {count}")

start_id = min_id
stop_id = start_id + configs.step

# 记录差异行数量
diff_count = 0

while stop_id < max_id + configs.step: # 加一个步长进去,防止因为step过大,导致有遗漏的id
# 拼接出比对的SQL
chksum_sql_4mysql = 'SELECT * FROM ' + configs.mysql_db + '.' + configs.mysql_tb + ' WHERE id >=' + str(
start_id) + ' AND id < ' + str(stop_id) + ' ORDER BY id ASC;'

chksum_sql_4pg = 'SELECT * FROM ' + configs.pg_schema + '.' + configs.pg_tb + ' WHERE id >=' + str(
start_id) + ' AND id < ' + str(stop_id) + ' ORDER BY id ASC;'

mysql_cursor.execute(chksum_sql_4mysql)
mysql_chksum_result = mysql_cursor.fetchall()

mysql_chksum = dict()
for x in mysql_chksum_result:
id = x[0]
chk_sum = hashlib.md5(str(x).replace(' ', '').encode()).hexdigest()
mysql_chksum[id] = chk_sum
# print(f"MySQL校验和 {mysql_chksum}")

# 连接PG进行数据校验
pg_conn = psycopg2.connect(host=configs.pg_host,
port=configs.pg_port,
user=configs.pg_user,
password=configs.pg_pass,
database=configs.pg_db
)
pg_cursor = pg_conn.cursor()
pg_cursor.execute(chksum_sql_4pg)

pg_chksum_result = pg_cursor.fetchall()

pg_chksum = dict()
for x in pg_chksum_result:
id = x[0]
chk_sum = hashlib.md5(str(x).replace(' ', '').encode()).hexdigest()
pg_chksum[id] = chk_sum
# print(f"PG校验和 {pg_chksum}")

# 通过集合的比较,快速找出不一致的主键id
if mysql_chksum != pg_chksum:
differ = set(mysql_chksum.items()) ^ set(pg_chksum.items())
s1 = set()
for i in differ:
s1.add(i[0])
print(s1)
with open('checksum_diff.log', 'a+') as f:
f.write(str(s1, ) + '\n')

diff_count = diff_count + len(s1)

start_id = stop_id
stop_id = stop_id + configs.step

stop_time = time.time()
time_dur = stop_time - start_time
print(f"比对 {count}条记录,总差异条数 {diff_count},耗时 {time_dur} 秒")


configs.py  内容如下

# MySQL数据源的信息
mysql_host = '192.168.31.181'
mysql_port = '3306'
mysql_user = 'dts'
mysql_pass = 'dts'
mysql_db = 'sbtest'
mysql_tb = 'sbtest1'

# PostgreSQL目标库的信息
pg_host = '192.168.31.182'
pg_port = '5432'
pg_user = 'dts'
pg_pass = 'dts'
pg_db = 'sbtest'
pg_schema = 'public'
pg_tb = 'sbtest1'


# 每次遍历的记录数
step = 2000 # 步长。 1表示逐条检测。这个值建议在1000-2000之间。

 

运行效果

通过Python实现MySQL和PG数据比对_mysql


9k记录数,在不同step下的耗时比对:

step = 100   18.5s
step = 500 5s
step = 1000 3.7s
step = 2000 3.3s




标签:Python,MySQL,cursor,pg,chksum,mysql,configs,id,PG
From: https://blog.51cto.com/lee90/5881189

相关文章

  • mysql破解root密码
    #设置免密登录echoskip-grant-tables>>/etc/my.cnfsystemctlrestartmysqld#设置密码为空mysql-e'usemysql;updateusersetauthentication_string=""where......
  • python编程(改进的线程同步方式)
      在实际代码开发中,gui的代码并不好写。因为不管是mvc、还是mvp都有一定的局限性。那么,这个时候,我就在想,是不是可以用mvp+reactor的方法进行gui的改进操作呢?首先app编写......
  • python编程(orm原理和实践)
    就给出自己对orm的理解。之前廖雪峰给出的code,大家可以通过​​地址​​下载的到。1、orm的使用方法    一般我们使用orm都是这么写代码的,classUser(Model):id=I......
  • python编程(gevent入门)
        大家都知道python脚本执行的时候不是很快,特别是python下面的多线程机制,长久以来一直被大家所诟病。所以,很多同学都在思考python下面有没有什么方法可以让python执行......
  • python编程(巧用装饰器)
        以前没有用过装饰器,也不知道它有什么用。直到最近写了一个log函数,在直到原来python的装饰器可以这么方便。1、原来debug消息的写法    假设有一个process函数,......
  • python编程(类变量和实例变量)
        关于类变量和实例变量,一直不是很清楚。所以,想做几个实验,彻底解决这个问题。为此,我们设计了三个实验。1、直接引用类变量importosimportsysclassA():data=1......
  • python编程(python和c相互调用)
      通常为了扩展python的功能,我们需要将c库移植到python上面。python和c调用一般分成两种情况,一种是python调用c,这种情况最为普遍,也比较简单。另外一种就是c调用python,这......
  • 安装mysql服务添加到systemctl服务当中
    1、https://www.cnblogs.com/dahuo/p/16014689.html把mysqld添加至systemctl进行管理 #复制过去后,会sytemctl会自动识别的 cp/usr/local/mysql/support-f......
  • autojs git判断差异python发送的手机上
    #!usr/bin/python#-*-coding:utf-8-*-#根据git变化,将变化的文件推送到手机上importosimportsysprint("文件编码格式:"+sys.getdefaultencoding())#############......
  • 阿瑟的爱心,python+django+html
    最近女朋友过生日,非要一个阿瑟的爱心,不知道这所谓的浪漫到底浪漫到哪里了,还是被脑残剧看坏了脑子(嘘!!)反正就是一顿操作吧,使用python+django的框架,开发了一个网页,最主要的还......