首页 > 数据库 >PostgreSQL流复制主从监控和自动故障转移的轻量级实现

PostgreSQL流复制主从监控和自动故障转移的轻量级实现

时间:2024-11-11 18:57:26浏览次数:5  
标签:node PostgreSQL standby db primary 节点 config 主从 轻量级

如何实现PostgreSQL的高可用,之前研究过repmgr以及pg_auto_failover,起作用都是起到主节点故障时,实现“自动故障转移”的目的。
但是repmgr以及pg_auto_failover得缺点是对数据库侵入过多,需要在监控的数据库内部进行一系列的配置操作,同时需要启动第三方服务实现节点的可用性监控,这又引入了额外的不确定性因素。
如果是单纯地为了故障转移,repmgr以及pg_auto_failover都显得过于沉重,于是快速尝试了一个轻量级的实现方案

实现思路

主要思路如下:

1,自动故障转移:该方式在正常情况下,作为一个服务持续监控主从复制状态
while 1:
	轮训PostgreSQL流复制的状态:
	if 如果主节点不可达:
  	  if 判断从节点是否可达:
    	if 从节点状态是是in_recovery:
			promote 从节点,从节点接管读写服务
            


	
2,手动故障转移:如果是主从节点都正常的情况下,为了演练灾备切换,人工故障转移
if 判断从节点是否可达:
    if 从节点状态是是in_recovery:
	    promote 从节点,从节点接管读写服务
#把原始主节点作为从节点加入集群中运行
if 如果主节点不可达:
	if 从节点状态是不是in_recovery:
		1,关闭原始主库
		2,尝试修复时间线
		3,尝试以standby的模式启动主库

优点

快速尝试了一个轻量级的实现方案,优点如下:
1,不引入或者依赖于任何第三方中间件,可以实现多组流复制集群的监控和故障转移。
2,不对现有PostgreSQL的复制环境做任何修改和侵入,以“第三方”的视角来监控现有PostgreSQL流复制,可以自行实现PostgreSQL流复制状态监控和故障转移。
3,支持在主节点故障时实现自动故障转移,或者手动故障转移视线灾备切换演练。
4,简单轻量,可以自定义故障转移的模式以及自定义日志等信息,通知信息等  

详细实现

详细实现如下,目前在两台EC2上测试环境上,一台腾讯云,一台阿里云上搭建PostgreSQL流复制。经测试:可以一键快速实现主从切换,或者连续故障转移(A节点切换到B节点,B节点切换到A节点,需要主从连接信息)
import psycopg2
import time
import paramiko


# 连接测试
def conn_to_postgres(db_config):
    try:
        conn = psycopg2.connect(**db_config)
        conn.close()
        return True
    except Exception as e:
        print(f"Error connecting to master: {e}")
        return False


# 判断节点身份
def is_postgresql_recovery(db_config):
    try:
        with psycopg2.connect(**db_config) as conn:
            with conn.cursor() as cur:
                cur.execute("SELECT pg_is_in_recovery();")
                in_recovery = cur.fetchone()[0]
        return in_recovery
    except Exception as e:
        print(f"Error connecting to master: {e}")
        return False


# 探测节点是否可达
def is_postgresql_reachable(db_config,retry_times):
    # 这里仅仅判断主节点是否可连通
    # 判断节点是否可达的时候,作为仲裁方,可以增加其他判断,比如当前节点是否能ping的通DNS服务器等,标明当前节点的判断是没有问题的
    while retry_times > 0:
        if not conn_to_postgres(db_config):
            print('the postgres cannot reachable,retrying......')
            time.sleep(10)
            retry_times = retry_times -1
        else:
            return True
    else:
        return False


def ssh_conn(host, username, password, port):
    ssh = paramiko.SSHClient()
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    try:
        ssh.connect(hostname=host, port=port, username=username, password=password)
        return ssh
    except Exception as e:
        return None, str(e)


def create_replication_slot_if_not_exists(db_config, slot_name):
    try:
        with psycopg2.connect(**db_config) as conn:
            with conn.cursor() as cur:
                cur.execute("SELECT slot_name FROM pg_replication_slots WHERE slot_name = '{0}';".format(slot_name))
                var_slot_name = cur.fetchall()
                if not var_slot_name:
                    cur.execute("SELECT * FROM pg_create_physical_replication_slot('{0}');".format(slot_name))
                    print(f"Replication slot '{slot_name}' created.")
                else:
                    print(f"Replication slot '{slot_name}' already exists.")
    except Exception as e:
        print(f"Error connecting to master: {e}")
        return False


def promote_standby_to_primary(db_config):
    try:
        with psycopg2.connect(**db_config) as conn:
            conn.autocommit = True 
            with conn.cursor() as cur:
                cur.execute("SELECT pg_promote(true);")
        print("{0} Standby promoted to primary successfully.".format(db_config['host']))
    except Exception as e:
        print("{0} Standby promoted to primary failed : {1}".format(db_config[0],e))


# 时间线修复
def run_pg_rewind(primary_node, standby_node):
    # 新的主服务器的连接信息(pg_rewind需要这些信息)
    primary_db_config = "host={0} port={1} user={2} password={3}".format(primary_node['db_config']['host'],
                                                                         primary_node['db_config']['port'],
                                                                         primary_node['db_config']['user'],
                                                                         primary_node['db_config']['password'])
    # 构建pg_rewind命令
    pg_rewind_cmd = ("sudo -u postgres {0}/pg_rewind --target-pgdata={1} --source-server='host={2} port={3} user={4} dbname='postgres' password={5}'"
                     .format(standby_node['pg_base_dir'],
                             standby_node['pg_data_dir'],
                             primary_node['db_config']['host'],
                             primary_node['db_config']['port'],
                             primary_node['db_config']['user'],
                             primary_node['db_config']['password'] ))
    ssh = ssh_conn(host=standby_node['host'], username=standby_node['user_name'], port=standby_node['port'], password=standby_node['password'])

    try:
        stdin, stdout, stderr = ssh.exec_command('sudo systemctl stop postgresql9000')
        output = stdout.read().decode('utf-8')
        error = stderr.read().decode('utf-8')
        if stderr.channel.recv_exit_status() != 0:
            raise Exception(f"Failed to stop postgresql9000")
            print(error)

        print('######begin to rewind standby node')
        print(pg_rewind_cmd)
        stdin, stdout, stderr = ssh.exec_command(pg_rewind_cmd)
        output = stdout.read().decode('utf-8')
        error = stderr.read().decode('utf-8')
        if stderr.channel.recv_exit_status() != 0:
            print('standby node pg_rewind failed')
            raise Exception(f"{0} Failed to rewind commend".format(standby_node['db_config']['host']))
        print('######standby node pg_rewind successfully')
        return True
    except Exception as err:
        print('pg_rewind failed ' + str(err))
        return False
    finally:
        ssh.close()


def startup_as_replica(primary_node,standby_node):
    standby_auto_conf_path = '{0}/postgresql.auto.conf'.format(standby_node['pg_data_dir'])
    standby_signal_path = '{0}/standby.signal'.format(standby_node['pg_data_dir'])

    ssh = ssh_conn(host=standby_node['host'], username=standby_node['user_name'], port=standby_node['port'], password=standby_node['password'])

    # 要写入postgresql.auto.conf的内容(模版)
    auto_conf_content = """
        primary_conninfo = 'user={2} password=''{3}'' channel_binding=prefer host={0} port={1} sslmode=prefer sslcompression=0 sslcertmode=allow sslsni=1 ssl_min_protocol_version=TLSv1.2 gssencmode=disable krbsrvname=postgres gssdelegation=0 target_session_attrs=any load_balance_hosts=disable'
        primary_slot_name = '{4}'
        """.format(primary_node['db_config']['host'],
                   primary_node['db_config']['port'],
                   primary_node['repl_user'],
                   primary_node['repl_password'],
                   primary_node['slot_name']).lstrip()

    # 要创建的standby.signal文件(空文件即可,表示该节点为备用节点)
    try:

        stdin, stdout, stderr = ssh.exec_command('sudo systemctl stop postgresql9000')
        output = stdout.read().decode('utf-8')
        error = stderr.read().decode('utf-8')
        if stderr.channel.recv_exit_status() != 0:
            raise Exception(f"Failed to stop postgresql9000")
            print(error)

        try:
            # 创建standby.signal文件(如果尚不存在)
            print('###### {0} touch {1}'.format(standby_node['host'],standby_signal_path))
            stdin, stdout, stderr = ssh.exec_command(f'sudo -u postgres touch {standby_signal_path}')
            output = stdout.read().decode('utf-8')
            error = stderr.read().decode('utf-8')
            # 检查命令是否成功执行
            if stderr.channel.recv_exit_status() != 0:
                raise Exception(f"Failed to create {standby_signal_path}: {stderr.read().decode()}")

            print('###### {0} touch {1}'.format(standby_node['host'], standby_auto_conf_path))
            stdin, stdout, stderr = ssh.exec_command(f'sudo -u postgres touch {standby_auto_conf_path}')
            stdin, stdout, stderr = ssh.exec_command('''sudo  echo "{0}"  > {1}'''.format(auto_conf_content,standby_auto_conf_path))
            output = stdout.read().decode('utf-8')
            error = stderr.read().decode('utf-8')
            # 检查命令是否成功执行
            if stderr.channel.recv_exit_status() != 0:
                print(error)
                raise Exception(f"Failed to create {standby_signal_path}: {stderr.read().decode()}")
        except Exception as err:
            print(err)

        stdin, stdout, stderr = ssh.exec_command('sudo systemctl restart postgresql9000')
        output = stdout.read().decode('utf-8')
        error = stderr.read().decode('utf-8')
        # 检查命令是否成功执行
        if stderr.channel.recv_exit_status() != 0:
            print(error)
            raise Exception(f"{0} restart postgresql failed".format(standby_node['host']))
    finally:
        ssh.close()


# 人工故障转移:在主节点可达的情况,把主节点作为从节点运行,可以是灾备切换的预演
def execute_failover(primary_node,standby_node):
    print('promote 从节点为主节点')
    # 判断standby节点是否可达,如果不可达,重试5次
    if is_postgresql_reachable(standby_node['db_config'],retry_times=5):
        # 判断standby节点是否处于recovery模式
        if is_postgresql_recovery(standby_node['db_config']):
            # 创建复制槽,为从节点复制做准备
            create_replication_slot_if_not_exists(standby_node['db_config'], standby_node['slot_name'])
            # promote standby节点
            promote_standby_to_primary(standby_node['db_config'])
        else:
            print('当前节点非recovery模式,不可执行promote')

    # 将旧的主节点作为从节点加入新的主节点复制
    # 判断原主节点是否可达
    print('将原始主节点,重新以从节点身份加入复制集群')
    if is_postgresql_reachable(primary_node['db_config'],retry_times=5):
        # 执行pg_rewind修复原始主节点的时间线,请注意:此时从节点提升为主节点,主节点已经变为从节点,所以需要注意节点身份的变化和参数的变化
        pg_rewind_result = run_pg_rewind(primary_node=standby_node,standby_node=primary_node)
        if pg_rewind_result:
            # 注意此时主从节点身份已经变换,所以这里参数也变化,原来的从节点已经成为主节点,所以注意参数的交换
            startup_as_replica(primary_node=standby_node, standby_node=primary_node)


# 自动检测和故障转移
def auto_failover(primary_node,standby_node):
    while True:
        if not is_postgresql_reachable(primary_node['db_config'], retry_times=5):
            # 告警通知,说明当前配置的主节点故障,即将开始主从切换
            execute_failover(primary_node,standby_node)
            # 执行完切换之后,进行写日志,邮件/短信告警通知等
            # 因为主从身份发生了变化,不再对当前的主从配置进行监控,应该第一时间人工介入:1,确认原始主节点是否正常,2,原始主节点是否以从节点的身份加入集群
            exit(0)
        # 定期检查间隔
        time.sleep(15)


if __name__ == "__main__":
    
    # 数据库连接配置
    primary_node  = {
        'host':'***.***.***.***',                                #主机名
        'user_name':"root",                                      #系统用户名
        'password': "******",                                    #系统密码
        'port':22,                                               #ssh端口号
        'pg_base_dir':'/usr/local/pgsql16/server/bin',           #PostgreSQL Server路径
        'pg_data_dir':'/usr/local/pgsql16/pg9000/data',          #PostgreSQL 数据路径
        'repl_user':'replica_user',                              #流复制用户名
        'repl_password':'******',                                #流复制用户密码
        'slot_name': 'pgstandby_slave01',                        #流复制slot名字
        'db_config': {                                           #数据库配置
            'host': '***.***.***.***',
            'user': 'postgres',
            'password': '******',
            'dbname': 'postgres',
            'port': '******'
        },
    }

    standby_node1 = {
        'host':'***.***.***.***',
        'user_name':"root",
        'password': "******",
        'port':22,
        'pg_base_dir':'/usr/local/pgsql16/server/bin',
        'pg_data_dir':'/usr/local/pgsql16/pg9000/data',
        'repl_user':'replica_user',
        'repl_password':'******',
        'slot_name': 'pgstandby_slave01',
        'db_config': {
            'host': '***.***.***.***',
            'user': 'postgres',
            'password': '******',
            'dbname': 'postgres',
            'port': '******'
        },
    }

    # 手动故障转移,主从节点以及复制均正常的情况下,实现参数里的主从节点交换身份
    execute_failover(primary_node=primary_node, standby_node=standby_node1)

    # 单独的时间线修复操作
    #pg_rewind_result = run_pg_rewind(primary_node=standby_node1, standby_node=primary_node)

    # 单独的讲一个节点作为从节点运行
    # 注意此时主从节点身份已经变换,所以这里参数也变化,原来的从节点已经成为主节点,所以注意参数的交换
    # startup_as_replica(primary_node=standby_node1, standby_node=primary_node)

 

待改进

1,用户名密码均已明文方式写在主机信息中
2,pg_rewind的时候需要重启服务器,这里把PostgreSQL的服务写死了,笔者的测试环境PostgreSQL服务名是postgresql9000
3,如果是自动切换模式,自动故障转移后,尚未修改主从配置信息,此时节点的身份信息还是切换前的
4,判断节点是否正常的逻辑中,仅仅通过是否可以连接来实现,如果监控服务本身无法与被监控节点通信,可能会出现误判的情况

 

标签:node,PostgreSQL,standby,db,primary,节点,config,主从,轻量级
From: https://www.cnblogs.com/wy123/p/18540311

相关文章

  • MySQL主从复制
    MySQL主从复制  概要  随着业务的增长,一台数据服务器已经满足不了需求了,负载过重。这个时候就需要减压了,实现负载均衡读写分离,一主一丛或一主多从。  主服务器只负责写,而从服务器只负责读,从而提高了效率减轻压力。  主从复制可以分为:  主从同步:当用户写数据......
  • Rocky9系统安装PostgreSQL
    官网https://www.postgresql.org/环境查看安装登录官网根据平台选择帮助文档sudodnfinstall-yhttps://download.postgresql.org/pub/repos/yum/reporpms/EL-9-x86_64/pgdg-redhat-repo-latest.noarch.rpmsudodnf-qymoduledisablepostgresqlsudodnfinst......
  • 因为mysql 8新的认证插件导致主从复制的IO线程失败
    1、错误信息Last_IO_Error:errorconnectingtomaster'[email protected]:3306'-retry-time:60retries:1message:Authenticationplugin'caching_sha2_password'reportederror:Authenticationrequiressecureconnection.2、在主库检查复制用户的pl......
  • PostgreSQL configure: error: readline library not found
    前言安装PostgreSQL时报错,以下复制代码configure:error:readlinelibrarynotfoundIfyouhavereadlinealreadyinstalled,seeconfig.logfordetailsonthefailure.Itispossiblethecompilerisn'tlookingintheproperdirectory.Use--without-readline......
  • postgresql事务与oracle中的事务差异
    事务事务ID及回卷参见postgresql中的事务回卷原理及预防措施。子事务(事务处理:概念与技术4.7)  子事务具有ACI特性,但是不具有D特性。只会在主事务提交时,才会提交,无法单独提交。pg不支持子事务。xact保存点保存点是不支持子事务/嵌套事务时的折中实现,但它是ANSISQL......
  • PostgreSQL 安装 POSTGRES_FDW
    PostgreSQL安装POSTGRES_FDW插件postgres_fdw模块提供外部数据包装器postgres_fdw它可以用于访问存储在外部PostgreSQL服务器中的数据。使用postgres_fdw访问外部数据需要做以下几点准备:1、使用CREATEextension安装postgres_fdw扩展2、使用createserver......
  • 【数据库系列】postgresql链接详解
    ......
  • YoloV10改进策略:上采样改进|CARAFE,轻量级上采样|即插即用|附改进方法+代码
    论文介绍CARAFE模块概述:本文介绍了一种名为CARAFE(Content-AwareReAssemblyofFEatures)的模块,它是一种用于特征上采样的新方法。应用场景:CARAFE模块旨在改进图像处理和计算机视觉任务中的上采样过程,特别适用于目标检测、实例分割、语义分割和图像修复等任务。目标:通过......
  • 有哪些免费的轻量级在线 CRM 系统?6款CRM系统盘点
    对于许多中小型企业和初创公司而言,如何选择一款既能满足日常需求又具性价比的CRM系统很重要。通常来说,系统的成本是企业选型过程中不可忽视的因素,但在这么多的CRM系统中,各类CRM系统的功能与价格对比真的很容易让人眼花缭乱。本文将盘6款值得关注的免费轻量级在线CRM系统,包括简道......
  • PostgreSQL技术大讲堂 - 第71讲:PostgreSQL 17 版本升级
     PostgreSQL技术大讲堂-第71讲,主题:PostgreSQL17版本升级讲课内容:PostgreSQL17版本升级  0、升级前准备工作  1、介绍小版本升级方式(pg12.2-to-pg12.20)  2、介绍大版本升级方式(pg12-to-pg17)  3、升级后验证   PostgreSQL版本更新很快,几乎......