首页 > 编程语言 >魔兽世界服务端TrinityCore连接池源码剖析

魔兽世界服务端TrinityCore连接池源码剖析

时间:2024-08-24 15:52:19浏览次数:14  
标签:std DatabaseWorkerPool TrinityCore connection 源码 连接池 template sql 服务端

简介

  • 魔兽世界服务器中数据库使用mysql来存储,并且数据库模块是直接嵌入在serve中,并没有单独的DB server
  • 在魔兽连接池中有两种连接池,一种是同步连接池,还有异步连接池
  • 连接池相关源码目录 TrinityCore-master\src\server\database\Database
  • 连接池具体文件:DatabaseWorkerPool.h DatabaseWorkerPool.cpp

在连接池源码中涉及到了一些其他类,读者如有不懂,我已经列出清单,可以自行查看相关源码实现

TnWKluwRIc4zsfE

核心源码讲解

DatabaseWorkerPool是一个模板类,提供了四个类,分别代表魔兽世界四个数据库的连接池

/// Accessor to the world database
TC_DATABASE_API extern DatabaseWorkerPool<WorldDatabaseConnection> WorldDatabase;
/// Accessor to the character database
TC_DATABASE_API extern DatabaseWorkerPool<CharacterDatabaseConnection> CharacterDatabase;
/// Accessor to the realm/login database
TC_DATABASE_API extern DatabaseWorkerPool<LoginDatabaseConnection> LoginDatabase;
/// Accessor to the hotfix database
TC_DATABASE_API extern DatabaseWorkerPool<HotfixDatabaseConnection> HotfixDatabase;

数据库类型

  • IDX_ASYNC: 异步连接池类型
  • IDX_SYNCH: 同步连接池类型

IDX_SIZE主要用于元素数量使用

enum InternalIndex
        {
        	// 异步 0
            IDX_ASYNC,
            // 同步 1
            IDX_SYNCH,
            // 2 利用enum元素下标从零开始的性质,可以当作枚举长度使用
            IDX_SIZE
        };

核心成员

  • _queue: 线程池任务队列

  • _connections:连接池(同步连接池和异步连接池用这一个),异步连接池下标为0,同步连接池下标为1

    • EFY76IoJjPlHayx
  • _connectionInfo: 存储数据库信息

    •   struct TC_DATABASE_API MySQLConnectionInfo
        {
            explicit MySQLConnectionInfo(std::string const& infoString);
        
        	// 用户名
            std::string user;
        	// 密码
            std::string password;
        	// 数据库名
            std::string database;
        	// 主机地址
            std::string host;
        	// 端口
            std::string port_or_socket;
            std::string ssl;
        };
      
  • _preparedStatementSize:预加载sql语句数量,与业务有关,实现存储的sql语句

  • _async_threads:异步线程池的线程数量

  • _synch_threads:同步线程池的线程数量

        // SQL任务队列,生产者消费者模型
        std::unique_ptr<ProducerConsumerQueue<SQLOperation*>> _queue;
		// 连接池(同步连接池和异步连接池用这一个)
        std::array<std::vector<std::unique_ptr<T>>, IDX_SIZE> _connections;
		// 存储数据库信息
        std::unique_ptr<MySQLConnectionInfo> _connectionInfo;
		// 预加载sql语句数量
        std::vector<uint8> _preparedStatementSize;
		// 异步线程数量,同步线程数量
        uint8 _async_threads, _synch_threads;

核心源码讲解

初始化构造函数

创建连接池对象以及清零线程池数量

template <class T>
DatabaseWorkerPool<T>::DatabaseWorkerPool()
    : _queue(new ProducerConsumerQueue<SQLOperation*>()),
      _async_threads(0), _synch_threads(0)
{
}

析构函数释放资源

将连接池中所有mysql连接对象释放

template <class T>
DatabaseWorkerPool<T>::~DatabaseWorkerPool()
{
    _queue->Cancel();
}

SetConnectionInfo 设置数据库信息以及连接池数量

template <class T>
void DatabaseWorkerPool<T>::SetConnectionInfo(std::string const& infoString,
    uint8 const asyncThreads, uint8 const synchThreads)
{
    _connectionInfo = std::make_unique<MySQLConnectionInfo>(infoString);

    _async_threads = asyncThreads;
    _synch_threads = synchThreads;
}

OpenConnections创建一条指定的连接

// brief:通过type指定的连接池类型,生成numConnections数量条mysql连接加入连接池
template <class T>
uint32 DatabaseWorkerPool<T>::OpenConnections(InternalIndex type, uint8 numConnections)
{
    for (uint8 i = 0; i < numConnections; ++i)
    {
        // 创建一条MySQLConnection连接了类
        auto connection = [&] {
            switch (type)
            {
            case IDX_ASYNC:
                return std::make_unique<T>(_queue.get(), *_connectionInfo);
            case IDX_SYNCH:
                return std::make_unique<T>(*_connectionInfo);
            default:
                ABORT();
            }
        }();

		// 与mysql进行实际的连接
        if (uint32 error = connection->Open())
        {
            // Failed to open a connection or invalid version, abort and cleanup
            _connections[type].clear();
            return error;
        }
// 对mysql版本进行判断,不能低于最低要求版本
#ifndef LIBMARIADB
        else if (connection->GetServerVersion() < MIN_MYSQL_SERVER_VERSION)
#else
        else if (connection->GetServerVersion() < MIN_MARIADB_SERVER_VERSION)
#endif
        {
#ifndef LIBMARIADB
            TC_LOG_ERROR("sql.driver", "TrinityCore does not support MySQL versions below " MIN_MYSQL_SERVER_VERSION_STRING " (found id {}, need id >= {}), please update your MySQL server", connection->GetServerVersion(), MIN_MYSQL_SERVER_VERSION);
#else
            TC_LOG_ERROR("sql.driver", "TrinityCore does not support MariaDB versions below " MIN_MARIADB_SERVER_VERSION_STRING " (found id {}, need id >= {}), please update your MySQL server", connection->GetServerVersion(), MIN_MARIADB_SERVER_VERSION);
#endif

            return 1;
        }
        else
        {
        	// 将生成的mysql连接加入对应的mysql连接池
            _connections[type].push_back(std::move(connection));
        }
    }

    // Everything is fine
    return 0;
}

Open 初始化连接池

初始化连接池,生成指定数量的同步和异步mysql连接加入连接池

template <class T>
uint32 DatabaseWorkerPool<T>::Open()
{
    WPFatal(_connectionInfo.get(), "Connection info was not set!");

    TC_LOG_INFO("sql.driver", "Opening DatabasePool '{}'. "
        "Asynchronous connections: {}, synchronous connections: {}.",
        GetDatabaseName(), _async_threads, _synch_threads);

	// 在异步连接池中添加_async_threads数量的异步连接
    uint32 error = OpenConnections(IDX_ASYNC, _async_threads);

    if (error)
        return error;

	// 在同步连接池中添加_sync_threads数量的同步连接
    error = OpenConnections(IDX_SYNCH, _synch_threads);

    if (!error)
    {
        TC_LOG_INFO("sql.driver", "DatabasePool '{}' opened successfully. "
                    "{} total connections running.", GetDatabaseName(),
                    (_connections[IDX_SYNCH].size() + _connections[IDX_ASYNC].size()));
    }

    return error;
}

close 关闭连接

关闭所有的同步连接以及异步连接

template <class T>
void DatabaseWorkerPool<T>::Close()
{
    TC_LOG_INFO("sql.driver", "Closing down DatabasePool '{}'.", GetDatabaseName());

    //! Closes the actualy MySQL connection.
    // 关闭所有的异步连接
    _connections[IDX_ASYNC].clear();

    TC_LOG_INFO("sql.driver", "Asynchronous connections on DatabasePool '{}' terminated. "
                "Proceeding with synchronous connections.",
        GetDatabaseName());

    //! Shut down the synchronous connections
    //! There's no need for locking the connection, because DatabaseWorkerPool<>::Close
    //! should only be called after any other thread tasks in the core have exited,
    //! meaning there can be no concurrent access at this point.
    // 关闭所有的同步连接
    _connections[IDX_SYNCH].clear();

    TC_LOG_INFO("sql.driver", "All connections on DatabasePool '{}' closed.", GetDatabaseName());
}

GetFreeConnection获取一条空闲连接

template <class T>
T* DatabaseWorkerPool<T>::GetFreeConnection()
{
#ifdef TRINITY_DEBUG
    if (_warnSyncQueries)
    {
        std::ostringstream ss;
        ss << boost::stacktrace::stacktrace();
        TC_LOG_WARN("sql.performances", "Sync query at:\n{}", ss.str());
    }
#endif

    uint8 i = 0;
    auto const num_cons = _connections[IDX_SYNCH].size();
    T* connection = nullptr;
    //! Block forever until a connection is free
    // 循环遍历,获取一条空闲连接返回
    for (;;)
    {
        connection = _connections[IDX_SYNCH][i++ % num_cons].get();
        //! Must be matched with t->Unlock() or you will get deadlocks
        if (connection->LockIfReady())
            break;
    }

    return connection;
}

完整源码

DatabaseWorkerPool.h

#ifndef _DATABASEWORKERPOOL_H
#define _DATABASEWORKERPOOL_H

#include "Define.h"
#include "DatabaseEnvFwd.h"
#include "StringFormat.h"
#include <array>
#include <string>
#include <vector>

template <typename T>
class ProducerConsumerQueue;

class SQLOperation;
struct MySQLConnectionInfo;

template <class T>
class DatabaseWorkerPool
{
    private:
        enum InternalIndex
        {
        	// 异步 0
            IDX_ASYNC,
            // 同步 1
            IDX_SYNCH,
            // 利用enum元素下标从零开始的性质,可以当作枚举长度使用
            IDX_SIZE
        };

    public:
        /* Activity state */
        DatabaseWorkerPool();

        ~DatabaseWorkerPool();

		// 设置数据库信息以及异步/同步连接池线程数量大小
        void SetConnectionInfo(std::string const& infoString, uint8 const asyncThreads, uint8 const synchThreads);

		// 初始化连接池,生成指定数量的同步和异步mysql连接加入连接池
        uint32 Open();

		// 将同步连接池与异步连接池的mysql连接进行销毁
        void Close();

        //! Prepares all prepared statements
        bool PrepareStatements();

        inline MySQLConnectionInfo const* GetConnectionInfo() const
        {
            return _connectionInfo.get();
        }

        /**
				延迟单向语句方法。
        */

        // 以字符串格式对将异步执行的单向SQL操作进行排队。
		// 这个方法应该只用于只执行一次的查询,例如在启动期间。
		// 将sql语句加入任务队列
		void Execute(char const* sql);

        // !以字符串格式(带有可变参数)对将异步执行的单向SQL操作进行排队。
		// !这个方法应该只用于只执行一次的查询,例如在启动期间。
        template<typename... Args>
        void PExecute(Trinity::FormatString<Args...> sql, Args&&... args)
        {
            if (Trinity::IsFormatEmptyOrNull(sql))
                return;

            Execute(Trinity::StringFormat(sql, std::forward<Args>(args)...).c_str());
        }

        // !以准备好的语句格式对将异步执行的单向SQL操作进行排队。
		// !语句必须使用CONNECTION_ASYNC标志来准备。
        void Execute(PreparedStatement<T>* stmt);

        /**
            直接同步单向语句方法。
        */

        // !直接以字符串格式执行单向SQL操作,这将阻塞调用线程,直到完成。
		// !这个方法应该只用于只执行一次的查询,例如在启动期间。
        void DirectExecute(char const* sql);

        // !直接以字符串格式(带有可变参数)执行单向SQL操作,这将阻塞调用线程,直到完成。
		// !这个方法应该只用于只执行一次的查询,例如在启动期间。
        template<typename... Args>
        void DirectPExecute(Trinity::FormatString<Args...> sql, Args&&... args)
        {
            if (Trinity::IsFormatEmptyOrNull(sql))
                return;

            DirectExecute(Trinity::StringFormat(sql, std::forward<Args>(args)...).c_str());
        }

        // !直接以准备好的语句格式执行单向SQL操作,这将阻塞调用线程,直到完成。
		// !语句必须使用connection_sync标志来准备。
        void DirectExecute(PreparedStatement<T>* stmt);

        /**
            同步查询(带结果集)方法。
        */

        // !直接以字符串格式执行SQL查询,该查询将阻塞调用线程,直到完成。
		// 返回引用计数的自动指针,不需要在上层代码中手动内存管理。
        QueryResult Query(char const* sql, T* connection = nullptr);

        // !直接以字符串格式执行SQL查询(带有可变参数),这将阻塞调用线程,直到完成。
		// !返回引用计数的自动指针,不需要在上层代码中手动内存管理。
        template<typename... Args>
        QueryResult PQuery(Trinity::FormatString<Args...> sql, T* conn, Args&&... args)
        {
            if (Trinity::IsFormatEmptyOrNull(sql))
                return QueryResult(nullptr);

            return Query(Trinity::StringFormat(sql, std::forward<Args>(args)...).c_str(), conn);
        }

        // !直接以字符串格式执行SQL查询(带有可变参数),这将阻塞调用线程,直到完成。
		// !返回引用计数的自动指针,不需要在上层代码中手动内存管理。
        template<typename... Args>
        QueryResult PQuery(Trinity::FormatString<Args...> sql, Args&&... args)
        {
            if (Trinity::IsFormatEmptyOrNull(sql))
                return QueryResult(nullptr);

            return Query(Trinity::StringFormat(sql, std::forward<Args>(args)...).c_str());
        }

        // !直接以准备好的格式执行SQL查询,该查询将阻塞调用线程,直到完成。
		// !返回引用计数的自动指针,不需要在上层代码中手动内存管理。
		// !语句必须带有CONNECTION_SYNCH标志。
        PreparedQueryResult Query(PreparedStatement<T>* stmt);

        /**
            异步查询(带结果集)方法。
        */

        // !以字符串格式对查询进行排队,该查询将在查询执行后立即设置QueryResultFuture返回对象的值。
		// !然后在ProcessQueryCallback方法中处理返回值。
        QueryCallback AsyncQuery(char const* sql);

        // !以准备好的格式对查询进行队列,该格式将在查询执行后立即设置PreparedQueryResultFuture返回对象的值。
		// !然后在ProcessQueryCallback方法中处理返回值。
		// !语句必须使用CONNECTION_ASYNC标志来准备。
        QueryCallback AsyncQuery(PreparedStatement<T>* stmt);

        // !将设置QueryResultHolderFuture值的SQL操作向量(可以是临时的,也可以是准备好的)排队
		// !查询执行后立即返回对象。
		// !然后在ProcessQueryCallback方法中处理返回值。
		// !添加到这个holder中的任何预处理语句都需要使用CONNECTION_ASYNC标志进行预处理。
        SQLQueryHolderCallback DelayQueryHolder(std::shared_ptr<SQLQueryHolder<T>> holder);

        /**
            事务上下文方法。
        */

        // !开始一个自动托管事务指针,如果未提交,该指针将自动回滚。(自动提交= 0)
        SQLTransaction<T> BeginTransaction();

        // !对一组单向SQL操作(可以是临时的,也可以是准备好的)进行排队。这些操作的顺序
		// !在执行过程中,附加到事务中的内容将受到尊重。
        void CommitTransaction(SQLTransaction<T> transaction);

        //! 对一组单向SQL操作(可以是临时的,也可以是准备好的)进行排队。这些操作的顺序
		// !在执行过程中,附加到事务中的内容将受到尊重。
        TransactionCallback AsyncCommitTransaction(SQLTransaction<T> transaction);

        // !直接执行一组单向SQL操作(可以是临时的,也可以是准备好的)。这些操作的顺序
		// !在执行过程中,附加到事务中的内容将受到尊重。
        void DirectCommitTransaction(SQLTransaction<T>& transaction);

        // !方法,用于在不同上下文中执行特别语句。
		// !如果存在有效对象,将封装在事务中,否则单独执行。
        void ExecuteOrAppend(SQLTransaction<T>& trans, char const* sql);

        // !方法,用于在不同上下文中执行准备好的语句。
		// !如果存在有效对象,将封装在事务中,否则单独执行。
        void ExecuteOrAppend(SQLTransaction<T>& trans, PreparedStatement<T>* stmt);

        /**
            Other
        */

        typedef typename T::Statements PreparedStatementIndex;

        // !自动管理(内部)指针,指向一个准备好的语句对象,供上层代码使用。
		// !在this->DirectExecute(PreparedStatement*)、this->Query(PreparedStatement*)或PreparedStatementTask::~PreparedStatementTask中删除指针。
		// !在执行之前,这个对象不会绑定到MySQL上下文中准备好的语句。
        PreparedStatement<T>* GetPreparedStatement(PreparedStatementIndex index);

        // !为当前排序应用转义字符串。(use utf8)
        // 字符串转义
        void EscapeString(std::string& str);

        //!保持我们所有的MySQL连接存活,防止服务器断开我们的连接。
        void KeepAlive();

        void WarnAboutSyncQueries([[maybe_unused]] bool warn)
        {
#ifdef TRINITY_DEBUG
            _warnSyncQueries = warn;
#endif
        }

        size_t QueueSize() const;

    private:
		// 通过type指定的连接池类型,生成numConnections数量条mysql连接加入连接池
        uint32 OpenConnections(InternalIndex type, uint8 numConnections);

        unsigned long EscapeString(char* to, char const* from, unsigned long length);

		// 将任务加入任务队列
        void Enqueue(SQLOperation* op);

        // 获取同步连接池中的空闲连接。
		// 调用者必须在触及MySQL上下文后调用t->Unlock()以防止死锁。
		T* GetFreeConnection();

		// 获取数据库名
        char const* GetDatabaseName() const;

        // SQL任务队列,生产者消费者模型
        std::unique_ptr<ProducerConsumerQueue<SQLOperation*>> _queue;
		// 连接池(同步连接池和异步连接池用这一个)
        std::array<std::vector<std::unique_ptr<T>>, IDX_SIZE> _connections;
		// 存储数据库信息
        std::unique_ptr<MySQLConnectionInfo> _connectionInfo;
		// 预加载sql语句数量
        std::vector<uint8> _preparedStatementSize;
		// 异步线程数量,同步线程数量
        uint8 _async_threads, _synch_threads;
#ifdef TRINITY_DEBUG
        static inline thread_local bool _warnSyncQueries = false;
#endif
};

#endif

DatabaseWorkerPool.cpp

#include "DatabaseWorkerPool.h"
#include "AdhocStatement.h"
#include "Common.h"
#include "Errors.h"
#include "Implementation/LoginDatabase.h"
#include "Implementation/WorldDatabase.h"
#include "Implementation/CharacterDatabase.h"
#include "Implementation/HotfixDatabase.h"
#include "Log.h"
#include "MySQLPreparedStatement.h"
#include "PreparedStatement.h"
#include "ProducerConsumerQueue.h"
#include "QueryCallback.h"
#include "QueryHolder.h"
#include "QueryResult.h"
#include "SQLOperation.h"
#include "Transaction.h"
#include "MySQLWorkaround.h"
#include <mysqld_error.h>
#ifdef TRINITY_DEBUG
#include <sstream>
#include <boost/stacktrace.hpp>
#endif

#define MIN_MYSQL_SERVER_VERSION 50700u
#define MIN_MYSQL_SERVER_VERSION_STRING "5.7"
#define MIN_MYSQL_CLIENT_VERSION 50700u
#define MIN_MYSQL_CLIENT_VERSION_STRING "5.7"

#define MIN_MARIADB_SERVER_VERSION 100209u
#define MIN_MARIADB_SERVER_VERSION_STRING "10.2.9"
#define MIN_MARIADB_CLIENT_VERSION 30003u
#define MIN_MARIADB_CLIENT_VERSION_STRING "3.0.3"

class PingOperation : public SQLOperation
{
    //! Operation for idle delaythreads
    // 测试mysql连接是否可以ping通
    bool Execute() override
    {
        m_conn->Ping();
        return true;
    }
};

template <class T>
DatabaseWorkerPool<T>::DatabaseWorkerPool()
    : _queue(new ProducerConsumerQueue<SQLOperation*>()),
      _async_threads(0), _synch_threads(0)
{
    WPFatal(mysql_thread_safe(), "Used MySQL library isn't thread-safe.");

#if defined(LIBMARIADB) && MARIADB_PACKAGE_VERSION_ID >= 30200
    WPFatal(mysql_get_client_version() >= MIN_MARIADB_CLIENT_VERSION, "TrinityCore does not support MariaDB versions below " MIN_MARIADB_CLIENT_VERSION_STRING " (found %s id %lu, need id >= %u), please update your MariaDB client library", mysql_get_client_info(), mysql_get_client_version(), MIN_MARIADB_CLIENT_VERSION);
    WPFatal(mysql_get_client_version() == MARIADB_PACKAGE_VERSION_ID, "Used MariaDB library version (%s id %lu) does not match the version id used to compile TrinityCore (id %u). Search on forum for TCE00011.", mysql_get_client_info(), mysql_get_client_version(), MARIADB_PACKAGE_VERSION_ID);
#else
    WPFatal(mysql_get_client_version() >= MIN_MYSQL_CLIENT_VERSION, "TrinityCore does not support MySQL versions below " MIN_MYSQL_CLIENT_VERSION_STRING " (found %s id %lu, need id >= %u), please update your MySQL client library", mysql_get_client_info(), mysql_get_client_version(), MIN_MYSQL_CLIENT_VERSION);
    WPFatal(mysql_get_client_version() == MYSQL_VERSION_ID, "Used MySQL library version (%s id %lu) does not match the version id used to compile TrinityCore (id %u). Search on forum for TCE00011.", mysql_get_client_info(), mysql_get_client_version(), MYSQL_VERSION_ID);
#endif
}

template <class T>
DatabaseWorkerPool<T>::~DatabaseWorkerPool()
{
    _queue->Cancel();
}

template <class T>
void DatabaseWorkerPool<T>::SetConnectionInfo(std::string const& infoString,
    uint8 const asyncThreads, uint8 const synchThreads)
{
    _connectionInfo = std::make_unique<MySQLConnectionInfo>(infoString);

    _async_threads = asyncThreads;
    _synch_threads = synchThreads;
}

template <class T>
uint32 DatabaseWorkerPool<T>::Open()
{
    WPFatal(_connectionInfo.get(), "Connection info was not set!");

    TC_LOG_INFO("sql.driver", "Opening DatabasePool '{}'. "
        "Asynchronous connections: {}, synchronous connections: {}.",
        GetDatabaseName(), _async_threads, _synch_threads);

	// 在异步连接池中添加_async_threads数量的异步连接
    uint32 error = OpenConnections(IDX_ASYNC, _async_threads);

    if (error)
        return error;

	// 在同步连接池中添加_sync_threads数量的同步连接
    error = OpenConnections(IDX_SYNCH, _synch_threads);

    if (!error)
    {
        TC_LOG_INFO("sql.driver", "DatabasePool '{}' opened successfully. "
                    "{} total connections running.", GetDatabaseName(),
                    (_connections[IDX_SYNCH].size() + _connections[IDX_ASYNC].size()));
    }

    return error;
}

template <class T>
void DatabaseWorkerPool<T>::Close()
{
    TC_LOG_INFO("sql.driver", "Closing down DatabasePool '{}'.", GetDatabaseName());

    //! Closes the actualy MySQL connection.
    // 关闭所有的异步连接
    _connections[IDX_ASYNC].clear();

    TC_LOG_INFO("sql.driver", "Asynchronous connections on DatabasePool '{}' terminated. "
                "Proceeding with synchronous connections.",
        GetDatabaseName());

    //! Shut down the synchronous connections
    //! There's no need for locking the connection, because DatabaseWorkerPool<>::Close
    //! should only be called after any other thread tasks in the core have exited,
    //! meaning there can be no concurrent access at this point.
    // 关闭所有的同步连接
    _connections[IDX_SYNCH].clear();

    TC_LOG_INFO("sql.driver", "All connections on DatabasePool '{}' closed.", GetDatabaseName());
}

template <class T>
bool DatabaseWorkerPool<T>::PrepareStatements()
{
	// connections == std::vector<std::unique_ptr<T>>
	// 先处理异步连接池,在处理同步连接池
    for (auto& connections : _connections)
    {
    	// connection == std::unique_ptr<T> == 一条mysql连接抽象类
        for (auto& connection : connections)
        {
        	// 对mysql连接抽象类上锁,是否允许其他线程访问此连接
            connection->LockIfReady();

			// 执行mysql连接具体派生类(具体数据库)的预加载sql语句(业务相关)
			// 具体可查看数据库派生类的DoPrepareStatements函数
            if (!connection->PrepareStatements())
            {
                connection->Unlock();
                Close();
                return false;
            }
            else
                connection->Unlock();

			// 获取预加载sql的数量
            size_t const preparedSize = connection->m_stmts.size();
            if (_preparedStatementSize.size() < preparedSize)
                _preparedStatementSize.resize(preparedSize);

			// 初始化预加载sql
            for (size_t i = 0; i < preparedSize; ++i)
            {
                // already set by another connection
                // (each connection only has prepared statements of it's own type sync/async)
                if (_preparedStatementSize[i] > 0)
                    continue;

                if (MySQLPreparedStatement * stmt = connection->m_stmts[i].get())
                {
                    uint32 const paramCount = stmt->GetParameterCount();

                    // TC only supports uint8 indices.
                    ASSERT(paramCount < std::numeric_limits<uint8>::max());

                    _preparedStatementSize[i] = static_cast<uint8>(paramCount);
                }
            }
        }
    }

    return true;
}

template <class T>
QueryResult DatabaseWorkerPool<T>::Query(char const* sql, T* connection /*= nullptr*/)
{
    if (!connection)
        connection = GetFreeConnection();

    ResultSet* result = connection->Query(sql);
    connection->Unlock();
    if (!result || !result->GetRowCount() || !result->NextRow())
    {
        delete result;
        return QueryResult(nullptr);
    }

    return QueryResult(result);
}

template <class T>
PreparedQueryResult DatabaseWorkerPool<T>::Query(PreparedStatement<T>* stmt)
{
	// 获取一条空闲连接
    auto connection = GetFreeConnection();
	// 执行stmt指定的预加载sql
    PreparedResultSet* ret = connection->Query(stmt);
    connection->Unlock();

    //! Delete proxy-class. Not needed anymore
    delete stmt;

    if (!ret || !ret->GetRowCount())
    {
        delete ret;
        return PreparedQueryResult(nullptr);
    }

	// 将结果返回
    return PreparedQueryResult(ret);
}

template <class T>
QueryCallback DatabaseWorkerPool<T>::AsyncQuery(char const* sql)
{
	// BasicStatementTask:内部包装了std::future以及存储sql语句
    BasicStatementTask* task = new BasicStatementTask(sql, true);
    // 在进入队列之前存储未来的结果-任务可能在从此方法返回之前已经被处理和删除
    QueryResultFuture result = task->GetFuture();
	// 将任务加入生产者消费者模型队列中
	Enqueue(task);
	// 异步返回结果
    return QueryCallback(std::move(result));
}

template <class T>
QueryCallback DatabaseWorkerPool<T>::AsyncQuery(PreparedStatement<T>* stmt)
{
    PreparedStatementTask* task = new PreparedStatementTask(stmt, true);
    // Store future result before enqueueing - task might get already processed and deleted before returning from this method
    PreparedQueryResultFuture result = task->GetFuture();
    Enqueue(task);
    return QueryCallback(std::move(result));
}

template <class T>
SQLQueryHolderCallback DatabaseWorkerPool<T>::DelayQueryHolder(std::shared_ptr<SQLQueryHolder<T>> holder)
{
    SQLQueryHolderTask* task = new SQLQueryHolderTask(holder);
    // Store future result before enqueueing - task might get already processed and deleted before returning from this method
    QueryResultHolderFuture result = task->GetFuture();
    Enqueue(task);
    return { std::move(holder), std::move(result) };
}

template <class T>
SQLTransaction<T> DatabaseWorkerPool<T>::BeginTransaction()
{
    return std::make_shared<Transaction<T>>();
}

template <class T>
void DatabaseWorkerPool<T>::CommitTransaction(SQLTransaction<T> transaction)
{
#ifdef TRINITY_DEBUG
    // !只在调试模式下分析事务弱点。
	// !理想情况下,我们在调试模式下捕获错误,然后纠正它们,
	// !所以没有必要在Release模式下浪费这些CPU周期。
    switch (transaction->GetSize())
    {
    case 0:
        TC_LOG_DEBUG("sql.driver", "Transaction contains 0 queries. Not executing.");
        return;
    case 1:
        TC_LOG_DEBUG("sql.driver", "Warning: Transaction only holds 1 query, consider removing Transaction context in code.");
        break;
    default:
        break;
    }
#endif // TRINITY_DEBUG

    Enqueue(new TransactionTask(transaction));
}

// 异步提交事务
template <class T>
TransactionCallback DatabaseWorkerPool<T>::AsyncCommitTransaction(SQLTransaction<T> transaction)
{
#ifdef TRINITY_DEBUG
    //! Only analyze transaction weaknesses in Debug mode.
    //! Ideally we catch the faults in Debug mode and then correct them,
    //! so there's no need to waste these CPU cycles in Release mode.
    switch (transaction->GetSize())
    {
        case 0:
            TC_LOG_DEBUG("sql.driver", "Transaction contains 0 queries. Not executing.");
            break;
        case 1:
            TC_LOG_DEBUG("sql.driver", "Warning: Transaction only holds 1 query, consider removing Transaction context in code.");
            break;
        default:
            break;
    }
#endif // TRINITY_DEBUG

    TransactionWithResultTask* task = new TransactionWithResultTask(transaction);
    TransactionFuture result = task->GetFuture();
    Enqueue(task);
    return TransactionCallback(std::move(result));
}

// 同步提交事务
template <class T>
void DatabaseWorkerPool<T>::DirectCommitTransaction(SQLTransaction<T>& transaction)
{
    T* connection = GetFreeConnection();
    int errorCode = connection->ExecuteTransaction(transaction);
    if (!errorCode)
    {
        connection->Unlock();      // OK, operation succesful
        return;
    }

    //! Handle MySQL Errno 1213 without extending deadlock to the core itself
    /// @todo More elegant way
    if (errorCode == ER_LOCK_DEADLOCK)
    {
        //todo: handle multiple sync threads deadlocking in a similar way as async threads
        uint8 loopBreaker = 5;
        for (uint8 i = 0; i < loopBreaker; ++i)
        {
            if (!connection->ExecuteTransaction(transaction))
                break;
        }
    }

    //! Clean up now.
    transaction->Cleanup();

    connection->Unlock();
}

template <class T>
PreparedStatement<T>* DatabaseWorkerPool<T>::GetPreparedStatement(PreparedStatementIndex index)
{
    return new PreparedStatement<T>(index, _preparedStatementSize[index]);
}

// 字符串转义
template <class T>
void DatabaseWorkerPool<T>::EscapeString(std::string& str)
{
    if (str.empty())
        return;

    char* buf = new char[str.size() * 2 + 1];
    EscapeString(buf, str.c_str(), uint32(str.size()));
    str = buf;
    delete[] buf;
}

template <class T>
void DatabaseWorkerPool<T>::KeepAlive()
{
    //! Ping同步连接
    for (auto& connection : _connections[IDX_SYNCH])
    {
        if (connection->LockIfReady())
        {
            connection->Ping();
            connection->Unlock();
        }
    }

    // !假设所有工作线程都是空闲的,每个工作线程将接收1个ping操作请求
	// !如果一个或多个工作线程很忙,ping操作将不会平均分配,但这无关紧要
	// !因为唯一的目的是防止连接空转。
    auto const count = _connections[IDX_ASYNC].size();
    for (uint8 i = 0; i < count; ++i)
        Enqueue(new PingOperation);
}


// brief:通过type指定的连接池类型,生成numConnections数量条mysql连接加入连接池
template <class T>
uint32 DatabaseWorkerPool<T>::OpenConnections(InternalIndex type, uint8 numConnections)
{
    for (uint8 i = 0; i < numConnections; ++i)
    {
        // 创建一条MySQLConnection连接了类
        auto connection = [&] {
            switch (type)
            {
            case IDX_ASYNC:
                return std::make_unique<T>(_queue.get(), *_connectionInfo);
            case IDX_SYNCH:
                return std::make_unique<T>(*_connectionInfo);
            default:
                ABORT();
            }
        }();

		// 与mysql进行实际的连接
        if (uint32 error = connection->Open())
        {
            // Failed to open a connection or invalid version, abort and cleanup
            _connections[type].clear();
            return error;
        }
// 对mysql版本进行判断,不能低于最低要求版本
#ifndef LIBMARIADB
        else if (connection->GetServerVersion() < MIN_MYSQL_SERVER_VERSION)
#else
        else if (connection->GetServerVersion() < MIN_MARIADB_SERVER_VERSION)
#endif
        {
#ifndef LIBMARIADB
            TC_LOG_ERROR("sql.driver", "TrinityCore does not support MySQL versions below " MIN_MYSQL_SERVER_VERSION_STRING " (found id {}, need id >= {}), please update your MySQL server", connection->GetServerVersion(), MIN_MYSQL_SERVER_VERSION);
#else
            TC_LOG_ERROR("sql.driver", "TrinityCore does not support MariaDB versions below " MIN_MARIADB_SERVER_VERSION_STRING " (found id {}, need id >= {}), please update your MySQL server", connection->GetServerVersion(), MIN_MARIADB_SERVER_VERSION);
#endif

            return 1;
        }
        else
        {
        	// 将生成的mysql连接加入对应的mysql连接池
            _connections[type].push_back(std::move(connection));
        }
    }

    // Everything is fine
    return 0;
}

template <class T>
unsigned long DatabaseWorkerPool<T>::EscapeString(char* to, char const* from, unsigned long length)
{
    if (!to || !from || !length)
        return 0;

    return _connections[IDX_SYNCH].front()->EscapeString(to, from, length);
}

template <class T>
void DatabaseWorkerPool<T>::Enqueue(SQLOperation* op)
{
    _queue->Push(op);
}

template <class T>
size_t DatabaseWorkerPool<T>::QueueSize() const
{
    return _queue->Size();
}

// 获取一条空闲连接
template <class T>
T* DatabaseWorkerPool<T>::GetFreeConnection()
{
#ifdef TRINITY_DEBUG
    if (_warnSyncQueries)
    {
        std::ostringstream ss;
        ss << boost::stacktrace::stacktrace();
        TC_LOG_WARN("sql.performances", "Sync query at:\n{}", ss.str());
    }
#endif

    uint8 i = 0;
    auto const num_cons = _connections[IDX_SYNCH].size();
    T* connection = nullptr;
    //! Block forever until a connection is free
    // 循环遍历,获取一条空闲连接返回
    for (;;)
    {
        connection = _connections[IDX_SYNCH][i++ % num_cons].get();
        //! Must be matched with t->Unlock() or you will get deadlocks
        if (connection->LockIfReady())
            break;
    }

    return connection;
}

template <class T>
char const* DatabaseWorkerPool<T>::GetDatabaseName() const
{
    return _connectionInfo->database.c_str();
}

template <class T>
void DatabaseWorkerPool<T>::Execute(char const* sql)
{
    if (!sql)
        return;

    BasicStatementTask* task = new BasicStatementTask(sql);
    Enqueue(task);
}

template <class T>
void DatabaseWorkerPool<T>::Execute(PreparedStatement<T>* stmt)
{
    PreparedStatementTask* task = new PreparedStatementTask(stmt);
    Enqueue(task);
}

template <class T>
void DatabaseWorkerPool<T>::DirectExecute(char const* sql)
{
    if (!sql)
        return;

    T* connection = GetFreeConnection();
    connection->Execute(sql);
    connection->Unlock();
}

template <class T>
void DatabaseWorkerPool<T>::DirectExecute(PreparedStatement<T>* stmt)
{
    T* connection = GetFreeConnection();
    connection->Execute(stmt);
    connection->Unlock();

    //! Delete proxy-class. Not needed anymore
    delete stmt;
}

template <class T>
void DatabaseWorkerPool<T>::ExecuteOrAppend(SQLTransaction<T>& trans, char const* sql)
{
    if (!trans)
        Execute(sql);
    else
        trans->Append(sql);
}

template <class T>
void DatabaseWorkerPool<T>::ExecuteOrAppend(SQLTransaction<T>& trans, PreparedStatement<T>* stmt)
{
    if (!trans)
        Execute(stmt);
    else
        trans->Append(stmt);
}

template class TC_DATABASE_API DatabaseWorkerPool<LoginDatabaseConnection>;
template class TC_DATABASE_API DatabaseWorkerPool<WorldDatabaseConnection>;
template class TC_DATABASE_API DatabaseWorkerPool<CharacterDatabaseConnection>;
template class TC_DATABASE_API DatabaseWorkerPool<HotfixDatabaseConnection>;

标签:std,DatabaseWorkerPool,TrinityCore,connection,源码,连接池,template,sql,服务端
From: https://blog.csdn.net/weixin_50448879/article/details/141498548

相关文章

  • 基于Java技术的量化积分管理系统设计与实现(2025年毕业项目-源码+论文+部署讲解等)
    文章目录1.前言2.详细视频演示3.论文参考4.项目运行截图5.技术框架5.1后端采用SpringBoot框架5.2前端框架Vue6.可行性分析7.系统测试7.1系统测试的目的7.2系统功能测试8.数据库表设计9.代码参考10.数据库脚本11.作者推荐项目12.为什么选择我?13.获取源......
  • 高颜值好看的登录页面(附源码)
    1 <!DOCTYPEhtml><html><head><meta charset="utf-8" /><title>登录</title><link rel="stylesheet" type="text/css" href="css/index.css" /></head><body>&l......
  • Go 互斥锁 Mutex 源码分析(二)
    原创文章,欢迎转载,转载请注明出处,谢谢。0.前言在Go互斥锁Mutex源码分析(一)一文中分析了互斥锁的结构和基本的抢占互斥锁的场景。在学习锁的过程中,看的不少文章是基于锁的状态解释的,个人经验来看,从锁的状态出发容易陷入细节,了解锁的状态转换过一段时间就忘,难以做到真正的......
  • 零基础国产GD32单片机编程入门(一)GD32单片机GPIO输出Keil5工程创建含源码
    文章目录一.概要二.GD32单片机GPIO内部结构图三.GD32单片机GPIO输入输出信号流向四.GD32单片机GPIO引脚的复用以及重映射五.从零开始创建一个GD32F103C8T6单片机GPIO输出驱动LED灯例程六.工程源代码下载七.小结一.概要GPIO(generalporposeintputoutput):单片机通......
  • 【排序算法】八大排序(下)(c语言实现)(附源码)
    ......
  • Java计算机毕业设计学生综合成绩测评系统(开题报告+源码+论文)
    本系统(程序+源码)带文档lw万字以上 文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景:在当今教育信息化快速发展的时代背景下,学生综合成绩测评系统已成为高等教育管理不可或缺的一部分。随着学生数量的增长、课程体系的复杂化以及教育评......
  • Java计算机毕业设计校园资源共享平台(开题报告+源码+论文)
    本系统(程序+源码)带文档lw万字以上 文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景随着高等教育规模的持续扩大,校园内资源分配不均与闲置浪费现象日益凸显。图书资料、学习用品、实验器材乃至生活物品等,在学生们手中往往难以实现高效......
  • Java计算机毕业设计校园疫情防控系统(开题报告+源码+论文)
    本系统(程序+源码)带文档lw万字以上 文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景在全球新冠疫情持续影响的背景下,校园作为知识传播与人才培养的重要场所,其疫情防控工作显得尤为重要。随着复学复课的逐步推进,如何科学、高效地管理校......
  • Java计算机毕业设计学术助手开发(开题报告+源码+论文)
    本系统(程序+源码)带文档lw万字以上 文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景:随着信息技术的飞速发展,学术研究领域正经历着前所未有的变革。面对海量且日益增长的学术资源,科研工作者在寻找、筛选及整合相关信息时面临着巨大挑战......
  • 实现多客户端与服务端的连接,并实现群聊效果
    1.创建客户端口packagecom.test4;importjava.io.IOException;importjava.net.Socket;importjava.util.ArrayList;publicclassKHD{publicstaticvoidmain(String[]args)throwsIOException{Socketlink=newSocket("127.0.0.1",6666)......