定义Excel列对象
class ExcelColumn:
"""
定义Excel中的列
参数:
name (str): 列的名称。
width (int | None, 可选): 列的宽度。默认为 None。
required (bool, 可选): 指示列是否必需。默认为 False。
mapping_factory (Callable | dict | None, 可选): 实例的映射工厂。
可以是可调用对象、字典或 None。默认为 None。
update (bool, 可选): 指示列是否应该更新,用于ExcelWriter写入的时候是否覆盖写入。默认为 True。
"""
def __init__(self, name: str, *, width: int | None = None, required=False,
mapping_factory: Callable | dict | None = None, update=True):
self.name = name if not required else f'*{name}'
self.width = width
self.required = required
self.mapping_keys = []
self.mapping_reverse = None
if not mapping_factory:
self.mapping = lambda x: x
elif isinstance(mapping_factory, dict):
mapping_factory_reverse = {v: k for k, v in mapping_factory.items()}
self.mapping = lambda x: mapping_factory.get(x)
self.mapping_reverse = lambda x: mapping_factory_reverse.get(x)
self.mapping_keys = list(mapping_factory.keys())
else:
self.mapping = mapping_factory
self.update = update
定义Excel模板元类
class ExcelTemplateMetaclass(type):
def __new__(cls, name, bases, attrs):
if name == 'ExcelTemplate':
return type.__new__(cls, name, bases, attrs)
columns = dict() # 保存Excel的列信息
for k, v in attrs.items():
if isinstance(v, ExcelColumn):
columns[k] = v
for k in columns.keys():
attrs.pop(k)
attrs['__columns__'] = columns
return type.__new__(cls, name, bases, attrs)
定义Excel模板基类
class ExcelTemplate(metaclass=ExcelTemplateMetaclass):
__sheet_name__ = 'Sheet1'
__columns__: dict[str, ExcelColumn]
headers = {
'Content-Type': 'application/vnd.ms-excel',
'Content-Disposition': f'attachment;filename=download.xlsx'
}
@staticmethod
async def get_download_data(query: QuerySet, limit: int=None):
"""传入一个querySet,最后导对应的数据,limit为最多允许导出的数据"""
if not limit:
limit = settings.EXCEL_DOWNLOAD_LIMIT
pagination = query.limit(limit).offset(0) # 最多允许导出100W的数据
return await pagination.all()
@classmethod
def _get_upload_title_mapping(cls):
return {v.name: k for k, v in cls.__columns__.items()}
@classmethod
def _get_upload_value_mappings(cls):
return {v.name: v.mapping for k, v in cls.__columns__.items()}
@staticmethod
def _get_data_validation(columnIndex, excelColumn: ExcelColumn):
dv = DataValidation(
type='list',
formula1=f'"{",".join(excelColumn.mapping_keys)}"',
allow_blank=not excelColumn.required
)
dv.error = f"{excelColumn.name}只能从str({excelColumn.mapping_keys})中选择"
dv.errorTitle = "无效的输入"
dv.prompt = '请从下拉框中选择数据'
dv.promptTitle = f"{excelColumn.name}选择"
columnName = get_column_letter(columnIndex)
dv.add(f'{columnName}{2}:{columnName}{11}') # 数据验证区域,2-11行
return dv
@classmethod
def _set_data_validation(cls, sheet):
for index, (k, v) in enumerate(cls.__columns__.items(), start=1):
if v.mapping_keys:
dv = cls._get_data_validation(index, v)
sheet.data_validations.append(dv)
@classmethod
def get_title(cls):
return [val for val, _ in cls._get_title()]
@classmethod
def get_update_title(cls):
return [k for k, v in cls.__columns__.items() if v.update]
@classmethod
def _get_title(cls):
return [
(v.name, v.width) for k, v in cls.__columns__.items()
]
@classmethod
def _get_title_mapping(cls):
return {v.name: k for k, v in cls.__columns__.items()}
@classmethod
def get_template(cls) -> BytesIO:
file = BytesIO()
excel = ExcelWriter(file)
sheet = excel.write(cls.__sheet_name__, [], title=cls.__columns__.values())
cls._set_data_validation(sheet)
excel.save()
file.seek(0)
return file
@classmethod
def get_template_as_stream_response(cls) -> StreamingResponse:
return StreamingResponse(cls.get_template(), media_type='xls/xlsx', headers=cls.headers)
@classmethod
def save_template_as_file(cls, filename: str):
with open(filename, 'wb') as f:
f.write(cls.get_template().getvalue())
@classmethod
def read(cls, file: bytes | str) -> "":
def get_value(key, value):
mapping = value_mappings.get(key)
if not mapping:
return value
return mapping(value)
def get_title(key):
return title_mapping.get(key, key)
title_mapping = cls._get_upload_title_mapping()
value_mappings = cls._get_upload_value_mappings()
if isinstance(file, bytes):
data = BytesIO()
data.write(file)
data.seek(0)
excel = ExcelReader(data)
else:
excel = ExcelReader(file)
return (
{get_title(k): get_value(k, v) for k, v in row.items()}
for row in excel.read(sheet=cls.__sheet_name__) if any(row.values())
)
@classmethod
def write(cls, data):
def translate_row_values(row_values):
"""将行数据转换为excel可以显示的值,主要是为了将后台枚举的数字转换为用户可读的字符"""
result = {}
for k, v in dict(row_values).items():
# 没有定义的列不会被导入到Excel中
if k not in cls.__columns__:
continue
out_key = cls.__columns__[k].name
# 处理时区问题,Excel保存时不支持时区参数
if isinstance(v, datetime):
result[out_key] = v.replace(tzinfo=None)
continue
# 将枚举数据转换为具体的值
mapping_reverse = cls.__columns__.get(k).mapping_reverse
if mapping_reverse:
result[out_key] = mapping_reverse(v)
continue
result[out_key] = v
return result
file = BytesIO()
excel = ExcelWriter(file)
sheet = excel.write(
cls.__sheet_name__,
(translate_row_values(row) for row in data), # 处理待写入的数据
title=cls.__columns__.values()
)
cls._set_data_validation(sheet)
excel.save()
file.seek(0)
return file
@classmethod
def write_as_stream_response(cls, data):
"""保存问一个文件响应对象,用于返回给前端"""
return StreamingResponse(cls.write(data), media_type='xls/xlsx', headers=cls.headers)
@classmethod
def write_as_file(cls, filename: str, data):
"""将数据保存为一个文件"""
with open(filename, 'wb') as f:
f.write(cls.write(data).getvalue())
使用ExcelTemplate管理模板
class OrderExcelTemplate(ExcelTemplate):
id = ExcelColumn('id', required=True)
orderNo = ExcelColumn('订单号', required=True)
price = ExcelColumn('价格', required=False)
OrderExcelTemplate.save_template_as_file('order_template.xlsx')
标签:return,name,get,Python,Excel,mapping,def,模板,cls
From: https://www.cnblogs.com/liulangjuanzhou/p/17916669.html