前几天看大佬的公众号,发现一个有意思的问题,即平时应该如何关闭MySQL,如下几种方式,平时应该使用哪种呢:
1. mysqladmin shutdown
2. service mysqld stop
3. kill 'pidof mysqld'
4. kill -9 'pidof mysqld'
这里不考虑mysqld_safe。
最终大佬说应该选用方式4 kill -9 的方式来关闭MySQL,其他方式会导致主从数据不一致,虽然平时我也是使用 kill -9的方式来关闭MySQL,但是对于其他方式会导致主从数据不一致这个结果还是难以置信,因此对这个结果我进行了测试,基于MySQL5.7.36版本,测试如下:
1. 配置MySQL半同步复制集群,主库配置如下:
2. 在从库上 stop slave io_thread,模拟 after_sync() 延迟
3. 在主库执行 insert into test1(id) values(1) 的操作,而后在另一个客户端通过 shutdown 的方式关闭MySQL服务
可以发现,insert into test1 values(1); 这条操作竟然执行成功了。
4. 重启MySQL服务,而后发现在数据库中确实存在 id = 1 的数据;
5. 使用 go 驱动,在测试程序中重复以上操作,发现表现类似,这就很奇怪了。
以下通过阅读源码来分析原因,其中删除了部分代码:
extern "C" void *signal_hand(void *arg MY_ATTRIBUTE((unused))) { ....for (;;) { int sig; while (sigwait(&set, &sig) == EINTR) { } if (cleanup_done) { my_thread_end(); my_thread_exit(0); // Safety return NULL; // Avoid compiler warnings } switch (sig) { case SIGTERM: case SIGQUIT: // Switch to the file log message processing. query_logger.set_handlers((log_output_options != LOG_NONE) ? LOG_FILE : LOG_NONE); DBUG_PRINT("info", ("Got signal: %d abort_loop: %d", sig, abort_loop));
// !abort_loop if (!abort_loop) {
// 将 abort_loop 设置为 true abort_loop = true; // Mark abort for threads. #ifdef HAVE_PSI_THREAD_INTERFACE // Delete the instrumentation for the signal thread. PSI_THREAD_CALL(delete_current_thread) (); #endif /* Kill the socket listener. kill socket listener,来停止接收新的连接。 The main thread will then set socket_listener_active= false, and wait for us to finish all the cleanup below. */ mysql_mutex_lock(&LOCK_socket_listener_active); while (socket_listener_active) { DBUG_PRINT("info", ("Killing socket listener")); if (pthread_kill(main_thread_id, SIGUSR1)) { assert(false); break; } mysql_cond_wait(&COND_socket_listener_active, &LOCK_socket_listener_active); } mysql_mutex_unlock(&LOCK_socket_listener_active); // 关闭现有连接 close_connections(); } my_thread_end(); my_thread_exit(0); return NULL; // Avoid compiler warnings break; case SIGHUP: if (!abort_loop) { int not_used; mysql_print_status(); // Print some debug info reload_acl_and_cache(NULL, (REFRESH_LOG | REFRESH_TABLES | REFRESH_FAST | REFRESH_GRANT | REFRESH_THREADS | REFRESH_HOSTS), NULL, ¬_used); // Flush logs // Reenable query logs after the options were reloaded. query_logger.set_handlers(log_output_options); } break; default: break; /* purecov: tested */ } } return NULL; /* purecov: deadcode */ }
static void close_connections(void) { DBUG_ENTER("close_connections"); (void)RUN_HOOK(server_state, before_server_shutdown, (NULL)); // One-Thread-Per-Connection: kill 阻塞的 threads Per_thread_connection_handler::kill_blocked_pthreads(); uint dump_thread_count = 0; uint dump_thread_kill_retries = 8; // Close listeners. 关闭 socket listeners if (mysqld_socket_acceptor != NULL) mysqld_socket_acceptor->close_listener(); #ifdef _WIN32 if (named_pipe_acceptor != NULL) named_pipe_acceptor->close_listener(); if (shared_mem_acceptor != NULL) shared_mem_acceptor->close_listener(); #endif /* First signal all threads that it's time to die This will give the threads some time to gracefully abort their statements and inform their clients that the server is about to die. */ // 获取 thd_manager Global_THD_manager *thd_manager = Global_THD_manager::get_instance(); sql_print_information("Giving %d client threads a chance to die gracefully", static_cast<int>(thd_manager->get_thd_count())); // thd_manager 针对所有的 conn 设置 KILL_CONNECTION FLAG Set_kill_conn set_kill_conn; thd_manager->do_for_all_thd(&set_kill_conn); sql_print_information("Shutting down slave threads"); // 释放 slave 线程 end_slave(); // 处理 dump 线程 if (set_kill_conn.get_dump_thread_count()) { /* Replication dump thread should be terminated after the clients are terminated. Wait for few more seconds for other sessions to end. dump 线程应在客户端终止后终止。再等待几秒钟,等待其他会话结束。 */ while (thd_manager->get_thd_count() > dump_thread_count && dump_thread_kill_retries) { sleep(1); dump_thread_kill_retries--; } set_kill_conn.set_dump_thread_flag(); thd_manager->do_for_all_thd(&set_kill_conn); } if (thd_manager->get_thd_count() > 0) sleep(2); // Give threads time to die /* Force remaining threads to die by closing the connection to the client This will ensure that threads that are waiting for a command from the client on a blocking read call are aborted. 通过关闭客户端的连接, 来强制剩余线程终止。 */ sql_print_information("Forcefully disconnecting %d remaining clients", static_cast<int>(thd_manager->get_thd_count())); // 关闭客户端连接。 Call_close_conn call_close_conn(true); thd_manager->do_for_all_thd(&call_close_conn); (void)RUN_HOOK(server_state, after_server_shutdown, (NULL)); /* All threads have now been aborted. Stop event scheduler thread after aborting all client connections, otherwise user may start/stop event scheduler after Events::deinit() deallocates scheduler object(static member in Events class) */ Events::deinit(); DBUG_PRINT("quit", ("Waiting for threads to die (count=%u)", thd_manager->get_thd_count())); thd_manager->wait_till_no_thd(); /* Connection threads might take a little while to go down after removing from global thread list. Give it some time. */ Connection_handler_manager::wait_till_no_connection(); delete_slave_info_objects(); DBUG_PRINT("quit", ("close_connections thread")); DBUG_VOID_RETURN; }
接下来看 Set_kill_conn 做了什么:
class Set_kill_conn : public Do_THD_Impl { ......virtual void operator()(THD *killing_thd) { DBUG_PRINT("quit", ("Informing thread %u that it's time to die", killing_thd->thread_id())); if (!m_kill_dump_threads_flag) {
// 这里在 first Loop 时跳过 slave threads & scheduler 线程 // We skip slave threads & scheduler on this first loop through.
if (killing_thd->slave_thread) return; if (killing_thd->get_command() == COM_BINLOG_DUMP || killing_thd->get_command() == COM_BINLOG_DUMP_GTID) { ++m_dump_thread_count; return; } DBUG_EXECUTE_IF("Check_dump_thread_is_alive", { assert(killing_thd->get_command() != COM_BINLOG_DUMP && killing_thd->get_command() != COM_BINLOG_DUMP_GTID); };); } mysql_mutex_lock(&killing_thd->LOCK_thd_data); killing_thd->killed = THD::KILL_CONNECTION; MYSQL_CALLBACK(Connection_handler_manager::event_functions, post_kill_notification, (killing_thd)); if (killing_thd->is_killable) { mysql_mutex_lock(&killing_thd->LOCK_current_cond); if (killing_thd->current_cond) { mysql_mutex_lock(killing_thd->current_mutex);
// 关键在这里,唤醒所有等待 thd -> current_cond 的线程并释放 thd-> current_mutex mysql_cond_broadcast(killing_thd->current_cond); mysql_mutex_unlock(killing_thd->current_mutex); } mysql_mutex_unlock(&killing_thd->LOCK_current_cond); } mysql_mutex_unlock(&killing_thd->LOCK_thd_data); } };
之前解析过半同步复制的源码,这里简单看一下 wait_ack 部分,之间会省略部分代
// 执行事务的线程等待从库的回复, 即等待 ACK 的实现函数
int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name,
my_off_t trx_wait_binlog_pos)
{ ...... // 打开了半同步 while (is_on()) { // 如果有从库回复 if (reply_file_name_inited_) { // 比较从库回复的日志坐标(filename & fileops)和固化的 commit 队列中最新的事务的 binlog filename & pos int cmp = ActiveTranx::compare(reply_file_name_, reply_file_pos_, trx_wait_binlog_name, trx_wait_binlog_pos); // 如果回复的日志坐标大于当前的日志坐标 if (cmp >= 0) { /* We have already sent the relevant binlog to the slave: no need to * wait here. 我们已经确认将相应的 binlog 发送给了从库: 无需在此等待。 */ if (trace_level_ & kTraceDetail) sql_print_information("%s: Binlog reply is ahead (%s, %lu),", kWho, reply_file_name_, (unsigned long)reply_file_pos_); // 退出循环 break; } } /* When code reaches here an Entry object may not be present in the following scenario. 当代码到了这里, 在一下场景中可能不存在 entry。 Semi sync was not enabled when transaction entered into ordered_commit process. During flush stage, semi sync was not enabled and there was no 'Entry' object created for the transaction being committed and at a later stage it was enabled. In this case trx_wait_binlog_name and trx_wait_binlog_pos are set but the 'Entry' object is not present. Hence dump thread will not wait for reply from slave and it will not update reply_file_name. In such case the committing transaction should not wait for an ack from slave and it should be considered as an async transaction. 事务进入 ordered_commit 时未启用半同步。 在 flush 阶段, 没有启用半同步, 没有为提交的事务创建 entry 对象, 但是在之后的节点启用了半同步。 在这种情况下, 设置了 trx_wait_binlog_name 和 trx_wait_binlog_pos, 但是 entry 对象并不存在。 此时, dump 线程将不会等待 slave 节点的 reply, 并且不会更新 reply_file_name。 在这种情况下, 提交的事务不应等待来自 slave 节点的 ack, 而应被视为异步事务。 */ if (!entry) { is_semi_sync_trans= false; goto l_end; } /* Let us update the info about the minimum binlog position of waiting * threads. * 这里更新等待线程等待的 minimum binlog pos 。 */ if (wait_file_name_inited_) { // 对比当前 commit 队列最后的binlog点位 和 wait_file_name_ & wait_file_pos_ 大小 int cmp = ActiveTranx::compare(trx_wait_binlog_name, trx_wait_binlog_pos, wait_file_name_, wait_file_pos_); if (cmp <= 0) { /* This thd has a lower position, let's update the minimum info. 这里更新 wait_file_name_ & wait_file_pos_。 */ strncpy(wait_file_name_, trx_wait_binlog_name, sizeof(wait_file_name_) - 1); wait_file_name_[sizeof(wait_file_name_) - 1]= '\0'; wait_file_pos_ = trx_wait_binlog_pos; rpl_semi_sync_master_wait_pos_backtraverse++; if (trace_level_ & kTraceDetail) sql_print_information("%s: move back wait position (%s, %lu),", kWho, wait_file_name_, (unsigned long)wait_file_pos_); } } else { strncpy(wait_file_name_, trx_wait_binlog_name, sizeof(wait_file_name_) - 1); wait_file_name_[sizeof(wait_file_name_) - 1]= '\0'; wait_file_pos_ = trx_wait_binlog_pos; wait_file_name_inited_ = true; if (trace_level_ & kTraceDetail) sql_print_information("%s: init wait position (%s, %lu),", kWho, wait_file_name_, (unsigned long)wait_file_pos_); } /* In semi-synchronous replication, we wait until the binlog-dump * thread has received the reply on the relevant binlog segment from the * replication slave. * 在半同步复制中, 我们等待直到 binlog dump 线程收到相关 binlog 的 reply 信息。 * * Let us suspend this thread to wait on the condition; * when replication has progressed far enough, we will release * these waiting threads. * 让我们暂停这个线程以等待这个条件; * 当复制进展足够时, 我们将释放等待的线程。 */ // 判断 slave 个数和半同步是否正常 // 当前 slave 节点的数量 == rpl_semi_sync_master_wait_for_slave_count -1 && 半同步复制正开启
// 在这里 abort_loop 为 true, rpl_semi_sync_master_clients 是 0,rpl_semi_sync_master_wait_for_slave_count 是 1,因此
// 半同步复制被关闭,循环中断,原事务提交成功。 if (abort_loop && (rpl_semi_sync_master_clients == rpl_semi_sync_master_wait_for_slave_count - 1) && is_on())
{ sql_print_warning("In commitTrx : SEMISYNC: Forced shutdown. Some updates might " "not be replicated."); // 关闭半同步, 中断循环 switch_off(); break; } //正式进入等待binlog同步的步骤,将rpl_semi_sync_master_wait_sessions+1 //然后发起等待信号,进入信号等待后,只有2种情况可以退出等待。1是被其他线程唤醒(binlog dump) //2是等待超时时间。如果是被唤醒则返回值是0,否则是其他值 rpl_semi_sync_master_wait_sessions++; if (trace_level_ & kTraceDetail) sql_print_information("%s: wait %lu ms for binlog sent (%s, %lu)", kWho, wait_timeout_, wait_file_name_, (unsigned long)wait_file_pos_); /* wait for the position to be ACK'ed back 实现 ACK 等待 */ assert(entry); entry->n_waiters++; /** 第一个参数为条件量,第二个为等待中释放LOCK_binlog_互斥锁,第三个为未来的超时绝对时间
当 Set_kill_conn唤醒了current_cond 的线程并释放 current_mutex 时,mysql_cond_timedwait() 函数返回,
wait_result = 0; 但是当前回复的日志坐标仍然小于所需的日志坐标,因此仍然在循环中。
*/
wait_result= mysql_cond_timedwait(&entry->cond, &LOCK_binlog_, &abstime);
entry->n_waiters--; /* 在等待时释放 LOCK_binlog_ 互斥锁, 有可能其他客户端执行 RESET MASTER 命令, 这将把 rpl_semi_sync_master_wait_sessions 重置为 0。 因此, 在递减前需要检查该值。 */ if (rpl_semi_sync_master_wait_sessions > 0) rpl_semi_sync_master_wait_sessions--; // wait_result != 0, 这里表示等待超时 if (wait_result != 0) { ..... } else // 等待 ACK 成功 { ...... } } l_end: ..... } ..... }
通过以上代码可知,在半同步复制的情况下,当主节点的 rpl_semi_sync_master_clients == rpl_semi_sync_master_wait_for_slave_count - 1 的情况下,使用 'mysqladmin shutdown' | 'shutdown' | 'kill {pid of mysqld}' 关闭主节点时确实会造成主从数据不一致。当我们使用了高可用组件且 rpl_semi_sync_master_wait_for_slave_count = 1 时尤其危险。以后还是继续使用 kill -9 来关闭MySQL吧。
标签:binlog,thd,thread,killing,源码,kill,MySQL,解析,wait From: https://www.cnblogs.com/juanmaofeifei/p/16742583.html