说明
有时候有一些东西需要迭代处理(比如临时需要采集一些数据),有时候中间会失败,就需要创建一个可持久化的索引。
因此简单的写入到 txt 文件进行存储。好几次之后,经常需要很重复写一些代码,变得很不好看。
所以花了一点点时间抽出来这些重复代码。
Code
# coding=utf-8
# @Author: 二次蓝
# @Created at 2023/4/6 9:55
import os.path
class Pointer:
def __init__(self, file_path: str, pad: int = 20, only_item: bool = False):
"""
创建一个可持久化的记录可迭代对象的索引的文件,
从第0行开始,每次返回一个索引对应的item,达到步长,就会写入磁盘持久化索引。
【存储的是上一次返回的索引】
:param file_path: 需要操作的索引文件路径
:param pad: 步长,每过几个索引则写入磁盘
:param only_item: 迭代时,只返回item,不返回index
"""
self.file_path = file_path
self.pad = pad
self.only_item = only_item
def read(self) -> int:
if os.path.exists(self.file_path):
with open(self.file_path, encoding="utf-8", mode="rt") as f:
return int(f.read())
else:
return -1
def save(self, pointer_num: int):
with open(self.file_path, encoding="utf-8", mode="wt") as f:
f.write(str(pointer_num))
return pointer_num
def iter(self, iterable):
"""
迭代一个可迭代对象,会从已存储索引+1开始迭代。
:param iterable: 一个可迭代对象
"""
wrote_index = self.read()
continue_index = wrote_index + 1
should_query_flag = False
index = 0
for index, item in enumerate(iterable):
if not should_query_flag:
if index < continue_index:
continue
else:
should_query_flag = True
if (index + 1) % self.pad == 0:
wrote_index = self.save(index)
if self.only_item:
yield item
else:
yield index, item
if wrote_index != index:
self.save(index)
if __name__ == '__main__':
pointer = Pointer("./test_pointer.txt")
for i in pointer.iter(range(105)):
print(i)
使用示例:
df = pd.read_csv(input_txt)
pinter = Pointer(pointer_txt, only_item=True)
logger.info(f"程序启动")
for index, row in pinter.iter(df.iterrows()): # iterrows()本身就返回了index
logger.info(f"读取第几行:{index + 1}")
这样就可以实现,程序中断后,再次运行可以继续迭代执行了,不再需要管内部逻辑。
通过设置 pad
步长参数可以减少写入磁盘的次数,但是如果在步长期间发生异常,那么就会丢失期间的已使用索引。
可以自行捕获异常,使用 pointer.save(num)
保存当时已使用的索引。或者直接把步长设为 1,每迭代一个都写入磁盘。