从MySQL 8.0.23版本开始,CHANGE MASTER TO开始被替换为CHANGE REPLICATION SOURCE TO,下面使用MySQL 8.0.32的代码分析语句的具体执行流程。
从语句的入口函数mysql_execute_command开始,在命令执行之前首先会检查语句执行用户是否有REPLICATION_SLAVE_ADMIN或SUPER权限:
case SQLCOM_CHANGE_MASTER: { Security_context *sctx = thd->security_context(); if (!sctx->check_access(SUPER_ACL) && !sctx->has_global_grant(STRING_WITH_LEN("REPLICATION_SLAVE_ADMIN")) .first) { my_error(ER_SPECIFIC_ACCESS_DENIED_ERROR, MYF(0), "SUPER or REPLICATION_SLAVE_ADMIN"); goto error; } res = change_master_cmd(thd); break; }
随后进入change_master_cmd函数,该函数主要是检查channel name的有效性,将channel name添加到channel map中:
bool change_master_cmd(THD *thd) { DBUG_TRACE; Master_info *mi = nullptr; LEX *lex = thd->lex; bool res = false; channel_map.wrlock(); // 判断实例是否能够被初始化为从实例;比如server_id为0或在多源复制场景下,本来有多个channel并且repository为table,而当实例重启后且repository被修改为File时,实例就不能成功加载默认channel,此时实例就无法初始化为从实例 if (!is_slave_configured()) { my_error(ER_SLAVE_CONFIGURATION, MYF(0)); res = true; goto err; } if (channel_map.is_group_replication_channel_name(lex->mi.channel, true)) { // 判断指定的channel name是否为group_replication_applier channel
// 判断CHANGE REPLICATION SOURCE TO指定的参数对于group_replication_applier channel是否是有效的(只有PRIVILEGES_CHECKS_USER是有效的) LEX_MASTER_INFO *lex_mi = &thd->lex->mi; if (is_invalid_change_master_for_group_replication_applier(lex_mi)) { my_error(ER_SLAVE_CHANNEL_OPERATION_NOT_ALLOWED, MYF(0), "CHANGE MASTER with the given parameters", lex->mi.channel); res = true; goto err; } // 需要保证group replication处于停止状态 if (is_group_replication_running()) { my_error(ER_GRP_OPERATION_NOT_ALLOWED_GR_MUST_STOP, MYF(0)); res = true; goto err; } } // 如果指定的channel name为group_replication_recovery channel name,判断指定的change master选项是否是受支持的,只能修改MASTER_USER或MASTER_PASSWORD if (channel_map.is_group_replication_channel_name(lex->mi.channel) && !channel_map.is_group_replication_channel_name(lex->mi.channel, true)) { LEX_MASTER_INFO *lex_mi = &thd->lex->mi; if (is_invalid_change_master_for_group_replication_recovery(lex_mi)) { my_error(ER_SLAVE_CHANNEL_OPERATION_NOT_ALLOWED, MYF(0), "CHANGE MASTER with the given parameters", lex->mi.channel); res = true; goto err; } } /* Error out if number of replication channels are > 1 if FOR CHANNEL clause is not provided in the CHANGE MASTER command. */ if (!lex->mi.for_channel && channel_map.get_num_instances() > 1) { //在使用多源复制时不支持不指定channel name my_error(ER_SLAVE_MULTIPLE_CHANNELS_CMD, MYF(0)); res = true; goto err; } /* Get the Master_info of the channel */ mi = channel_map.get_mi(lex->mi.channel); /* create a new channel if doesn't exist */ if (!mi && strcmp(lex->mi.channel, channel_map.get_default_channel())) { /* The mi will be returned holding mi->channel_lock for writing */ if (add_new_channel(&mi, lex->mi.channel)) goto err; // 初始化maser info并将channel添加到channel map中 } if (mi) { bool configure_filters = !Master_info::is_configured(mi); if (!(res = change_master(thd, mi, &thd->lex->mi)))
随后进入change_master函数,change_master函数主要是检查CHANGE REPLICATION SOURCE TO指定的选项的有效性和选项之间是否冲突、更新Master Info和Relay Log Info信息并刷盘:
int change_master(THD *thd, Master_info *mi, LEX_MASTER_INFO *lex_mi, bool preserve_logs) { int error = 0; // 标记是否指定了和IO线程相关的选项 bool have_receive_option = false; // 标记是否指定了和SQL、worker线程相关的选项 bool have_execute_option = false; // 标记是否制定了同时会影响到IO线程和SQL、worker线程的选项 bool have_both_receive_execute_option = false; bool validation_error = false; // 如果不存在mts gaps,则会删除worker info表信息 bool mta_remove_worker_info = false; // 使用bit位标记正在运行的复制线程 int thread_mask; // 只有在执行CHANGE REPLICATION SOURCE TO时没有指定relay_log_file/relay_log_pos且复制线程都停止时才可能会清除relay log bool need_relay_log_purge = true; // 记录先前的Master信息,以便后面在error log中打印相关参数的变更 char saved_host[HOSTNAME_LENGTH + 1], saved_bind_addr[HOSTNAME_LENGTH + 1]; uint saved_port = 0; char saved_log_name[FN_REFLEN]; my_off_t saved_log_pos = 0; DBUG_TRACE; // 由于复制线程需要修改mysql.slave_master_info表所以忽略只读控制 thd->set_skip_readonly_check(); // 防止其他线程修改Master_Info信息 mi->channel_wrlock(); // 防止其他线程改变复制线程状态 lock_slave_threads(mi); // 返回正在运行运行的复制线程(IO/SQL) init_thread_mask(&thread_mask, mi, false); // 如果有正在运行的复制线程,为了防止数据丢失CHANGE REPLICATION SOURCE TO语句不会进行purge relay log操作; // 但relay log的purge操作依旧受relay_log_purge参数的影响 if (thread_mask) { need_relay_log_purge = false; } /* 判断语句是否设置或修改了任意影响IO线程的选项: - host - user - password - port - log_file_name - pos - connect_retry - ssl相关 ... */ have_receive_option = have_change_replication_source_receive_option(lex_mi); /* 判断语句是否设置或修改了任意影响SQL、worker线程的选项: - relay_log_name - relay_log_pos - sql_delay - privilege_checks_username */ have_execute_option = have_change_replication_source_execute_option( lex_mi, &need_relay_log_purge); /* 判断语句是否设置了任意同时影响IO和SQL、worker线程的选项 - assign_gtids_to_anonymous_transactions_type - auto_position - source_connection_auto_failover - gtid_only */ have_both_receive_execute_option = have_change_replication_source_applier_and_receive_option(lex_mi); // 当复制线程运行时,不允许设置与其对应的选项 if ((have_both_receive_execute_option && ((thread_mask & SLAVE_IO) || (thread_mask & SLAVE_SQL))) || (have_receive_option && have_execute_option && (thread_mask & SLAVE_IO) && (thread_mask & SLAVE_SQL))) { error = ER_SLAVE_CHANNEL_MUST_STOP; my_error(ER_SLAVE_CHANNEL_MUST_STOP, MYF(0), mi->get_channel()); goto err; } if (have_receive_option && (thread_mask & SLAVE_IO)) { error = ER_SLAVE_CHANNEL_IO_THREAD_MUST_STOP; my_error(ER_SLAVE_CHANNEL_IO_THREAD_MUST_STOP, MYF(0), mi->get_channel()); goto err; } if (have_execute_option && (thread_mask & SLAVE_SQL)) { error = ER_SLAVE_CHANNEL_SQL_THREAD_MUST_STOP; my_error(ER_SLAVE_CHANNEL_SQL_THREAD_MUST_STOP, MYF(0), mi->get_channel()); goto err; } /* 如果GTID_MODE != ON,验证指定的选项是否有效: - source_auto_position=1需要开启GTID - ASSIGN_GTIDS_TO_ANONYMOUS_TRANSACTIONS != OFF需要GTID_MODE = ON - GTID_ONLY= 1需要GTID_MODE = ON - SOURCE_CONNECTION_AUTO_FAILOVER = 1需要GTID_MODE = ON */ if (global_gtid_mode.get() != Gtid_mode::ON) { if ((error = validate_gtid_option_restrictions(lex_mi, mi))) { goto err; } } /* 判断选项的兼容性,主要有: - master log file/pos和relay log file/log与auto_position选项冲突 - assign_gtids_to_anonymous_transactions_info参数不为OFF时与auto_position冲突 - CHANGE REPLICATION SOURCE TO GTID_ONLY = 1需要SOURCE_AUTO_POSITION = 1、REQUIRE_ROW_FORMAT = 1 - CHANGE REPLICATION SOURCE TO SOURCE_CONNECTION_AUTO_FAILOVER = 1需要SOURCE_AUTO_POSITION = 1 - GTID_ONLY = 1开启时不能关闭SOURCE_AUTO_POSITION - GTID_ONLY开启时不能关闭REQUIRE_ROW_FORMAT ... */ if ((error = evaluate_inter_option_dependencies(lex_mi, mi))) { goto err; } // preserve_logs参数未指定(false) if (need_relay_log_purge && preserve_logs && mi->rli->inited) { need_relay_log_purge = false; } THD_STAGE_INFO(thd, stage_changing_source); int thread_mask_stopped_threads; // 返回停止的复制线程 init_thread_mask(&thread_mask_stopped_threads, mi, true); // 如果不存在repository则创建否则读取Master Info和Relay Log Info信息 if (load_mi_and_rli_from_repositories(mi, false, thread_mask_stopped_threads, need_relay_log_purge)) { error = ER_MASTER_INFO; my_error(ER_MASTER_INFO, MYF(0)); goto err; } // 检查PRIVILEGE_CHECKS_USER选项指定的username和hostname是否符合语法、用户是否有权限 std::tie(validation_error, mta_remove_worker_info) = validate_change_replication_source_options(thd, lex_mi, mi, thread_mask); if (validation_error) { error = 1; goto err; } // 保存原先的username、hostname、bind_addr if (have_receive_option) { strmake(saved_host, mi->host, HOSTNAME_LENGTH); strmake(saved_bind_addr, mi->bind_addr, HOSTNAME_LENGTH); saved_port = mi->port; strmake(saved_log_name, mi->get_master_log_name(), FN_REFLEN - 1); saved_log_pos = mi->get_master_log_pos(); } /* - 更新指定的master info信息:username、hostname、password、port、auto_position、master_log_file、master_log_pos... - 更新指定的relay log info信息:Relay Log File、Relay Log Pos... */ if (update_change_replication_source_options( thd, lex_mi, mi, have_both_receive_execute_option, have_execute_option, have_receive_option)) { error = 1; goto err; } /* 如果需要purge relay log且没有指定host,port,log_file_name,log_file_position且relay log info有效,说明并没有修改复制源; 此时使用relay log info的master log file、master log pos初始化master info的master log file、master log pos来从新拉取Binlog */ if (need_relay_log_purge) { if (!lex_mi->host && !lex_mi->port && !lex_mi->log_file_name && !lex_mi->pos && !mi->rli->is_applier_source_position_info_invalid()) { mi->set_master_log_pos(max<ulonglong>( BIN_LOG_HEADER_SIZE, mi->rli->get_group_master_log_pos())); mi->set_master_log_name(mi->rli->get_group_master_log_name()); } } // 在error log中输出原先的source_host、source_port、source_log_file、source_log_pos和新的信息 if (have_receive_option) LogErr(SYSTEM_LEVEL, ER_SLAVE_CHANGE_MASTER_TO_EXECUTED, mi->get_for_channel_str(true), saved_host, saved_port, saved_log_name, (ulong)saved_log_pos, saved_bind_addr, mi->host, mi->port, mi->get_master_log_name(), (ulong)mi->get_master_log_pos(), mi->bind_addr); // 刷盘Master Info if ((thread_mask & SLAVE_IO) == 0 && flush_master_info(mi, true)) { error = ER_RELAY_LOG_INIT; my_error(ER_RELAY_LOG_INIT, MYF(0), "Failed to flush master info file"); goto err; } if ((thread_mask & SLAVE_SQL) == 0) // SQL线程属于停止状态 { /* 如果需要purge relay log(没有指定relay log file、relay log pos),则进行purge relay log操作; purge relay log操作会破坏Relay Log Info中的复制位点,所以需要使用Master Info中的Master Log File和Master Log Pos初始化Relay Log Info; 否则就检查relay log name是否在relay log index中 */ if (need_relay_log_purge) { const char *errmsg = nullptr; THD_STAGE_INFO(thd, stage_purging_old_relay_logs); if (mi->rli->purge_relay_logs(thd, &errmsg)) { error = ER_RELAY_LOG_FAIL; my_error(ER_RELAY_LOG_FAIL, MYF(0), errmsg); goto err; } if (!mi->is_receiver_position_info_invalid()) { mi->rli->set_group_master_log_pos(mi->get_master_log_pos()); mi->rli->set_group_master_log_name(mi->get_master_log_name()); DBUG_PRINT("info", ("master_log_pos: %llu", mi->get_master_log_pos())); } } else { const char *errmsg = nullptr; // if (mi->rli->is_group_relay_log_name_invalid(&errmsg)) { error = ER_RELAY_LOG_INIT; my_error(ER_RELAY_LOG_INIT, MYF(0), errmsg); goto err; } } char *var_group_master_log_name = const_cast<char *>(mi->rli->get_group_master_log_name()); // 如果没有指定master log name就将Relay Log Info的Pos设置为0 if (!var_group_master_log_name[0] && !mi->rli->is_applier_source_position_info_invalid()) mi->rli->set_group_master_log_pos(0); // 中断SOURCE_POS_WAIT()操作 mi->rli->abort_pos_wait++; mi->rli->clear_error(); if (mi->rli->workers_array_initialized) { for (size_t i = 0; i < mi->rli->get_worker_count(); i++) { mi->rli->get_worker(i)->clear_error(); } } // 刷盘relay log info if (mi->rli->flush_info(Relay_log_info::RLI_FLUSH_IGNORE_SYNC_OPT | Relay_log_info::RLI_FLUSH_IGNORE_GTID_ONLY)) { error = ER_RELAY_LOG_INIT; my_error(ER_RELAY_LOG_INIT, MYF(0), "Failed to flush relay info file."); goto err; } } log_invalid_position_warning(thd, lex_mi, mi); // 如果不存在mts gaps,则清空worker info table if (mta_remove_worker_info) if (Rpl_info_factory::reset_workers(mi->rli)) { error = ER_MTS_RESET_WORKERS; my_error(ER_MTS_RESET_WORKERS, MYF(0)); goto err; } err: // 解锁复制线程和Master Info unlock_slave_threads(mi); mi->channel_unlock(); return error; }
标签:log,relay,mi,REPLICATION,源码,master,channel,error,CHANGE From: https://www.cnblogs.com/wagaga/p/17263227.html