业务场景
使用了外部工作流系统管理固定资产申请,转移(负责人变更), 盘点,报废等涉及固定资产的业务,而固定资产采购,折旧等仍在SAP中进行,所以需要定时从SAP中下载包括固定资产账面值的固定资产清单,以作为工作流审批节点流转的依据
主要功能说明
- 定时运行
- 自动登录SAP
- 下载SAP固定资产报表 S_ALR_87011990,(需在SAP中提前设置好报表输出格式layout)
- 下载成跳格分享的csv文本文件
- 进行数据格式处理:日期格式转换,数字格式转换
- 添加目前数据库的字段
- 写入目标数据库
- 执行目录数据库中的后处理SQL语句
附注:每次运行记录数8000多笔,总耗时约1分钟,总体性能可接受
import os,win32com.client import time,csv from datetime import datetime import datetime as datetime1 import schedule from utils import connect_db, close_db,get_configer,get_logger, timestamp from sap_utils import SAP,_get_sap_session,close_sap,get_sap_session, send_email from pprint import pprint cf =get_configer('asset_interface.conf') logger = get_logger('asset_interface.log') if cf.has_option('transaction', 'testing') and cf.getboolean('transaction', 'testing'): testing_flag = True else: testing_flag = False def execute_transaction(session, conn, cursor): session.findById("wnd[0]/tbar[0]/okcd").Text = cf.get('transaction','tcode') #"/nS_ALR_87011990" session.findById("wnd[0]").sendVKey(0) session.findById("wnd[0]/usr/radXEINZEL").Select() session.findById("wnd[0]/usr/ctxtBUKRS-LOW").Text = cf.get('transaction','company') session.findById("wnd[0]/usr/ctxtBEREICH1").Text = "60" session.findById("wnd[0]/usr/ctxtSRTVR").Text = cf.get('transaction','sort_variant') #"0002" sap = SAP(session) date_format = sap.get_date_format("wnd[0]/usr/ctxtBERDATUM") #dmY format_str = '%' + '%'.join(date_format) #'%m%d%Y' next_month = datetime.today().replace(day=28) + datetime1.timedelta(days=4) end_of_month = (next_month - datetime1.timedelta(days=next_month.day)).strftime(format_str) try: default_date = session.findById("wnd[0]/usr/ctxtBERDATUM").Text session.findById("wnd[0]/usr/ctxtBERDATUM").Text = end_of_month except: session.findById("wnd[0]/usr/ctxtBERDATUM").Text = default_date session.findById("wnd[0]/tbar[1]/btn[8]").press() #session.findById("wnd[0]/mbar/menu[0]/menu[1]/menu[1]").Select() session.findById("wnd[0]/mbar/menu[0]/menu[1]/menu[2]").select() session.findById("wnd[1]/usr/subSUBSCREEN_STEPLOOP:SAPLSPO5:0150/sub:SAPLSPO5:0150/radSPOPLI-SELFLAG[1,0]").select() session.findById("wnd[1]/tbar[0]/btn[0]").press() full_file_name = save_csv(session, 'asset_list_report') print('full_file_name=', full_file_name) if full_file_name: data = get_data_from_file(full_file_name, date_format) cursor.execute ("delete from app_fd_F01_AssetMaster") db_fields =['id', 'c_AssetNo', 'c_SubNumber', 'c_AssetClass', 'c_AssetDescription', 'c_WBS', 'c_Plant', ' c_CostCenter', 'c_AssetOwnerNo', 'c_CapitalizedDate', 'c_DeactivationDate', 'c_Currency', 'c_CurrBkVal', 'c_CurrentAPC', 'c_AccumulDep', 'dateCreated', 'createdBy'] s = "INSERT into app_fd_F01_AssetMaster (%s) VALUES (%s) " %(','.join(db_fields), ','.join(['?']*len(db_fields))) record_value = [] total_row = len(data) total_records_updated = 0 for (j, row) in enumerate(data): imod = j % 1000 if row[0] and row[11]: record_value.append(row) total_records_updated += 1 if record_value and (j == total_row -1 or (j and imod == 0)): cursor.executemany(s, record_value) cursor.commit() record_value =[] return total_records_updated def save_csv(session,tcode, file_folder=None): if file_folder: session.findById("wnd[1]/usr/ctxtDY_PATH").text = file_folder else: file_folder = session.findById("wnd[1]/usr/ctxtDY_PATH").text file_name = f"{tcode}_{datetime.now():%y%m%d_%H%M%S}.csv" session.findById("wnd[1]/usr/ctxtDY_FILENAME").text = file_name session.findById("wnd[1]/tbar[0]/btn[0]").press() result = None full_file_name = os.path.join(file_folder, file_name) time.sleep(1) for i in range(720): if os.path.exists(full_file_name): result = full_file_name break time.sleep(1) return result def get_data_from_file(file_name, date_format): data = [] with open(file_name, "r", newline='', encoding='unicode_escape') as csvfile: rows = csv.reader(csvfile, delimiter = '\t') rows = [r for r in rows] columns = rows[6] #获取第7行标题列 valid_columns = [c for c in columns if c] #剔除空字段,获取非空标题字段,解决字段间多个tab的情况 target_column_count = len(valid_columns) for i, row in enumerate(rows): if i > 7 and row and len(row) > 4: #剔除空行,结尾标记行, row = [c for idx, c in enumerate(row) if columns[idx]] #剔除空列标题对应的字段值 col_count = len(row) if col_count < target_column_count: #补齐最后几个空列,最后几列无内容时,也没有tab分隔符, row += [None for j in range(target_column_count - col_count)] data.append(row) fields = cf.get('transaction','fields') #从配置文件中获取字段清单 fields = fields.split(',') if len(valid_columns) < len(fields): logger.info('missing Fields in sap layout') return #字段名顺序匹配,导出时因字段名输出长度不一致,会有短,中,长三种标签输出 # col_check = [c for (i, c) in enumerate(fields) if c != valid_columns[i]] # if col_check: # logger.info('Fields sequence should be same as in asset_interface.conf file, field index') # return result = [] ymd_pos = get_ymd_pos(date_format) for row in data: for column_idx, value in enumerate(row): if not value: continue if 8<= column_idx <=9: #日期字段处理:根据SAP用户格式转换成 yyyy/mm/dd格式 row[column_idx] = convert_date(value, ymd_pos) elif 11<= column_idx <=13: #数字字段处理,去掉千分位分隔符,去掉首尾空格,将末尾负号移至最前面 value = value.strip().replace(',','') value = f"-{value[:-1]}" if value[-1] == '-' else value row[column_idx] = value if column_idx == 11 and value == '0.00': row[column_idx] = 0 row.extend([datetime.now().strftime('%Y/%m/%d %H:%M:%S'),os.environ['username']]) #添加时间戳和当前用户 row.insert(0,row[0]) #将资产号作为ID result.append(row) pprint('get_data_from_file 2 records %s' % result[:2]) return result def get_ymd_pos(date_format): """根据格式化字符串,解析年、月、日位置, date_format: 如dmY, mdY,Ymd 返回{'Y':(6,10), 'm':(3,5), 'd':(0,2) """ start = 0 ymd_pos = {} for k in date_format: begin = start length = 3 if k in ['m','d'] else 5 start += length ymd_pos[k] = (begin, start - 1) return ymd_pos def convert_date(date_str, ymd_pos): """从日期字符串中按位置取出年、月、日,再按 固定年/月/日格式字符串返回""" pos = ymd_pos.get('Y') y = date_str[pos[0]: pos[1]] pos = ymd_pos.get('m') m = date_str[pos[0]: pos[1]] pos = ymd_pos.get('d') d = date_str[pos[0]: pos[1]] return f"{y}/{m}/{d}" def job(): post_sql_commands=[ """ 执行后处理的SQL语句 """ ] session = conn = 0 try: print('%s started running the job...' % datetime.now()) short_cut_file =cf.get('saplogon','short_cut_file') popup_win_title=cf.get('saplogon','popup_win_title') pin =cf.get('saplogon','pin') wait_sec =cf.get('saplogon','wait_sec') if not testing_flag: session, msg = get_sap_session(short_cut_file, popup_win_title, pin, wait_sec) else: session, msg = _get_sap_session() if session: conn, cursor = connect_db(cf.get('db','ip'), cf.get('db','db')) total_records_updated = execute_transaction(session, conn, cursor) for sql in post_sql_commands: cursor.execute(sql) print('%s records updated' % cursor.rowcount) send_email('%s Asset updated' % total_records_updated, "[email protected]") else: send_email("Failed logon SAP %s" % msg, '[email protected]') print('%s finished running the job...' % datetime.now()) except Exception as e: send_email("Asset interface run with error %s" % str(e)) raise finally: if not testing_flag: close_db(conn) close_sap(session) def main(): print('started..') run_at=cf.get('schedule','RunAt') #mailto=cf.get('mail','MailTo') runat = run_at.split(';') for r in runat: schedule.every().day.at(r).do(job) print('%s waiting for pending job at %s' %(datetime.now(),runat)) while True: schedule.run_pending() time.sleep(1) if __name__ == "__main__": job() main()
标签:session,findById,wnd,get,Python,87011990,file,SAP,row From: https://www.cnblogs.com/pythonClub/p/17655302.html