python处理数据常用方法,包括:
1)按照指定行数 split_size,分割超大csv文件
2)读取csv文件数据,并发送http-json请求,订正生产或者测试环境数据
3)csv文件按照某一列分割成多个csv文件
4) 连接指定数据库,实施查询、更新、或者导出csv操作
##常用处理csv文件数据的python脚本方法 ###### 1)按照指定行数 split_size,分割超大csv文件 import pandas as pd from pathlib import Path def split_big_csv_file(): res_file_path = Path("C:\\Users\\admin\\Desktop\\prod\\test-net.csv") #待分割文件路径 split_size = 2000000 #子文件行数最大值 tar_dir = res_file_path.parent/("split_"+res_file_path.name.split(".")[0]) if not tar_dir.exists(): tar_dir.mkdir() print("创建文件夹\t"+str(tar_dir)) print("目标路径:\t"+str(tar_dir)) print("分割文件:\t"+str(res_file_path)) print("分割大小:\t"+"{:,}".format(split_size)) tmp = pd.read_csv(res_file_path,nrows = 10) columns = tmp.columns.to_list() idx = 0 while(len(tmp)>0): start = 1+(idx*split_size) tmp = pd.read_csv(res_file_path, header = None, names = columns, skiprows = start, nrows = split_size) if len(tmp) <= 0: break file_name = res_file_path.name.split(".")[0]+"_{}_{}".format(start,start+len(tmp))+".csv" file_path = tar_dir/file_name tmp.to_csv(file_path,index=False) idx+=1 print(file_name +"\t保存成功") ###### 2)读取csv文件数据,并发送http-json请求,订正生产或者测试环境数据 from datetime import datetime, timedelta import numpy as np import pandas as pd import requests import json import sys from concurrent.futures import ThreadPoolExecutor from threading import BoundedSemaphore url = "http://xxxx/v1/modify" headers = {"Content-Type": "application/json", "Accept": "application/json", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/65.0.3325.181 Safari/537.36"} count = 0 request_header = headers start_time = datetime.now() total_line = 0 haveActivedCount = 0 def read_from_extract_file(): global start_time stop_line = 0 #根据列名读取csv文件 df = pd.read_csv("C:\\Users\\admin\\Desktop\\prod\\test-net.csv",keep_default_na=False, dtype={'device_sn':np.str_,'device_id':np.str_,'device_mac':np.str_,'device_vendor':np.str_,'device_model':np.str_}, encoding='utf-8') print(df.shape) print(df.size) #线程池方式 # executor = BoundedExecutor(10, 3000) for index, row in df.iterrows(): if index + 1 > stop_line: print(row) # response = executor.submit(post_data_request, row) response = post_data_request(row) def post_data_request(row): global total_line global count global haveActivedCount #请求json格式 data = { "deviceId": row['device_id'], "sn": row['device_sn'], "deviceVendorId": row['device_vendor'], "deviceModelId": row['device_model'], "deviceType": 'xxxxx', "mac": row['device_mac'], "activeTime":row['create_date'] } print("开始更新数据", data) global request_header try: response = requests.post(url, headers=request_header, json=data) #业务状态码判断 if response.status_code == 200: if json.loads(json.dumps(response.json())).get('code') == 200: count = count + 1 print("成功更新: ",count) elif json.loads(json.dumps(response.json())).get('code') == 4100000: # print(response.json()) haveActivedCount = haveActivedCount + 1 print("已经激活: ", haveActivedCount) else: print("共更新总数##### : ", count) print("其中已激活总数##### : ", haveActivedCount) print(response) sys.exit() except requests.exceptions: print("请求发送异常,当前更新的行数",count + haveActivedCount) sys.exit() class BoundedExecutor: """BoundedExecutor behaves as a ThreadPoolExecutor which will block on calls to submit() once the limit given as "bound" work items are queued for execution. :param bound: Integer - the maximum number of items in the work queue :param max_workers: Integer - the size of the thread pool """ def __init__(self, max_workers, bound): self.executor = ThreadPoolExecutor(max_workers=max_workers) self.semaphore = BoundedSemaphore(bound + max_workers) """See concurrent.futures.Executor#submit""" def submit(self, fn, *args, **kwargs): self.semaphore.acquire() try: future = self.executor.submit(fn, *args, **kwargs) except: self.semaphore.release() raise else: future.add_done_callback(lambda x: self.semaphore.release()) return future """See concurrent.futures.Executor#shutdown""" def shutdown(self, wait=True): self.executor.shutdown(wait) ###### 3)csv文件按照某一列分割成多个csv文件 import pandas as pd def split_csv_file_with_assign_column(): x_head_key = ['partner_name', 'prod_model', 'prod_id', 'sn', 'mac', 'active_time'] csv_file = 'C:/Users/admin/Desktop/prod-20220830/test001.csv' df = pd.read_csv(csv_file, header=0,encoding="utf-8") df.columns = x_head_key # 去掉重复数据 # ind_frame = df.drop_duplicates(keep='first') # 对数据进行分组处理 grouped = df.groupby(x_head_key[0]) # 按照partner_name分组 #分割后文件存放目录 file = './' # file = 'C:\\Users\\admin\\Desktop\\prod-20220830\\python\\' allCount=0; for value, group in grouped: filename = file + str(value) + '.csv' try: f = open(filename, 'w') if f: # 清空文件内容 f.truncate() # 将新数据写入文件 allCount=allCount+group.value_counts().size; print(filename+',数量='+str(group.value_counts().size)) group.to_csv(filename, header=x_head_key, index=False, mode='ab+') except UnicodeEncodeError: print("编码错误, 该数据无法写到文件中, 直接忽略该数据"); print('总记录数:'+str(allCount)); ###### 4) 连接指定数据库,实施查询、更新、或者导出csv操作 #coding:utf-8 import pymysql import csv # 读取csv文件中的数据作为sql查询参数,发起查询,导出所需数据 def get_csv_data_by_params(): # print(os.getcwd()) 打印当前路径 filenameParam = 'C:/Users/admin/Desktop/prod-20220830/test002_params.csv' # sql查询参数--文件名和路径 for line in open(filenameParam, encoding='utf-8'): print(line), arr = line.replace("\n","").split(',') select_data_and_write_csv(arr[0],arr[1],arr[2]) # 将数据保存为csv文件 def select_data_and_write_csv(user_id,start_time,end_time): data = mysql_db_sz_stock(user_id,start_time,end_time) # print(len(data)) # print(data) # print(os.getcwd()) 打印当前路径 filename = 'C:/Users/admin/Desktop/prod-20220830/getdata/Export_'+user_id+'.csv' # 文件名和路径 with open(filename, mode='a',newline="", encoding='utf-8') as f: write = csv.writer(f, dialect='excel') write.writerow(['id', 'user_id', 'center_ability_code', 'center_ability_name', 'create_time']) # 先写下标题 for item in data: write.writerow(item) def mysql_db_sz_stock(user_id,startTime,endTime): # # 连接数据库 connect = pymysql.connect(host="127.0.0.1", # 本地数据库 port=3306, user="user", password="password", db="db_name", charset="utf8") # 服务器名,账户,密码,数据库名称 ##采用配置方式 # 连接数据库 # connect = pymysql.connect(host=MYSQL_HOST, # 本地数据库 # port=MYSQL_PORT, # user=MYSQL_USER, # password=MYSQL_PWD, # db=MYSQL_DB, # charset='utf8') # 服务器名,账户,密码,数据库名称 cursor = connect.cursor() sql = "SELECT id,user_id,create_time FROM `table123` where create_time >=%(startTime)s and create_time <%(endTime)s and user_id =%(user_id)s" #sql参数 values = {"user_id": user_id, "startTime": startTime, "endTime": endTime} cursor.execute(sql, values) data = cursor.fetchall() connect.commit() # 提交到数据库执行 print("查询参数:user_id="+user_id+",startTime="+startTime+",endTime="+endTime) print("查询结果size="+str(len(data))) # print(data) # ---------------------关闭数据库 cursor.close() # 关闭游标 connect.close() # 关闭数据库连接 return data if __name__ == '__main__': # split_big_csv_file(); # read_from_extract_file(); # split_csv_file_with_assign_column(); get_csv_data_by_params();
标签:python,print,split,file,数据处理,工具,csv,data,row From: https://www.cnblogs.com/wuyun-blog/p/16718140.html