首页 > 数据库 >MySQL 关闭源码解析

MySQL 关闭源码解析

时间:2022-10-08 11:34:45浏览次数:74  
标签:binlog thd thread killing 源码 kill MySQL 解析 wait

前几天看大佬的公众号,发现一个有意思的问题,即平时应该如何关闭MySQL,如下几种方式,平时应该使用哪种呢:

1. mysqladmin shutdown

2. service mysqld stop

3. kill 'pidof mysqld' 

4. kill -9 'pidof mysqld'

这里不考虑mysqld_safe。

最终大佬说应该选用方式4 kill -9 的方式来关闭MySQL,其他方式会导致主从数据不一致,虽然平时我也是使用 kill -9的方式来关闭MySQL,但是对于其他方式会导致主从数据不一致这个结果还是难以置信,因此对这个结果我进行了测试,基于MySQL5.7.36版本,测试如下:

1. 配置MySQL半同步复制集群,主库配置如下:

 

2. 在从库上 stop slave io_thread,模拟 after_sync() 延迟

3. 在主库执行 insert into test1(id) values(1) 的操作,而后在另一个客户端通过 shutdown 的方式关闭MySQL服务

可以发现,insert into test1 values(1); 这条操作竟然执行成功了。

4. 重启MySQL服务,而后发现在数据库中确实存在 id = 1 的数据;

5. 使用  go 驱动,在测试程序中重复以上操作,发现表现类似,这就很奇怪了。

以下通过阅读源码来分析原因,其中删除了部分代码:

extern "C" void *signal_hand(void *arg MY_ATTRIBUTE((unused)))
{
  ....for (;;)
  {
    int sig;
    while (sigwait(&set, &sig) == EINTR)
    {
    }
    if (cleanup_done)
    {
      my_thread_end();
      my_thread_exit(0); // Safety
      return NULL;       // Avoid compiler warnings
    }
    switch (sig)
    {
    case SIGTERM:
    case SIGQUIT:
      // Switch to the file log message processing.
      query_logger.set_handlers((log_output_options != LOG_NONE) ? LOG_FILE : LOG_NONE);
      DBUG_PRINT("info", ("Got signal: %d  abort_loop: %d", sig, abort_loop));
// !abort_loop if (!abort_loop) {
// 将 abort_loop 设置为 true abort_loop = true; // Mark abort for threads. #ifdef HAVE_PSI_THREAD_INTERFACE // Delete the instrumentation for the signal thread. PSI_THREAD_CALL(delete_current_thread) (); #endif /* Kill the socket listener. kill socket listener,来停止接收新的连接。 The main thread will then set socket_listener_active= false, and wait for us to finish all the cleanup below. */ mysql_mutex_lock(&LOCK_socket_listener_active); while (socket_listener_active) { DBUG_PRINT("info", ("Killing socket listener")); if (pthread_kill(main_thread_id, SIGUSR1)) { assert(false); break; } mysql_cond_wait(&COND_socket_listener_active, &LOCK_socket_listener_active); } mysql_mutex_unlock(&LOCK_socket_listener_active); // 关闭现有连接 close_connections(); } my_thread_end(); my_thread_exit(0); return NULL; // Avoid compiler warnings break; case SIGHUP: if (!abort_loop) { int not_used; mysql_print_status(); // Print some debug info reload_acl_and_cache(NULL, (REFRESH_LOG | REFRESH_TABLES | REFRESH_FAST | REFRESH_GRANT | REFRESH_THREADS | REFRESH_HOSTS), NULL, &not_used); // Flush logs // Reenable query logs after the options were reloaded. query_logger.set_handlers(log_output_options); } break; default: break; /* purecov: tested */ } } return NULL; /* purecov: deadcode */ }

 

static void close_connections(void)
{
  DBUG_ENTER("close_connections");
  (void)RUN_HOOK(server_state, before_server_shutdown, (NULL));
  // One-Thread-Per-Connection: kill 阻塞的 threads 
  Per_thread_connection_handler::kill_blocked_pthreads();

  uint dump_thread_count = 0;
  uint dump_thread_kill_retries = 8;

  // Close listeners.  关闭 socket listeners
  if (mysqld_socket_acceptor != NULL)
    mysqld_socket_acceptor->close_listener();
#ifdef _WIN32
  if (named_pipe_acceptor != NULL)
    named_pipe_acceptor->close_listener();

  if (shared_mem_acceptor != NULL)
    shared_mem_acceptor->close_listener();
#endif

  /*
    First signal all threads that it's time to die
    This will give the threads some time to gracefully abort their
    statements and inform their clients that the server is about to die.
  */
  // 获取 thd_manager 
  Global_THD_manager *thd_manager = Global_THD_manager::get_instance();
  sql_print_information("Giving %d client threads a chance to die gracefully",
                        static_cast<int>(thd_manager->get_thd_count()));
  // thd_manager 针对所有的 conn 设置 KILL_CONNECTION FLAG 
  Set_kill_conn set_kill_conn;
  thd_manager->do_for_all_thd(&set_kill_conn);
  sql_print_information("Shutting down slave threads");
  // 释放 slave 线程
  end_slave();
  // 处理 dump 线程
  if (set_kill_conn.get_dump_thread_count())
  {
    /*
      Replication dump thread should be terminated after the clients are
      terminated. Wait for few more seconds for other sessions to end.
      dump 线程应在客户端终止后终止。再等待几秒钟,等待其他会话结束。
     */
    while (thd_manager->get_thd_count() > dump_thread_count &&
           dump_thread_kill_retries)
    {
      sleep(1);
      dump_thread_kill_retries--;
    }
    set_kill_conn.set_dump_thread_flag();
    thd_manager->do_for_all_thd(&set_kill_conn);
  }
  if (thd_manager->get_thd_count() > 0)
    sleep(2); // Give threads time to die

  /*
    Force remaining threads to die by closing the connection to the client
    This will ensure that threads that are waiting for a command from the
    client on a blocking read call are aborted.
    通过关闭客户端的连接, 来强制剩余线程终止。
  */

  sql_print_information("Forcefully disconnecting %d remaining clients",
                        static_cast<int>(thd_manager->get_thd_count()));
  // 关闭客户端连接。
  Call_close_conn call_close_conn(true);
  thd_manager->do_for_all_thd(&call_close_conn);

  (void)RUN_HOOK(server_state, after_server_shutdown, (NULL));

  /*
    All threads have now been aborted. Stop event scheduler thread
    after aborting all client connections, otherwise user may
    start/stop event scheduler after Events::deinit() deallocates
    scheduler object(static member in Events class)
  */
  Events::deinit();
  DBUG_PRINT("quit", ("Waiting for threads to die (count=%u)",
                      thd_manager->get_thd_count()));
  thd_manager->wait_till_no_thd();

  /*
    Connection threads might take a little while to go down after removing from
    global thread list. Give it some time.
  */
  Connection_handler_manager::wait_till_no_connection();

  delete_slave_info_objects();
  DBUG_PRINT("quit", ("close_connections thread"));

  DBUG_VOID_RETURN;
}

接下来看 Set_kill_conn 做了什么:

class Set_kill_conn : public Do_THD_Impl
{
  ......virtual void operator()(THD *killing_thd)
  {
    DBUG_PRINT("quit", ("Informing thread %u that it's time to die",
                        killing_thd->thread_id()));
    if (!m_kill_dump_threads_flag)
    {
// 这里在 first Loop 时跳过 slave threads & scheduler 线程 // We skip slave threads & scheduler on this first loop through.
if (killing_thd->slave_thread) return; if (killing_thd->get_command() == COM_BINLOG_DUMP || killing_thd->get_command() == COM_BINLOG_DUMP_GTID) { ++m_dump_thread_count; return; } DBUG_EXECUTE_IF("Check_dump_thread_is_alive", { assert(killing_thd->get_command() != COM_BINLOG_DUMP && killing_thd->get_command() != COM_BINLOG_DUMP_GTID); };); } mysql_mutex_lock(&killing_thd->LOCK_thd_data); killing_thd->killed = THD::KILL_CONNECTION; MYSQL_CALLBACK(Connection_handler_manager::event_functions, post_kill_notification, (killing_thd)); if (killing_thd->is_killable) { mysql_mutex_lock(&killing_thd->LOCK_current_cond); if (killing_thd->current_cond) { mysql_mutex_lock(killing_thd->current_mutex);
// 关键在这里,唤醒所有等待 thd -> current_cond 的线程并释放 thd-> current_mutex mysql_cond_broadcast(killing_thd->current_cond); mysql_mutex_unlock(killing_thd->current_mutex); } mysql_mutex_unlock(&killing_thd->LOCK_current_cond); } mysql_mutex_unlock(&killing_thd->LOCK_thd_data); } };

 之前解析过半同步复制的源码,这里简单看一下 wait_ack 部分,之间会省略部分代

// 执行事务的线程等待从库的回复, 即等待 ACK 的实现函数
int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name,                 
my_off_t trx_wait_binlog_pos)

{ ...... // 打开了半同步 while (is_on()) { // 如果有从库回复 if (reply_file_name_inited_) { // 比较从库回复的日志坐标(filename & fileops)和固化的 commit 队列中最新的事务的 binlog filename & pos int cmp = ActiveTranx::compare(reply_file_name_, reply_file_pos_, trx_wait_binlog_name, trx_wait_binlog_pos); // 如果回复的日志坐标大于当前的日志坐标 if (cmp >= 0) { /* We have already sent the relevant binlog to the slave: no need to * wait here. 我们已经确认将相应的 binlog 发送给了从库: 无需在此等待。 */ if (trace_level_ & kTraceDetail) sql_print_information("%s: Binlog reply is ahead (%s, %lu),", kWho, reply_file_name_, (unsigned long)reply_file_pos_); // 退出循环 break; } } /* When code reaches here an Entry object may not be present in the following scenario. 当代码到了这里, 在一下场景中可能不存在 entry。 Semi sync was not enabled when transaction entered into ordered_commit process. During flush stage, semi sync was not enabled and there was no 'Entry' object created for the transaction being committed and at a later stage it was enabled. In this case trx_wait_binlog_name and trx_wait_binlog_pos are set but the 'Entry' object is not present. Hence dump thread will not wait for reply from slave and it will not update reply_file_name. In such case the committing transaction should not wait for an ack from slave and it should be considered as an async transaction. 事务进入 ordered_commit 时未启用半同步。 在 flush 阶段, 没有启用半同步, 没有为提交的事务创建 entry 对象, 但是在之后的节点启用了半同步。 在这种情况下, 设置了 trx_wait_binlog_name 和 trx_wait_binlog_pos, 但是 entry 对象并不存在。 此时, dump 线程将不会等待 slave 节点的 reply, 并且不会更新 reply_file_name。 在这种情况下, 提交的事务不应等待来自 slave 节点的 ack, 而应被视为异步事务。 */ if (!entry) { is_semi_sync_trans= false; goto l_end; } /* Let us update the info about the minimum binlog position of waiting * threads. * 这里更新等待线程等待的 minimum binlog pos 。 */ if (wait_file_name_inited_) { // 对比当前 commit 队列最后的binlog点位 和 wait_file_name_ & wait_file_pos_ 大小 int cmp = ActiveTranx::compare(trx_wait_binlog_name, trx_wait_binlog_pos, wait_file_name_, wait_file_pos_); if (cmp <= 0) { /* This thd has a lower position, let's update the minimum info. 这里更新 wait_file_name_ & wait_file_pos_。 */ strncpy(wait_file_name_, trx_wait_binlog_name, sizeof(wait_file_name_) - 1); wait_file_name_[sizeof(wait_file_name_) - 1]= '\0'; wait_file_pos_ = trx_wait_binlog_pos; rpl_semi_sync_master_wait_pos_backtraverse++; if (trace_level_ & kTraceDetail) sql_print_information("%s: move back wait position (%s, %lu),", kWho, wait_file_name_, (unsigned long)wait_file_pos_); } } else { strncpy(wait_file_name_, trx_wait_binlog_name, sizeof(wait_file_name_) - 1); wait_file_name_[sizeof(wait_file_name_) - 1]= '\0'; wait_file_pos_ = trx_wait_binlog_pos; wait_file_name_inited_ = true; if (trace_level_ & kTraceDetail) sql_print_information("%s: init wait position (%s, %lu),", kWho, wait_file_name_, (unsigned long)wait_file_pos_); } /* In semi-synchronous replication, we wait until the binlog-dump * thread has received the reply on the relevant binlog segment from the * replication slave. * 在半同步复制中, 我们等待直到 binlog dump 线程收到相关 binlog 的 reply 信息。 * * Let us suspend this thread to wait on the condition; * when replication has progressed far enough, we will release * these waiting threads. * 让我们暂停这个线程以等待这个条件; * 当复制进展足够时, 我们将释放等待的线程。 */ // 判断 slave 个数和半同步是否正常 // 当前 slave 节点的数量 == rpl_semi_sync_master_wait_for_slave_count -1 && 半同步复制正开启
// 在这里 abort_loop 为 true, rpl_semi_sync_master_clients 是 0,rpl_semi_sync_master_wait_for_slave_count 是 1,因此
// 半同步复制被关闭,循环中断,原事务提交成功。 if (abort_loop && (rpl_semi_sync_master_clients == rpl_semi_sync_master_wait_for_slave_count - 1) && is_on())
{ sql_print_warning("In commitTrx : SEMISYNC: Forced shutdown. Some updates might " "not be replicated."); // 关闭半同步, 中断循环 switch_off(); break; } //正式进入等待binlog同步的步骤,将rpl_semi_sync_master_wait_sessions+1 //然后发起等待信号,进入信号等待后,只有2种情况可以退出等待。1是被其他线程唤醒(binlog dump) //2是等待超时时间。如果是被唤醒则返回值是0,否则是其他值 rpl_semi_sync_master_wait_sessions++; if (trace_level_ & kTraceDetail) sql_print_information("%s: wait %lu ms for binlog sent (%s, %lu)", kWho, wait_timeout_, wait_file_name_, (unsigned long)wait_file_pos_); /* wait for the position to be ACK'ed back 实现 ACK 等待 */ assert(entry); entry->n_waiters++; /** 第一个参数为条件量,第二个为等待中释放LOCK_binlog_互斥锁,第三个为未来的超时绝对时间
当 Set_kill_conn唤醒了current_cond 的线程并释放 current_mutex 时,mysql_cond_timedwait() 函数返回,
wait_result = 0; 但是当前回复的日志坐标仍然小于所需的日志坐标,因此仍然在循环中。
*/
wait_result= mysql_cond_timedwait(&entry->cond, &LOCK_binlog_, &abstime);
      entry->n_waiters--;
      /*
        在等待时释放 LOCK_binlog_ 互斥锁, 有可能其他客户端执行 RESET MASTER 命令, 这将把 rpl_semi_sync_master_wait_sessions 重置为 0。
        因此, 在递减前需要检查该值。
      */
      if (rpl_semi_sync_master_wait_sessions > 0)
        rpl_semi_sync_master_wait_sessions--;
      // wait_result != 0, 这里表示等待超时
      if (wait_result != 0)
      {
       .....
      }
      else
      // 等待 ACK 成功
      {
        ......
      }
    }

l_end:
     .....
  }
  .....
}

 通过以上代码可知,在半同步复制的情况下,当主节点的 rpl_semi_sync_master_clients == rpl_semi_sync_master_wait_for_slave_count - 1 的情况下,使用 'mysqladmin shutdown' | 'shutdown' | 'kill {pid of mysqld}' 关闭主节点时确实会造成主从数据不一致。当我们使用了高可用组件且 rpl_semi_sync_master_wait_for_slave_count = 1 时尤其危险。以后还是继续使用 kill -9 来关闭MySQL吧。

标签:binlog,thd,thread,killing,源码,kill,MySQL,解析,wait
From: https://www.cnblogs.com/juanmaofeifei/p/16742583.html

相关文章

  • web期末网站设计大作业(中华传统文化主题学生网页设计源码)
    ......
  • mac通过docker一键部署MySQL8
    目录mac通过docker一键部署MySQL8一、前言二、系统配置三、安装步骤Dockerhub查看镜像地址1、一键安装1.1、克隆脚本1.2、安装程序1.2.1、程序安装详情1.3、初始化用户1.3.......
  • mysql忘记密码的解决方案
    1.停止mysql服务;2.找到mysql的配置文件在最后一行添加skip-grant-tables=13.重新启用mysql并且用无账号模式进入mysql-uroot-p4.重新设置密码usemysql;up......
  • Ubuntu迁移mysql数据库到新的目录下
    1.先使用下面命令将mysql数据库服务停止:sudosystemctlstopmysql2.迁移到挂载新盘/mnt/data/方式一:sudomv/var/lib/mysql/mnt/data/方式二:sudocp-a/var/l......
  • MySQL
    MySQ基础知识与基本SQL语句MySQL字段类型、字符编码与配置文件主键与外键、自增、表关系之一对多、多对多、一对一MySQL查询关键字where、groupby、having、distinct、......
  • KingbaseES V8R6运维案例之---wal日志解析DML操作
    案例说明:通过sys_waldump解析DML操作,获取DML操作的日志条目具体内容。适用版本:KingbaseESV8R3/R6一、DML事务操作对应的wal日志文件#查看当前online的wal日志文件p......
  • KingbaseES V8R6运维案例之---sys_waldump解析wal日志
    案例说明:wal日志文件记录了,事务操作的redo日志信息,由于wal日志文件是二进制文件,无法直接读取其文件内容。sys_waldump可以解决这个问题,通过sys_waldump来解析wal日志来......
  • 列表解析式+九九乘法表
    #numbers=[]#fornumberinrange(10):#numbers.append(number)#print(numbers)#[0,1,2,3,4,5,6,7,8,9]#VAT_PERCENT=0.1##defadd_vat(price):#return......
  • 从SpringBoot启动,阅读源码设计
    目录一、背景说明二、SpringBoot工程三、应用上下文四、资源加载五、应用环境六、Bean对象七、Tomcat服务八、事件模型九、配置加载十、数据库集成十一、参考源码服务启......
  • CentOS7环境源码安装freeswitch1.10
    操作系统:CentOS7.6_x64   freeswitch版本:1.10.7一、安装步骤1、下载freeswitch源代码wgethttp://files.freeswitch.org/releases/freeswitch/freeswitch-1.1......