首页 > 其他分享 >Patroni中对pg的重启、停止和启动详解

Patroni中对pg的重启、停止和启动详解

时间:2024-12-13 09:01:44浏览次数:12  
标签:11 00 23 self stop Patroni 2024 详解 pg

Patroni中对pg的重启、停止和启动详解

对于pg的重启、停止和启动的操作,都在/postgres/__init__.py文件中定义了相关的函数,都是调用其中的函数进行使用。

1. 停止

1.1 stop函数

重启的函数为stop。其代码如下所示:

    def stop(self, mode: str = 'fast', block_callbacks: bool = False, checkpoint: Optional[bool] = None,
             on_safepoint: Optional[Callable[..., Any]] = None, on_shutdown: Optional[Callable[[int, int], Any]] = None,
             before_shutdown: Optional[Callable[..., Any]] = None, stop_timeout: Optional[int] = None) -> bool:
        if checkpoint is None:
            checkpoint = False if mode == 'immediate' else True

        success, pg_signaled = self._do_stop(mode, block_callbacks, checkpoint, on_safepoint,
                                             on_shutdown, before_shutdown, stop_timeout)
        if success:
            if not block_callbacks:
                self.set_state('stopped')
                if pg_signaled:
                    self.call_nowait(CallbackAction.ON_STOP)
        else:
            logger.warning('pg_ctl stop failed')
            self.set_state('stop failed')
        return success

这个函数调用内部函数_do_stop来执行停止操作,如果成功停止则异步执行停止后的回调操作,失败也会进行错误处理,最后返回停止结果。

  • mode:停止模式。
    • 'fast':快速停止,通常会强制关闭连接。
    • 'immediate':立即停止,意味着数据库进程会被立即终止,不会进行正常的清理操作。
  • block_callbacks:控制是否在停止过程中阻止回调函数的执行。如果设置为 True,则在停止过程中不会执行回调函数。
  • on_safepoint:当没有用户连接的情况下(即所有用户的数据库会话都结束),该回调函数会被调用。(安全停止)
  • on_shutdown:当数据库已经进入关闭状态并且 pg_controldata 开始报告数据库集群已经关闭时,该回调函数会被调用。
  • before_shutdown:在数据库执行 CHECKPOINT 操作后、调用 pg_ctl stop 之前,执行此回调。

sucess

  • True:表示 PostgreSQL 主进程已经不存在,即已停止或退出。
  • False:表示发送停止信号失败(如权限问题或其他错误)。
  • None:表示停止信号已经成功发送,等待进程停止或退出。

1.2 _do_stop函数

_do_stop函数如下所示:

    def _do_stop(self, mode: str, block_callbacks: bool, checkpoint: bool,
                 on_safepoint: Optional[Callable[..., Any]], on_shutdown: Optional[Callable[[int, int], Any]],
                 before_shutdown: Optional[Callable[..., Any]], stop_timeout: Optional[int]) -> Tuple[bool, bool]:
        postmaster = self.is_running()
        if not postmaster:
            if on_safepoint:
                on_safepoint()
            return True, False

        if checkpoint and not self.is_starting():
            self.checkpoint(timeout=stop_timeout)

        if not block_callbacks:
            self.set_state('stopping')

        # invoke user-directed before stop script
        self._before_stop()

        if before_shutdown:
            before_shutdown()

        # Send signal to postmaster to stop
        success = postmaster.signal_stop(mode, self.pgcommand('pg_ctl'))
        if success is not None:
            if success and on_safepoint:
                on_safepoint()
            return success, True

        # We can skip safepoint detection if we don't have a callback
        if on_safepoint:
            # Wait for our connection to terminate so we can be sure that no new connections are being initiated
            self._wait_for_connection_close(postmaster)
            postmaster.wait_for_user_backends_to_close(stop_timeout)
            on_safepoint()

        if on_shutdown and mode in ('fast', 'smart'):
            i = 0
            # Wait for pg_controldata `Database cluster state:` to change to "shut down"
            while postmaster.is_running():
                data = self.controldata()
                if data.get('Database cluster state', '') == 'shut down':
                    checkpoint_locations = self._checkpoint_locations_from_controldata(data)
                    if checkpoint_locations:
                        on_shutdown(*checkpoint_locations)
                    break
                elif data.get('Database cluster state', '').startswith('shut down'):  # shut down in recovery
                    break
                elif stop_timeout and i >= stop_timeout:
                    stop_timeout = 0
                    break
                time.sleep(STOP_POLLING_INTERVAL)
                i += STOP_POLLING_INTERVAL

        try:
            postmaster.wait(timeout=stop_timeout)
        except TimeoutExpired:
            logger.warning("Timeout during postmaster stop, aborting Postgres.")
            if not self.terminate_postmaster(postmaster, mode, stop_timeout):
                postmaster.wait()

        return True, True

在这个函数中具体停止是调用signal_stop函数来停止,在这个函数中会根据操作系统的不同来使用不同的方式停止,unix系统会发送停止信号,其他系统会使用pg_ctl命令。如果发送成功不会返回值,就是默认为None,这时候就会往后走去执行on_shutdown回调函数。

sucess

  • True:表示 PostgreSQL 主进程已经不存在,即已停止或退出。
  • False:表示发送停止信号失败(如权限问题或其他错误)。
  • None:表示停止信号已经成功发送,等待进程停止或退出。

pg_signaled

  • True:发送了停止信号。
  • False:没有发送停止信号。

1.3 停止流程图

数据库的停止流程如下所示:

2. 启动

2.1 start函数

重启的函数为start。其代码如下所示:

    def start(self, timeout: Optional[float] = None, task: Optional[CriticalTask] = None,
              block_callbacks: bool = False, role: Optional[str] = None,
              after_start: Optional[Callable[..., Any]] = None) -> Optional[bool]:
        self.connection_pool.close()

        if self.is_running():
            logger.error('Cannot start PostgreSQL because one is already running.')
            self.set_state('starting')
            return True

        if not block_callbacks:
            self.__cb_pending = CallbackAction.ON_START

        self.set_role(role or self.get_postgres_role_from_data_directory())

        self.set_state('starting')
        self.set_pending_restart_reason(CaseInsensitiveDict())

        try:
            if not self.ensure_major_version_is_known():
                return None
            configuration = self.config.effective_configuration
        except Exception:
            return None

        self.config.check_directories()
        self.config.write_postgresql_conf(configuration)
        self.config.resolve_connection_addresses()
        self.config.replace_pg_hba()
        self.config.replace_pg_ident()

        options = ['--{0}={1}'.format(p, configuration[p]) for p in self.config.CMDLINE_OPTIONS
                   if p in configuration and p not in ('wal_keep_segments', 'wal_keep_size')]

        if self.cancellable.is_cancelled:
            return False

        with task or null_context():
            if task and task.is_cancelled:
                logger.info("PostgreSQL start cancelled.")
                return False

            self._postmaster_proc = PostmasterProcess.start(self.pgcommand('postgres'),
                                                            self._data_dir,
                                                            self.config.postgresql_conf,
                                                            options)

            if task:
                task.complete(self._postmaster_proc)

        start_timeout = timeout
        if not start_timeout:
            try:
                start_timeout = float(self.config.get('pg_ctl_timeout', 60) or 0)
            except ValueError:
                start_timeout = 60

        # We want postmaster to open ports before we continue
        if not self._postmaster_proc or not self.wait_for_port_open(self._postmaster_proc, start_timeout):
            return False

        ret = self.wait_for_startup(start_timeout)
        if ret is not None:
            if ret and after_start:
                after_start()
            return ret
        elif timeout is not None:
            return False
        else:
            return None
  • block_callbacks: 一个布尔值,用于控制是否阻止回调的执行。在重启过程中,通常会阻止启动或停止回调的执行。
  • after_start: 启动后执行的回调函数,用于在数据库启动完成后进行一些额外操作。

返回值:

  • 返回 True 如果启动已成功并且 PostgreSQL 端口已打开。
  • 返回 False 如果启动失败。
  • 返回 None 如果 PostgreSQL 仍在启动过程中。

在这个函数中调用start函数来实现命令行启动数据库。实现在一个指定的环境中启动 PostgreSQL 数据库,并确保能够正确处理与现有进程、postmaster.pid 文件以及多进程环境相关的复杂性。通过使用 multiprocessing 来启动 PostgreSQL,可以避免直接使用 subprocess.Popen,从而更好地管理 PostgreSQL 进程的生命周期。构造的启动命令:

/usr/local/fbase/13/bin/postgres -D /data/fbase/fbdata --config-file=/data/fbase/fbdata/postgresql.conf --listen_addresses=0.0.0.0 --port=8432 --cluster_name=demo --wal_level=logical --hot_standby=on --max_connections=100 --max_wal_senders=10 --max_prepared_transactions=200 --max_locks_per_transaction=64 --track_commit_timestamp=off --max_replication_slots=10 --max_worker_processes=8 --wal_log_hints=on

2.2 启动流程图

数据库的启动流程如下所示:

3. 重启

3.1 restart函数

重启的函数为restart。其代码如下所示:

    def restart(self, timeout: Optional[float] = None, task: Optional[CriticalTask] = None,
                block_callbacks: bool = False, role: Optional[str] = None,
                before_shutdown: Optional[Callable[..., Any]] = None,
                after_start: Optional[Callable[..., Any]] = None) -> Optional[bool]:
        self.set_state('restarting')
        if not block_callbacks:
            self.__cb_pending = CallbackAction.ON_RESTART
        ret = self.stop(block_callbacks=True, before_shutdown=before_shutdown)\
            and self.start(timeout, task, True, role, after_start)
        if not ret and not self.is_starting():
            self.set_state('restart failed ({0})'.format(self.state))
        return ret

对于重启来说,即调用了数据库停止(stop)和数据库启动(start)函数来完成重启操作。并且在调用stop函数时传递停止前回调函数

before_shutdown,在调用start函数时传递启动成功后回调函数after_start

  • block_callbacks:控制是否阻止回调函数的执行。

    • False:在重启完成后,系统会触发与重启相关的回调操作。
    • True:回调动作不会被触发,直到重启过程完成且不再阻止回调。
  • before_shutdown:停止前执行的回调函数。

  • after_start:启动完成后执行的回调函数。

3.2 重启流程图

重启的具体流程如下(stop和start的流程见上两章):

画板

文件下载

3.3 重启日志

2024-11-23 00:38:10,217 INFO: no action. I am (pgsql1), the leader with the lock
2024-11-23 00:38:20,243 INFO: Lock owner: pgsql1; I am pgsql1
2024-11-23 00:38:20,305 INFO: updated leader lock during restart
2024-11-23 00:39:08.488 CST [32328] LOG:  received fast shutdown request
2024-11-23 00:39:08.490 CST [32328] LOG:  aborting any active transactions
2024-11-23 00:39:08.491 CST [32396] FATAL:  terminating connection due to administrator command
2024-11-23 00:39:08.494 CST [32328] LOG:  background worker "logical replication launcher" (PID 32408) exited with exit code 1
2024-11-23 00:39:08.495 CST [32330] LOG:  shutting down
2024-11-23 00:39:08.531 CST [32328] LOG:  database system is shut down
2024-11-23 00:39:10,244 INFO: Lock owner: pgsql1; I am pgsql1
2024-11-23 00:39:10,246 INFO: updated leader lock during restart
2024-11-23 00:39:20,222 INFO: Lock owner: pgsql1; I am pgsql1
2024-11-23 00:39:20,224 INFO: updated leader lock during restart
2024-11-23 00:40:25.928 CST [32699] LOG:  starting PostgreSQL 13.16 on x86_64-pc-linux-gnu, compiled by gcc (GCC) 4.8.5 20150623 (Red Hat 4.8.5-44), 64-bit
2024-11-23 00:40:25.928 CST [32699] LOG:  listening on IPv4 address "0.0.0.0", port 8432
2024-11-23 00:40:25.930 CST [32699] LOG:  listening on Unix socket "/tmp/.s.PGSQL.8432"
2024-11-23 00:40:25.934 CST [32700] LOG:  database system was shut down at 2024-11-23 00:39:08 CST
2024-11-23 00:40:25.938 CST [32699] LOG:  database system is ready to accept connections
2024-11-23 00:40:26,193 INFO: postmaster pid=32699
localhost:8432 - accepting connections
2024-11-23 00:40:30,240 INFO: Lock owner: pgsql1; I am pgsql1
2024-11-23 00:40:30,242 INFO: updated leader lock during restart
localhost:8432 - accepting connections
2024-11-23 00:40:40,227 INFO: Lock owner: pgsql1; I am pgsql1
2024-11-23 00:40:40,236 INFO: updated leader lock during restart
2024-11-23 00:40:47,738 INFO: Lock owner: pgsql1; I am pgsql1
2024-11-23 00:40:47,739 INFO: establishing a new patroni heartbeat connection to postgres
2024-11-23 00:40:47,789 INFO: no action. I am (pgsql1), the leader with the lock

从日志可以分析,pg 在重启时不会丢失自己的自己的主节点状态,会一直去更新自己的持有的主节点锁。

分析代码可以得知patroni在 pg 启动、重启和停止的时候,都会调用一个回调函数来处理不同的回调类型。

# 暂停
            if not block_callbacks:
                self.set_state('stopped')
                if pg_signaled:
                    self.call_nowait(CallbackAction.ON_STOP)
                    
# 启动
        if not block_callbacks:
            self.__cb_pending = CallbackAction.ON_START


# 重启
        if not block_callbacks:
            self.__cb_pending = CallbackAction.ON_RESTART

其在call_nowait函数调用一个回调命令,但不等待命令执行完成。:

    def call_nowait(self, cb_type: CallbackAction) -> None:
        """pick a callback command and call it without waiting for it to finish """
        if self.bootstrapping:
            return
        if cb_type in (CallbackAction.ON_START, CallbackAction.ON_STOP,
                       CallbackAction.ON_RESTART, CallbackAction.ON_ROLE_CHANGE):
            self.__cb_called = True

        if self.callback and cb_type in self.callback:
            cmd = self.callback[cb_type]
            role = 'primary' if self.role == 'promoted' else self.role
            try:
                cmd = shlex.split(self.callback[cb_type]) + [cb_type, role, self.scope]
                self._callback_executor.call(cmd)
            except Exception:
                logger.exception('callback %s %r %s %s failed', cmd, cb_type, role, self.scope)

3.4 方法调用图

标签:11,00,23,self,stop,Patroni,2024,详解,pg
From: https://www.cnblogs.com/zreo2home/p/18604052

相关文章

  • Citus的restart详解
    Citus的restart详解1.命令行restart在ctl.py的restart方法中,获取到集群的信息,然后再获取到要重启节点的信息。cluster=get_dcs(cluster_name,group).get_cluster()members=get_members(cluster,cluster_name,member_names,role,force,'restart',False,g......
  • xpath定位方法详解
    '''xpath定位方法详解:推荐手写xpath,复制的容易报错。xpath是基于元素存在的路径进行定位,所以分为绝对路径和相对路径两种语法结构。绝对路径:不推荐,除非找不到其他解决办法。示例:/html/body/div[1]/div[2]/div[5]/div[1]/div/form/span[1]/input......
  • 零基础前端项目实战】数据大屏可视化项目完整教程 - 手把手教你用Vue+ECharts打造炫酷
    效果图:完整代码<!DOCTYPEhtml><html><head><metacharset="utf-8"><title>数据大屏展示</title><scriptsrc="https://cdn.jsdelivr.net/npm/vue@2.6.14/dist/vue.js"></script><scrip......
  • godoos 内网聊天机制详解
    GodoOS是一款轻量级的云桌面系统,旨在为用户提供高效、安全的网络通信环境。其内网聊天功能不仅支持文本消息的实时传输,还具备文件传输的能力。本文将详细介绍godoos内网聊天机制的核心原理和技术实现。内网聊天机制概述godoos的内网聊天机制基于UDP协议,通过定期检查在线用......
  • Transformers 框架 Pipeline 任务详解(三):词元分类(token-classification)和命名实体识别
    微信公众号:老牛同学公众号标题:Transformers框架Pipeline任务详解(三):词元分类(token-classification)和命名实体识别公众号链接:https://mp.weixin.qq.com/s/r2uFCwPZaMeDL_eiQsEmIQ在自然语言处理(NLP)领域,Token-Classification(词元分类)任务是一项关键的技术,这项技术广泛应用于......
  • Vue 组件样式作用域和深度选择器详解
    1.问题背景在Vue项目中使用第三方组件库(如ElementUI)时,经常会遇到需要覆盖组件默认样式的情况。但是当我们在组件中使用<stylescoped>时,会发现样式无法生效。比如以下场景:<template><div><el-message-box>...</el-message-box></div></template><styles......
  • 常见漏洞篇——反序列化漏洞详解_反序列漏洞原理
    网络安全常见漏洞篇反序列化漏洞反序列化漏洞(DeserializationVulnerability)是一种安全漏洞,存在于应用程序中对数据进行反序列化操作的过程中。当应用程序接收到外部传递的恶意序列化数据并进行反序列化时,攻击者可以利用这个漏洞执行未经授权的代码或导致应用程序受到攻击......
  • linux下pgrep实现精确查询
    在Linux中,pgrep 是一个命令行工具,用于根据进程名称和其他属性搜索进程。要实现精确查询,您可以使用一些选项来控制搜索行为,以便更准确地匹配进程。使用 pgrep 进行精确查询按进程名完全匹配:使用 -x 选项可以确保只匹配完全的进程名称。例如,若您要查找名为"my_process"......
  • patroni-4.0.2源码分析
    patroni-4.0.2的源码分析1.patroni文件夹__init__.py:导包初始化代码。__main__.py:主函数,程序入口。version.py:保存版本信息。dcs文件夹:dynamic_loader.py:存放查找包中特定抽象接口实现的辅助函数。request.py:处理与Patroni的RESTAPI通信的工具。daemon.py:config_gen......
  • [C++] 继承详解
    目录前言演示用编译器及其标准DevC++6.7.5Redpanda C++14                           先 赞 后 看  养  成 习 惯  正文1、继承的概念与意义2、继承的使用 2.1继承的定义及语法2......