使用 mysql-replication python监听mysql binlog 实时同步数据
文章目录
- 使用 mysql-replication python监听mysql binlog 实时同步数据
- 前言
- 一、环境
- 二、安装与配置
- 1.首先安装mysql-replication
- 2.参数
- 3.配置数据库
- 4.读取binlog日志
- 总结
前言
数据库的基础信息需要频繁访问,需要存入redis 轮询存入需要占用资源,并且不是实时,使用mysql-replication可解决此问题。
一、环境
- mysql-replication0.23
- python3.7
- miniconda4.8.3 (https://blog.csdn.net/mtl1994/article/details/114968140)
二、安装与配置
1.首先安装mysql-replication
pip install mysql-replication
2.参数
BinLogStreamReader()参数
ctl_connection_settings:集群保存模式信息的连接设置
resume_stream:从位置或binlog的最新事件或旧的可用事件开始
log_file:设置复制开始日志文件
log_pos:设置复制开始日志pos(resume_stream应该为true)
auto_position:使用master_auto_position gtid设置位置
blocking:在流上读取被阻止
only_events:允许的事件数组
ignored_events:被忽略的事件数组
only_tables:包含要观看的表的数组(仅适用于binlog_format ROW)
ignored_tables:包含要跳过的表的数组
only_schemas:包含要观看的模式的数组
ignored_schemas:包含要跳过的模式的数组
freeze_schema:如果为true,则不支持ALTER TABLE。速度更快。
skip_to_timestamp:在达到指定的时间戳之前忽略所有事件。
report_slave:在SHOW SLAVE HOSTS中报告奴隶。
slave_uuid:在SHOW SLAVE HOSTS中报告slave_uuid。
fail_on_table_metadata_unavailable:如果我们无法获取有关row_events的表信息,应该引发异常
slave_heartbeat:(秒)主站应主动发送心跳连接。这也减少了复制恢复时GTID复制的流量(在许多事件在binlog中跳过的情况下)。请参阅mysql文档中的MASTER_HEARTBEAT_PERIOD以了解语义
3.配置数据库
show variables like 'log_bin';
#显示on
show variables like 'binlog_format';
#显示row
#如果不是 修改下 my.cnf 重启一下
4.读取binlog日志
#m.get_s() 封装的数据库对象 格式
#self.mysql_settings = {
# 'host': '127.0.0.1',
# 'port': 3306,
# 'user': 'root',
# 'passwd': 'root'
#}
ef main(r):
# 实例化binlog 流对象
stream = BinLogStreamReader(
connection_settings=m.get_s(),
server_id=1, # slave标识,唯一
blocking=True, # 阻塞等待后续事件
# 设定只监控写操作:增、删、改
only_schemas=['env'],
ignored_tables=['env_data_mon_log','env_data_his_log'],
freeze_schema=True,
only_events=[
DeleteRowsEvent,
UpdateRowsEvent,
WriteRowsEvent
]
)
for binlogevent in stream:
try:
binlogevent.dump() # 打印所有信息
for row in binlogevent.rows:
# 打印 库名 和 表名
event = {"schema": binlogevent.schema, "table": binlogevent.table}
print(event)
if isinstance(binlogevent, DeleteRowsEvent):
event["action"] = "delete"
event["data"] = row["values"]
elif isinstance(binlogevent, UpdateRowsEvent):
event["action"] = "update"
event["data"] = row["after_values"] # 注意这里不是values
elif isinstance(binlogevent, WriteRowsEvent):
event["action"] = "insert"
event["data"] = row["values"]
print(json.dumps(event, cls=DateEncoder))
sys.stdout.flush()
#存入redis队列 ,在创建一个消费者程序 把数据存入redis
r.lpush(config.sync.q_name,json.dumps(event, cls=DateEncoder))
except Exception as e:
traceback.print_exc()
# stream.close() # 如果使用阻塞模式,这行多余了
if __name__ == '__main__':
r = initRedis()
main(r)
"""
输出数据格式
{
"schema": "demo", # 数据库名
"table": "student", # 表名
"action": "update", # 动作 insert、delete、update
"data": { # 数据,里边包含所有字段
"id": 26,
"name": "haha",
"age": 34,
"update_time": "2019-06-06 16:59:06",
"display": 0
}
}
"""
总结
~~~