Patroni中对pg的重启、停止和启动详解
对于pg的重启、停止和启动的操作,都在/postgres/__init__.py
文件中定义了相关的函数,都是调用其中的函数进行使用。
1. 停止
1.1 stop函数
重启的函数为stop
。其代码如下所示:
def stop(self, mode: str = 'fast', block_callbacks: bool = False, checkpoint: Optional[bool] = None,
on_safepoint: Optional[Callable[..., Any]] = None, on_shutdown: Optional[Callable[[int, int], Any]] = None,
before_shutdown: Optional[Callable[..., Any]] = None, stop_timeout: Optional[int] = None) -> bool:
if checkpoint is None:
checkpoint = False if mode == 'immediate' else True
success, pg_signaled = self._do_stop(mode, block_callbacks, checkpoint, on_safepoint,
on_shutdown, before_shutdown, stop_timeout)
if success:
if not block_callbacks:
self.set_state('stopped')
if pg_signaled:
self.call_nowait(CallbackAction.ON_STOP)
else:
logger.warning('pg_ctl stop failed')
self.set_state('stop failed')
return success
这个函数调用内部函数_do_stop
来执行停止操作,如果成功停止则异步执行停止后的回调操作,失败也会进行错误处理,最后返回停止结果。
mode
:停止模式。'fast'
:快速停止,通常会强制关闭连接。'immediate'
:立即停止,意味着数据库进程会被立即终止,不会进行正常的清理操作。
block_callbacks
:控制是否在停止过程中阻止回调函数的执行。如果设置为True
,则在停止过程中不会执行回调函数。on_safepoint
:当没有用户连接的情况下(即所有用户的数据库会话都结束),该回调函数会被调用。(安全停止)on_shutdown
:当数据库已经进入关闭状态并且pg_controldata
开始报告数据库集群已经关闭时,该回调函数会被调用。before_shutdown
:在数据库执行CHECKPOINT
操作后、调用pg_ctl stop
之前,执行此回调。
sucess
:
True
:表示 PostgreSQL 主进程已经不存在,即已停止或退出。False
:表示发送停止信号失败(如权限问题或其他错误)。None
:表示停止信号已经成功发送,等待进程停止或退出。
1.2 _do_stop函数
_do_stop
函数如下所示:
def _do_stop(self, mode: str, block_callbacks: bool, checkpoint: bool,
on_safepoint: Optional[Callable[..., Any]], on_shutdown: Optional[Callable[[int, int], Any]],
before_shutdown: Optional[Callable[..., Any]], stop_timeout: Optional[int]) -> Tuple[bool, bool]:
postmaster = self.is_running()
if not postmaster:
if on_safepoint:
on_safepoint()
return True, False
if checkpoint and not self.is_starting():
self.checkpoint(timeout=stop_timeout)
if not block_callbacks:
self.set_state('stopping')
# invoke user-directed before stop script
self._before_stop()
if before_shutdown:
before_shutdown()
# Send signal to postmaster to stop
success = postmaster.signal_stop(mode, self.pgcommand('pg_ctl'))
if success is not None:
if success and on_safepoint:
on_safepoint()
return success, True
# We can skip safepoint detection if we don't have a callback
if on_safepoint:
# Wait for our connection to terminate so we can be sure that no new connections are being initiated
self._wait_for_connection_close(postmaster)
postmaster.wait_for_user_backends_to_close(stop_timeout)
on_safepoint()
if on_shutdown and mode in ('fast', 'smart'):
i = 0
# Wait for pg_controldata `Database cluster state:` to change to "shut down"
while postmaster.is_running():
data = self.controldata()
if data.get('Database cluster state', '') == 'shut down':
checkpoint_locations = self._checkpoint_locations_from_controldata(data)
if checkpoint_locations:
on_shutdown(*checkpoint_locations)
break
elif data.get('Database cluster state', '').startswith('shut down'): # shut down in recovery
break
elif stop_timeout and i >= stop_timeout:
stop_timeout = 0
break
time.sleep(STOP_POLLING_INTERVAL)
i += STOP_POLLING_INTERVAL
try:
postmaster.wait(timeout=stop_timeout)
except TimeoutExpired:
logger.warning("Timeout during postmaster stop, aborting Postgres.")
if not self.terminate_postmaster(postmaster, mode, stop_timeout):
postmaster.wait()
return True, True
在这个函数中具体停止是调用signal_stop
函数来停止,在这个函数中会根据操作系统的不同来使用不同的方式停止,unix
系统会发送停止信号,其他系统会使用pg_ctl
命令。如果发送成功不会返回值,就是默认为None,这时候就会往后走去执行on_shutdown
回调函数。
sucess
:
True
:表示 PostgreSQL 主进程已经不存在,即已停止或退出。False
:表示发送停止信号失败(如权限问题或其他错误)。None
:表示停止信号已经成功发送,等待进程停止或退出。
pg_signaled
:
True
:发送了停止信号。False
:没有发送停止信号。
1.3 停止流程图
数据库的停止流程如下所示:
2. 启动
2.1 start函数
重启的函数为start
。其代码如下所示:
def start(self, timeout: Optional[float] = None, task: Optional[CriticalTask] = None,
block_callbacks: bool = False, role: Optional[str] = None,
after_start: Optional[Callable[..., Any]] = None) -> Optional[bool]:
self.connection_pool.close()
if self.is_running():
logger.error('Cannot start PostgreSQL because one is already running.')
self.set_state('starting')
return True
if not block_callbacks:
self.__cb_pending = CallbackAction.ON_START
self.set_role(role or self.get_postgres_role_from_data_directory())
self.set_state('starting')
self.set_pending_restart_reason(CaseInsensitiveDict())
try:
if not self.ensure_major_version_is_known():
return None
configuration = self.config.effective_configuration
except Exception:
return None
self.config.check_directories()
self.config.write_postgresql_conf(configuration)
self.config.resolve_connection_addresses()
self.config.replace_pg_hba()
self.config.replace_pg_ident()
options = ['--{0}={1}'.format(p, configuration[p]) for p in self.config.CMDLINE_OPTIONS
if p in configuration and p not in ('wal_keep_segments', 'wal_keep_size')]
if self.cancellable.is_cancelled:
return False
with task or null_context():
if task and task.is_cancelled:
logger.info("PostgreSQL start cancelled.")
return False
self._postmaster_proc = PostmasterProcess.start(self.pgcommand('postgres'),
self._data_dir,
self.config.postgresql_conf,
options)
if task:
task.complete(self._postmaster_proc)
start_timeout = timeout
if not start_timeout:
try:
start_timeout = float(self.config.get('pg_ctl_timeout', 60) or 0)
except ValueError:
start_timeout = 60
# We want postmaster to open ports before we continue
if not self._postmaster_proc or not self.wait_for_port_open(self._postmaster_proc, start_timeout):
return False
ret = self.wait_for_startup(start_timeout)
if ret is not None:
if ret and after_start:
after_start()
return ret
elif timeout is not None:
return False
else:
return None
block_callbacks
: 一个布尔值,用于控制是否阻止回调的执行。在重启过程中,通常会阻止启动或停止回调的执行。after_start
: 启动后执行的回调函数,用于在数据库启动完成后进行一些额外操作。
返回值:
- 返回
True
如果启动已成功并且 PostgreSQL 端口已打开。 - 返回
False
如果启动失败。 - 返回
None
如果 PostgreSQL 仍在启动过程中。
在这个函数中调用start
函数来实现命令行启动数据库。实现在一个指定的环境中启动 PostgreSQL 数据库,并确保能够正确处理与现有进程、postmaster.pid
文件以及多进程环境相关的复杂性。通过使用 multiprocessing
来启动 PostgreSQL,可以避免直接使用 subprocess.Popen
,从而更好地管理 PostgreSQL 进程的生命周期。构造的启动命令:
/usr/local/fbase/13/bin/postgres -D /data/fbase/fbdata --config-file=/data/fbase/fbdata/postgresql.conf --listen_addresses=0.0.0.0 --port=8432 --cluster_name=demo --wal_level=logical --hot_standby=on --max_connections=100 --max_wal_senders=10 --max_prepared_transactions=200 --max_locks_per_transaction=64 --track_commit_timestamp=off --max_replication_slots=10 --max_worker_processes=8 --wal_log_hints=on
2.2 启动流程图
数据库的启动流程如下所示:
3. 重启
3.1 restart函数
重启的函数为restart
。其代码如下所示:
def restart(self, timeout: Optional[float] = None, task: Optional[CriticalTask] = None,
block_callbacks: bool = False, role: Optional[str] = None,
before_shutdown: Optional[Callable[..., Any]] = None,
after_start: Optional[Callable[..., Any]] = None) -> Optional[bool]:
self.set_state('restarting')
if not block_callbacks:
self.__cb_pending = CallbackAction.ON_RESTART
ret = self.stop(block_callbacks=True, before_shutdown=before_shutdown)\
and self.start(timeout, task, True, role, after_start)
if not ret and not self.is_starting():
self.set_state('restart failed ({0})'.format(self.state))
return ret
对于重启来说,即调用了数据库停止(stop)和数据库启动(start)函数来完成重启操作。并且在调用stop
函数时传递停止前回调函数
before_shutdown
,在调用start
函数时传递启动成功后回调函数after_start
。
-
block_callbacks:控制是否阻止回调函数的执行。
- False:在重启完成后,系统会触发与重启相关的回调操作。
- True:回调动作不会被触发,直到重启过程完成且不再阻止回调。
-
before_shutdown
:停止前执行的回调函数。 -
after_start
:启动完成后执行的回调函数。
3.2 重启流程图
重启的具体流程如下(stop和start的流程见上两章):
3.3 重启日志
2024-11-23 00:38:10,217 INFO: no action. I am (pgsql1), the leader with the lock
2024-11-23 00:38:20,243 INFO: Lock owner: pgsql1; I am pgsql1
2024-11-23 00:38:20,305 INFO: updated leader lock during restart
2024-11-23 00:39:08.488 CST [32328] LOG: received fast shutdown request
2024-11-23 00:39:08.490 CST [32328] LOG: aborting any active transactions
2024-11-23 00:39:08.491 CST [32396] FATAL: terminating connection due to administrator command
2024-11-23 00:39:08.494 CST [32328] LOG: background worker "logical replication launcher" (PID 32408) exited with exit code 1
2024-11-23 00:39:08.495 CST [32330] LOG: shutting down
2024-11-23 00:39:08.531 CST [32328] LOG: database system is shut down
2024-11-23 00:39:10,244 INFO: Lock owner: pgsql1; I am pgsql1
2024-11-23 00:39:10,246 INFO: updated leader lock during restart
2024-11-23 00:39:20,222 INFO: Lock owner: pgsql1; I am pgsql1
2024-11-23 00:39:20,224 INFO: updated leader lock during restart
2024-11-23 00:40:25.928 CST [32699] LOG: starting PostgreSQL 13.16 on x86_64-pc-linux-gnu, compiled by gcc (GCC) 4.8.5 20150623 (Red Hat 4.8.5-44), 64-bit
2024-11-23 00:40:25.928 CST [32699] LOG: listening on IPv4 address "0.0.0.0", port 8432
2024-11-23 00:40:25.930 CST [32699] LOG: listening on Unix socket "/tmp/.s.PGSQL.8432"
2024-11-23 00:40:25.934 CST [32700] LOG: database system was shut down at 2024-11-23 00:39:08 CST
2024-11-23 00:40:25.938 CST [32699] LOG: database system is ready to accept connections
2024-11-23 00:40:26,193 INFO: postmaster pid=32699
localhost:8432 - accepting connections
2024-11-23 00:40:30,240 INFO: Lock owner: pgsql1; I am pgsql1
2024-11-23 00:40:30,242 INFO: updated leader lock during restart
localhost:8432 - accepting connections
2024-11-23 00:40:40,227 INFO: Lock owner: pgsql1; I am pgsql1
2024-11-23 00:40:40,236 INFO: updated leader lock during restart
2024-11-23 00:40:47,738 INFO: Lock owner: pgsql1; I am pgsql1
2024-11-23 00:40:47,739 INFO: establishing a new patroni heartbeat connection to postgres
2024-11-23 00:40:47,789 INFO: no action. I am (pgsql1), the leader with the lock
从日志可以分析,pg 在重启时不会丢失自己的自己的主节点状态,会一直去更新自己的持有的主节点锁。
分析代码可以得知patroni
在 pg 启动、重启和停止的时候,都会调用一个回调函数来处理不同的回调类型。
# 暂停
if not block_callbacks:
self.set_state('stopped')
if pg_signaled:
self.call_nowait(CallbackAction.ON_STOP)
# 启动
if not block_callbacks:
self.__cb_pending = CallbackAction.ON_START
# 重启
if not block_callbacks:
self.__cb_pending = CallbackAction.ON_RESTART
其在call_nowait
函数调用一个回调命令,但不等待命令执行完成。:
def call_nowait(self, cb_type: CallbackAction) -> None:
"""pick a callback command and call it without waiting for it to finish """
if self.bootstrapping:
return
if cb_type in (CallbackAction.ON_START, CallbackAction.ON_STOP,
CallbackAction.ON_RESTART, CallbackAction.ON_ROLE_CHANGE):
self.__cb_called = True
if self.callback and cb_type in self.callback:
cmd = self.callback[cb_type]
role = 'primary' if self.role == 'promoted' else self.role
try:
cmd = shlex.split(self.callback[cb_type]) + [cb_type, role, self.scope]
self._callback_executor.call(cmd)
except Exception:
logger.exception('callback %s %r %s %s failed', cmd, cb_type, role, self.scope)