一 背景说明
1 开发要求mongodb里的数据需要导入到clickhouse,方便他们分析,因此才有了如下的操作, 刚开始找了很多第三方的数据迁移软件,比如tapdata有这个功能,不过用过几次,经常报错,并且也是收费的。因此才决定自己写python脚本解决这个问题。
2 数据能否顺利导入,发现跟创建ck库里面表的字段类型有着密切的关系
二 在ck里需要提前创建好指定的表
下面是我在ck里面创建了一个core_customer_test 这个表,像create_time,update_time,应该都是Datetime的类型,我这里选择了string,不然我的导入脚本就会报错
CREATE TABLE mongodb.core_customer_test
(
`create_time` Nullable(String),
`dep_id` Nullable(String),
`is_delete` Nullable(String),
`invitor_userid` Nullable(String),
`name` Nullable(String),
`qx_name` Nullable(String),
`unionid` Nullable(String),
`video_add_time` Nullable(String),
`lunar_birthday` Nullable(String),
`external_userid` Nullable(String),
`follow_user` Nullable(String),
`_id` Varchar,
`birthday` Nullable(String),
`update_time` Nullable(String),
`sex` Nullable(String),
)
ENGINE = MergeTree
ORDER BY (_id);
三 数据同步脚本
3.1 全量同步脚本
import pymongo
import pymysql
import sys
from clickhouse_driver import Client
#import datetime
from datetime import datetime
from datetime import datetime, timedelta
#dd = collection.find().limit(2)
mydb = Client(
host="clickhouse_ip.clickhouse.ads.aliyuncs.com",
user="root_db",
password="xxxxxx",
database="mongodb"
) #clickhouse
#mycursor = mydb.cursor()
client = pymongo.MongoClient('mongodb://root:[email protected]:27018/')
db = client['sjzx']
collection = db['core_customer']
skip_num=0
count=collection.count_documents({})
print(count)
read_num=0
clickhouse_data = []
while True:
clickhouse_data = []
#print(skip_num)
if read_num * 100 >= count:
break
dd = collection.find().limit(200).skip(skip_num)
for row in dd:
print(clickhouse_data)
clickhouse_data = []
converted_row = [
str(row["_id"]), # 转换为字符串类型
str(row.get("birthday",'')), # 转换为整数类型
#row['create_time'].replace(microsecond=0),
str(row.get("create_time",'')),
str(row.get("dep_id",'')), # 转换为浮点数类型
#row.get['dep_id',''],
str(row.get("is_delete",'')), # 转换为浮点数类型
str(row.get("name",'')), # 转换为浮点数类型
str(row.get("qx_name",'')), # 转换为浮点数类型
str(row.get("sex",'')), # 转换为浮点数类型
#row['update_time'].replace(microsecond=0),
str(row.get("update_time",'')), # 转换为浮点数类型
str(row.get("video_add_time",'')), # 转换为浮点数类型
str(row.get("lunar_birthday",'')), # 转换为浮点数类型
str(row.get("invitor_userid",'')), # 转换为浮点数类型
str(row.get("external_userid",'')), # 转换为浮点数类型
str(row.get("follow_user",'')), # 转换为浮点数类型
]
clickhouse_data.append(converted_row)
insert_query = ('INSERT INTO core_customer_test (_id, birthday, create_time,dep_id,is_delete,name,qx_name,sex,update_time,video_add_time,lunar_birthday, invitor_userid, external_userid, follow_user) VALUES')
mydb.execute(insert_query, clickhouse_data,types_check=True)
skip_num = skip_num + 200
read_num = read_num + 1
mydb.disconnect()
3.2 增量同步脚本
增量同步是暂定第二天的1点钟,同步前一天的所有数据,在定时任务计划里固定时间运行 比如:2023-4-13 的1点钟,会同步 2023-4-12的一天的数据,以此类推。
import pymongo
import pymysql
import sys
from clickhouse_driver import Client
#import datetime
from datetime import datetime
from datetime import datetime, timedelta
#dd = collection.find().limit(2)
mydb = Client(
host="clickhouse_ip.clickhouse.ads.aliyuncs.com",
user="root_db",
password="xxxxxxx",
database="mongodb"
) #clickhouse
#mycursor = mydb.cursor()
client = pymongo.MongoClient('mongodb://root:[email protected]:27018/')
db = client['sjzx']
collection = db['core_customer']
skip_num=0
yes_date = datetime.now() - timedelta(days=1)
yes_date_start = yes_date.strptime(yes_date.strftime('%Y-%m-%d') + ' 00:00:00', '%Y-%m-%d %H:%M:%S')
yes_date_end = yes_date.strptime(yes_date.strftime('%Y-%m-%d') + ' 23:59:59', '%Y-%m-%d %H:%M:%S')
myquery = {'create_time': {'$gte': yes_date_start, '$lte': yes_date_end}}
count=collection.count_documents({})
print(count)
read_num=0
clickhouse_data = []
while True:
clickhouse_data = []
#print(skip_num)
if read_num * 100 >= count:
break
dd = collection.find(myquery).limit(200).skip(skip_num)
for row in dd:
print(clickhouse_data)
clickhouse_data = []
converted_row = [
str(row["_id"]), # 转换为字符串类型
str(row.get("birthday",'')), # 转换为整数类型
str(row.get("create_time",'')), # 转换为整数类型
str(row.get("dep_id",'')), # 转换为浮点数类型
str(row.get("is_delete",'')), # 转换为浮点数类型
str(row.get("name",'')), # 转换为浮点数类型
str(row.get("qx_name",'')), # 转换为浮点数类型
str(row.get("sex",'')), # 转换为浮点数类型
str(row.get("update_time",'')), # 转换为浮点数类型
str(row.get("video_add_time",'')), # 转换为浮点数类型
#row['video_add_time'].replace(microsecond=0),
str(row.get("lunar_birthday",'')), # 转换为浮点数类型
str(row.get("invitor_userid",'')), # 转换为浮点数类型
str(row.get("external_userid",'')), # 转换为浮点数类型
str(row.get("follow_user",'')), # 转换为浮点数类型
]
clickhouse_data.append(converted_row)
insert_query = ('INSERT INTO core_customer_test (_id, birthday, create_time,dep_id,is_delete,name,qx_name,sex,update_time,video_add_time,lunar_birthday, invitor_userid, external_userid, follow_user) VALUES')
mydb.execute(insert_query, clickhouse_data,types_check=True)
skip_num = skip_num + 200
read_num = read_num + 1
mydb.disconnect()
#mydb.close()
四 到clickkhosue查看数据
最后,要感谢郑照辉的帮助。
标签:导入到,get,mongodb,浮点数,str,time,clickhouse,row From: https://blog.51cto.com/825536458/6842228