上期是讲逻辑复制,本期是通过PYTHON 来对逻辑复制中的配置参数,publication 定义, 打印不适合进行逻辑复制的表,打印没有在使用的复制槽,另外包含当前发布端和接收端两边的LSN对比。
以下是代码,对于逻辑复制中主要的监控点有
1 是不是存在复制槽不使用的情况
2 是不是存在主库和从库之间的复制延迟(异步)
3 当前库是不是存在不适合进行逻辑复制的表
4 当前库是不是有设置发布
#!/usr/bin/python3
import os
import sys
import psycopg2
import re
import subprocess
#检测当前PG是否具备进行逻辑复制的参数配置
def parameter():
conn = None
conn = psycopg2.connect(database="postgres",user="postgres",password="",host="localhost",port="5432")
conn_sub = psycopg2.connect(database="postgres",user="admin",password="admin",host="192.168.198.101",port="5432")
cur = conn.cursor()
cur.execute("""show wal_level;""")
rows = cur.fetchall()
for row in rows:
print("启用逻辑复制wal_level必须为logical")
print("________________")
print(row)
cur = conn.cursor()
cur.execute("""show max_worker_processes;""")
rows = cur.fetchall()
for row in rows:
print("启用逻辑复制,请注意max_worker_process的数字")
print("________________")
print(row)
cur = conn.cursor()
cur.execute("""show max_replication_slots""")
rows = cur.fetchall()
for row in rows:
print("启用逻辑复制,请注意复制槽的数字")
print("________________")
print(row)
cur = conn.cursor()
cur.execute("""show max_wal_senders;""")
rows = cur.fetchall()
for row in rows:
print("启用逻辑复制,请注意最大max_wal_sender数字")
print("________________")
print(row)
cur = conn.cursor()
cur.execute("""show max_logical_replication_workers;""")
rows = cur.fetchall()
for row in rows:
print("启用逻辑复制,请注意最大max_logical_replication_workers数字")
print("________________")
print(row) cur = conn.cursor()
cur.execute("""show max_sync_workers_per_subscription;""")
rows = cur.fetchall()
for row in rows:
print("启用逻辑复制,请注意最大max_sync_workers_per_subscription数字")
print("________________")
print(row)
cur = conn.cursor()
cur.execute("""select pubname,puballtables,pubinsert,pubupdate,pubdelete,pubtruncate from pg_publication;""")
rows = cur.fetchmany(100)
print("当前库publication定义发布信息")
print("-----------------------------")
print("发布定义名 是否对全库表进行发布定义 追踪插入 追踪更新 追踪删除 追踪truncate")
for row in rows:
print(row) print("----------------------------------------------")
cur = conn.cursor()
cur.execute("""select pubname,tablename from pg_publication_tables;""")
rows = cur.fetchmany(100)
print("当前库发布的数据表")
print("------------------")
print("发布定义名 发布表名")
for row in rows:
print(row)
print("----------------------------------------------")
cur = conn.cursor()
cur.execute("""select relname,relhasindex
from pg_catalog.pg_class as c
inner join pg_namespace as n on
(c.relnamespace = n.oid and n.nspname NOT IN ('information_schema', 'pg_catalog') and c.relkind='r') where c.relhasindex = 'f';""")
rows = cur.fetchmany(100)
print("当前库不适合建立逻辑复制表list")
print("_______________________________")
print("表名 无主键")
for row in rows:
print(row)
print("----------------------------------------------")
cur = conn.cursor()
cur.execute("""SELECT slot_name,slot_type,active FROM pg_replication_slots where active= 'f';""")
rows = cur.fetchmany(100)
print("注意当前没有被使用的复制槽,确认不使用请立即删除")
print("_______________________________")
print("复制槽名 复制槽类型 目前不在使用")
for row in rows:
print(row)
print("删除复制槽语句 select pg_drop_replication('复制槽名')")
print("------------------------------------------------------")
cur = conn.cursor()
cur.execute("""select sent_lsn,replay_lsn from pg_stat_replication;""")
rows = cur.fetchmany(100)
print("主库lsn信息")
print("_______________________________")
print("发送LSN 回执LSN")
for row in rows:
print(row)
print("------------------------------------------------------")
cur = conn_sub.cursor()
cur.execute("""select subname,received_lsn,latest_end_time::varchar(20) from pg_stat_subscription;""")
rows = cur.fetchmany(100)
print("查看目的端数据接收状态")
print("-----------------------")
print("接收端名 接收到的LSN 最后接收到LSN时间")
for row in rows:
print(row)
conn.commit()
conn_sub.commit()
conn_sub.close
conn.close
if __name__ == "__main__":
parameter()
下图为执行后的效果
下面有两个地方需要注意
1 设定subscription 的连接,需要在接收端的位置设置用户密码
2 可以将连接设置为及时输入的模式,会更方便,这里没有写死了。
另逻辑复制中最怕的是接收端数据出现问题,导致复制停止,目前需要通过日志来查询出现的问题。程序里面并未有及时分析日志的部分。
标签:rows,PostgreSQL,cur,python,复制,print,conn,row From: https://blog.51cto.com/u_14150796/6528070