简介
- 魔兽世界服务器中数据库使用mysql来存储,并且数据库模块是直接嵌入在serve中,并没有单独的DB server
- 在魔兽连接池中有两种连接池,一种是同步连接池,还有异步连接池
- 连接池相关源码目录 TrinityCore-master\src\server\database\Database
- 连接池具体文件:DatabaseWorkerPool.h DatabaseWorkerPool.cpp
在连接池源码中涉及到了一些其他类,读者如有不懂,我已经列出清单,可以自行查看相关源码实现
核心源码讲解
DatabaseWorkerPool
是一个模板类,提供了四个类,分别代表魔兽世界四个数据库的连接池
/// Accessor to the world database
TC_DATABASE_API extern DatabaseWorkerPool<WorldDatabaseConnection> WorldDatabase;
/// Accessor to the character database
TC_DATABASE_API extern DatabaseWorkerPool<CharacterDatabaseConnection> CharacterDatabase;
/// Accessor to the realm/login database
TC_DATABASE_API extern DatabaseWorkerPool<LoginDatabaseConnection> LoginDatabase;
/// Accessor to the hotfix database
TC_DATABASE_API extern DatabaseWorkerPool<HotfixDatabaseConnection> HotfixDatabase;
数据库类型
IDX_ASYNC
: 异步连接池类型IDX_SYNCH
: 同步连接池类型
IDX_SIZE主要用于元素数量使用
enum InternalIndex
{
// 异步 0
IDX_ASYNC,
// 同步 1
IDX_SYNCH,
// 2 利用enum元素下标从零开始的性质,可以当作枚举长度使用
IDX_SIZE
};
核心成员
-
_queue
: 线程池任务队列 -
_connections
:连接池(同步连接池和异步连接池用这一个),异步连接池下标为0,同步连接池下标为1 -
_connectionInfo
: 存储数据库信息-
struct TC_DATABASE_API MySQLConnectionInfo { explicit MySQLConnectionInfo(std::string const& infoString); // 用户名 std::string user; // 密码 std::string password; // 数据库名 std::string database; // 主机地址 std::string host; // 端口 std::string port_or_socket; std::string ssl; };
-
-
_preparedStatementSize
:预加载sql语句数量,与业务有关,实现存储的sql语句 -
_async_threads
:异步线程池的线程数量 -
_synch_threads
:同步线程池的线程数量
// SQL任务队列,生产者消费者模型
std::unique_ptr<ProducerConsumerQueue<SQLOperation*>> _queue;
// 连接池(同步连接池和异步连接池用这一个)
std::array<std::vector<std::unique_ptr<T>>, IDX_SIZE> _connections;
// 存储数据库信息
std::unique_ptr<MySQLConnectionInfo> _connectionInfo;
// 预加载sql语句数量
std::vector<uint8> _preparedStatementSize;
// 异步线程数量,同步线程数量
uint8 _async_threads, _synch_threads;
核心源码讲解
初始化构造函数
创建连接池对象以及清零线程池数量
template <class T>
DatabaseWorkerPool<T>::DatabaseWorkerPool()
: _queue(new ProducerConsumerQueue<SQLOperation*>()),
_async_threads(0), _synch_threads(0)
{
}
析构函数释放资源
将连接池中所有mysql连接对象释放
template <class T>
DatabaseWorkerPool<T>::~DatabaseWorkerPool()
{
_queue->Cancel();
}
SetConnectionInfo 设置数据库信息以及连接池数量
template <class T>
void DatabaseWorkerPool<T>::SetConnectionInfo(std::string const& infoString,
uint8 const asyncThreads, uint8 const synchThreads)
{
_connectionInfo = std::make_unique<MySQLConnectionInfo>(infoString);
_async_threads = asyncThreads;
_synch_threads = synchThreads;
}
OpenConnections创建一条指定的连接
// brief:通过type指定的连接池类型,生成numConnections数量条mysql连接加入连接池
template <class T>
uint32 DatabaseWorkerPool<T>::OpenConnections(InternalIndex type, uint8 numConnections)
{
for (uint8 i = 0; i < numConnections; ++i)
{
// 创建一条MySQLConnection连接了类
auto connection = [&] {
switch (type)
{
case IDX_ASYNC:
return std::make_unique<T>(_queue.get(), *_connectionInfo);
case IDX_SYNCH:
return std::make_unique<T>(*_connectionInfo);
default:
ABORT();
}
}();
// 与mysql进行实际的连接
if (uint32 error = connection->Open())
{
// Failed to open a connection or invalid version, abort and cleanup
_connections[type].clear();
return error;
}
// 对mysql版本进行判断,不能低于最低要求版本
#ifndef LIBMARIADB
else if (connection->GetServerVersion() < MIN_MYSQL_SERVER_VERSION)
#else
else if (connection->GetServerVersion() < MIN_MARIADB_SERVER_VERSION)
#endif
{
#ifndef LIBMARIADB
TC_LOG_ERROR("sql.driver", "TrinityCore does not support MySQL versions below " MIN_MYSQL_SERVER_VERSION_STRING " (found id {}, need id >= {}), please update your MySQL server", connection->GetServerVersion(), MIN_MYSQL_SERVER_VERSION);
#else
TC_LOG_ERROR("sql.driver", "TrinityCore does not support MariaDB versions below " MIN_MARIADB_SERVER_VERSION_STRING " (found id {}, need id >= {}), please update your MySQL server", connection->GetServerVersion(), MIN_MARIADB_SERVER_VERSION);
#endif
return 1;
}
else
{
// 将生成的mysql连接加入对应的mysql连接池
_connections[type].push_back(std::move(connection));
}
}
// Everything is fine
return 0;
}
Open 初始化连接池
初始化连接池,生成指定数量的同步和异步mysql连接加入连接池
template <class T>
uint32 DatabaseWorkerPool<T>::Open()
{
WPFatal(_connectionInfo.get(), "Connection info was not set!");
TC_LOG_INFO("sql.driver", "Opening DatabasePool '{}'. "
"Asynchronous connections: {}, synchronous connections: {}.",
GetDatabaseName(), _async_threads, _synch_threads);
// 在异步连接池中添加_async_threads数量的异步连接
uint32 error = OpenConnections(IDX_ASYNC, _async_threads);
if (error)
return error;
// 在同步连接池中添加_sync_threads数量的同步连接
error = OpenConnections(IDX_SYNCH, _synch_threads);
if (!error)
{
TC_LOG_INFO("sql.driver", "DatabasePool '{}' opened successfully. "
"{} total connections running.", GetDatabaseName(),
(_connections[IDX_SYNCH].size() + _connections[IDX_ASYNC].size()));
}
return error;
}
close 关闭连接
关闭所有的同步连接以及异步连接
template <class T>
void DatabaseWorkerPool<T>::Close()
{
TC_LOG_INFO("sql.driver", "Closing down DatabasePool '{}'.", GetDatabaseName());
//! Closes the actualy MySQL connection.
// 关闭所有的异步连接
_connections[IDX_ASYNC].clear();
TC_LOG_INFO("sql.driver", "Asynchronous connections on DatabasePool '{}' terminated. "
"Proceeding with synchronous connections.",
GetDatabaseName());
//! Shut down the synchronous connections
//! There's no need for locking the connection, because DatabaseWorkerPool<>::Close
//! should only be called after any other thread tasks in the core have exited,
//! meaning there can be no concurrent access at this point.
// 关闭所有的同步连接
_connections[IDX_SYNCH].clear();
TC_LOG_INFO("sql.driver", "All connections on DatabasePool '{}' closed.", GetDatabaseName());
}
GetFreeConnection获取一条空闲连接
template <class T>
T* DatabaseWorkerPool<T>::GetFreeConnection()
{
#ifdef TRINITY_DEBUG
if (_warnSyncQueries)
{
std::ostringstream ss;
ss << boost::stacktrace::stacktrace();
TC_LOG_WARN("sql.performances", "Sync query at:\n{}", ss.str());
}
#endif
uint8 i = 0;
auto const num_cons = _connections[IDX_SYNCH].size();
T* connection = nullptr;
//! Block forever until a connection is free
// 循环遍历,获取一条空闲连接返回
for (;;)
{
connection = _connections[IDX_SYNCH][i++ % num_cons].get();
//! Must be matched with t->Unlock() or you will get deadlocks
if (connection->LockIfReady())
break;
}
return connection;
}
完整源码
DatabaseWorkerPool.h
#ifndef _DATABASEWORKERPOOL_H
#define _DATABASEWORKERPOOL_H
#include "Define.h"
#include "DatabaseEnvFwd.h"
#include "StringFormat.h"
#include <array>
#include <string>
#include <vector>
template <typename T>
class ProducerConsumerQueue;
class SQLOperation;
struct MySQLConnectionInfo;
template <class T>
class DatabaseWorkerPool
{
private:
enum InternalIndex
{
// 异步 0
IDX_ASYNC,
// 同步 1
IDX_SYNCH,
// 利用enum元素下标从零开始的性质,可以当作枚举长度使用
IDX_SIZE
};
public:
/* Activity state */
DatabaseWorkerPool();
~DatabaseWorkerPool();
// 设置数据库信息以及异步/同步连接池线程数量大小
void SetConnectionInfo(std::string const& infoString, uint8 const asyncThreads, uint8 const synchThreads);
// 初始化连接池,生成指定数量的同步和异步mysql连接加入连接池
uint32 Open();
// 将同步连接池与异步连接池的mysql连接进行销毁
void Close();
//! Prepares all prepared statements
bool PrepareStatements();
inline MySQLConnectionInfo const* GetConnectionInfo() const
{
return _connectionInfo.get();
}
/**
延迟单向语句方法。
*/
// 以字符串格式对将异步执行的单向SQL操作进行排队。
// 这个方法应该只用于只执行一次的查询,例如在启动期间。
// 将sql语句加入任务队列
void Execute(char const* sql);
// !以字符串格式(带有可变参数)对将异步执行的单向SQL操作进行排队。
// !这个方法应该只用于只执行一次的查询,例如在启动期间。
template<typename... Args>
void PExecute(Trinity::FormatString<Args...> sql, Args&&... args)
{
if (Trinity::IsFormatEmptyOrNull(sql))
return;
Execute(Trinity::StringFormat(sql, std::forward<Args>(args)...).c_str());
}
// !以准备好的语句格式对将异步执行的单向SQL操作进行排队。
// !语句必须使用CONNECTION_ASYNC标志来准备。
void Execute(PreparedStatement<T>* stmt);
/**
直接同步单向语句方法。
*/
// !直接以字符串格式执行单向SQL操作,这将阻塞调用线程,直到完成。
// !这个方法应该只用于只执行一次的查询,例如在启动期间。
void DirectExecute(char const* sql);
// !直接以字符串格式(带有可变参数)执行单向SQL操作,这将阻塞调用线程,直到完成。
// !这个方法应该只用于只执行一次的查询,例如在启动期间。
template<typename... Args>
void DirectPExecute(Trinity::FormatString<Args...> sql, Args&&... args)
{
if (Trinity::IsFormatEmptyOrNull(sql))
return;
DirectExecute(Trinity::StringFormat(sql, std::forward<Args>(args)...).c_str());
}
// !直接以准备好的语句格式执行单向SQL操作,这将阻塞调用线程,直到完成。
// !语句必须使用connection_sync标志来准备。
void DirectExecute(PreparedStatement<T>* stmt);
/**
同步查询(带结果集)方法。
*/
// !直接以字符串格式执行SQL查询,该查询将阻塞调用线程,直到完成。
// 返回引用计数的自动指针,不需要在上层代码中手动内存管理。
QueryResult Query(char const* sql, T* connection = nullptr);
// !直接以字符串格式执行SQL查询(带有可变参数),这将阻塞调用线程,直到完成。
// !返回引用计数的自动指针,不需要在上层代码中手动内存管理。
template<typename... Args>
QueryResult PQuery(Trinity::FormatString<Args...> sql, T* conn, Args&&... args)
{
if (Trinity::IsFormatEmptyOrNull(sql))
return QueryResult(nullptr);
return Query(Trinity::StringFormat(sql, std::forward<Args>(args)...).c_str(), conn);
}
// !直接以字符串格式执行SQL查询(带有可变参数),这将阻塞调用线程,直到完成。
// !返回引用计数的自动指针,不需要在上层代码中手动内存管理。
template<typename... Args>
QueryResult PQuery(Trinity::FormatString<Args...> sql, Args&&... args)
{
if (Trinity::IsFormatEmptyOrNull(sql))
return QueryResult(nullptr);
return Query(Trinity::StringFormat(sql, std::forward<Args>(args)...).c_str());
}
// !直接以准备好的格式执行SQL查询,该查询将阻塞调用线程,直到完成。
// !返回引用计数的自动指针,不需要在上层代码中手动内存管理。
// !语句必须带有CONNECTION_SYNCH标志。
PreparedQueryResult Query(PreparedStatement<T>* stmt);
/**
异步查询(带结果集)方法。
*/
// !以字符串格式对查询进行排队,该查询将在查询执行后立即设置QueryResultFuture返回对象的值。
// !然后在ProcessQueryCallback方法中处理返回值。
QueryCallback AsyncQuery(char const* sql);
// !以准备好的格式对查询进行队列,该格式将在查询执行后立即设置PreparedQueryResultFuture返回对象的值。
// !然后在ProcessQueryCallback方法中处理返回值。
// !语句必须使用CONNECTION_ASYNC标志来准备。
QueryCallback AsyncQuery(PreparedStatement<T>* stmt);
// !将设置QueryResultHolderFuture值的SQL操作向量(可以是临时的,也可以是准备好的)排队
// !查询执行后立即返回对象。
// !然后在ProcessQueryCallback方法中处理返回值。
// !添加到这个holder中的任何预处理语句都需要使用CONNECTION_ASYNC标志进行预处理。
SQLQueryHolderCallback DelayQueryHolder(std::shared_ptr<SQLQueryHolder<T>> holder);
/**
事务上下文方法。
*/
// !开始一个自动托管事务指针,如果未提交,该指针将自动回滚。(自动提交= 0)
SQLTransaction<T> BeginTransaction();
// !对一组单向SQL操作(可以是临时的,也可以是准备好的)进行排队。这些操作的顺序
// !在执行过程中,附加到事务中的内容将受到尊重。
void CommitTransaction(SQLTransaction<T> transaction);
//! 对一组单向SQL操作(可以是临时的,也可以是准备好的)进行排队。这些操作的顺序
// !在执行过程中,附加到事务中的内容将受到尊重。
TransactionCallback AsyncCommitTransaction(SQLTransaction<T> transaction);
// !直接执行一组单向SQL操作(可以是临时的,也可以是准备好的)。这些操作的顺序
// !在执行过程中,附加到事务中的内容将受到尊重。
void DirectCommitTransaction(SQLTransaction<T>& transaction);
// !方法,用于在不同上下文中执行特别语句。
// !如果存在有效对象,将封装在事务中,否则单独执行。
void ExecuteOrAppend(SQLTransaction<T>& trans, char const* sql);
// !方法,用于在不同上下文中执行准备好的语句。
// !如果存在有效对象,将封装在事务中,否则单独执行。
void ExecuteOrAppend(SQLTransaction<T>& trans, PreparedStatement<T>* stmt);
/**
Other
*/
typedef typename T::Statements PreparedStatementIndex;
// !自动管理(内部)指针,指向一个准备好的语句对象,供上层代码使用。
// !在this->DirectExecute(PreparedStatement*)、this->Query(PreparedStatement*)或PreparedStatementTask::~PreparedStatementTask中删除指针。
// !在执行之前,这个对象不会绑定到MySQL上下文中准备好的语句。
PreparedStatement<T>* GetPreparedStatement(PreparedStatementIndex index);
// !为当前排序应用转义字符串。(use utf8)
// 字符串转义
void EscapeString(std::string& str);
//!保持我们所有的MySQL连接存活,防止服务器断开我们的连接。
void KeepAlive();
void WarnAboutSyncQueries([[maybe_unused]] bool warn)
{
#ifdef TRINITY_DEBUG
_warnSyncQueries = warn;
#endif
}
size_t QueueSize() const;
private:
// 通过type指定的连接池类型,生成numConnections数量条mysql连接加入连接池
uint32 OpenConnections(InternalIndex type, uint8 numConnections);
unsigned long EscapeString(char* to, char const* from, unsigned long length);
// 将任务加入任务队列
void Enqueue(SQLOperation* op);
// 获取同步连接池中的空闲连接。
// 调用者必须在触及MySQL上下文后调用t->Unlock()以防止死锁。
T* GetFreeConnection();
// 获取数据库名
char const* GetDatabaseName() const;
// SQL任务队列,生产者消费者模型
std::unique_ptr<ProducerConsumerQueue<SQLOperation*>> _queue;
// 连接池(同步连接池和异步连接池用这一个)
std::array<std::vector<std::unique_ptr<T>>, IDX_SIZE> _connections;
// 存储数据库信息
std::unique_ptr<MySQLConnectionInfo> _connectionInfo;
// 预加载sql语句数量
std::vector<uint8> _preparedStatementSize;
// 异步线程数量,同步线程数量
uint8 _async_threads, _synch_threads;
#ifdef TRINITY_DEBUG
static inline thread_local bool _warnSyncQueries = false;
#endif
};
#endif
DatabaseWorkerPool.cpp
#include "DatabaseWorkerPool.h"
#include "AdhocStatement.h"
#include "Common.h"
#include "Errors.h"
#include "Implementation/LoginDatabase.h"
#include "Implementation/WorldDatabase.h"
#include "Implementation/CharacterDatabase.h"
#include "Implementation/HotfixDatabase.h"
#include "Log.h"
#include "MySQLPreparedStatement.h"
#include "PreparedStatement.h"
#include "ProducerConsumerQueue.h"
#include "QueryCallback.h"
#include "QueryHolder.h"
#include "QueryResult.h"
#include "SQLOperation.h"
#include "Transaction.h"
#include "MySQLWorkaround.h"
#include <mysqld_error.h>
#ifdef TRINITY_DEBUG
#include <sstream>
#include <boost/stacktrace.hpp>
#endif
#define MIN_MYSQL_SERVER_VERSION 50700u
#define MIN_MYSQL_SERVER_VERSION_STRING "5.7"
#define MIN_MYSQL_CLIENT_VERSION 50700u
#define MIN_MYSQL_CLIENT_VERSION_STRING "5.7"
#define MIN_MARIADB_SERVER_VERSION 100209u
#define MIN_MARIADB_SERVER_VERSION_STRING "10.2.9"
#define MIN_MARIADB_CLIENT_VERSION 30003u
#define MIN_MARIADB_CLIENT_VERSION_STRING "3.0.3"
class PingOperation : public SQLOperation
{
//! Operation for idle delaythreads
// 测试mysql连接是否可以ping通
bool Execute() override
{
m_conn->Ping();
return true;
}
};
template <class T>
DatabaseWorkerPool<T>::DatabaseWorkerPool()
: _queue(new ProducerConsumerQueue<SQLOperation*>()),
_async_threads(0), _synch_threads(0)
{
WPFatal(mysql_thread_safe(), "Used MySQL library isn't thread-safe.");
#if defined(LIBMARIADB) && MARIADB_PACKAGE_VERSION_ID >= 30200
WPFatal(mysql_get_client_version() >= MIN_MARIADB_CLIENT_VERSION, "TrinityCore does not support MariaDB versions below " MIN_MARIADB_CLIENT_VERSION_STRING " (found %s id %lu, need id >= %u), please update your MariaDB client library", mysql_get_client_info(), mysql_get_client_version(), MIN_MARIADB_CLIENT_VERSION);
WPFatal(mysql_get_client_version() == MARIADB_PACKAGE_VERSION_ID, "Used MariaDB library version (%s id %lu) does not match the version id used to compile TrinityCore (id %u). Search on forum for TCE00011.", mysql_get_client_info(), mysql_get_client_version(), MARIADB_PACKAGE_VERSION_ID);
#else
WPFatal(mysql_get_client_version() >= MIN_MYSQL_CLIENT_VERSION, "TrinityCore does not support MySQL versions below " MIN_MYSQL_CLIENT_VERSION_STRING " (found %s id %lu, need id >= %u), please update your MySQL client library", mysql_get_client_info(), mysql_get_client_version(), MIN_MYSQL_CLIENT_VERSION);
WPFatal(mysql_get_client_version() == MYSQL_VERSION_ID, "Used MySQL library version (%s id %lu) does not match the version id used to compile TrinityCore (id %u). Search on forum for TCE00011.", mysql_get_client_info(), mysql_get_client_version(), MYSQL_VERSION_ID);
#endif
}
template <class T>
DatabaseWorkerPool<T>::~DatabaseWorkerPool()
{
_queue->Cancel();
}
template <class T>
void DatabaseWorkerPool<T>::SetConnectionInfo(std::string const& infoString,
uint8 const asyncThreads, uint8 const synchThreads)
{
_connectionInfo = std::make_unique<MySQLConnectionInfo>(infoString);
_async_threads = asyncThreads;
_synch_threads = synchThreads;
}
template <class T>
uint32 DatabaseWorkerPool<T>::Open()
{
WPFatal(_connectionInfo.get(), "Connection info was not set!");
TC_LOG_INFO("sql.driver", "Opening DatabasePool '{}'. "
"Asynchronous connections: {}, synchronous connections: {}.",
GetDatabaseName(), _async_threads, _synch_threads);
// 在异步连接池中添加_async_threads数量的异步连接
uint32 error = OpenConnections(IDX_ASYNC, _async_threads);
if (error)
return error;
// 在同步连接池中添加_sync_threads数量的同步连接
error = OpenConnections(IDX_SYNCH, _synch_threads);
if (!error)
{
TC_LOG_INFO("sql.driver", "DatabasePool '{}' opened successfully. "
"{} total connections running.", GetDatabaseName(),
(_connections[IDX_SYNCH].size() + _connections[IDX_ASYNC].size()));
}
return error;
}
template <class T>
void DatabaseWorkerPool<T>::Close()
{
TC_LOG_INFO("sql.driver", "Closing down DatabasePool '{}'.", GetDatabaseName());
//! Closes the actualy MySQL connection.
// 关闭所有的异步连接
_connections[IDX_ASYNC].clear();
TC_LOG_INFO("sql.driver", "Asynchronous connections on DatabasePool '{}' terminated. "
"Proceeding with synchronous connections.",
GetDatabaseName());
//! Shut down the synchronous connections
//! There's no need for locking the connection, because DatabaseWorkerPool<>::Close
//! should only be called after any other thread tasks in the core have exited,
//! meaning there can be no concurrent access at this point.
// 关闭所有的同步连接
_connections[IDX_SYNCH].clear();
TC_LOG_INFO("sql.driver", "All connections on DatabasePool '{}' closed.", GetDatabaseName());
}
template <class T>
bool DatabaseWorkerPool<T>::PrepareStatements()
{
// connections == std::vector<std::unique_ptr<T>>
// 先处理异步连接池,在处理同步连接池
for (auto& connections : _connections)
{
// connection == std::unique_ptr<T> == 一条mysql连接抽象类
for (auto& connection : connections)
{
// 对mysql连接抽象类上锁,是否允许其他线程访问此连接
connection->LockIfReady();
// 执行mysql连接具体派生类(具体数据库)的预加载sql语句(业务相关)
// 具体可查看数据库派生类的DoPrepareStatements函数
if (!connection->PrepareStatements())
{
connection->Unlock();
Close();
return false;
}
else
connection->Unlock();
// 获取预加载sql的数量
size_t const preparedSize = connection->m_stmts.size();
if (_preparedStatementSize.size() < preparedSize)
_preparedStatementSize.resize(preparedSize);
// 初始化预加载sql
for (size_t i = 0; i < preparedSize; ++i)
{
// already set by another connection
// (each connection only has prepared statements of it's own type sync/async)
if (_preparedStatementSize[i] > 0)
continue;
if (MySQLPreparedStatement * stmt = connection->m_stmts[i].get())
{
uint32 const paramCount = stmt->GetParameterCount();
// TC only supports uint8 indices.
ASSERT(paramCount < std::numeric_limits<uint8>::max());
_preparedStatementSize[i] = static_cast<uint8>(paramCount);
}
}
}
}
return true;
}
template <class T>
QueryResult DatabaseWorkerPool<T>::Query(char const* sql, T* connection /*= nullptr*/)
{
if (!connection)
connection = GetFreeConnection();
ResultSet* result = connection->Query(sql);
connection->Unlock();
if (!result || !result->GetRowCount() || !result->NextRow())
{
delete result;
return QueryResult(nullptr);
}
return QueryResult(result);
}
template <class T>
PreparedQueryResult DatabaseWorkerPool<T>::Query(PreparedStatement<T>* stmt)
{
// 获取一条空闲连接
auto connection = GetFreeConnection();
// 执行stmt指定的预加载sql
PreparedResultSet* ret = connection->Query(stmt);
connection->Unlock();
//! Delete proxy-class. Not needed anymore
delete stmt;
if (!ret || !ret->GetRowCount())
{
delete ret;
return PreparedQueryResult(nullptr);
}
// 将结果返回
return PreparedQueryResult(ret);
}
template <class T>
QueryCallback DatabaseWorkerPool<T>::AsyncQuery(char const* sql)
{
// BasicStatementTask:内部包装了std::future以及存储sql语句
BasicStatementTask* task = new BasicStatementTask(sql, true);
// 在进入队列之前存储未来的结果-任务可能在从此方法返回之前已经被处理和删除
QueryResultFuture result = task->GetFuture();
// 将任务加入生产者消费者模型队列中
Enqueue(task);
// 异步返回结果
return QueryCallback(std::move(result));
}
template <class T>
QueryCallback DatabaseWorkerPool<T>::AsyncQuery(PreparedStatement<T>* stmt)
{
PreparedStatementTask* task = new PreparedStatementTask(stmt, true);
// Store future result before enqueueing - task might get already processed and deleted before returning from this method
PreparedQueryResultFuture result = task->GetFuture();
Enqueue(task);
return QueryCallback(std::move(result));
}
template <class T>
SQLQueryHolderCallback DatabaseWorkerPool<T>::DelayQueryHolder(std::shared_ptr<SQLQueryHolder<T>> holder)
{
SQLQueryHolderTask* task = new SQLQueryHolderTask(holder);
// Store future result before enqueueing - task might get already processed and deleted before returning from this method
QueryResultHolderFuture result = task->GetFuture();
Enqueue(task);
return { std::move(holder), std::move(result) };
}
template <class T>
SQLTransaction<T> DatabaseWorkerPool<T>::BeginTransaction()
{
return std::make_shared<Transaction<T>>();
}
template <class T>
void DatabaseWorkerPool<T>::CommitTransaction(SQLTransaction<T> transaction)
{
#ifdef TRINITY_DEBUG
// !只在调试模式下分析事务弱点。
// !理想情况下,我们在调试模式下捕获错误,然后纠正它们,
// !所以没有必要在Release模式下浪费这些CPU周期。
switch (transaction->GetSize())
{
case 0:
TC_LOG_DEBUG("sql.driver", "Transaction contains 0 queries. Not executing.");
return;
case 1:
TC_LOG_DEBUG("sql.driver", "Warning: Transaction only holds 1 query, consider removing Transaction context in code.");
break;
default:
break;
}
#endif // TRINITY_DEBUG
Enqueue(new TransactionTask(transaction));
}
// 异步提交事务
template <class T>
TransactionCallback DatabaseWorkerPool<T>::AsyncCommitTransaction(SQLTransaction<T> transaction)
{
#ifdef TRINITY_DEBUG
//! Only analyze transaction weaknesses in Debug mode.
//! Ideally we catch the faults in Debug mode and then correct them,
//! so there's no need to waste these CPU cycles in Release mode.
switch (transaction->GetSize())
{
case 0:
TC_LOG_DEBUG("sql.driver", "Transaction contains 0 queries. Not executing.");
break;
case 1:
TC_LOG_DEBUG("sql.driver", "Warning: Transaction only holds 1 query, consider removing Transaction context in code.");
break;
default:
break;
}
#endif // TRINITY_DEBUG
TransactionWithResultTask* task = new TransactionWithResultTask(transaction);
TransactionFuture result = task->GetFuture();
Enqueue(task);
return TransactionCallback(std::move(result));
}
// 同步提交事务
template <class T>
void DatabaseWorkerPool<T>::DirectCommitTransaction(SQLTransaction<T>& transaction)
{
T* connection = GetFreeConnection();
int errorCode = connection->ExecuteTransaction(transaction);
if (!errorCode)
{
connection->Unlock(); // OK, operation succesful
return;
}
//! Handle MySQL Errno 1213 without extending deadlock to the core itself
/// @todo More elegant way
if (errorCode == ER_LOCK_DEADLOCK)
{
//todo: handle multiple sync threads deadlocking in a similar way as async threads
uint8 loopBreaker = 5;
for (uint8 i = 0; i < loopBreaker; ++i)
{
if (!connection->ExecuteTransaction(transaction))
break;
}
}
//! Clean up now.
transaction->Cleanup();
connection->Unlock();
}
template <class T>
PreparedStatement<T>* DatabaseWorkerPool<T>::GetPreparedStatement(PreparedStatementIndex index)
{
return new PreparedStatement<T>(index, _preparedStatementSize[index]);
}
// 字符串转义
template <class T>
void DatabaseWorkerPool<T>::EscapeString(std::string& str)
{
if (str.empty())
return;
char* buf = new char[str.size() * 2 + 1];
EscapeString(buf, str.c_str(), uint32(str.size()));
str = buf;
delete[] buf;
}
template <class T>
void DatabaseWorkerPool<T>::KeepAlive()
{
//! Ping同步连接
for (auto& connection : _connections[IDX_SYNCH])
{
if (connection->LockIfReady())
{
connection->Ping();
connection->Unlock();
}
}
// !假设所有工作线程都是空闲的,每个工作线程将接收1个ping操作请求
// !如果一个或多个工作线程很忙,ping操作将不会平均分配,但这无关紧要
// !因为唯一的目的是防止连接空转。
auto const count = _connections[IDX_ASYNC].size();
for (uint8 i = 0; i < count; ++i)
Enqueue(new PingOperation);
}
// brief:通过type指定的连接池类型,生成numConnections数量条mysql连接加入连接池
template <class T>
uint32 DatabaseWorkerPool<T>::OpenConnections(InternalIndex type, uint8 numConnections)
{
for (uint8 i = 0; i < numConnections; ++i)
{
// 创建一条MySQLConnection连接了类
auto connection = [&] {
switch (type)
{
case IDX_ASYNC:
return std::make_unique<T>(_queue.get(), *_connectionInfo);
case IDX_SYNCH:
return std::make_unique<T>(*_connectionInfo);
default:
ABORT();
}
}();
// 与mysql进行实际的连接
if (uint32 error = connection->Open())
{
// Failed to open a connection or invalid version, abort and cleanup
_connections[type].clear();
return error;
}
// 对mysql版本进行判断,不能低于最低要求版本
#ifndef LIBMARIADB
else if (connection->GetServerVersion() < MIN_MYSQL_SERVER_VERSION)
#else
else if (connection->GetServerVersion() < MIN_MARIADB_SERVER_VERSION)
#endif
{
#ifndef LIBMARIADB
TC_LOG_ERROR("sql.driver", "TrinityCore does not support MySQL versions below " MIN_MYSQL_SERVER_VERSION_STRING " (found id {}, need id >= {}), please update your MySQL server", connection->GetServerVersion(), MIN_MYSQL_SERVER_VERSION);
#else
TC_LOG_ERROR("sql.driver", "TrinityCore does not support MariaDB versions below " MIN_MARIADB_SERVER_VERSION_STRING " (found id {}, need id >= {}), please update your MySQL server", connection->GetServerVersion(), MIN_MARIADB_SERVER_VERSION);
#endif
return 1;
}
else
{
// 将生成的mysql连接加入对应的mysql连接池
_connections[type].push_back(std::move(connection));
}
}
// Everything is fine
return 0;
}
template <class T>
unsigned long DatabaseWorkerPool<T>::EscapeString(char* to, char const* from, unsigned long length)
{
if (!to || !from || !length)
return 0;
return _connections[IDX_SYNCH].front()->EscapeString(to, from, length);
}
template <class T>
void DatabaseWorkerPool<T>::Enqueue(SQLOperation* op)
{
_queue->Push(op);
}
template <class T>
size_t DatabaseWorkerPool<T>::QueueSize() const
{
return _queue->Size();
}
// 获取一条空闲连接
template <class T>
T* DatabaseWorkerPool<T>::GetFreeConnection()
{
#ifdef TRINITY_DEBUG
if (_warnSyncQueries)
{
std::ostringstream ss;
ss << boost::stacktrace::stacktrace();
TC_LOG_WARN("sql.performances", "Sync query at:\n{}", ss.str());
}
#endif
uint8 i = 0;
auto const num_cons = _connections[IDX_SYNCH].size();
T* connection = nullptr;
//! Block forever until a connection is free
// 循环遍历,获取一条空闲连接返回
for (;;)
{
connection = _connections[IDX_SYNCH][i++ % num_cons].get();
//! Must be matched with t->Unlock() or you will get deadlocks
if (connection->LockIfReady())
break;
}
return connection;
}
template <class T>
char const* DatabaseWorkerPool<T>::GetDatabaseName() const
{
return _connectionInfo->database.c_str();
}
template <class T>
void DatabaseWorkerPool<T>::Execute(char const* sql)
{
if (!sql)
return;
BasicStatementTask* task = new BasicStatementTask(sql);
Enqueue(task);
}
template <class T>
void DatabaseWorkerPool<T>::Execute(PreparedStatement<T>* stmt)
{
PreparedStatementTask* task = new PreparedStatementTask(stmt);
Enqueue(task);
}
template <class T>
void DatabaseWorkerPool<T>::DirectExecute(char const* sql)
{
if (!sql)
return;
T* connection = GetFreeConnection();
connection->Execute(sql);
connection->Unlock();
}
template <class T>
void DatabaseWorkerPool<T>::DirectExecute(PreparedStatement<T>* stmt)
{
T* connection = GetFreeConnection();
connection->Execute(stmt);
connection->Unlock();
//! Delete proxy-class. Not needed anymore
delete stmt;
}
template <class T>
void DatabaseWorkerPool<T>::ExecuteOrAppend(SQLTransaction<T>& trans, char const* sql)
{
if (!trans)
Execute(sql);
else
trans->Append(sql);
}
template <class T>
void DatabaseWorkerPool<T>::ExecuteOrAppend(SQLTransaction<T>& trans, PreparedStatement<T>* stmt)
{
if (!trans)
Execute(stmt);
else
trans->Append(stmt);
}
template class TC_DATABASE_API DatabaseWorkerPool<LoginDatabaseConnection>;
template class TC_DATABASE_API DatabaseWorkerPool<WorldDatabaseConnection>;
template class TC_DATABASE_API DatabaseWorkerPool<CharacterDatabaseConnection>;
template class TC_DATABASE_API DatabaseWorkerPool<HotfixDatabaseConnection>;
标签:std,DatabaseWorkerPool,TrinityCore,connection,源码,连接池,template,sql,服务端
From: https://blog.csdn.net/weixin_50448879/article/details/141498548