代码应用场景:需要对数据库中的数据进行批量查询,然后将查询到的结果每条分别写入不同的文本文件中。由于数据数量较多,考虑使用线程池减少时间开销。
#核心代码逻辑 import pymysql from dbutils.pooled_db import PooledDB import threading class processing: def __init__(self,maxconnections=5,thread_num=20,host,user,password,db_name): """创建数据库连接池 """ #数据库连接信息 self.pool =PooledDB(creator=pymysql,maxconnections=maxconnections,maxshared=maxconnections,host=host,user=user,passwd=password,db=db_name,port=3306,charset='utf8mb4',blocking=False) #每个线程运行:从数据库读取一条数据,写入一个文件中 def threading_doing(self,html): #从数据库获取数据 conn = self.pool.connection() cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) #html_list = html_list.replace('"',"").split(",") cursor.execute("select content from table_name where html = %s",(html,)) #对查询到的数据格式做处理 html_content = cursor.fetchall()[0]["content"] #写入文件 file_name = html.replace("http://","").replace(":","_") with open(f"tmp_2/htmls/{file_name}.txt","w+",encoding="utf-8") as f: f.write(html_content) return file_name #关闭链接 cursor.close() conn.close() def run(self): start_time = datetime.datetime.now() print(f"开始时间:{start_time.strftime('%Y%m%d%H%M%S')}") # 查找表中全部数据 conn = self.pool.connection() cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) cursor.execute("select target from tabel_name where simple_number > 9") target_list_all = cursor.fetchall() cursor.close() conn.close() for i in target_list_all: target_list = json.loads(i["target"]) #线程池使用核心代码 with ThreadPoolExecutor(max_workers=100) as t: obj_list = [] for html in target_list: obj = t.submit(self.threading_doing,html) obj_list.append(obj) for future in as_completed(obj_list): data = future.result() print(data) end_time = datetime.datetime.now() print('{} 完成!耗时:{} '.format(end_time.strftime('%Y%m%d%H%M%S'),end_time-start_time)) if __name__ == "__main__": p = processing("") p.run()
标签:__,name,python,self,list,cursor,html,线程,使用 From: https://www.cnblogs.com/Iitt1evegbird/p/17411419.html