首页 > 数据库 >mongodb数据如何导入到clickhouse

mongodb数据如何导入到clickhouse

时间:2023-08-01 18:33:02浏览次数:40  
标签:导入到 get mongodb 浮点数 str time clickhouse row

一 背景说明

1 开发要求mongodb里的数据需要导入到clickhouse,方便他们分析,因此才有了如下的操作, 刚开始找了很多第三方的数据迁移软件,比如tapdata有这个功能,不过用过几次,经常报错,并且也是收费的。因此才决定自己写python脚本解决这个问题。

2 数据能否顺利导入,发现跟创建ck库里面表的字段类型有着密切的关系

二 在ck里需要提前创建好指定的表

下面是我在ck里面创建了一个core_customer_test 这个表,像create_time,update_time,应该都是Datetime的类型,我这里选择了string,不然我的导入脚本就会报错

CREATE TABLE mongodb.core_customer_test
(
    `create_time` Nullable(String),
    `dep_id`  Nullable(String),
    `is_delete` Nullable(String),
    `invitor_userid` Nullable(String),
    `name` Nullable(String),
    `qx_name` Nullable(String),
    `unionid` Nullable(String),
    `video_add_time` Nullable(String),
    `lunar_birthday` Nullable(String),
    `external_userid` Nullable(String),
    `follow_user` Nullable(String),
    `_id` Varchar,
    `birthday` Nullable(String),
    `update_time` Nullable(String),
    `sex` Nullable(String),
)
ENGINE = MergeTree

ORDER BY (_id);

三 数据同步脚本

3.1 全量同步脚本

import pymongo
import pymysql
import sys
from clickhouse_driver import Client
#import datetime
from datetime import datetime
from datetime import datetime, timedelta
#dd = collection.find().limit(2)
mydb = Client(
  host="clickhouse_ip.clickhouse.ads.aliyuncs.com",
  user="root_db",
  password="xxxxxx",
  database="mongodb"
)  #clickhouse
#mycursor = mydb.cursor()
client = pymongo.MongoClient('mongodb://root:[email protected]:27018/')
db = client['sjzx']
collection = db['core_customer']
skip_num=0
count=collection.count_documents({})
print(count)
read_num=0
clickhouse_data = []

while True:
    clickhouse_data = []
    #print(skip_num)
    if read_num * 100 >= count:
       break
    dd = collection.find().limit(200).skip(skip_num)
    for row in dd:

        print(clickhouse_data)
        clickhouse_data = []
        converted_row = [
            str(row["_id"]),  # 转换为字符串类型
            str(row.get("birthday",'')),  # 转换为整数类型
            #row['create_time'].replace(microsecond=0),
            str(row.get("create_time",'')),
            str(row.get("dep_id",'')),  # 转换为浮点数类型
            #row.get['dep_id',''],
            str(row.get("is_delete",'')),  # 转换为浮点数类型
            str(row.get("name",'')),  # 转换为浮点数类型
            str(row.get("qx_name",'')),  # 转换为浮点数类型
            str(row.get("sex",'')),  # 转换为浮点数类型
            #row['update_time'].replace(microsecond=0),
            str(row.get("update_time",'')),  # 转换为浮点数类型
            str(row.get("video_add_time",'')),  # 转换为浮点数类型
            str(row.get("lunar_birthday",'')),  # 转换为浮点数类型
            str(row.get("invitor_userid",'')),  # 转换为浮点数类型
            str(row.get("external_userid",'')),  # 转换为浮点数类型
            str(row.get("follow_user",'')),  # 转换为浮点数类型
        ]

        clickhouse_data.append(converted_row)

        insert_query = ('INSERT INTO core_customer_test (_id, birthday, create_time,dep_id,is_delete,name,qx_name,sex,update_time,video_add_time,lunar_birthday, invitor_userid, external_userid, follow_user) VALUES')
        mydb.execute(insert_query, clickhouse_data,types_check=True)
    skip_num = skip_num + 200
    read_num = read_num + 1
mydb.disconnect()


3.2 增量同步脚本

增量同步是暂定第二天的1点钟,同步前一天的所有数据,在定时任务计划里固定时间运行 比如:2023-4-13 的1点钟,会同步 2023-4-12的一天的数据,以此类推。

import pymongo
import pymysql
import sys
from clickhouse_driver import Client
#import datetime
from datetime import datetime
from datetime import datetime, timedelta
#dd = collection.find().limit(2)
mydb = Client(
  host="clickhouse_ip.clickhouse.ads.aliyuncs.com",
  user="root_db",
  password="xxxxxxx",
  database="mongodb"
)  #clickhouse
#mycursor = mydb.cursor()
client = pymongo.MongoClient('mongodb://root:[email protected]:27018/')
db = client['sjzx']
collection = db['core_customer']
skip_num=0
yes_date = datetime.now() - timedelta(days=1)

yes_date_start = yes_date.strptime(yes_date.strftime('%Y-%m-%d') + ' 00:00:00', '%Y-%m-%d %H:%M:%S')
yes_date_end = yes_date.strptime(yes_date.strftime('%Y-%m-%d') + ' 23:59:59', '%Y-%m-%d %H:%M:%S')
myquery = {'create_time': {'$gte': yes_date_start, '$lte': yes_date_end}}
count=collection.count_documents({})
print(count)
read_num=0
clickhouse_data = []

while True:
    clickhouse_data = []
    #print(skip_num)
    if read_num * 100 >= count:
       break
    dd = collection.find(myquery).limit(200).skip(skip_num)
    for row in dd:

        print(clickhouse_data)
        clickhouse_data = []
        converted_row = [
            str(row["_id"]),  # 转换为字符串类型
            str(row.get("birthday",'')),  # 转换为整数类型
            str(row.get("create_time",'')),  # 转换为整数类型
            str(row.get("dep_id",'')),  # 转换为浮点数类型
            str(row.get("is_delete",'')),  # 转换为浮点数类型
            str(row.get("name",'')),  # 转换为浮点数类型
            str(row.get("qx_name",'')),  # 转换为浮点数类型
            str(row.get("sex",'')),  # 转换为浮点数类型
            str(row.get("update_time",'')),  # 转换为浮点数类型
            str(row.get("video_add_time",'')),  # 转换为浮点数类型
            #row['video_add_time'].replace(microsecond=0),
            str(row.get("lunar_birthday",'')),  # 转换为浮点数类型
            str(row.get("invitor_userid",'')),  # 转换为浮点数类型
            str(row.get("external_userid",'')),  # 转换为浮点数类型
            str(row.get("follow_user",'')),  # 转换为浮点数类型
        ]

        clickhouse_data.append(converted_row)

        insert_query = ('INSERT INTO core_customer_test (_id, birthday, create_time,dep_id,is_delete,name,qx_name,sex,update_time,video_add_time,lunar_birthday, invitor_userid, external_userid, follow_user) VALUES')
        mydb.execute(insert_query, clickhouse_data,types_check=True)
    skip_num = skip_num + 200
    read_num = read_num + 1
mydb.disconnect()
#mydb.close()

四 到clickkhosue查看数据

image.png

最后,要感谢郑照辉的帮助。

标签:导入到,get,mongodb,浮点数,str,time,clickhouse,row
From: https://blog.51cto.com/825536458/6842228

相关文章

  • 统一观测|借助 Prometheus 监控 ClickHouse 数据库
    引言ClickHouse作为用于联机分析(OLAP)的列式数据库管理系统(DBMS),最核心的特点是极致压缩率和极速查询性能。同时,ClickHouse支持SQL查询,在基于大宽表的聚合分析查询场景下展现出优异的性能。因此,获得了广泛的应用。本文旨在分享阿里云可观测监控Prometheus版对开源Clic......
  • MongoDB数据库的部署和应用
    推荐步骤:在Centos01上部署MongoDB服务器客户端登录验证在centos01的MongoDB配置文件通过配置文件控制MongoDB服务,配置MongoDB身份验证在centos01的MongoDB服务器配置身份验证管理和修改配置文件支持验证在centos01管理MongoDB管理数据,集合批量数据管理实验步骤创建管理MongoDB组和......
  • 关于spark写clickhouse出现 too many parts(300)错误的最佳解决方式
    出现这个问题的根本原因是clickhouse插入速度超过clickhouse的文件合并速度(默认300)解决方式如下 觉得好用记得点个关注或者赞哈......
  • mongodb索引大小查看
    查看数据库中索引总大小>db.stats(){"db":"abce","collections":258,"views":0,"objects":3869336,"avgObjSize":23033.87034752216,"dataSi......
  • mongodb去重统计
    MongoDB去重统计在MongoDB中,去重统计是一种常见的需求。如果我们有一个存储大量数据的集合,我们可能需要统计其中不重复的元素的数量。幸运的是,MongoDB提供了一些强大的聚合操作符和方法来实现这个目标。使用distinct()方法进行去重统计MongoDB的distinct()方法可以用于从集合中......
  • mongodb模糊查询性能
    MongoDB模糊查询性能优化指南作为一名经验丰富的开发者,我将为你详细介绍如何优化MongoDB的模糊查询性能。以下是整个过程的步骤概览:步骤内容1创建合适的索引2使用合适的正则表达式3避免使用无效的模糊查询4使用投影来减少返回的数据量5使用分页来限制......
  • mongodb和mysql的优缺点
    MongoDB和MySQL的优缺点对比引言在现代软件开发中,数据库是必不可少的一部分。MongoDB和MySQL是两种常见的数据库管理系统(DBMS)。本文将对它们的优缺点进行对比,帮助读者选择适合自己需要的数据库系统。MongoDBMongoDB是一个基于文档的NoSQL数据库,使用JSON格式存储数据。它的特点......
  • mongodb服务启动命令
    MongoDB服务启动命令实现教程1.整体流程下面是实现MongoDB服务启动命令的整体流程,通过以下步骤可以顺利启动服务:步骤描述1下载和安装MongoDB2配置MongoDB环境变量3创建数据存储目录4启动MongoDB服务2.每一步的具体操作2.1下载和安装MongoDB首先,......
  • mongodb分片原理
    MongoDB分片原理及实现1.流程图以下是实现MongoDB分片的流程图:步骤描述1.部署分片集群在多台服务器上安装和配置MongoDB分片。2.创建配置服务器创建用于存储分片元数据的配置服务器。3.启动分片集群启动配置服务器和分片服务器。4.启动mongos路由在......
  • mongodb电商数据库设计
    MongoDB电商数据库设计概述在电商平台的数据库设计中,使用MongoDB作为数据库管理系统是一种常见的选择。MongoDB是一种具有高度可伸缩性和灵活性的NoSQL数据库,适用于存储大量的非结构化或半结构化数据。本文将介绍如何使用MongoDB来设计电商数据库,并指导刚入行的开发者完成这个任......