首页 > 编程语言 >Python SAP 脚本定时自动下载资产清单 S_ALR_87011990

Python SAP 脚本定时自动下载资产清单 S_ALR_87011990

时间:2023-08-24 22:25:18浏览次数:48  
标签:session findById wnd get Python 87011990 file SAP row

业务场景

使用了外部工作流系统管理固定资产申请,转移(负责人变更), 盘点,报废等涉及固定资产的业务,而固定资产采购,折旧等仍在SAP中进行,所以需要定时从SAP中下载包括固定资产账面值的固定资产清单,以作为工作流审批节点流转的依据

主要功能说明

  1. 定时运行
  2. 自动登录SAP
  3. 下载SAP固定资产报表 S_ALR_87011990,(需在SAP中提前设置好报表输出格式layout)
  4. 下载成跳格分享的csv文本文件
  5. 进行数据格式处理:日期格式转换,数字格式转换
  6. 添加目前数据库的字段
  7. 写入目标数据库
  8. 执行目录数据库中的后处理SQL语句

附注:每次运行记录数8000多笔,总耗时约1分钟,总体性能可接受

 

import os,win32com.client
import time,csv
from datetime import datetime
import datetime as datetime1
import schedule
from utils import connect_db, close_db,get_configer,get_logger, timestamp
from sap_utils import SAP,_get_sap_session,close_sap,get_sap_session, send_email
from pprint import pprint

cf =get_configer('asset_interface.conf')
logger = get_logger('asset_interface.log')
if cf.has_option('transaction', 'testing') and cf.getboolean('transaction', 'testing'):
    testing_flag = True
else:
    testing_flag = False

def execute_transaction(session, conn, cursor):
    session.findById("wnd[0]/tbar[0]/okcd").Text = cf.get('transaction','tcode')  #"/nS_ALR_87011990"
    session.findById("wnd[0]").sendVKey(0)
    session.findById("wnd[0]/usr/radXEINZEL").Select()
    session.findById("wnd[0]/usr/ctxtBUKRS-LOW").Text = cf.get('transaction','company') 
    session.findById("wnd[0]/usr/ctxtBEREICH1").Text = "60"
    session.findById("wnd[0]/usr/ctxtSRTVR").Text = cf.get('transaction','sort_variant') #"0002"
 
    sap = SAP(session)
    date_format = sap.get_date_format("wnd[0]/usr/ctxtBERDATUM")  #dmY
    format_str = '%' + '%'.join(date_format) #'%m%d%Y'
    next_month = datetime.today().replace(day=28) + datetime1.timedelta(days=4)
    end_of_month = (next_month - datetime1.timedelta(days=next_month.day)).strftime(format_str)
 try:
        default_date = session.findById("wnd[0]/usr/ctxtBERDATUM").Text
        session.findById("wnd[0]/usr/ctxtBERDATUM").Text = end_of_month
    except:
        session.findById("wnd[0]/usr/ctxtBERDATUM").Text = default_date
 
    session.findById("wnd[0]/tbar[1]/btn[8]").press()
    #session.findById("wnd[0]/mbar/menu[0]/menu[1]/menu[1]").Select()
    session.findById("wnd[0]/mbar/menu[0]/menu[1]/menu[2]").select()
    session.findById("wnd[1]/usr/subSUBSCREEN_STEPLOOP:SAPLSPO5:0150/sub:SAPLSPO5:0150/radSPOPLI-SELFLAG[1,0]").select()  
    session.findById("wnd[1]/tbar[0]/btn[0]").press()
    full_file_name = save_csv(session, 'asset_list_report')
 print('full_file_name=', full_file_name)
 if full_file_name:        
        data = get_data_from_file(full_file_name, date_format)
 
    cursor.execute ("delete from app_fd_F01_AssetMaster")
    db_fields =['id',
 'c_AssetNo',
 'c_SubNumber',
 'c_AssetClass',
 'c_AssetDescription',
 'c_WBS',
 'c_Plant',
 ' c_CostCenter',
 'c_AssetOwnerNo',
 'c_CapitalizedDate',
 'c_DeactivationDate',
 'c_Currency',
 'c_CurrBkVal',
 'c_CurrentAPC',
 'c_AccumulDep',
 'dateCreated',
 'createdBy']
    s = "INSERT into app_fd_F01_AssetMaster (%s) VALUES (%s) " %(','.join(db_fields), ','.join(['?']*len(db_fields)))    
    record_value = []
    total_row = len(data)
    total_records_updated = 0
 for (j, row) in enumerate(data):
        imod = j % 1000
 if row[0] and row[11]:            
            record_value.append(row)
            total_records_updated += 1
 if record_value and (j == total_row -1 or (j and imod == 0)):
            cursor.executemany(s, record_value)
            cursor.commit()
            record_value =[]            
 return total_records_updated

def save_csv(session,tcode, file_folder=None):
 if file_folder:
        session.findById("wnd[1]/usr/ctxtDY_PATH").text = file_folder 
 else:
        file_folder = session.findById("wnd[1]/usr/ctxtDY_PATH").text
 
    file_name = f"{tcode}_{datetime.now():%y%m%d_%H%M%S}.csv"
    session.findById("wnd[1]/usr/ctxtDY_FILENAME").text = file_name
 
    session.findById("wnd[1]/tbar[0]/btn[0]").press()
    result = None 
    full_file_name = os.path.join(file_folder, file_name)
    time.sleep(1)    
 for i in range(720):
 if os.path.exists(full_file_name):
            result = full_file_name
 break
        time.sleep(1) 
 
 return result

def get_data_from_file(file_name, date_format):
    data = []
    with open(file_name, "r", newline='', encoding='unicode_escape') as csvfile:
        rows = csv.reader(csvfile, delimiter = '\t')
        rows = [r for r in rows]

    columns = rows[6]                                               #获取第7行标题列
    valid_columns = [c for c in columns if c]                       #剔除空字段,获取非空标题字段,解决字段间多个tab的情况
    target_column_count = len(valid_columns)
 for i, row in enumerate(rows):                
 if i > 7 and row and len(row) > 4:                          #剔除空行,结尾标记行,
            row = [c for idx, c in enumerate(row) if columns[idx]]  #剔除空列标题对应的字段值
            col_count = len(row)            
 if col_count < target_column_count:                     #补齐最后几个空列,最后几列无内容时,也没有tab分隔符,
                row += [None for j in range(target_column_count - col_count)]                        
            data.append(row)
 
    fields = cf.get('transaction','fields')             #从配置文件中获取字段清单
    fields = fields.split(',')
 if len(valid_columns) < len(fields):
        logger.info('missing Fields in sap layout')        
 return
    #字段名顺序匹配,导出时因字段名输出长度不一致,会有短,中,长三种标签输出
    # col_check = [c for (i, c) in enumerate(fields) if c != valid_columns[i]]
    # if col_check:
    #     logger.info('Fields sequence should be same as in asset_interface.conf file, field index')        
    #     return
    result = []
    ymd_pos = get_ymd_pos(date_format)
 for row in data:        
 for column_idx, value in enumerate(row):
 if not value: continue
 if 8<= column_idx <=9:         #日期字段处理:根据SAP用户格式转换成 yyyy/mm/dd格式
                row[column_idx]  = convert_date(value, ymd_pos)           
            elif 11<= column_idx <=13:       #数字字段处理,去掉千分位分隔符,去掉首尾空格,将末尾负号移至最前面
                value = value.strip().replace(',','')
                value = f"-{value[:-1]}" if value[-1] == '-' else value
                row[column_idx]  = value
 if column_idx == 11 and value == '0.00':
                row[column_idx]  = 0 
        row.extend([datetime.now().strftime('%Y/%m/%d %H:%M:%S'),os.environ['username']])    #添加时间戳和当前用户
        row.insert(0,row[0])                #将资产号作为ID
        result.append(row)
    pprint('get_data_from_file 2 records %s' % result[:2])            
 return result

def get_ymd_pos(date_format):
 """根据格式化字符串,解析年、月、日位置,
        date_format: 如dmY, mdY,Ymd
        返回{'Y':(6,10),
             'm':(3,5),
            'd':(0,2)
    """
    start = 0
    ymd_pos = {}
 for k in date_format:
        begin = start
        length = 3 if k in ['m','d'] else 5
        start += length
        ymd_pos[k] = (begin, start - 1)
 return ymd_pos

def convert_date(date_str, ymd_pos):
 """从日期字符串中按位置取出年、月、日,再按 固定年/月/日格式字符串返回"""
    pos = ymd_pos.get('Y')
    y = date_str[pos[0]: pos[1]]
    pos = ymd_pos.get('m')
    m = date_str[pos[0]: pos[1]]
    pos = ymd_pos.get('d')
    d = date_str[pos[0]: pos[1]]
 return f"{y}/{m}/{d}"

def job():
    post_sql_commands=[
 """
    执行后处理的SQL语句
    """
    ]
    session = conn = 0 
 try:
 print('%s started running the job...' % datetime.now())    
        short_cut_file =cf.get('saplogon','short_cut_file')
        popup_win_title=cf.get('saplogon','popup_win_title')
        pin =cf.get('saplogon','pin')
        wait_sec =cf.get('saplogon','wait_sec')
 if not testing_flag:
            session, msg = get_sap_session(short_cut_file, popup_win_title, pin, wait_sec)
 else:
            session, msg = _get_sap_session()        
 if session:            
            conn, cursor = connect_db(cf.get('db','ip'), cf.get('db','db'))            
            total_records_updated = execute_transaction(session, conn, cursor) 
 for sql in post_sql_commands:
                cursor.execute(sql)
 print('%s records updated' % cursor.rowcount)                            
            send_email('%s Asset updated' % total_records_updated, "[email protected]")
 else:
            send_email("Failed logon SAP %s" % msg, '[email protected]')
 print('%s finished running the job...' % datetime.now())                
    except Exception as e:
        send_email("Asset interface run with error %s" % str(e))
        raise
 finally:
 if not testing_flag:        
            close_db(conn)    
            close_sap(session)        
 
def main():
 print('started..')
    run_at=cf.get('schedule','RunAt')
    #mailto=cf.get('mail','MailTo')    
    runat = run_at.split(';')
 for r in runat:
        schedule.every().day.at(r).do(job)
 print('%s waiting for pending job at %s' %(datetime.now(),runat))    
 while True:
        schedule.run_pending()
        time.sleep(1)
 
if __name__ == "__main__":
    job()
    main()        

  

标签:session,findById,wnd,get,Python,87011990,file,SAP,row
From: https://www.cnblogs.com/pythonClub/p/17655302.html

相关文章

  • 从SAP GridView中获取数据
    classSapGuiGridView:"""SAP中GridView组件数据的表示对象类。用于从SAP的GridView中读取指定的数据。"""@staticmethoddefget_data(session,_id,columns,handler=None):"""读取查询到的表格数据。......
  • SAP总结
     1.获取session(窗口)SetSapGuiAuto=GetObject("SAPGUI")'GettheSAPGUIScriptingobjectSetSAPApp=SapGuiAuto.GetScriptingEngine'GetthecurrentlyrunningSAPGUISetSAPCon=SAPApp.Children(0)'Getthefirstsystemthatis......
  • 从SAP TableControl中读取数据
    classSapGuiTableControl:"""读取GuiTableControl对象的数据。"""@staticmethoddefget_data(session,_id,columns=None):"""获取指定列的数据,索引从0开始。:paramsession:SAP的GuiSession......
  • Python虚拟环境
    以前在打比赛和做项目的时候都一直都没有注重管理python包,以至于把所有的包都堆到Anaconda下,以前出现包问题的时候能百度解决的解决,结局不了就卸了重装,感觉没什么。最近开始做一些项目,在自己的电脑上做好,去别的地方打包运行,结果要配两次环境非常的麻烦,所以开始认真对待起不同项目......
  • Python 项目以及常见的目录结构
    当今世界,Python可以说是最受欢迎的编程语言之一。作为一种高级动态语言,Python具有简单易学、代码可读性强和生态系统丰富等特点,广泛应用于Web开发、数据科学、机器学习、网络爬虫等领域。在Python项目中,良好的目录结构设计是一个成功项目的关键因素之一。Python项......
  • python 中 if __name__ == '__main__'
    当我们编写Python模块时,有时候需要让某些代码只在该模块作为主程序运行时才执行,而不是被其他模块import引入时就执行。这时候可以使用if__name__=='__main__'这个条件语句。什么是 name 变量在Python中,每个模块(Python文件)都有一个内置变量__name__,用于指示当前模......
  • 使用 conda 管理电脑多个 python 版本
    背景之前一直使用python自带的虚拟环境管理工具(virtualen包),虽然很舒服,可以有不同的软件包环境,但是所有环境都只能基于一个python版本。由于历史原因,系统(Ubuntu)升级时给我新增了一个python3.11,我索性就只保留了这一个版本。这两天要使用open3d,结果发现不支持最新版本的......
  • python独立脚本应用Django项目的环境
    一、需求说明一直用Django在开发一个网站项目,其中的注册用户和登录,都是使用Django自带的认证系统。主要是对密码的加密,在注册或者登录的时候,前端传递多来的密码,我会使用Django的set_password()方法在加密一次经过加密后的数据库中的数据样子如下:......
  • Python 入门的第三天
    pycharm的简单使用注释语法变量与常量垃圾回收机制数据类型(8种基本数据类型)首先:我们学习如何修改主题,步骤为:打开pycharm,File,Settings,Appearance&Behavior,Appearance,Theme2.如何切换解释器File,Settings,Project:django_lock,PythonInterpreter 3.如何创......
  • Python全栈开发从入门到入土
    Python全栈开发从入门到入土持续更新中~~~希望大家多多支持!【Python基础从入坑到放弃】【一】Python基础入门【一】计算机基础编程语言的介绍什么是语言什么是编程语言为什么会出现编程语言什么是编程为什么要有编程计算机组成原理计算机的五大组成部分......