首页 > 数据库 >用Python脚本迁移MongoDB数据到金仓-kingbase数据库

用Python脚本迁移MongoDB数据到金仓-kingbase数据库

时间:2024-05-31 20:46:03浏览次数:42  
标签:金仓 mongo Python MongoDB image offset import data id

1、首先需要明确MongoDB与kingbase的对应关系,collection相当于table,filed相当于字段,根据这个对应关系创建表;

此次迁移的MongoDB里的数据字段是:_id(自动生成的objectid),image(转成二进制存储的文档)

所以在金仓里创建表 create table admin(id varchar,image bytea);

2、安装Python环境,由于是内网环境,没有yum源,需要从能连接互联网的环境下载好相应的安装包

Python:3.9.0版本

用到以下这些包

import pymongo
import ksycopg2
import concurrent.futures
from ksycopg2 import pool
import logging
from urllib.parse import quote_plus

------------------------------------------------------------------------------------

pip download pymongo -d pymongo_packages --下载pymongo库

pip3 install --no-index --find-links=. pymongo --安装pymongo库

金仓的Python驱动可以到金仓官网下载,需要找和Python对应的版本

以下是Python脚本内容:

import pymongo
import psycopg2
import concurrent.futures
from psycopg2 import pool
import logging
from urllib.parse import quote_plus
import os

# 初始化日志记录
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')

# MongoDB设置
username='admin'
password='SCJGscjg@123'
host='10.253.228.41'
port='27017'
encoded_username = quote_plus(username)
encoded_password = quote_plus(password)
uri = f"mongodb://{encoded_username}:{encoded_password}@{host}:{port}/"
mongo_client = pymongo.MongoClient(uri)
mongo_db = mongo_client['admin']
mongo_collection = mongo_db['admin']

# 连接池设置
kb_pool = psycopg2.pool.ThreadedConnectionPool(
    minconn=1,
    maxconn=20,
    host="10.253.228.110",
    database="mongo",
    user="system",
    password="1",
    port="54322"
)

# 偏移量存储文件
OFFSET_FILE = 'offset.txt'

def read_offset():
    if os.path.exists(OFFSET_FILE):
        with open(OFFSET_FILE, 'r') as f:
            return int(f.read().strip())
    return 0

def write_offset(offset):
    with open(OFFSET_FILE, 'w') as f:
        f.write(str(offset))

def batch_insert(mongo_data):
    kb_conn = None
    try:
        kb_conn = kb_pool.getconn()
        with kb_conn.cursor() as kb_cursor:
            for data in mongo_data:
                id_value = data['_id']
                image_data = data['image']
                insert_query = "INSERT INTO dzzzwj(id, image) VALUES (%s, %s)"
                kb_cursor.execute(insert_query, (id_value, image_data))
            kb_conn.commit()
        return True
    except Exception as e:
        logging.error(f"批量插入错误: {e}")
        return False
    finally:
        if kb_conn:
            kb_pool.putconn(kb_conn)

def main():
    batch_size = 80
    offset = read_offset()
    executor = concurrent.futures.ThreadPoolExecutor(max_workers=8)
    
    try:
        while True:
            mongo_data = list(mongo_collection.find().skip(offset).limit(batch_size))
            if not mongo_data:
                break

            future = executor.submit(batch_insert, mongo_data)
            future.add_done_callback(lambda f, offset=offset: (
                logging.info(f"Batch completed with offset {offset}") if f.result() else logging.error(f"Batch failed with offset {offset}"),
                write_offset(offset + batch_size) if f.result() else None
            ))
            offset += batch_size if future.result() else 0
    except Exception as e:
        logging.error(f"主循环错误: {e}")
    finally:
        executor.shutdown(wait=True)
        mongo_client.close()
        kb_pool.closeall()
        logging.info("资源已清理完毕。")

if __name__ == "__main__":
    main()

这段代码思路:

(1)连接MongoDB和kingbase数据;

(2)因为MongoDB数据量比较大,并且需要断点续传,索引用了分页和排序;

(3)数据成功插入金仓数据库后,增加偏移量,并且将当前偏移量记录在offset.txt里面,以便脚本停了,可以再重启接着迁数据;

因为二进制数据从MongoDB和金仓数据查询出来的内容看着不一样,所以下面的代码是计算两边数据md5值对比的简单代码

import pymongo
import ksycopg2
import base64
import hashlib

def compute_hash(data):
    return hashlib.md5(data).hexdigest()

mongo_client = pymongo.MongoClient('mongodb://127.0.0.1:27017/')
mongo_db = mongo_client['admin']
mongo_collection = mongo_db['mongodb']

database = "test"
user = "system"
password = "1"
host = "127.0.0.1"
port = "54322"

conn = ksycopg2.connect(database=database, user=user, password=password, host=host, port=port)

cursor = conn.cursor()


mongo_data = mongo_collection.find()
print(mongo_data)    

    # 插入到 kingbase
for data in mongo_data:
   id_value = data['_id']
   image_data = data['image']

   #image_data = base64.b64encode(base64_data).decode('utf-8')

   image_data_byte = image_data 
   if isinstance(image_data, bytes):
       mongo_hash = compute_hash(image_data_byte)
       print(mongo_hash)

   #image_data = base64.b64encode(base64_data).decode('utf-8')
   if id_value and image_data:
      insert_query = "INSERT INTO zzwj(_id, image) VALUES (%s, %s)"
      cursor.execute(insert_query, (id_value, image_data))

    # 提交事务
conn.commit()

cursor.execute("select _id, image from zzwj")
rows = cursor.fetchall()

for row in rows:
    _id = row[0]
    image_byte = row[1]
    
    pg_hash = compute_hash(image_byte)
    print(pg_hash)


# 关闭连接
cursor.close()
conn.close()
mongo_client.close()

 

标签:金仓,mongo,Python,MongoDB,image,offset,import,data,id
From: https://www.cnblogs.com/weisu-koko/p/18225227

相关文章

  • python数据处理
    Python在数据处理方面非常强大,主要得益于其丰富的库,如Pandas、NumPy和Matplotlib等。以下是一些基本的Python代码示例,用于数据加载、处理和可视化。1.导入库importpandasaspdimportnumpyasnpimportmatplotlib.pyplotasplt2.加载数据#从CSV文件加载......
  • Python-pptx正确设置中文字体
    使用pptx_ea_font库设置中文字体:1.安装pptx_ea_font库:pipinstallpptx-ea-font2.p=text_frame.paragraphs[0]#取文本段落 run=p.runs[0]#取文本运行对象,该对象为段落的子元素pptx_ea_font.set_font(run,'微软雅黑')#以下方法只能修改数字和英文#run.font.name=......
  • Python读取SU数据
    SU数据格式以下简称SeismicUnix为SU。SU格式是SEGY的简化,没有前面3600字节文件头,故需要从第一道的道头获取需要的信息。这里默认所有道的采样点数即nt是一样的。SEGY和SU格式的道头都有240字节,但SEGY只有前180字节有信息,SU的181-240字节定义了画图相关......
  • Python实现SMA黏菌优化算法优化XGBoost回归模型(XGBRegressor算法)项目实战
    说明:这是一个机器学习实战项目(附带数据+代码+文档+视频讲解),如需数据+代码+文档+视频讲解可以直接到文章最后获取。1.项目背景黏菌优化算法(Slimemouldalgorithm,SMA)由Li等于2020年提出,其灵感来自于黏菌的扩散和觅食行为,属于元启发算法。具有收敛速度快,寻优能力强的特点。主......
  • Python实现SMA黏菌优化算法优化LightGBM回归模型(LGBMRegressor算法)项目实战
    说明:这是一个机器学习实战项目(附带数据+代码+文档+视频讲解),如需数据+代码+文档+视频讲解可以直接到文章最后获取。1.项目背景黏菌优化算法(Slimemouldalgorithm,SMA)由Li等于2020年提出,其灵感来自于黏菌的扩散和觅食行为,属于元启发算法。具有收敛速度快,寻优能力强的特点。主......
  • 黑客团伙利用Python、Golang和Rust恶意软件袭击印国防部门;OpenAI揭秘,AI模型如何被用于
    巴黑客团伙利用Python、Golang和Rust恶意软件袭击印度国防部门!与巴基斯坦有联系的TransparentTribe组织已被确认与一系列新的攻击有关,这些攻击使用Python、Golang和Rust编写的跨平台恶意软件,针对印度政府、国防和航空航天部门。“这一系列活动从2023年底持续到2024年4月......
  • python selenium mysql -- 数据爬取2
    fromseleniumimportwebdriverfromselenium.webdriver.common.byimportBydriver=webdriver.Chrome()driver.get('https://www.00ksw.com/html/120/120704/')zj_list=[]#使用更健壮的XPath或CSS选择器links=driver.find_elements(By.XPATH,'/html/bo......
  • 机器学习python实践中对于决策函数(decision_function)的一些个人思考
    最近在利用python进行实践训练,但是跟着参考书学习到SVM的时候,示例代码里突然出现了一个函数——decision_function(),让我很懵逼,帮助文档里的英文翻译过来说啥决策函数、ovr、ovo之类的,让我整个人更晕了,因为我在理论部分参考的是周志华老师的《西瓜书》,而《西瓜书》中并没有对这......
  • 【二】从小白开始使用Python一步一步搭建一个深度学习UI界面【界面设计】
    本来是想使用QtDesigner进行界面控件拖拽的方式进行界面设计的,但是后来觉得这样后面维护更新起来太麻烦了,就还是使用纯代码来写界面吧,这需要一定的想象能力。设计界面pyqt外部工具添加在设置界面搜索“外部工具”,这里我已经添加了两个QTDesigner的外部工具,一个是用于创......
  • Python 入门之阅目的 Pythonic(二)
    #免责声明:本文内容主要是肥清大神的视频以及自己收集学习内容的整理笔记,目是主要是为了让象博主这样的老白能更好的学习编号,如有侵权,请联系博主进行删除。5.控制结构5.1.复杂的列表推导#两个循环的条件以内可使用列表推导式5.2.lambda使用#一次性的结果可用#......