python代码
# -*- coding: utf-8 -*- # encoding:utf-8 from flask import Flask, render_template, request, send_file,jsonify import os, requests import pandas as pd from datetime import datetime, timedelta import time, json from log import logging import threading,queue app = Flask(__name__) result_queue = queue.Queue() # 定义上传文件的存储文件夹 UPLOAD_FOLDER = 'uploads' app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER ShengCheng = 'shengcheng' app.config['ShengCheng'] = ShengCheng Templates = 'templates' app.config['Templates'] = Templates Log = 'log' app.config['Log'] = Log # 确保上传文件夹存在 if not os.path.exists(UPLOAD_FOLDER): os.makedirs(UPLOAD_FOLDER) if not os.path.exists(ShengCheng): os.makedirs(ShengCheng) if not os.path.exists(Templates): os.makedirs(Templates) @app.route('/') def index(): return render_template('index.html') def allowed_file(filename): ALLOWED_EXTENSIONS = {'csv', 'xls', 'xlsx'} return '.' in filename and \ filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS @app.route('/upload', methods=['POST']) def upload_files(): t1 = time.time() try: if request.method == 'POST': if 'file1' not in request.files or 'file2' not in request.files: return jsonify("请选择两个文件上传") file1 = request.files['file1'] file2 = request.files['file2'] if file1.filename == '' or file2.filename == '': return jsonify("文件名不能为空") max_size = 10 * 1024 * 1024 * 1024 * 1024 # 1G # 保存上传的文件到指定文件夹 if file1 and allowed_file(file1.filename) and file1.content_length <= max_size and \ file2 and allowed_file(file2.filename) and file2.content_length <= max_size: file1_path = os.path.join(app.config['UPLOAD_FOLDER'], file1.filename) file2_path = os.path.join(app.config['UPLOAD_FOLDER'], file2.filename) file1.save(file1_path) file2.save(file2_path) # hebing(file1_path, file2_path) # shaixuan() # path, name = fenge() thread1 = threading.Thread(target=hebing, args=(file1_path,file2_path)) thread1.start() thread1.join() # if os.path.exists('匹配结果-1.xlsx'): # thread3 = threading.Thread(target=fenge, args=(result_queue)) # thread3.start() path, name = fenge() T = time.time() - t1 logging.info("数据处理完成,文件处理共花费{0}s,文件名称为:{1},合并后的文件路径为{2}".format(T, path, name)) print("数据处理完成,文件处理共花费{0}s,文件名称为:{1},合并后的文件路径为{2}".format(T, path, name)) try: qywx_text_robot("数据处理完成,文件处理共花费{0}s,文件名称为:{1},合并后的文件路径为{2}".format(T, path, name)) except Exception as e : logging.error("qywx_text_robot error ") data ={ "url":os.path.basename(path), "date":int(T) } # return "文件上传成功,数据处理完成,花费时间{0}s,<a href='/download/{1}'>点击此处下载合并后的文件</a>,<button onclick='goBack()'>返回上一级</button>".format(int(T), os.path.basename(path)) return render_template('download.html',**data) else: logging.error("文件类型或大小不符合要求") print('文件类型或大小不符合要求') return jsonify('文件类型或大小不符合要求') else: logging.error("仅接受POST请求") print('仅接受POST请求') return jsonify('仅接受POST请求') except Exception as e : logging.error("upload error {0}".format(e)) print(e) return jsonify(e) @app.route('/get_file_links', methods=['GET']) def get_file_links(): file_links = [] for root, dirs, files in os.walk(app.config['ShengCheng']): for file in files: file_path = os.path.join(root, file) # 生成相对路径,以便前端构建完整下载链接 relative_path = os.path.relpath(file_path, app.config['ShengCheng']) file_links.append(relative_path) logging.info(file_links) return jsonify(file_links) @app.route('/download/<filename>', methods=['GET']) def download_file(filename): current_directory = os.getcwd() file_path = os.path.join(current_directory, app.config['ShengCheng'], filename) logging.info("download link :{0}".format(file_path)) return send_file(file_path, as_attachment=True) def hebing(file1, file2): date = datetime.now().strftime("%Y-%m-%d") print("本次执行日期:{0}".format(date)) if os.path.exists('匹配结果.xlsx'): os.remove('匹配结果.xlsx') print('匹配结果.xlsx 已删除!') # 创建示例数据框 # 读取字段更改列字段把居民电话改成手机号 try: if "xlsx" in file1 or "xls" in file1 and "csv" in file2: T1 = time.time() dfds = pd.read_excel(file1) dfds_columns_list = [] for col in dfds.columns: dfds_columns_list.append(col) if '居民电话' in dfds_columns_list: # 这里可以进行修改列名的操作,例如添加前缀 dfds.rename(columns={'居民电话': '手机号'}, inplace=True) dfds.to_excel(file1) logging.info('字段答题情况-已改造完成!') print('字段答题情况-已改造完成!') time.sleep(1) # 清掉无效列 gaizao = pd.read_excel(file1, usecols='B,C,D,E,F,G,H') # gaizao = pd.read_excel(file1) gaizao.to_excel(file1) logging.info('清理无效列/杂项-完成!') print('清理无效列/杂项-完成!') # 开始匹配处理 print("开始匹配处理") logging.info("开始匹配处理") df1 = pd.read_csv(file2, chunksize=1000) df2 = pd.read_excel(file1) # 指定用于合并的列 merge_column = '手机号' # 初始化合并后的DataFrame merged_df = pd.DataFrame() # 遍历第一个文件的块 for chunk in df1: # 根据共享列进行合并 result = pd.merge(chunk, df2, on=merge_column, how='left') # 如果是第一次迭代,直接赋值,否则追加 if merged_df is None: merged_df = result else: merged_df = merged_df._append(result, ignore_index=True) # # 执行类似VLOOKUP的操作,以左侧的df1为准,只匹配df2中的'Value2' # result = pd.merge(df1, df2, on='手机号', ) merged_df.to_excel('匹配结果.xlsx') time.sleep(1) T2 = time.time() T = T2 - T1 logging.info("匹配结果已生成,共花费{0}s".format(T)) print("匹配结果已生成,共花费{0}s".format(T)) if os.path.exists('匹配结果.xlsx'): thread2 = threading.Thread(target=shaixuan, args=()) thread2.start() thread2.join() else: # 开始匹配处理 print("开始匹配处理") logging.info("开始匹配处理") df1 = pd.read_csv(file2,chunksize=1000) df2 = pd.read_excel(file1) # 指定用于合并的列 merge_column = '手机号' # 初始化合并后的DataFrame merged_df = pd.DataFrame() # 遍历第一个文件的块 for chunk in df1: # 根据共享列进行合并 result = pd.merge(chunk, df2, on=merge_column, how='left') # 如果是第一次迭代,直接赋值,否则追加 if merged_df is None: merged_df = result else: merged_df = merged_df._append(result, ignore_index=True) # # 执行类似VLOOKUP的操作,以左侧的df1为准,只匹配df2中的'Value2' # result = pd.merge(df1, df2, on='手机号', how='left') merged_df.to_excel('匹配结果.xlsx') time.sleep(1) T2 = time.time() T = T2 - T1 print("匹配结果已生成,共花费{0}s".format(T)) logging.info("匹配结果已生成,共花费{0}s".format(T)) if os.path.exists('匹配结果.xlsx'): thread2 = threading.Thread(target=shaixuan, args=()) thread2.start() thread2.join() elif "xlsx" in file2 or "xls" in file2 and "csv" in file1: T1 = time.time() dfds = pd.read_excel(file2) dfds_columns_list = [] for col in dfds.columns: dfds_columns_list.append(col) if '居民电话' in dfds_columns_list: # 这里可以进行修改列名的操作,例如添加前缀 dfds.rename(columns={'居民电话': '手机号'}, inplace=True) dfds.to_excel(file2) print('字段答题情况-已改造完成!') logging.info('字段答题情况-已改造完成!') time.sleep(1) # 清掉无效列 gaizao = pd.read_excel(file2, usecols='B,C,D,E,F,G,H') # gaizao = pd.read_excel(file2) gaizao.to_excel(file2) print('清理无效列/杂项-完成!') logging.info('清理无效列/杂项-完成!') # 开始匹配处理 print("开始匹配处理") logging.info("开始匹配处理") df1 = pd.read_csv(file1,chunksize=1000) df2 = pd.read_excel(file2) # 分块读取,防止内存溢出 # 指定用于合并的列 merge_column = '手机号' # 初始化合并后的DataFrame merged_df = pd.DataFrame() # 遍历第一个文件的块 for chunk in df1: # 根据共享列进行合并 result = pd.merge(chunk, df2, on=merge_column, how='left') # 如果是第一次迭代,直接赋值,否则追加 if merged_df is None: merged_df = result else: merged_df = merged_df._append(result, ignore_index=True) # # 执行类似VLOOKUP的操作,以左侧的df1为准,只匹配df2中的'Value2' # result = pd.merge(df1, df2, on='手机号', how='left') merged_df.to_excel('匹配结果.xlsx') time.sleep(1) T2 = time.time() T = T2 - T1 if os.path.exists('匹配结果.xlsx'): thread2 = threading.Thread(target=shaixuan, args=()) thread2.start() thread2.join() else: # 开始匹配处理 print("开始匹配处理") logging.info("开始匹配处理") df1 = pd.read_csv(file1,chunksize=1000) df2 = pd.read_excel(file2) # 分块读取,防止内存溢出 # 指定用于合并的列 merge_column = '手机号' # 初始化合并后的DataFrame merged_df = pd.DataFrame() # 遍历第一个文件的块 for chunk in df1: # 根据共享列进行合并 result = pd.merge(chunk, df2, on=merge_column, how='left') # 如果是第一次迭代,直接赋值,否则追加 if merged_df is None: merged_df = result else: merged_df = merged_df._append(result, ignore_index=True) # # 执行类似VLOOKUP的操作,以左侧的df1为准,只匹配df2中的'Value2' # result = pd.merge(df1, df2, on='手机号', how='left') merged_df.to_excel('匹配结果.xlsx') time.sleep(1) T2 = time.time() T = T2 - T1 print("匹配结果已生成,共花费{0}s".format(T)) logging.info("匹配结果已生成,共花费{0}s".format(T)) if os.path.exists('匹配结果.xlsx'): thread2 = threading.Thread(target=shaixuan, args=()) thread2.start() thread2.join() else: print("上传不符合的文件") # 处理文件(这里简单地将两个文件内容合并并保存为新文件) with open(file1, 'r') as f1, open(file2, 'r') as f2: content1 = f1.read() content2 = f2.read() merged_content = content1 + content2 current_directory = os.getcwd() path = now_date() + "_" + "匹配结果.xlsx" merged_file_path = os.path.join(current_directory, app.config['ShengCheng'], path) with open(merged_file_path, 'w') as merged_file: merged_file.write(merged_content) except Exception as e: print("hebing error:", e) logging.error("hebing error:", e) return jsonify("hebing方法报错误:{0}".format(e)) def now_date(): # 获取当前日期 current_date = datetime.now() # st1 = str(current_date)[:10] # st2 = str(current_date)[11:19] # 获取前7天的日期 # current_date_1 = + st1 + "-" + st2 # previous_day = current_date - timedelta(days=7) now_time = (str("sc" + current_date.strftime('%Y%m%d%H%M%S'))) return now_time def shaixuan(): if os.path.exists('匹配结果-1.xlsx'): os.remove('匹配结果-1.xlsx') print('匹配结果-1.xlsx 已删除!') logging.info('匹配结果-1.xlsx 已删除!') T1 = time.time() try: df31 = pd.read_excel('匹配结果.xlsx', index_col=0, usecols='C,D,E,F,G,H,K,L,O,Q,R,S,T,U,V') # df31 = pd.read_excel('匹配结果.xlsx', index_col=0) df31_columns_list = [] for col in df31.columns: df31_columns_list.append(col) if "公众号" in df31_columns_list: dfa31 = df31.loc[(df31['公众号'] == 'XXX')] # dfa31 = df31.loc[(df31['字段状态'] == '绿码') & (df31['字段状态'] == '黄码') & (df31['字段状态'] == '红码')] dfa31.to_excel('匹配结果-1.xlsx') print("匹配结果-1") logging.info("匹配结果-1") T = time.time() - T1 print("公众号为XXX的筛选完成,共花费{0}s".format(T)) logging.info("公众号为XXX的筛选完成,共花费{0}s".format(T)) else: print("匹配结果-1") logging.info("匹配结果-1") df31.to_excel('匹配结果-1.xlsx') except Exception as e: print("shaixuan error:", e) logging.error("shaixuan error:", e) return jsonify("shaixuan方法报错误:{0}".format(e)) def fenge(): if os.path.exists(r'匹配结果-2.xlsx'): os.remove(r'匹配结果-2.xlsx') print('匹配结果-2.xlsx 已删除!') logging.info('匹配结果-2.xlsx 已删除!') try: # 读取Excel文件 T1 = time.time() print("开始分割时间匹配结果") logging.info("开始分割时间匹配结果") df = pd.read_excel('匹配结果-1.xlsx') current_directory = os.getcwd() # 将数据创建为DataFrame df_time = pd.DataFrame(df) # 使用str.split()方法来分列,expand=True表示将结果扩展为多个列 # 这里假设数据以空格分隔 df_split = df_time['最后一次答题时间'].str.split(' ', expand=True) # 给分列后的列命名 df_split.columns = ['最后一次答题日期', '最后一次答题时间'] # 将分列后的数据与原始DataFrame合并 df = pd.concat([df, df_split], axis=1) df.to_excel('匹配结果-2.xlsx') print("生成匹配结果-2") logging.info("生成匹配结果-2") dfs = pd.read_excel('匹配结果-2.xlsx', usecols='B,C,D,E,F,G,H,I,J,K,L,M,N,O,P,Q,R') dfs.to_excel("匹配结果-new.xlsx") print("清洗匹配结果-2,生成匹配结果-new") logging.info("清洗匹配结果-2,生成匹配结果-new") path = now_date() + "_" + "匹配结果.xlsx" name = os.path.join(current_directory, app.config['ShengCheng'], path) # True_name = os.path.normpath(path) # 去重用户类型为家人 df_t = pd.read_excel("匹配结果-new.xlsx") t = [] for col in df_t.columns: t.append(col) if "用户类型" in t: filtered_data = df_t[df_t['用户类型'] != '家人'] filtered_data.to_excel(name) print("用户类型为XX的筛选完成,生成最终结果{0}".format(name)) logging.info("用户类型为XX的筛选完成,生成最终结果{0}".format(name)) T2 = time.time() T = T2 - T1 print("分割和筛选XX共花费{0}s".format(T)) logging.info("分割和筛选xx共花费{0}s".format(T)) else: T2 = time.time() T = T2 - T1 df_t.to_excel(name) print("分割和筛选XX共花费{0}s".format(T)) logging.info("分割和筛选XX共花费{0}s".format(T)) return path, name except Exception as e: print("fenge error:", e) logging.info("fenge error:", e) return jsonify("fenge方法报错误:{0}".format(e)) def qywx_text_robot(content): data = { "msgtype": 'text', "text": { "content": content, "mentioned_list": ['@all'], # userid,"@all"艾特所有人 "mentioned_mobile_list": [] # 手机号 } } req = { "url": 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxxx', "method": "POST", "headers": { "Content-Type": "application/json; charset=utf-8" }, "data": json.dumps(data).encode('utf8'), } requests.post(url=req['url'], data=req['data']) if __name__ == '__main__': app.run("0.0.0.0", "5000") logging.info("The process is starting")
前端代码index.html <!DOCTYPE html> <head> <meta charset="UTF-8"> <title>上传两个文件并合并下载</title> <script src="https://code.jquery.com/jquery-3.6.0.min.js"></script> </head> <body> <h1>上传两个文件并合并下载</h1> <p>匹配相同项手机号合并</p> <form action="/upload" method="post" enctype="multipart/form-data"> <label for="file1">文件1:</label><input type="file" name="file1" id="file1" required><br><br> <label for="file2">文件2:</label><input type="file" name="file2" id="file2" required><br><br> <input type="submit" value="上传并合并文件"> </form> <h1>已生成文件</h1> <div id="file-class"> <div id="loading-spinner" class="loading-spinner"></div> <div id="fileList"></div> </div> <!-- 以下代码请放入前端html文件的body内 --> <script src="https://agi-dev-platform-web.bj.bcebos.com/ai_apaas/embed/output/embedLiteSDK.js?responseExpires=0"></script> <script> function goBack() { window.history.back(); }; new EmbedLiteSDK({appId: '37f3fe7a-e68b-4f11-b430-42774b6a36e9', code: 'embedjW6Y1HLV93TeIh3U2pEc'}); $(document).ready(function () { // 后端接口地址,根据实际情况修改 const backendUrl_getFiles = '/get_file_links'; const backendUrl_download = '/download/'; // 获取元素 const loadingSpinner = document.getElementById('loading-spinner'); const dataDisplay = document.getElementById('fileList'); // 显示转圈效果 loadingSpinner.style.display = 'block'; dataDisplay.style.display = 'none'; // const data = { message: '这是从后端获取的数据' }; // const dataElement = document.createElement('p'); // dataElement.textContent = data.message; // dataDisplay.appendChild(dataElement); // 获取文件名称列表并展示 $.ajax({ type: 'GET', url: backendUrl_getFiles, dataType: 'json', success: function (data) { const fileListElement = $('#fileList'); data.forEach(function (fileName) { const listItem = $('<li>'); const linkElement = $('<a>').attr('href', backendUrl_download + fileName).text(fileName); listItem.append(linkElement); fileListElement.append(listItem); }); }, error: function (error) { console.error('获取文件名称列表出错:', error); } }); // 隐藏转圈效果,显示数据 loadingSpinner.style.display = 'none'; dataDisplay.style.display = 'block'; }); </script> <style> /* styles.css */ body { font-family: Arial, sans-serif; padding: 20px; background-color: #f4f4f4; } h1 { color: #333; text-align: center; } p { width: 400px; margin: 0 auto 20px; } form { background-color: #fff; padding: 20px; border-radius: 5px; box-shadow: 0 0 5px rgba(0, 0, 0, 0.1); width: 400px; margin: 0 auto 20px; } label { display: block; margin-bottom: 5px; } input[type="file"], input[type="submit"] { width: 100%; padding: 5px; margin-bottom: 10px; border: 1px solid #ccc; border-radius: 3px; } input[type="submit"] { background-color: #007BFF; color: white; cursor: pointer; transition: background-color 0.3s ease; } input[type="submit"]:hover { background-color: #0056b3; } #file-class { background-color: #fff; padding: 20px; border-radius: 5px; box-shadow: 0 0 5px rgba(0, 0, 0, 0.1); width: 400px; margin: 0 auto; } #fileList li { list-style-type: none; margin-bottom: 10px; } #fileList a { color: #007BFF; text-decoration: none; transition: color 0.3s ease; } #fileList a:hover { color: #0056b3; text-decoration: underline; } .loading-spinner { width: 40px; height: 40px; border: 4px solid rgba(0, 0, 0, 0.1); border-top: 4px solid #007BFF; border-radius: 50%; animation: spin 1s linear infinite; margin: 50px auto; display: block; } @keyframes spin { to { transform: rotate(360deg); } } </style> </body> </html>
前端代码download.html <!DOCTYPE html> <head> <meta charset="UTF-8"> <title>上传两个文件并合并下载</title> <script src="https://code.jquery.com/jquery-3.6.0.min.js"></script> </head> <body> <h1>上传两个文件并合并下载</h1> <p>文件上传成功,数据处理完成,花费时间{{date}}s,<a href="/download/{{url}}">点击此处下载合并后的文件</a>,<button onclick='goBack()'>返回上一级</button></p> <!-- 以下代码请放入前端html文件的body内 --> <script src="https://agi-dev-platform-web.bj.bcebos.com/ai_apaas/embed/output/embedLiteSDK.js?responseExpires=0"></script> <script> function goBack() { window.history.back(); }; </script> <style> /* styles.css */ body { font-family: Arial, sans-serif; padding: 20px; background-color: #f4f4f4; } h1 { color: #333; text-align: center; } p { width: 400px; margin: 0 auto 20px; } </style> </body> </html>
目录架构为
标签:xlsx,匹配,手机号,表格,df,pd,time,path From: https://www.cnblogs.com/liyixiang545/p/18683403