首页 > 数据库 >Python 使用MongoDB & MongoDB 工具的封装

Python 使用MongoDB & MongoDB 工具的封装

时间:2022-12-04 22:11:53浏览次数:38  
标签:封装 name Python MongoDB self collection connect filters param

Python 使用MongoDB

补充:操作之前首先在虚拟机或者服务器端启动 MongoDB ;

# 重新加载配置,并启动mongodb
sudo systemctl daemon-reload
sudo systemctl start mongod

# 查看运行状态
sudo systemctl status mongod
# 如果mongodb状态为stop,则运行 sudo systemctl enable mongod

# 停止mongodb
sudo systemctl stop mongod

# 重启mongodb
sudo systemctl restart mongod

1.介绍与安装

在 Python 当中,一般常用于开发中操作 MongoDB 的模块无非三个:pymongo,mongoengine,moter

moter 是 python 中基于 pymongo 实现的异步操作,类似于aiomysql基于pymysql实现异步操作.

  • 安装

    pip install pymongo
    

2.数据库连接

  • 无密码连接

    import pymongo
    
    mongo = pymongo.MongoClient("mongodb://192.168.127.129:27017")
    
  • 有密码连接

    方式一

    import pymongo
    from urllib.parse import quote_plus
    
    username = quote_plus("mofang")
    password = quote_plus("1234567")
    # 获取连接对象:字符串 mongodb 引擎 + // 用户名:密码@地址:端口/库名
    mongo = pymongo.MongoClient(f"mongodb://{username}:{password}@192.168.127.129:27017/mofang")
    # print(mongo)
    # 获取数据库操作对象
    db = mongo['mofang']
    print(db)
    # 获取集合操作对象
    collection = db['user_list']
    print(collection)
    

    方式二

    # 方式二
    import pymongo
    mongo = pymongo.MongoClient("mongodb://127.0.0.1:27017")
    db = mongo['mofang']
    username = "mofang"
    password = "1234567"
    v = db.authenticate(username,password)
    print(db)
    print(v)
    # collection = db['user_list']
    # print(collection)
    

    补充:方式二可能对于较新的版本会失效,可以回退版本解决出现的问题.

    import pymongo
    conn = pymongo.MongoClient(host="192.168.127.129",port=27017)
    mongo = conn['mofang']
    mongo.authenticate("mofang","1234567")
    # 上述的字符串也可以使用参数进行替代,但是这种认证方式不支持pymongo4.0 以后的版本
    

3.pymongo的封装

pymongo 的操作很多会受到版本的困扰,使用的时候请确定版本信息进行一定的修改;

# -*- coding: utf-8 -*-
"""封装 MongoDB 的数据库的操作信息;
注: 本工具依赖包是 pymongo == 3.12.3; 更高版本的包可能会导致方法的出错
"""
import pymongo

from config.settings import MONGODB


class MongoHelper(object):
    """封装 MongoDB 的基本操作
    """

    def __init__(self):
        """实例化 MongoDB 的连接池对象,使用对象进行 MongoDB 的操作;
        """
        if MONGODB.get("password") is None:
            url = "mongodb://%(host)s:%(port)s" % {
                "host": MONGODB['host'],
                "port": MONGODB['port'],
            }
        else:
            # 扩展有密码时连接的配置信息;
            url = "mongodb://%(user)s:%(password)s@%(host)s:%(port)s" % {
                "user": MONGODB["user"],
                "password": MONGODB["password"],
                "host": MONGODB["host"],
                "port": MONGODB["port"]
            }
        # 将线程安全的连接池封装到对象中;
        self.connect_client = pymongo.MongoClient(url)

    def close_connect(self):
        """
        关闭 mongodb 的连接;
        :return: None
        """
        self.connect_client.close()

    def get_connections(self) -> list:
        """
        获取数据库中的集合信息;
        :return: list; 集合名称的列表信息;

        example:
            v = mongo_helper.get_connections()
            print(v)
        """
        result = self.connect_client[MONGODB['database']].list_collection_names()
        self.close_connect()
        return result

    def insert_one(self, collection_name: str, value: dict):
        """
        向集合中插入一条(文档)数据;
        :param collection_name: str; 集合的名称;
        :param value: 被插入的数据信息;
        :return: 返回插入返回的 id 信息

        example:
            v = mongo_helper.insert_one('test', {"name": "hello", "price": 33})
            print(v)
        """
        conn = self.connect_client[MONGODB['database']]
        col_insert = conn[collection_name].insert_one(value)
        col_id = col_insert.inserted_id
        self.close_connect()
        return col_id

    def insert_many(self, collection_name: str, value: list):
        """
        插入多条数据信息;
        :param collection_name: 集合的名称;
        :param value: 列表嵌套字典的信息;
        :return: 返回插入的 ids 对象集合的列表信息;

        example:
            data = [
                {"name": "前端", 'price': 66},
                {"name": "前端", 'price': 66},
                {"name": "前端", 'price': 66},
                {"name": "前端", 'price': 66},
                {"name": "前端", 'price': 66},
                {"name": "前端", 'price': 66}
            ]
            v = mongo_helper.insert_many('test', data)
            print(v)
        """
        conn = self.connect_client[MONGODB['database']]
        col_insert = conn[collection_name].insert_many(value)
        col_ids = col_insert.inserted_ids
        self.close_connect()
        return col_ids

    def fetch_one(self, collection_name: str, filters: dict = None) -> dict:
        """
        查询一条符合条件的数据信息
        :param collection_name: 集合的名称;
        :param filters: dict; 过滤条件;
        :return: dict; 筛选结果,字典信息;

        example:
            filters = {"name": "python入门"}
            v = mongo_helper.fetch_one("test", filters)
            print(v)

        """
        conn = self.connect_client[MONGODB['database']][collection_name]
        result = conn.find_one(filters)
        self.close_connect()
        return result

    def fetch_all(self, collection_name: str, filters: dict = None) -> list:
        """
        查询符合条件的所有数据信息,将游标的信息进行循环获取到列表信息;
        :param collection_name: 集合的名称;
        :param filters: dict; 过滤的条件信息;
        :return: list; 符合条件的数据列表

        example:
            filters = {"name": "java入门"}
            v = mongo_helper.fetch_all("test", filters)
            print(v, type(v))

        """
        conn = self.connect_client[MONGODB['database']][collection_name]
        result = conn.find(filters)  # 此时返回的是游标对象;
        result_list = [i for i in result]
        self.close_connect()
        return result_list

    def fetch_page_info(self, collection_name: str, filters: dict = None, page_size: int = 10,
                        page_no: int = 1) -> dict:
        """
        分页查询的使用;
        :param collection_name: 集合的名称信息;
        :param filters: 查询条件信息;
        :param page_size: 每页上的数量信息;
        :param page_no: 页码信息;
        :return: dict; 返回分页查询的信息数据;

        example:
            filters = {"name": "java入门"}
            v = mongo_helper.fetch_page_info("test", filters, 5, 5)
            print(v)
        """
        conn = self.connect_client[MONGODB['database']][collection_name]
        skip = page_size * (page_no - 1)
        result = conn.find(filters).limit(page_size).skip(skip)
        result_dict = {"page_size": page_size, "page_no": page_no, "data": [i for i in result]}
        self.close_connect()
        return result_dict

    def fetch_count_info(self, collection_name: str, filters: dict = None) -> int:
        """
        查询统计集合中的文档的数量信息;
        :param collection_name: str; 集合的名称;
        :param filters: dict; 按条件统计,为空的时候查询全部的信息;
        :return: int; 集合中的文档的数量信息;

        example:
            v = mongo_helper.fetch_count_info("test")
            print(v, type(v))
        """
        if filters is None:
            filters = {}
        conn = self.connect_client[MONGODB['database']][collection_name]
        result = conn.count_documents(filters)
        self.close_connect()
        return result

    def update_one(self, collection_name: str, filters: dict, data: dict) -> int:
        """
        更新一条文档的信息;
        :param collection_name: 集合的名称;
        :param filters: dict; 筛选条件;
        :param data: 修改的信息;
        :return: int; 返回被修改的文档数;

        example:
            filters = {"name": "java入门"}
            v = mongo_helper.update_many("test", filters, {"$set": {"name": "我爱学习"}})
            print(v, type(v))
        """
        conn = self.connect_client[MONGODB['database']][collection_name]
        result = conn.update_one(filter=filters, update=data)
        self.close_connect()
        return result.modified_count

    def update_many(self, collection_name: str, filters: dict, data: dict) -> int:
        """
        批量修改数据;
        :param collection_name: 集合的名称;
        :param filters: 筛选条件;
        :param data: 修改信息;
        :return: int; 修改的数量;

        example:
            filters = {"name": "我爱学习"}
            v = mongo_helper.update_many("test", filters, {"$set": {"name": "批量修改回来"}})
            print(v, type(v))
        """
        conn = self.connect_client[MONGODB['database']][collection_name]
        result = conn.update_many(filter=filters, update=data)
        self.close_connect()
        return result.modified_count

    def delete_one(self, collection_name: str, filters: dict) -> int:
        """
        删除单条的数据信息;
        :param collection_name:
        :param filters:
        :return: int; 删除数据的条数;

        example:
            filters = {"name": "批量修改回来"}
            v = mongo_helper.delete_one("test", filters)
            print(v, type(v))
        """
        conn = self.connect_client[MONGODB['database']][collection_name]
        result = conn.delete_one(filter=filters)
        self.close_connect()
        return result.deleted_count

    def delete_many(self, collection_name: str, filters: dict) -> int:
        """
        删除多条的数据信息;
        :param collection_name: 集合的名称;
        :param filters: dict; 过滤条件;
        :return: int; 返回删除的条数;

        example:
            filters = {"name": "批量修改回来"}
            v = mongo_helper.delete_many("test", filters)
            print(v, type(v))

        """
        conn = self.connect_client[MONGODB['database']][collection_name]
        result = conn.delete_many(filter=filters)
        self.close_connect()
        return result.deleted_count

    def drop_collection(self, collection_name: str):
        """
        删除集合(删除表);
        :param collection_name: 集合的名称;
        :return: None

        example:
            mongo_helper.drop_collection("test_data")
        """
        self.connect_client[MONGODB['database']][collection_name].drop()
        self.close_connect()


# 使用的时候为保持连接池的数量,导入时直接导入该对象进行单例模式形式的使用
mongo_helper = MongoHelper()

if __name__ == '__main__':
    """简单的脚本测试
    """
    v = mongo_helper.fetch_count_info("test")
    print(v)

继续努力,终成大器;

标签:封装,name,Python,MongoDB,self,collection,connect,filters,param
From: https://www.cnblogs.com/Blogwj123/p/16950959.html

相关文章

  • python自动化办公初探之桌牌制作
    前言:开会用的桌牌,制作起来非常麻烦,要根据参会人员的不同,制作不同的桌牌。如果参会人员非常多,制作就变的更麻烦。通过python中的xlrd和docxtpl模块可以自动的快速生成桌牌,省......
  • Centos7.x安装Python3(优化方法)
    安装相应的编译工具建议在root下操作,会方便很多,一定要安装,否则编译安装会报错。yum-ygroupinstall"Developmenttools"yum-yinstallzlib-develbzip2-developens......
  • Centos7.x将Python2升级到Python3
    查看Python版本python-V更新yum源yumupdate安装依赖yuminstallyum-utilsyum-builddeppython3下载pythonwgethttps://www.python.org/ftp/python/3.8.5/Py......
  • 离线安装python库
    B站看到了水哥的自动化办公视频(5分钟,教你做个自动化软件拿来办公)但因为用的是内网,所以没法直接pipinstall所以这里我们离线安装安装的就是和视频中相关的几个库相关环......
  • python打包指南
    在项目的根目录里创建setup.py#-*-coding:utf-8-*-#author:navysummer#email:navysummer@yeah.netimportshutilimportsetuptoolsfromsetuptools.command.......
  • python字符串常用方法介绍,基于python3.10
     python字符串常用方法-目录:1、strip()、lstrip()、rstrip()2、removeprefix()、removesuffix()3、replace()4、split()、rsplit()5、join()6、upper()、lower()、capita......
  • Python——pygam库实现弹跳小球
    代码实现:importsys#导入sys模块importpygame#导入pygame模块pygame.init()#初始化pygamesize=width,height=700,500#设置窗口screen......
  • 优雅简单玩转python3异步并发
    在python3之后,随着async/await引入,异步调用以全新而便捷的方式让人眼前一亮。首先,尽量用async/await定义协程这里以使用aiohttp请求网络,async函数中,不要使用blockingio......
  • 进入python的世界_day44_前端——CSS的学习(边框、定位、浮动、溢出解决、堆叠等)
    一、CSS之调整边框1.边框#左border-left-width:30px;border-left-style:solid;border-left-color:coral;#上border-top-color:coral;border-top-width:30......
  • 零基础学python 第四章 序列的应用
    实例1 输出每日一贴importdatetimemot=["今天星期一:\n坚持下去不是因为我很坚强,而是因为我别无选择。","今天星期二:\n含泪播种的人一定能笑着收获。",......