首页 > 数据库 >PyMySQL及数据库连接池

PyMySQL及数据库连接池

时间:2023-06-21 15:23:10浏览次数:44  
标签:cur 数据库 args PyMySQL 连接池 sql self conn result

1 PyMySQL及数据库连接池

PyMySQL 是在 Python3.x 版本中用于连接 MySQL 服务器的一个库,由于频繁连接数据库很耗时,因此将 PyMySQL 对数据库的一系列操作封装到一个类中,实现连接一次数据库就可以完成多次操作,以提高性能。

2 普通的数据库连接

import pymysql


class SQLHelper(object):
    """
    PyMySQL操作数据库
    """
    def __init__(self):
        """
        在实例化对象时自动连接数据库
        """
        # 也可以根据需要将连接数据库的参数放到配置文件中,然后在初始化时读取
        self.connect()

    def open(self):
        """
        创建数据库连接对象和游标对象
        """
        self.conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', password="xxx", database='xxx', charset='utf8')
        self.cur = self.conn.cursor(cursor=pymysql.cursors.DictCursor)  # 游标设置为字典类型

    def get_list(self, sql, args=None):
        """
        获取列表数据
        """
        self.cur.execute(sql, args)
        data_list = self.cur.fetchall()
        return data_list

    def get_one(self, sql, args=None):
        """
        获取单条数据
        """
        self.cur.execute(sql, args)
        data = self.cur.fetchone()
        return data

    def modify(self, sql, args=None):
        """
        更新、删除单条数据
        """
        self.cur.execute(sql, args)
        self.conn.commit()

    def bulk_modify(self, sql, args=None):
        """
        批量增加、更新、删除数据(好处:只连接一次,批量操作,只提交一次)
        """
        self.cur.executemany(sql, args)
        self.conn.commit()

    def create(self, sql, args=None):
        """
        增加单条数据,并返回最新自增ID
        """
        self.cur.execute(sql, args)
        self.conn.commit()
        return self.cur.lastrowid

    def close(self):
        """
        关闭连接
        """
        self.cur.close()
        self.conn.close()import pymysql


class SQLHelper(object):
    """
    PyMySQL操作数据库
    """
    def __init__(self):
        """
        在实例化对象时自动连接数据库
        """
        # 也可以根据需要将连接数据库的参数放到配置文件中,然后在初始化时读取
        self.connect()

    def open(self):
        """
        创建数据库连接对象和游标对象
        """
        self.conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', password="xxx", database='xxx', charset='utf8')
        self.cur = self.conn.cursor(cursor=pymysql.cursors.DictCursor)  # 游标设置为字典类型

    def get_list(self, sql, args=None):
        """
        获取列表数据
        """
        self.cur.execute(sql, args)
        data_list = self.cur.fetchall()
        return data_list

    def get_one(self, sql, args=None):
        """
        获取单条数据
        """
        self.cur.execute(sql, args)
        data = self.cur.fetchone()
        return data

    def modify(self, sql, args=None):
        """
        更新、删除单条数据
        """
        self.cur.execute(sql, args)
        self.conn.commit()

    def bulk_modify(self, sql, args=None):
        """
        批量增加、更新、删除数据(好处:只连接一次,批量操作,只提交一次)
        """
        self.cur.executemany(sql, args)
        self.conn.commit()

    def create(self, sql, args=None):
        """
        增加单条数据,并返回最新自增ID
        """
        self.cur.execute(sql, args)
        self.conn.commit()
        return self.cur.lastrowid

    def close(self):
        """
        关闭连接
        """
        self.cur.close()
        self.conn.close()

3 数据库连接池

数据库连接池帮我们维护了若干个与数据库的连接,当我们需要连接数据库时,只需向连接池申请一个连接,当操作完数据库后,再将连接放回到连接池。在实际项目中,如果不用 ORM 要写原生 SQL 操作数据库,建议用数据库连接池,可以处理并发请求的场景。在 Python 中,可以通过 DBUtils 模块实现一个数据库连接池。此连接池有两种连接模式:

  • 模式一:为每个线程创建一个连接,线程即使调用了close方法,也不会关闭,只是把连接重新放到连接池,供自己线程再次使用。当线程终止时,连接自动关闭。
  • 模式二:创建一批连接到连接池,供所有线程共享使用。

4 DBUtils 的安装

在虚拟环境下安装:

pip install pymysql DBUtils

5 基于函数封装

from dbutils.pooled_db import PooledDB
import pymysql

POOL = PooledDB(
    creator=pymysql,  # 使用链接数据库的模块
    maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
    mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
    blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
    ping=0,  # ping MySQL服务端,检查是否服务可用。如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
    host='127.0.0.1',
    port=3306,
    user='root',
    password='xxx',
    database='xxx',
    charset='utf8'
)


def get_list(sql, args=None):
    """
    获取所有数据
    :param sql: SQL语句
    :param args: SQL语句的占位参数
    :return: 查询结果
    """
    conn = POOL.connection()  # 去连接池中获取一个连接
    cur = conn.cursor()
    cur.execute(sql, args)
    result = cur.fetchall()
    cur.close()
    conn.close()  # 将连接放回到连接池,并不会关闭连接,当线程终止时,连接自动关闭
    return result


def get_one(sql, args=None):
    """
    获取单条数据
    :return: 查询结果
    """
    conn = POOL.connection()
    cur = conn.cursor()
    cur.execute(sql, args)
    result = cur.fetchone()
    cur.close()
    conn.close()
    return result


def modify(sql, args=None):
    """
    修改、增加、删除操作
    :return: 受影响的行数
    """
    conn = POOL.connection()
    cur = conn.cursor()
    result = cur.execute(sql, args)
    conn.commit()
    cur.close()
    conn.close()
    return result


def bulk_modify(sql, args=None):
    """
    批量修改、增加、删除操作
    :return: 受影响的行数
    """
    conn = POOL.connection()
    cur = conn.cursor()
    result = cur.executemany(sql, args)
    conn.commit()
    cur.close()
    conn.close()
    return result


def create(sql, args=None):
    """
    增加数据
    :return: 新增数据行的ID
    """
    conn = POOL.connection()
    cur = conn.cursor()
    cur.execute(sql, args)
    conn.commit()
    cur.close()
    conn.close()
    return cur.lastrowid


if __name__ == '__main__':
    result = get_list("select * from student")
    # result = get_one("select * from class where id=%s and title=%s", ['10', '五班'])
    # result = modify("update class set title=%s where id=%s", ['五班', '10'])
    # result = bulk_modify("update class set title=%s where id=%s", [('五班1', '10'), ('五班2', '11')])
    # result = create("insert into student (name, class_id) values (%s, %s)", ['张三', '10'])
    # result = bulk_modify("insert into student (name, class_id) values (%s, %s)", [('李四', '10'), ('王五', '10')])
    # result = modify("delete from student where id=%s", ['20', ])
    print(result)

6 基于类封装

 

from dbutils.pooled_db import PooledDB
import pymysql


class SQLHelper(object):

    def __init__(self):
        self.pool = PooledDB(
            creator=pymysql,  # 使用链接数据库的模块
            maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
            mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
            blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
            ping=0,  # ping MySQL服务端,检查是否服务可用。如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
            host='127.0.0.1',
            port=3306,
            user='root',
            password='xxx',
            database='xxx',
            charset='utf8'
        )

    def open(self):
        conn = self.pool.connection()  # 去连接池中获取一个连接
        cur = conn.cursor()
        return conn, cur

    def close(self, conn, cur):
        cur.close()
        conn.close()  # 将连接放回到连接池,并不会关闭连接,当线程终止时,连接自动关闭

    def get_list(self, sql, args=None):
        """
        获取所有数据
        :param sql: SQL语句
        :param args: SQL语句的占位参数
        :return: 查询结果
        """
        conn, cur = self.open()
        cur.execute(sql, args)
        result = cur.fetchall()
        self.close(conn, cur)
        return result

    def get_one(self, sql, args=None):
        """
        获取单条数据
        :return: 查询结果
        """
        conn, cur = self.open()
        cur.execute(sql, args)
        result = cur.fetchone()
        self.close(conn, cur)
        return result

    def modify(self, sql, args=None):
        """
        修改、增加、删除操作
        :return: 受影响的行数
        """
        conn, cur = self.open()
        result = cur.execute(sql, args)
        conn.commit()
        self.close(conn, cur)
        return result

    def bulk_modify(self, sql, args=None):
        """
        批量修改、增加、删除操作
        :return: 受影响的行数
        """
        conn, cur = self.open()
        result = cur.executemany(sql, args)
        conn.commit()
        self.close(conn, cur)
        return result

    def create(self, sql, args=None):
        """
        增加数据
        :return: 新增数据行的ID
        """
        conn, cur = self.open()
        cur.execute(sql, args)
        conn.commit()
        self.close(conn, cur)
        return cur.lastrowid


if __name__ == '__main__':
    db = SQLHelper()
    result = db.get_list("select * from student")
    # result = db.get_one("select * from class where id=%s and title=%s", ['10', '五班'])
    # result = db.modify("update class set title=%s where id=%s", ['五班', '10'])
    # result = db.bulk_modify("update class set title=%s where id=%s", [('五班1', '10'), ('五班2', '11')])
    # result = db.create("insert into student (name, class_id) values (%s, %s)", ['张三', '10'])
    # result = db.bulk_modify("insert into student (name, class_id) values (%s, %s)", [('李四', '10'), ('王五', '10')])
    # result = db.modify("delete from student where id=%s", ['20', ])
    print(result)

7 支持上下文管理

使用 threading.local 实现

import threading

from dbutils.pooled_db import PooledDB
import pymysql


class SQLHelper(object):
    """
    使用 threading.local 实现上下文管理,且是单例模式
    """

    def __init__(self):
        self.pool = PooledDB(
            creator=pymysql,  # 使用链接数据库的模块
            maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
            mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
            blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
            ping=0,  # ping MySQL服务端,检查是否服务可用。如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
            host='127.0.0.1',
            port=3306,
            user='root',
            password='xxx',
            database='xxx',
            charset='utf8'
        )
        self.local = threading.local()  # 维护一个栈
        """
        storage = {
            线程ID: {'stack': [(conn, cusor), ]},
        }
        """

    def open(self):
        conn = self.pool.connection()  # 去连接池中获取一个连接
        cur = conn.cursor()
        return conn, cur

    def close(self, conn, cur):
        cur.close()
        conn.close()  # 将连接放回到连接池,并不会关闭连接,当线程终止时,连接自动关闭

    def get_list(self, sql, args=None):
        """
        获取所有数据
        :param sql: SQL语句
        :param args: SQL语句的占位参数
        :return: 查询结果
        """
        conn, cur = self.open()
        cur.execute(sql, args)
        result = cur.fetchall()
        self.close(conn, cur)
        return result

    def get_one(self, sql, args=None):
        """
        获取单条数据
        :return: 查询结果
        """
        conn, cur = self.open()
        cur.execute(sql, args)
        result = cur.fetchone()
        self.close(conn, cur)
        return result

    def modify(self, sql, args=None):
        """
        修改、增加、删除操作
        :return: 受影响的行数
        """
        conn, cur = self.open()
        result = cur.execute(sql, args)
        conn.commit()
        self.close(conn, cur)
        return result

    def bulk_modify(self, sql, args=None):
        """
        批量修改、增加、删除操作
        :return: 受影响的行数
        """
        conn, cur = self.open()
        result = cur.executemany(sql, args)
        conn.commit()
        self.close(conn, cur)
        return result

    def create(self, sql, args=None):
        """
        增加数据
        :return: 新增数据行的ID
        """
        conn, cur = self.open()
        cur.execute(sql, args)
        conn.commit()
        self.close(conn, cur)
        return cur.lastrowid

    def __enter__(self):
        conn, cur = self.open()
        rv = getattr(self.local, 'stack', None)
        if not rv:
            self.local.stack = [(conn, cur), ]
        else:
            self.local.stack.append((conn, cur))
        return conn, cur

    def __exit__(self, exc_type, exc_val, exc_tb):
        rv = getattr(self.local, 'stack', None)
        if not rv:
            return
        conn, cur = self.local.stack.pop()
        cur.close()
        conn.close()


if __name__ == '__main__':
    db = SQLHelper()

    with db as (conn, cur):
        cur.execute("insert into student (name, class_id) values (%s, %s)", ['赵六', '10'])
        conn.commit()

    with db as (conn, cur):
        cur.execute("select * from student")
        # result = cur.fetchmany(3)  # 取前3条
        result = cur.fetchall()
        print(result)

8 普通实现方式

from dbutils.pooled_db import PooledDB
import pymysql

# 全局变量定义连接池,只加载一次
POOL = PooledDB(
    creator=pymysql,  # 使用链接数据库的模块
    maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
    mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
    blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
    ping=0,  # ping MySQL服务端,检查是否服务可用。如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
    host='127.0.0.1',
    port=3306,
    user='root',
    password='xxx',
    database='xxx',
    charset='utf8'
)


class SQLHelper(object):
    """
    支持上下文管理,非单例模式
    """

    def __init__(self):
        self.conn = None
        self.cur = None

    def open(self):
        conn = POOL.connection()  # 去连接池中获取一个连接
        cur = conn.cursor()
        return conn, cur

    def close(self, conn, cur):
        cur.close()
        conn.close()  # 将连接放回到连接池,并不会关闭连接,当线程终止时,连接自动关闭

    def get_list(self, sql, args=None):
        """
        获取所有数据
        :param sql: SQL语句
        :param args: SQL语句的占位参数
        :return: 查询结果
        """
        conn, cur = self.open()
        cur.execute(sql, args)
        result = cur.fetchall()
        self.close(conn, cur)
        return result

    def get_one(self, sql, args=None):
        """
        获取单条数据
        :return: 查询结果
        """
        conn, cur = self.open()
        cur.execute(sql, args)
        result = cur.fetchone()
        self.close(conn, cur)
        return result

    def modify(self, sql, args=None):
        """
        修改、增加、删除操作
        :return: 受影响的行数
        """
        conn, cur = self.open()
        result = cur.execute(sql, args)
        conn.commit()
        self.close(conn, cur)
        return result

    def bulk_modify(self, sql, args=None):
        """
        批量修改、增加、删除操作
        :return: 受影响的行数
        """
        conn, cur = self.open()
        result = cur.executemany(sql, args)
        conn.commit()
        self.close(conn, cur)
        return result

    def create(self, sql, args=None):
        """
        增加数据
        :return: 新增数据行的ID
        """
        conn, cur = self.open()
        cur.execute(sql, args)
        conn.commit()
        self.close(conn, cur)
        return cur.lastrowid

    def __enter__(self):
        self.conn, self.cur = self.open()
        return self.conn, self.cur

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.cur.close()
        self.conn.close()


if __name__ == '__main__':
    with SQLHelper() as (conn, cur):
        cur.execute("insert into student (name, class_id) values (%s, %s)", ['孙七', '10'])
        conn.commit()

    with SQLHelper() as (conn, cur):
        cur.execute("select * from student")
        # result = cur.fetchmany(3)  # 取前3条
        result = cur.fetchall()
        print(result)

  

 

标签:cur,数据库,args,PyMySQL,连接池,sql,self,conn,result
From: https://www.cnblogs.com/yoyo1216/p/17496292.html

相关文章

  • Lowes EDI 项目数据库方案开源介绍
    近期为了帮助广大用户更好地使用EDI系统,我们根据以往的项目实施经验,将成熟的EDI项目进行开源。用户安装好知行之桥EDI系统之后,只需要下载我们整理好的示例代码,并放置在知行之桥指定的工作区中,即可开始使用。今天的文章主要为大家介绍LOWE'SEDI项目,了解如何获取开源的项目......
  • PG-DBA培训04:PostgreSQL数据类型与数据库设计规范
    一、风哥PG-DBA培训04:PostgreSQL数据类型与数据库设计规范本课程由风哥发布的基于PostgreSQL数据库的系列课程,本课程属于PostgreSQL数据库SQL开发与应用实战阶段之PostgreSQL数据类型与数据库设计规范,学完本课程可以掌握PostgreSQLSQL语句基础讲解,PostgreSQLSQL语言基础知识,安......
  • PG-DBA培训01:PostgreSQL数据库基础入门培训
    一、风哥PG-DBA培训01:PostgreSQL数据库基础入门培训课程本课程由风哥发布的基于PostgreSQL数据库的系列课程,本课程属于PostgreSQL数据库实战入门与安装配置阶段之PostgreSQL数据库基础入门培训课程,学完本课程可以掌握PostgreSQL数据库简介,PostgreSQL行业生态应用,PostgreSQL工作与......
  • PG-DBA培训01:PostgreSQL数据库基础入门培训
    一、风哥PG-DBA培训01:PostgreSQL数据库基础入门培训课程本课程由风哥发布的基于PostgreSQL数据库的系列课程,本课程属于PostgreSQL数据库实战入门与安装配置阶段之PostgreSQL数据库基础入门培训课程,学完本课程可以掌握PostgreSQL数据库简介,PostgreSQL行业生态应用,PostgreSQL工作与就......
  • PG-DBA培训04:PostgreSQL数据类型与数据库设计规范
    一、风哥PG-DBA培训04:PostgreSQL数据类型与数据库设计规范本课程由风哥发布的基于PostgreSQL数据库的系列课程,本课程属于PostgreSQL数据库SQL开发与应用实战阶段之PostgreSQL数据类型与数据库设计规范,学完本课程可以掌握PostgreSQLSQL语句基础讲解,PostgreSQLSQL语言基础知识,安装......
  • laravel数据库模型蛇形命名自动转换驼峰命名
    2023年6月20日15:10:59我看了各种方案,但是多多少少都有各种问题建议使用https://github.com/kirkbushell/eloquence安装composerrequirekirkbushell/eloquence添加到provider添加eloquenceserviceprovider在你的config/app.php文件中'providers'=>[/......
  • 稳,从数据库连接池 testOnBorrow 看架构设计 | 京东云技术团队
    本文从CommonsDBCPtestOnBorrow的作用机制着手,管中窥豹,从一点去分析数据库连接池获取的过程以及架构分层设计。以下内容会按照每层的作用,贯穿分析整个调用流程。1️⃣框架层commons-poolTheindicationofwhetherobjectswillbe validatedbeforebeingborrowed fromthe......
  • Postgresql 数据库导入导出 物理VS逻辑 集合
    PostgreSQL数据的导入导出本身并没有特别高的技术要求,属于日常操作,但熟悉导入导出以及选择数据导入导出的方式还是有点思考空间的。怎么导出数据的方式更稳妥,更适应业务的需求。下面就先总结数据导入导出中的数据导出的一部分方式和命令的实例,其中一些也是我在总结中发现的,例如COP......
  • 数据库的reset master和reset slave
    resetmaster注意:在master上执行mysql>RESETMASTER作用包括:删除binlog索引文件中列出的所有binlog文件清空binlog索引文件创建一个新的binlog文件清空系统变量gtid_purged和gtid_executed在MySQL5.7.5及后续版本中,RESETMASTER还会会清空 mysql.gtid_executed......
  • Mysql数据库5.6版本安装
    5.6的软件包创建管理组mysql,创建用户解压·mysql的5.6版本移动到指定位置修改目录所有者优化mysql命令检查mysql版本修改主配置文件初始化mysql生成mysql服务控制文件添加到系统文件设置开机自启动启动mysqld并查看状态设置登录数据库密码登录数据库创建数据库查看数据库创建数据库......