首页 > 编程语言 >Marker 源码解析(二)

Marker 源码解析(二)

时间:2024-03-08 17:11:31浏览次数:28  
标签:return text self 源码 Marker import model 解析 page

.\marker\marker\models.py

# 从 marker.cleaners.equations 模块中导入 load_texify_model 函数
from marker.cleaners.equations import load_texify_model
# 从 marker.ordering 模块中导入 load_ordering_model 函数
from marker.ordering import load_ordering_model
# 从 marker.postprocessors.editor 模块中导入 load_editing_model 函数
from marker.postprocessors.editor import load_editing_model
# 从 marker.segmentation 模块中导入 load_layout_model 函数
from marker.segmentation import load_layout_model

# 定义一个函数用于加载所有模型
def load_all_models():
    # 调用 load_editing_model 函数,加载编辑模型
    edit = load_editing_model()
    # 调用 load_ordering_model 函数,加载排序模型
    order = load_ordering_model()
    # 调用 load_layout_model 函数,加载布局模型
    layout = load_layout_model()
    # 调用 load_texify_model 函数,加载 TeXify 模型
    texify = load_texify_model()
    # 将加载的模型按顺序存储在列表中
    model_lst = [texify, layout, order, edit]
    # 返回模型列表
    return model_lst

.\marker\marker\ocr\page.py

import io  # 导入io模块
from typing import List, Optional  # 导入类型提示相关模块

import fitz as pymupdf  # 导入fitz模块并重命名为pymupdf
import ocrmypdf  # 导入ocrmypdf模块
from spellchecker import SpellChecker  # 从spellchecker模块导入SpellChecker类

from marker.ocr.utils import detect_bad_ocr  # 从marker.ocr.utils模块导入detect_bad_ocr函数
from marker.schema import Block  # 从marker.schema模块导入Block类
from marker.settings import settings  # 从marker.settings模块导入settings变量

ocrmypdf.configure_logging(verbosity=ocrmypdf.Verbosity.quiet)  # 配置ocrmypdf的日志记录级别为quiet

# 对整个页面进行OCR识别,返回Block对象列表
def ocr_entire_page(page, lang: str, spellchecker: Optional[SpellChecker] = None) -> List[Block]:
    # 如果OCR_ENGINE设置为"tesseract",则调用ocr_entire_page_tess函数
    if settings.OCR_ENGINE == "tesseract":
        return ocr_entire_page_tess(page, lang, spellchecker)
    # 如果OCR_ENGINE设置为"ocrmypdf",则调用ocr_entire_page_ocrmp函数
    elif settings.OCR_ENGINE == "ocrmypdf":
        return ocr_entire_page_ocrmp(page, lang, spellchecker)
    else:
        raise ValueError(f"Unknown OCR engine {settings.OCR_ENGINE}")  # 抛出数值错误异常,显示未知的OCR引擎

# 使用tesseract对整个页面进行OCR识别,返回Block对象列表
def ocr_entire_page_tess(page, lang: str, spellchecker: Optional[SpellChecker] = None) -> List[Block]:
    try:
        # 获取页面的完整OCR文本页
        full_tp = page.get_textpage_ocr(flags=settings.TEXT_FLAGS, dpi=settings.OCR_DPI, full=True, language=lang)
        # 获取页面的文本块列表
        blocks = page.get_text("dict", sort=True, flags=settings.TEXT_FLAGS, textpage=full_tp)["blocks"]
        # 获取页面的完整文本
        full_text = page.get_text("text", sort=True, flags=settings.TEXT_FLAGS, textpage=full_tp)

        # 如果完整文本长度为0,则返回空列表
        if len(full_text) == 0:
            return []

        # 检查OCR是否成功。如果失败,返回空列表
        # 例如,如果有一张扫描的空白页上有一些淡淡的文本印记,OCR可能会失败
        if detect_bad_ocr(full_text, spellchecker):
            return []
    except RuntimeError:
        return []
    return blocks  # 返回文本块列表

# 使用ocrmypdf对整个页面进行OCR识别,返回Block对象列表
def ocr_entire_page_ocrmp(page, lang: str, spellchecker: Optional[SpellChecker] = None) -> List[Block]:
    # 使用ocrmypdf获取整个页面的OCR文本
    src = page.parent  # 页面所属文档
    blank_doc = pymupdf.open()  # 创建临时的1页文档
    blank_doc.insert_pdf(src, from_page=page.number, to_page=page.number, annots=False, links=False)  # 插入PDF页面
    pdfbytes = blank_doc.tobytes()  # 获取文档字节流
    inbytes = io.BytesIO(pdfbytes)  # 转换为BytesIO对象
    # 创建一个字节流对象,用于存储 ocrmypdf 处理后的结果 PDF
    outbytes = io.BytesIO()  # let ocrmypdf store its result pdf here
    # 使用 ocrmypdf 进行 OCR 处理
    ocrmypdf.ocr(
        inbytes,
        outbytes,
        language=lang,
        output_type="pdf",
        redo_ocr=None if settings.OCR_ALL_PAGES else True,
        force_ocr=True if settings.OCR_ALL_PAGES else None,
        progress_bar=False,
        optimize=False,
        fast_web_view=1e6,
        skip_big=15, # skip images larger than 15 megapixels
        tesseract_timeout=settings.TESSERACT_TIMEOUT,
        tesseract_non_ocr_timeout=settings.TESSERACT_TIMEOUT,
    )
    # 以 fitz PDF 格式打开 OCR 处理后的输出
    ocr_pdf = pymupdf.open("pdf", outbytes.getvalue())  # read output as fitz PDF
    # 获取 OCR 处理后的文本块信息
    blocks = ocr_pdf[0].get_text("dict", sort=True, flags=settings.TEXT_FLAGS)["blocks"]
    # 获取 OCR 处理后的完整文本
    full_text = ocr_pdf[0].get_text("text", sort=True, flags=settings.TEXT_FLAGS)

    # 确保原始 PDF/EPUB/MOBI 的边界框和 OCR 处理后的 PDF 的边界框相同
    assert page.bound() == ocr_pdf[0].bound()

    # 如果完整文本为空,则返回空列表
    if len(full_text) == 0:
        return []

    # 如果检测到 OCR 处理不良,则返回空列表
    if detect_bad_ocr(full_text, spellchecker):
        return []

    # 返回文本块信息
    return blocks

.\marker\marker\ocr\utils.py

# 导入必要的模块和类
from typing import Optional
from nltk import wordpunct_tokenize
from spellchecker import SpellChecker
from marker.settings import settings
import re

# 检测 OCR 文本质量是否差,返回布尔值
def detect_bad_ocr(text, spellchecker: Optional[SpellChecker], misspell_threshold=.7, space_threshold=.6, newline_threshold=.5, alphanum_threshold=.4):
    # 如果文本长度为0,则假定 OCR 失败
    if len(text) == 0:
        return True

    # 使用 wordpunct_tokenize 函数将文本分词
    words = wordpunct_tokenize(text)
    # 过滤掉空白字符
    words = [w for w in words if w.strip()]
    # 提取文本中的字母数字字符
    alpha_words = [word for word in words if word.isalnum()]

    # 如果提供了拼写检查器
    if spellchecker:
        # 检查文本中的拼写错误
        misspelled = spellchecker.unknown(alpha_words)
        # 如果拼写错误数量超过阈值,则返回 True
        if len(misspelled) > len(alpha_words) * misspell_threshold:
            return True

    # 计算文本中空格的数量
    spaces = len(re.findall(r'\s+', text))
    # 计算文本中字母字符的数量
    alpha_chars = len(re.sub(r'\s+', '', text))
    # 如果空格占比超过阈值,则返回 True
    if spaces / (alpha_chars + spaces) > space_threshold:
        return True

    # 计算文本中换行符的数量
    newlines = len(re.findall(r'\n+', text))
    # 计算文本中非换行符的数量
    non_newlines = len(re.sub(r'\n+', '', text))
    # 如果换行符占比超过阈值,则返回 True
    if newlines / (newlines + non_newlines) > newline_threshold:
        return True

    # 如果文本中字母数字字符比例低于阈值,则返回 True
    if alphanum_ratio(text) < alphanum_threshold: # Garbled text
        return True

    # 计算文本中无效字符的数量
    invalid_chars = len([c for c in text if c in settings.INVALID_CHARS])
    # 如果无效字符数量超过阈值,则返回 True
    if invalid_chars > max(3.0, len(text) * .02):
        return True

    # 默认情况下返回 False
    return False

# 将字体标志拆解为可读的形式
def font_flags_decomposer(flags):
    l = []
    # 检查字体标志中是否包含上标
    if flags & 2 ** 0:
        l.append("superscript")
    # 检查字体标志中是否包含斜体
    if flags & 2 ** 1:
        l.append("italic")
    # 检查字体标志中是否包含衬线
    if flags & 2 ** 2:
        l.append("serifed")
    else:
        l.append("sans")
    # 检查字体标志中是否包含等宽字体
    if flags & 2 ** 3:
        l.append("monospaced")
    else:
        l.append("proportional")
    # 检查字体标志中是否包含粗体
    if flags & 2 ** 4:
        l.append("bold")
    # 返回拆解后的字体标志字符串
    return "_".join(l)

# 计算文本中字母数字字符的比例
def alphanum_ratio(text):
    # 去除文本中的空格和换行符
    text = text.replace(" ", "")
    text = text.replace("\n", "")
    # 统计文本中的字母数字字符数量
    alphanumeric_count = sum([1 for c in text if c.isalnum()])

    # 如果文本长度为0,则返回1
    if len(text) == 0:
        return 1

    # 计算字母数字字符比例
    ratio = alphanumeric_count / len(text)
    # 返回变量 ratio 的值
    return ratio

.\marker\marker\ordering.py

# 导入必要的模块
from copy import deepcopy
from typing import List
import torch
import sys, os
from marker.extract_text import convert_single_page
from transformers import LayoutLMv3ForSequenceClassification, LayoutLMv3Processor
from PIL import Image
import io
from marker.schema import Page
from marker.settings import settings

# 从设置中加载 LayoutLMv3Processor 模型
processor = LayoutLMv3Processor.from_pretrained(settings.ORDERER_MODEL_NAME)

# 加载 LayoutLMv3ForSequenceClassification 模型
def load_ordering_model():
    model = LayoutLMv3ForSequenceClassification.from_pretrained(
        settings.ORDERER_MODEL_NAME,
        torch_dtype=settings.MODEL_DTYPE,
    ).to(settings.TORCH_DEVICE_MODEL)
    model.eval()
    return model

# 获取推理数据
def get_inference_data(page, page_blocks: Page):
    # 深拷贝页面块的边界框
    bboxes = deepcopy([block.bbox for block in page_blocks.blocks])
    # 初始化单词列表
    words = ["."] * len(bboxes)

    # 获取页面的像素图像
    pix = page.get_pixmap(dpi=settings.LAYOUT_DPI, annots=False, clip=page_blocks.bbox)
    # 将像素图像转换为 PNG 格式
    png = pix.pil_tobytes(format="PNG")
    # 将 PNG 数据转换为 RGB 图像
    rgb_image = Image.open(io.BytesIO(png)).convert("RGB")

    # 获取页面块的边界框和宽高
    page_box = page_blocks.bbox
    pwidth = page_blocks.width
    pheight = page_blocks.height

    # 调整边界框的值
    for box in bboxes:
        if box[0] < page_box[0]:
            box[0] = page_box[0]
        if box[1] < page_box[1]:
            box[1] = page_box[1]
        if box[2] > page_box[2]:
            box[2] = page_box[2]
        if box[3] > page_box[3]:
            box[3] = page_box[3]

        # 将边界框的值转换为相对于页面宽高的比例
        box[0] = int(box[0] / pwidth * 1000)
        box[1] = int(box[1] / pheight * 1000)
        box[2] = int(box[2] / pwidth * 1000)
        box[3] = int(box[3] / pheight * 1000)

    return rgb_image, bboxes, words

# 批量推理
def batch_inference(rgb_images, bboxes, words, model):
    # 对 RGB 图像、单词和边界框进行编码
    encoding = processor(
        rgb_images,
        text=words,
        boxes=bboxes,
        return_tensors="pt",
        truncation=True,
        padding="max_length",
        max_length=128
    )

    # 将像素值转换为模型的数据类型
    encoding["pixel_values"] = encoding["pixel_values"].to(model.dtype)
    # 进入推断模式,不进行梯度计算
    with torch.inference_mode():
        # 将指定的键对应的值移动到模型所在设备上
        for k in ["bbox", "input_ids", "pixel_values", "attention_mask"]:
            encoding[k] = encoding[k].to(model.device)
        # 使用模型进行推理,获取输出
        outputs = model(**encoding)
        # 获取模型输出的预测结果
        logits = outputs.logits

    # 获取预测结果中概率最大的类别索引,并转换为列表
    predictions = logits.argmax(-1).squeeze().tolist()
    # 如果预测结果是整数,则转换为列表
    if isinstance(predictions, int):
        predictions = [predictions]
    # 将预测结果转换为类别标签
    predictions = [model.config.id2label[p] for p in predictions]
    # 返回预测结果
    return predictions
# 为文档中的每个块添加列数计数
def add_column_counts(doc, doc_blocks, model, batch_size):
    # 按照批量大小遍历文档块
    for i in range(0, len(doc_blocks), batch_size):
        # 创建当前批量的索引范围
        batch = range(i, min(i + batch_size, len(doc_blocks)))
        # 初始化空列表用于存储 RGB 图像、边界框和单词
        rgb_images = []
        bboxes = []
        words = []
        # 遍历当前批量的页码
        for pnum in batch:
            # 获取推理数据:RGB 图像、页边界框和页单词
            page = doc[pnum]
            rgb_image, page_bboxes, page_words = get_inference_data(page, doc_blocks[pnum])
            rgb_images.append(rgb_image)
            bboxes.append(page_bboxes)
            words.append(page_words)

        # 进行批量推理,获取预测结果
        predictions = batch_inference(rgb_images, bboxes, words, model)
        # 将预测结果与页码对应,更新文档块的列数计数
        for pnum, prediction in zip(batch, predictions):
            doc_blocks[pnum].column_count = prediction

# 对文档块进行排序
def order_blocks(doc, doc_blocks: List[Page], model, batch_size=settings.ORDERER_BATCH_SIZE):
    # 添加列数计数
    add_column_counts(doc, doc_blocks, model, batch_size)

    # 遍历文档块中的每一页
    for page_blocks in doc_blocks:
        # 如果该页的列数大于1
        if page_blocks.column_count > 1:
            # 根据位置重新排序块
            split_pos = page_blocks.x_start + page_blocks.width / 2
            left_blocks = []
            right_blocks = []
            # 遍历该页的每个块
            for block in page_blocks.blocks:
                # 根据位置将块分为左右两部分
                if block.x_start <= split_pos:
                    left_blocks.append(block)
                else:
                    right_blocks.append(block)
            # 更新该页的块顺序
            page_blocks.blocks = left_blocks + right_blocks
    # 返回排序后的文档块
    return doc_blocks

.\marker\marker\postprocessors\editor.py

# 导入必要的库
from collections import defaultdict, Counter
from itertools import chain
from typing import Optional

# 导入 transformers 库中的 AutoTokenizer 类
from transformers import AutoTokenizer

# 导入 settings 模块中的 settings 变量
from marker.settings import settings

# 导入 torch 库
import torch
import torch.nn.functional as F

# 导入 marker.postprocessors.t5 模块中的 T5ForTokenClassification 类和 byt5_tokenize 函数
from marker.postprocessors.t5 import T5ForTokenClassification, byt5_tokenize

# 定义加载编辑模型的函数
def load_editing_model():
    # 如果未启用编辑模型,则返回 None
    if not settings.ENABLE_EDITOR_MODEL:
        return None

    # 从预训练模型中加载 T5ForTokenClassification 模型
    model = T5ForTokenClassification.from_pretrained(
            settings.EDITOR_MODEL_NAME,
            torch_dtype=settings.MODEL_DTYPE,
        ).to(settings.TORCH_DEVICE_MODEL)
    model.eval()

    # 配置模型的标签映射
    model.config.label2id = {
        "equal": 0,
        "delete": 1,
        "newline-1": 2,
        "space-1": 3,
    }
    model.config.id2label = {v: k for k, v in model.config.label2id.items()}
    return model

# 定义编辑全文的函数
def edit_full_text(text: str, model: Optional[T5ForTokenClassification], batch_size: int = settings.EDITOR_BATCH_SIZE):
    # 如果模型为空,则直接返回原始文本和空字典
    if not model:
        return text, {}

    # 对文本进行 tokenization
    tokenized = byt5_tokenize(text, settings.EDITOR_MAX_LENGTH)
    input_ids = tokenized["input_ids"]
    char_token_lengths = tokenized["char_token_lengths"]

    # 准备 token_masks 列表
    token_masks = []
    # 遍历输入的 input_ids,按照 batch_size 进行分批处理
    for i in range(0, len(input_ids), batch_size):
        # 从 tokenized 中获取当前 batch 的 input_ids
        batch_input_ids = tokenized["input_ids"][i: i + batch_size]
        # 将 batch_input_ids 转换为 torch 张量,并指定设备为 model 的设备
        batch_input_ids = torch.tensor(batch_input_ids, device=model.device)
        # 从 tokenized 中获取当前 batch 的 attention_mask
        batch_attention_mask = tokenized["attention_mask"][i: i + batch_size]
        # 将 batch_attention_mask 转换为 torch 张量,并指定设备为 model 的设备
        batch_attention_mask = torch.tensor(batch_attention_mask, device=model.device)
        
        # 进入推理模式
        with torch.inference_mode():
            # 使用模型进行预测
            predictions = model(batch_input_ids, attention_mask=batch_attention_mask)

        # 将预测结果 logits 移动到 CPU 上
        logits = predictions.logits.cpu()

        # 如果最大概率小于阈值,则假设为不良预测
        # 我们希望保守一点,不要对文本进行过多编辑
        probs = F.softmax(logits, dim=-1)
        max_prob = torch.max(probs, dim=-1)
        cutoff_prob = max_prob.values < settings.EDITOR_CUTOFF_THRESH
        labels = logits.argmax(-1)
        labels[cutoff_prob] = model.config.label2id["equal"]
        labels = labels.squeeze().tolist()
        if len(labels) == settings.EDITOR_MAX_LENGTH:
            labels = [labels]
        labels = list(chain.from_iterable(labels))
        token_masks.extend(labels)

    # 文本中的字符列表
    flat_input_ids = list(chain.from_iterable(input_ids)

    # 去除特殊标记 0,1。保留未知标记,尽管它不应该被使用
    assert len(token_masks) == len(flat_input_ids)
    token_masks = [mask for mask, token in zip(token_masks, flat_input_ids) if token >= 2]

    # 确保 token_masks 的长度与文本编码后的长度相等
    assert len(token_masks) == len(list(text.encode("utf-8")))

    # 统计编辑次数的字典
    edit_stats = defaultdict(int)
    # 输出文本列表
    out_text = []
    # 起始位置
    start = 0
    # 遍历文本中的每个字符及其索引
    for i, char in enumerate(text):
        # 获取当前字符对应的 token 长度
        char_token_length = char_token_lengths[i]
        # 获取当前字符对应的 token 的 mask
        masks = token_masks[start: start + char_token_length]
        # 将 mask 转换为标签
        labels = [model.config.id2label[mask] for mask in masks]
        # 如果所有标签都是 "delete",则执行删除操作
        if all(l == "delete" for l in labels):
            # 如果删除的是空格,则保留,否则忽略
            if char.strip():
                out_text.append(char)
            else:
                edit_stats["delete"] += 1
        # 如果标签为 "newline-1",则添加换行符
        elif labels[0] == "newline-1":
            out_text.append("\n")
            out_text.append(char)
            edit_stats["newline-1"] += 1
        # 如果标签为 "space-1",则添加空格
        elif labels[0] == "space-1":
            out_text.append(" ")
            out_text.append(char)
            edit_stats["space-1"] += 1
        # 如果标签为其他情况,则保留字符
        else:
            out_text.append(char)
            edit_stats["equal"] += 1

        # 更新下一个字符的起始位置
        start += char_token_length

    # 将处理后的文本列表转换为字符串
    out_text = "".join(out_text)
    # 返回处理后的文本及编辑统计信息
    return out_text, edit_stats

.\marker\marker\postprocessors\t5.py

# 从 transformers 库中导入 T5Config 和 T5PreTrainedModel 类
from transformers import T5Config, T5PreTrainedModel
# 导入 torch 库
import torch
# 从 torch 库中导入 nn 模块
from torch import nn
# 从 copy 库中导入 deepcopy 函数
from copy import deepcopy
# 从 typing 库中导入 Optional, Tuple, Union, List 类型
from typing import Optional, Tuple, Union, List
# 从 itertools 库中导入 chain 函数
from itertools import chain

# 从 transformers.modeling_outputs 模块中导入 TokenClassifierOutput 类
from transformers.modeling_outputs import TokenClassifierOutput
# 从 transformers.models.t5.modeling_t5 模块中导入 T5Stack 类
from transformers.models.t5.modeling_t5 import T5Stack
# 从 transformers.utils.model_parallel_utils 模块中导入 get_device_map, assert_device_map 函数
from transformers.utils.model_parallel_utils import get_device_map, assert_device_map

# 定义一个函数,用于将文本进行字节编码并分词
def byt5_tokenize(text: str, max_length: int, pad_token_id: int = 0):
    # 初始化一个空列表,用于存储字节编码
    byte_codes = []
    # 遍历文本中的每个字符
    for char in text:
        # 将每个字符进行 UTF-8 编码,并加上 3 以考虑特殊标记
        byte_codes.append([byte + 3 for byte in char.encode('utf-8')])

    # 将字节编码展开成一个列表
    tokens = list(chain.from_iterable(byte_codes))
    # 记录每个字符对应的 token 长度
    char_token_lengths = [len(b) for b in byte_codes]

    # 初始化批量 token 和注意力掩码列表
    batched_tokens = []
    attention_mask = []
    # 按照最大长度将 token 进行分批
    for i in range(0, len(tokens), max_length):
        batched_tokens.append(tokens[i:i + max_length])
        attention_mask.append([1] * len(batched_tokens[-1])

    # 对最后一个批次进行填充
    if len(batched_tokens[-1]) < max_length:
        batched_tokens[-1] += [pad_token_id] * (max_length - len(batched_tokens[-1]))
        attention_mask[-1] += [0] * (max_length - len(attention_mask[-1]))

    # 返回包含分词结果的字典
    return {"input_ids": batched_tokens, "attention_mask": attention_mask, "char_token_lengths": char_token_lengths}

# 定义一个 T5ForTokenClassification 类,继承自 T5PreTrainedModel 类
class T5ForTokenClassification(T5PreTrainedModel):
    # 定义一个列表,用于指定加载时忽略的键
    _keys_to_ignore_on_load_missing = [r"encoder.embed_tokens.weight"]
    # 初始化函数,接受一个T5Config对象作为参数
    def __init__(self, config: T5Config):
        # 调用父类的初始化函数
        super().__init__(config)
        # 设置模型维度为配置中的d_model值
        self.model_dim = config.d_model

        # 创建一个共享的嵌入层,词汇表大小为config.vocab_size,维度为config.d_model
        self.shared = nn.Embedding(config.vocab_size, config.d_model)

        # 复制配置对象,用于创建编码器
        encoder_config = deepcopy(config)
        encoder_config.is_decoder = False
        encoder_config.is_encoder_decoder = False
        encoder_config.use_cache = False
        # 创建T5Stack编码器
        self.encoder = T5Stack(encoder_config, self.shared)

        # 设置分类器的dropout值
        classifier_dropout = (
            config.classifier_dropout if hasattr(config, 'classifier_dropout') else config.dropout_rate
        )
        self.dropout = nn.Dropout(classifier_dropout)
        # 创建一个线性层,输入维度为config.d_model,输出维度为config.num_labels
        self.classifier = nn.Linear(config.d_model, config.num_labels)

        # 初始化权重并应用最终处理
        self.post_init()

        # 模型并行化
        self.model_parallel = False
        self.device_map = None


    # 并行化函数,接受一个设备映射device_map作为参数
    def parallelize(self, device_map=None):
        # 如果未提供device_map,则根据编码器块的数量和GPU数量生成一个默认的device_map
        self.device_map = (
            get_device_map(len(self.encoder.block), range(torch.cuda.device_count()))
            if device_map is None
            else device_map
        )
        # 检查设备映射的有效性
        assert_device_map(self.device_map, len(self.encoder.block))
        # 将编码器并行化
        self.encoder.parallelize(self.device_map)
        # 将分类器移动到编码器的第一个设备上
        self.classifier.to(self.encoder.first_device)
        self.model_parallel = True

    # 反并行化函数
    def deparallelize(self):
        # 取消编码器的并行化
        self.encoder.deparallelize()
        # 将编码器和分类器移动到CPU上
        self.encoder = self.encoder.to("cpu")
        self.classifier = self.classifier.to("cpu")
        self.model_parallel = False
        self.device_map = None
        # 释放GPU缓存
        torch.cuda.empty_cache()

    # 获取输入嵌入层函数
    def get_input_embeddings(self):
        return self.shared

    # 设置输入嵌入层函数,接受一个新的嵌入层new_embeddings作为参数
    def set_input_embeddings(self, new_embeddings):
        self.shared = new_embeddings
        # 设置编码器的输入嵌入层为新的嵌入层
        self.encoder.set_input_embeddings(new_embeddings)

    # 获取编码器函数
    def get_encoder(self):
        return self.encoder
    # 对模型中的特定头部进行修剪
    def _prune_heads(self, heads_to_prune):
        # 遍历需要修剪的层和头部
        for layer, heads in heads_to_prune.items():
            # 调用 SelfAttention 模块的 prune_heads 方法进行修剪
            self.encoder.block[layer].layer[0].SelfAttention.prune_heads(heads)

    # 前向传播函数
    def forward(
        self,
        input_ids: Optional[torch.LongTensor] = None,
        attention_mask: Optional[torch.FloatTensor] = None,
        head_mask: Optional[torch.FloatTensor] = None,
        inputs_embeds: Optional[torch.FloatTensor] = None,
        labels: Optional[torch.LongTensor] = None,
        output_attentions: Optional[bool] = None,
        output_hidden_states: Optional[bool] = None,
        return_dict: Optional[bool] = None,
    ) -> Union[Tuple[torch.FloatTensor], TokenClassifierOutput]:
        # 如果 return_dict 为 None,则使用配置中的 use_return_dict
        return_dict = return_dict if return_dict is not None else self.config.use_return_dict

        # 调用编码器进行前向传播
        outputs = self.encoder(
            input_ids=input_ids,
            attention_mask=attention_mask,
            inputs_embeds=inputs_embeds,
            head_mask=head_mask,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )

        # 获取序列输出
        sequence_output = outputs[0]

        # 对序列输出进行 dropout
        sequence_output = self.dropout(sequence_output)
        # 将序列输出传入分类器得到 logits
        logits = self.classifier(sequence_output)

        # 初始化损失为 None
        loss = None

        # 如果不使用 return_dict,则返回输出结果
        if not return_dict:
            output = (logits,) + outputs[2:]
            return ((loss,) + output) if loss is not None else output

        # 使用 TokenClassifierOutput 类返回结果
        return TokenClassifierOutput(
            loss=loss,
            logits=logits,
            hidden_states=outputs.hidden_states,
            attentions=outputs.attentions
        )

.\marker\marker\schema.py

# 导入 Counter 类,用于计数
# 导入 List、Optional、Tuple 类型,用于类型提示
from collections import Counter
from typing import List, Optional, Tuple

# 导入 BaseModel、field_validator 类,用于定义数据模型和字段验证
# 导入 ftfy 模块,用于修复文本中的 Unicode 错误
from pydantic import BaseModel, field_validator
import ftfy

# 导入 boxes_intersect_pct、multiple_boxes_intersect 函数,用于计算两个框的交集比例和多个框的交集情况
# 导入 settings 模块,用于获取配置信息
from marker.bbox import boxes_intersect_pct, multiple_boxes_intersect
from marker.settings import settings

# 定义函数 find_span_type,用于查找给定 span 在页面块中的类型
def find_span_type(span, page_blocks):
    # 默认块类型为 "Text"
    block_type = "Text"
    # 遍历页面块列表
    for block in page_blocks:
        # 如果 span 的边界框与页面块的边界框有交集
        if boxes_intersect_pct(span.bbox, block.bbox):
            # 更新块类型为页面块的类型
            block_type = block.block_type
            break
    # 返回块类型
    return block_type

# 定义类 BboxElement,继承自 BaseModel 类,表示具有边界框的元素
class BboxElement(BaseModel):
    bbox: List[float]

    # 验证 bbox 字段是否包含 4 个元素
    @field_validator('bbox')
    @classmethod
    def check_4_elements(cls, v: List[float]) -> List[float]:
        if len(v) != 4:
            raise ValueError('bbox must have 4 elements')
        return v

    # 计算元素的高度、宽度、起始 x 坐标、起始 y 坐标、面积
    @property
    def height(self):
        return self.bbox[3] - self.bbox[1]

    @property
    def width(self):
        return self.bbox[2] - self.bbox[0]

    @property
    def x_start(self):
        return self.bbox[0]

    @property
    def y_start(self):
        return self.bbox[1]

    @property
    def area(self):
        return self.width * self.height

# 定义类 BlockType,继承自 BboxElement 类,表示具有块类型的元素
class BlockType(BboxElement):
    block_type: str

# 定义类 Span,继承自 BboxElement 类,表示具有文本内容的元素
class Span(BboxElement):
    text: str
    span_id: str
    font: str
    color: int
    ascender: Optional[float] = None
    descender: Optional[float] = None
    block_type: Optional[str] = None
    selected: bool = True

    # 修复文本中的 Unicode 错误
    @field_validator('text')
    @classmethod
    def fix_unicode(cls, text: str) -> str:
        return ftfy.fix_text(text)

# 定义类 Line,继承自 BboxElement 类,表示具有多个 Span 的行元素
class Line(BboxElement):
    spans: List[Span]

    # 获取行的预备文本,即所有 Span 的文本拼接而成
    @property
    def prelim_text(self):
        return "".join([s.text for s in self.spans])

    # 获取行的起始 x 坐标
    @property
    def start(self):
        return self.spans[0].bbox[0]

# 定义类 Block,继承自 BboxElement 类,表示具有多个 Line 的块元素
class Block(BboxElement):
    lines: List[Line]
    pnum: int

    # 获取块的预备文本,即所有 Line 的预备文本拼接而成
    @property
    def prelim_text(self):
        return "\n".join([l.prelim_text for l in self.lines])
    # 检查文本块是否包含公式,通过检查每个 span 的 block_type 是否为 "Formula" 来确定
    def contains_equation(self, equation_boxes=None):
        # 生成一个包含每个 span 的 block_type 是否为 "Formula" 的条件列表
        conditions = [s.block_type == "Formula" for l in self.lines for s in l.spans]
        # 如果提供了 equation_boxes 参数,则添加一个条件,检查文本块的边界框是否与给定框相交
        if equation_boxes:
            conditions += [multiple_boxes_intersect(self.bbox, equation_boxes)]
        # 返回条件列表中是否有任何一个条件为 True
        return any(conditions)

    # 过滤掉包含在 bad_span_ids 中的 span
    def filter_spans(self, bad_span_ids):
        new_lines = []
        for line in self.lines:
            new_spans = []
            for span in line.spans:
                # 如果 span 的 span_id 不在 bad_span_ids 中,则保留该 span
                if not span.span_id in bad_span_ids:
                    new_spans.append(span)
            # 更新 line 的 spans 属性为过滤后的 new_spans
            line.spans = new_spans
            # 如果 line 中仍有 spans,则将其添加到 new_lines 中
            if len(new_spans) > 0:
                new_lines.append(line)
        # 更新 self.lines 为过滤后的 new_lines
        self.lines = new_lines

    # 过滤掉包含在 settings.BAD_SPAN_TYPES 中的 span 的 block_type
    def filter_bad_span_types(self):
        new_lines = []
        for line in self.lines:
            new_spans = []
            for span in line.spans:
                # 如果 span 的 block_type 不在 BAD_SPAN_TYPES 中,则保留该 span
                if span.block_type not in settings.BAD_SPAN_TYPES:
                    new_spans.append(span)
            # 更新 line 的 spans 属性为过滤后的 new_spans
            line.spans = new_spans
            # 如果 line 中仍有 spans,则将其添加到 new_lines 中
            if len(new_spans) > 0:
                new_lines.append(line)
        # 更新 self.lines 为过滤后的 new_lines
        self.lines = new_lines

    # 返回文本块中出现频率最高的 block_type
    def most_common_block_type(self):
        # 统计每个 span 的 block_type 出现的次数
        counter = Counter([s.block_type for l in self.lines for s in l.spans])
        # 返回出现次数最多的 block_type
        return counter.most_common(1)[0][0]

    # 设置文本块中所有 span 的 block_type 为给定的 block_type
    def set_block_type(self, block_type):
        for line in self.lines:
            for span in line.spans:
                # 将 span 的 block_type 设置为给定的 block_type
                span.block_type = block_type
# 定义一个名为 Page 的类,继承自 BboxElement 类
class Page(BboxElement):
    # 类属性:blocks 为 Block 对象列表,pnum 为整数,column_count 和 rotation 可选整数,默认为 None
    blocks: List[Block]
    pnum: int
    column_count: Optional[int] = None
    rotation: Optional[int] = None # 页面的旋转角度

    # 获取页面中非空行的方法
    def get_nonblank_lines(self):
        # 获取页面中所有行
        lines = self.get_all_lines()
        # 过滤出非空行
        nonblank_lines = [l for l in lines if l.prelim_text.strip()]
        return nonblank_lines

    # 获取页面中所有行的方法
    def get_all_lines(self):
        # 获取页面中所有行的列表
        lines = [l for b in self.blocks for l in b.lines]
        return lines

    # 获取页面中非空跨度的方法,返回 Span 对象列表
    def get_nonblank_spans(self) -> List[Span]:
        # 获取页面中所有行
        lines = [l for b in self.blocks for l in b.lines]
        # 过滤出非空跨度
        spans = [s for l in lines for s in l.spans if s.text.strip()]
        return spans

    # 添加块类型到行的方法
    def add_block_types(self, page_block_types):
        # 如果检测到的块类型数量与页面行数不匹配,则打印警告信息
        if len(page_block_types) != len(self.get_all_lines()):
            print(f"Warning: Number of detected lines {len(page_block_types)} does not match number of lines {len(self.get_all_lines())}")

        i = 0
        for block in self.blocks:
            for line in block.lines:
                if i < len(page_block_types):
                    line_block_type = page_block_types[i].block_type
                else:
                    line_block_type = "Text"
                i += 1
                for span in line.spans:
                    span.block_type = line_block_type

    # 获取页面中字体统计信息的方法
    def get_font_stats(self):
        # 获取页面中非空跨度的字体信息
        fonts = [s.font for s in self.get_nonblank_spans()]
        # 统计字体出现次数
        font_counts = Counter(fonts)
        return font_counts

    # 获取页面中行高统计信息的方法
    def get_line_height_stats(self):
        # 获取页面中非空行的行高信息
        heights = [l.bbox[3] - l.bbox[1] for l in self.get_nonblank_lines()]
        # 统计行高出现次数
        height_counts = Counter(heights)
        return height_counts

    # 获取页面中行起始位置统计信息的方法
    def get_line_start_stats(self):
        # 获取页面中非空行的起始位置信息
        starts = [l.bbox[0] for l in self.get_nonblank_lines()]
        # 统计起始位置出现次数
        start_counts = Counter(starts)
        return start_counts
    # 获取文本块中非空行的起始位置列表
    def get_min_line_start(self):
        # 通过列表推导式获取非空行的起始位置,并且该行为文本类型
        starts = [l.bbox[0] for l in self.get_nonblank_lines() if l.spans[0].block_type == "Text"]
        # 如果没有找到非空行,则抛出索引错误
        if len(starts) == 0:
            raise IndexError("No lines found")
        # 返回起始位置列表中的最小值
        return min(starts)

    # 获取文本块中每个文本块的 prelim_text 属性,并用换行符连接成字符串
    @property
    def prelim_text(self):
        return "\n".join([b.prelim_text for b in self.blocks])
# 定义一个继承自BboxElement的MergedLine类,包含文本和字体列表属性
class MergedLine(BboxElement):
    text: str
    fonts: List[str]

    # 返回该行中出现频率最高的字体
    def most_common_font(self):
        # 统计字体列表中各个字体出现的次数
        counter = Counter(self.fonts)
        # 返回出现频率最高的字体
        return counter.most_common(1)[0][0]


# 定义一个继承自BboxElement的MergedBlock类,包含行列表、段落号和块类型列表属性
class MergedBlock(BboxElement):
    lines: List[MergedLine]
    pnum: int
    block_types: List[str]

    # 返回该块中出现频率最高的块类型
    def most_common_block_type(self):
        # 统计块类型列表中各个类型出现的次数
        counter = Counter(self.block_types)
        # 返回出现频率最高的块类型
        return counter.most_common(1)[0][0]


# 定义一个继承自BaseModel的FullyMergedBlock类,包含文本和块类型属性
class FullyMergedBlock(BaseModel):
    text: str
    block_type: str

.\marker\marker\segmentation.py

# 导入所需的库
from concurrent.futures import ThreadPoolExecutor
from typing import List

from transformers import LayoutLMv3ForTokenClassification

# 导入自定义的模块
from marker.bbox import unnormalize_box
from transformers.models.layoutlmv3.image_processing_layoutlmv3 import normalize_box
import io
from PIL import Image
from transformers import LayoutLMv3Processor
import numpy as np
from marker.settings import settings
from marker.schema import Page, BlockType
import torch
from math import isclose

# 设置图像最大像素值,避免部分图像被截断
Image.MAX_IMAGE_PIXELS = None

# 从预训练模型加载 LayoutLMv3Processor
processor = LayoutLMv3Processor.from_pretrained(settings.LAYOUT_MODEL_NAME, apply_ocr=False)

# 定义需要分块的键和不需要分块的键
CHUNK_KEYS = ["input_ids", "attention_mask", "bbox", "offset_mapping"]
NO_CHUNK_KEYS = ["pixel_values"]

# 加载 LayoutLMv3ForTokenClassification 模型
def load_layout_model():
    # 从预训练模型加载 LayoutLMv3ForTokenClassification 模型
    model = LayoutLMv3ForTokenClassification.from_pretrained(
        settings.LAYOUT_MODEL_NAME,
        torch_dtype=settings.MODEL_DTYPE,
    ).to(settings.TORCH_DEVICE_MODEL)

    # 设置模型的标签映射
    model.config.id2label = {
        0: "Caption",
        1: "Footnote",
        2: "Formula",
        3: "List-item",
        4: "Page-footer",
        5: "Page-header",
        6: "Picture",
        7: "Section-header",
        8: "Table",
        9: "Text",
        10: "Title"
    }

    model.config.label2id = {v: k for k, v in model.config.id2label.items()}
    return model

# 检测文档块类型
def detect_document_block_types(doc, blocks: List[Page], layoutlm_model, batch_size=settings.LAYOUT_BATCH_SIZE):
    # 获取特征编码、元数据和样本长度
    encodings, metadata, sample_lengths = get_features(doc, blocks)
    # 预测块类型
    predictions = predict_block_types(encodings, layoutlm_model, batch_size)
    # 将预测结果与框匹配
    block_types = match_predictions_to_boxes(encodings, predictions, metadata, sample_lengths, layoutlm_model)
    # 断言块类型数量与块数量相等
    assert len(block_types) == len(blocks)
    return block_types

# 获取临时框
def get_provisional_boxes(pred, box, is_subword, start_idx=0):
    # 从预测结果中获取临时框
    prov_predictions = [pred_ for idx, pred_ in enumerate(pred) if not is_subword[idx]][start_idx:]
    # 从列表中筛选出不是子词的元素,并从指定索引开始切片,得到新的列表
    prov_boxes = [box_ for idx, box_ in enumerate(box) if not is_subword[idx]][start_idx:]
    # 返回处理后的预测结果和框
    return prov_predictions, prov_boxes
# 获取页面编码信息,输入参数为页面和页面块对象
def get_page_encoding(page, page_blocks: Page):
    # 如果页面块中的所有行数为0,则返回空列表
    if len(page_blocks.get_all_lines()) == 0:
        return [], []

    # 获取页面块的边界框、宽度和高度
    page_box = page_blocks.bbox
    pwidth = page_blocks.width
    pheight = page_blocks.height

    # 获取页面块的像素图,并转换为 PNG 格式
    pix = page.get_pixmap(dpi=settings.LAYOUT_DPI, annots=False, clip=page_blocks.bbox)
    png = pix.pil_tobytes(format="PNG")
    png_image = Image.open(io.BytesIO(png))
    # 如果图像太大,则缩小以适应模型
    rgb_image = png_image.convert('RGB')
    rgb_width, rgb_height = rgb_image.size

    # 确保图像大小与 PDF 页面的比例正确
    assert isclose(rgb_width / pwidth, rgb_height / pheight, abs_tol=2e-2)

    # 获取页面块中的所有行
    lines = page_blocks.get_all_lines()

    boxes = []
    text = []
    for line in lines:
        box = line.bbox
        # 处理边界框溢出的情况
        if box[0] < page_box[0]:
            box[0] = page_box[0]
        if box[1] < page_box[1]:
            box[1] = page_box[1]
        if box[2] > page_box[2]:
            box[2] = page_box[2]
        if box[3] > page_box[3]:
            box[3] = page_box[3]

        # 处理边界框宽度或高度为0或负值的情况
        if box[2] <= box[0]:
            print("Zero width box found, cannot convert properly")
            raise ValueError
        if box[3] <= box[1]:
            print("Zero height box found, cannot convert properly")
            raise ValueError
        boxes.append(box)
        text.append(line.prelim_text)

    # 将边界框归一化为模型(缩放为1000x1000)
    boxes = [normalize_box(box, pwidth, pheight) for box in boxes]
    for box in boxes:
        # 验证所有边界框都是有效的
        assert(len(box) == 4)
        assert(max(box)) <= 1000
        assert(min(box)) >= 0
    # 使用 processor 处理 RGB 图像,传入文本、框、返回偏移映射等参数
    encoding = processor(
        rgb_image,
        text=text,
        boxes=boxes,
        return_offsets_mapping=True,
        truncation=True,
        return_tensors="pt",
        stride=settings.LAYOUT_CHUNK_OVERLAP,
        padding="max_length",
        max_length=settings.LAYOUT_MODEL_MAX,
        return_overflowing_tokens=True
    )
    # 从 encoding 中弹出 offset_mapping 和 overflow_to_sample_mapping
    offset_mapping = encoding.pop('offset_mapping')
    overflow_to_sample_mapping = encoding.pop('overflow_to_sample_mapping')
    # 将 encoding 中的 bbox、input_ids、attention_mask、pixel_values 转换为列表
    bbox = list(encoding["bbox"])
    input_ids = list(encoding["input_ids"])
    attention_mask = list(encoding["attention_mask"])
    pixel_values = list(encoding["pixel_values"])

    # 断言各列表长度相等
    assert len(bbox) == len(input_ids) == len(attention_mask) == len(pixel_values) == len(offset_mapping)

    # 将各列表中的元素组成字典,放入 list_encoding 列表中
    list_encoding = []
    for i in range(len(bbox)):
        list_encoding.append({
            "bbox": bbox[i],
            "input_ids": input_ids[i],
            "attention_mask": attention_mask[i],
            "pixel_values": pixel_values[i],
            "offset_mapping": offset_mapping[i]
        })

    # 其他数据包括原始框、pwidth 和 pheight
    other_data = {
        "original_bbox": boxes,
        "pwidth": pwidth,
        "pheight": pheight,
    }
    # 返回 list_encoding 和 other_data
    return list_encoding, other_data
# 获取文档的特征信息
def get_features(doc, blocks):
    # 初始化编码、元数据和样本长度列表
    encodings = []
    metadata = []
    sample_lengths = []
    # 遍历每个块
    for i in range(len(blocks)):
        # 调用函数获取页面编码和其他数据
        encoding, other_data = get_page_encoding(doc[i], blocks[i])
        # 将页面编码添加到编码列表中
        encodings.extend(encoding)
        # 将其他数据添加到元数据列表中
        metadata.append(other_data)
        # 记录当前页面编码的长度
        sample_lengths.append(len(encoding))
    # 返回编码、元数据和样本长度
    return encodings, metadata, sample_lengths


# 预测块类型
def predict_block_types(encodings, layoutlm_model, batch_size):
    # 初始化所有预测结果列表
    all_predictions = []
    # 按批次处理编码
    for i in range(0, len(encodings), batch_size):
        # 计算当前批次的起始和结束索引
        batch_start = i
        batch_end = min(i + batch_size, len(encodings))
        # 获取当前批次的编码
        batch = encodings[batch_start:batch_end]

        # 构建模型输入
        model_in = {}
        for k in ["bbox", "input_ids", "attention_mask", "pixel_values"]:
            model_in[k] = torch.stack([b[k] for b in batch]).to(layoutlm_model.device)

        model_in["pixel_values"] = model_in["pixel_values"].to(layoutlm_model.dtype)

        # 进入推理模式
        with torch.inference_mode():
            # 使用模型进行推理
            outputs = layoutlm_model(**model_in)
            logits = outputs.logits

        # 获取预测结果
        predictions = logits.argmax(-1).squeeze().tolist()
        if len(predictions) == settings.LAYOUT_MODEL_MAX:
            predictions = [predictions]
        # 将预测结果添加到所有预测结果列表中
        all_predictions.extend(predictions)
    # 返回所有预测结果
    return all_predictions


# 将预测结果与框匹配
def match_predictions_to_boxes(encodings, predictions, metadata, sample_lengths, layoutlm_model) -> List[List[BlockType]]:
    # 断言编码、预测结果和样本长度的长度相等
    assert len(encodings) == len(predictions) == sum(sample_lengths)
    assert len(metadata) == len(sample_lengths)

    # 初始化页面起始索引和页面块类型列表
    page_start = 0
    page_block_types = []
    # 返回页面块类型列表
    return page_block_types

.\marker\marker\settings.py

import os
from typing import Optional, List, Dict

from dotenv import find_dotenv
from pydantic import computed_field
from pydantic_settings import BaseSettings
import fitz as pymupdf
import torch

# 定义一个设置类,继承自BaseSettings
class Settings(BaseSettings):
    # General
    TORCH_DEVICE: Optional[str] = None

    # 计算属性,返回TORCH_DEVICE_MODEL
    @computed_field
    @property
    def TORCH_DEVICE_MODEL(self) -> str:
        # 如果TORCH_DEVICE不为None,则返回TORCH_DEVICE
        if self.TORCH_DEVICE is not None:
            return self.TORCH_DEVICE

        # 如果CUDA可用,则返回"cuda"
        if torch.cuda.is_available():
            return "cuda"

        # 如果MPS可用,则返回"mps"
        if torch.backends.mps.is_available():
            return "mps"

        # 否则返回"cpu"
        return "cpu"

    INFERENCE_RAM: int = 40 # 每个GPU的VRAM量(以GB为单位)。
    VRAM_PER_TASK: float = 2.5 # 每个任务分配的VRAM量(以GB为单位)。 峰值标记VRAM使用量约为3GB,但工作程序的平均值较低。
    DEFAULT_LANG: str = "English" # 我们假设文件所在的默认语言,应该是TESSERACT_LANGUAGES中的一个键

    SUPPORTED_FILETYPES: Dict = {
        "application/pdf": "pdf",
        "application/epub+zip": "epub",
        "application/x-mobipocket-ebook": "mobi",
        "application/vnd.ms-xpsdocument": "xps",
        "application/x-fictionbook+xml": "fb2"
    }

    # PyMuPDF
    TEXT_FLAGS: int = pymupdf.TEXTFLAGS_DICT & ~pymupdf.TEXT_PRESERVE_LIGATURES & ~pymupdf.TEXT_PRESERVE_IMAGES

    # OCR
    INVALID_CHARS: List[str] = [chr(0xfffd), "�"]
    OCR_DPI: int = 400
    TESSDATA_PREFIX: str = ""
    TESSERACT_LANGUAGES: Dict = {
        "English": "eng",
        "Spanish": "spa",
        "Portuguese": "por",
        "French": "fra",
        "German": "deu",
        "Russian": "rus",
        "Chinese": "chi_sim",
        "Japanese": "jpn",
        "Korean": "kor",
        "Hindi": "hin",
    }
    TESSERACT_TIMEOUT: int = 20 # 何时放弃OCR
    # 定义拼写检查语言对应的字典
    SPELLCHECK_LANGUAGES: Dict = {
        "English": "en",
        "Spanish": "es",
        "Portuguese": "pt",
        "French": "fr",
        "German": "de",
        "Russian": "ru",
        "Chinese": None,
        "Japanese": None,
        "Korean": None,
        "Hindi": None,
    }
    # 是否在每一页运行 OCR,即使可以提取文本
    OCR_ALL_PAGES: bool = False
    # 用于 OCR 的并行 CPU 工作线程数
    OCR_PARALLEL_WORKERS: int = 2
    # 使用的 OCR 引擎,可以是 "tesseract" 或 "ocrmypdf",ocrmypdf 质量更高但速度较慢
    OCR_ENGINE: str = "ocrmypdf"

    # Texify 模型相关参数
    TEXIFY_MODEL_MAX: int = 384 # Texify 的最大推理长度
    TEXIFY_TOKEN_BUFFER: int = 256 # Texify 的 token 缓冲区大小
    TEXIFY_DPI: int = 96 # 渲染图像的 DPI
    TEXIFY_BATCH_SIZE: int = 2 if TORCH_DEVICE_MODEL == "cpu" else 6 # Texify 的批处理大小,CPU 上较低因为使用 float32
    TEXIFY_MODEL_NAME: str = "vikp/texify"

    # Layout 模型相关参数
    BAD_SPAN_TYPES: List[str] = ["Caption", "Footnote", "Page-footer", "Page-header", "Picture"]
    LAYOUT_MODEL_MAX: int = 512
    LAYOUT_CHUNK_OVERLAP: int = 64
    LAYOUT_DPI: int = 96
    LAYOUT_MODEL_NAME: str = "vikp/layout_segmenter"
    LAYOUT_BATCH_SIZE: int = 8 # 最大 512 个 token 意味着较高的批处理大小

    # Ordering 模型相关参数
    ORDERER_BATCH_SIZE: int = 32 # 可以较高,因为最大 token 数为 128
    ORDERER_MODEL_NAME: str = "vikp/column_detector"

    # 最终编辑模型相关参数
    EDITOR_BATCH_SIZE: int = 4
    EDITOR_MAX_LENGTH: int = 1024
    EDITOR_MODEL_NAME: str = "vikp/pdf_postprocessor_t5"
    ENABLE_EDITOR_MODEL: bool = False # 编辑模型可能会产生误报
    EDITOR_CUTOFF_THRESH: float = 0.9 # 忽略概率低于此阈值的预测

    # Ray 相关参数
    RAY_CACHE_PATH: Optional[str] = None # 保存 Ray 缓存的路径
    RAY_CORES_PER_WORKER: int = 1 # 每个 worker 分配的 CPU 核心数

    # 调试相关参数
    DEBUG: bool = False # 启用调试日志
    # 调试数据文件夹路径,默认为 None
    DEBUG_DATA_FOLDER: Optional[str] = None
    # 调试级别,范围从 0 到 2,2 表示记录所有信息
    DEBUG_LEVEL: int = 0
    
    # 计算属性,返回是否使用 CUDA
    @computed_field
    @property
    def CUDA(self) -> bool:
        return "cuda" in self.TORCH_DEVICE
    
    # 计算属性,返回模型数据类型
    @computed_field
    @property
    def MODEL_DTYPE(self) -> torch.dtype:
        if self.TORCH_DEVICE_MODEL == "cuda":
            return torch.bfloat16
        else:
            return torch.float32
    
    # 计算属性,返回用于转换的数据类型
    @computed_field
    @property
    def TEXIFY_DTYPE(self) -> torch.dtype:
        return torch.float32 if self.TORCH_DEVICE_MODEL == "cpu" else torch.float16
    
    # 类配置
    class Config:
        # 从环境文件中查找 local.env 文件
        env_file = find_dotenv("local.env")
        # 额外配置,忽略错误
        extra = "ignore"
# 创建一个 Settings 对象实例
settings = Settings()

.\marker\scripts\verify_benchmark_scores.py

# 导入 json 模块和 argparse 模块
import json
import argparse

# 验证分数的函数,接收一个文件路径作为参数
def verify_scores(file_path):
    # 打开文件并加载 JSON 数据
    with open(file_path, 'r') as file:
        data = json.load(file)

    # 获取 multicolcnn.pdf 文件的分数
    multicolcnn_score = data["marker"]["files"]["multicolcnn.pdf"]["score"]
    # 获取 switch_trans.pdf 文件的分数
    switch_trans_score = data["marker"]["files"]["switch_trans.pdf"]["score"]

    # 如果其中一个分数小于等于 0.4,则抛出 ValueError 异常
    if multicolcnn_score <= 0.4 or switch_trans_score <= 0.4:
        raise ValueError("One or more scores are below the required threshold of 0.4")

# 如果当前脚本被直接执行
if __name__ == "__main__":
    # 创建 ArgumentParser 对象,设置描述信息
    parser = argparse.ArgumentParser(description="Verify benchmark scores")
    # 添加一个参数,指定文件路径,类型为字符串
    parser.add_argument("file_path", type=str, help="Path to the json file")
    # 解析命令行参数
    args = parser.parse_args()
    # 调用 verify_scores 函数,传入文件路径参数
    verify_scores(args.file_path)

标签:return,text,self,源码,Marker,import,model,解析,page
From: https://www.cnblogs.com/apachecn/p/18061413

相关文章

  • 原型模式解析
    本文是什么在前段时间学习了一些设计模式,里面的一些内容还是需要去值得思考的。其中有一种创建型模式叫做原型模式。在之前看的一些文章中,单例模式写的和拷贝构造函数差不多。但经过几天的思考,感觉还是和拷贝构造函数有着明显的区别,故写下此文表达我的想法。如果有不同的思考,可以......
  • springboot 2.4.0 启动源码分析
    SpringBoot启动的基本配置SpringBoot启动的配置主要有以下两个部分添加依赖最基本的springboot依赖只需要添加一个springboot启动器的依赖即可,如果是web服务则添加web的启动器<?xmlversion="1.0"encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"......
  • Spring-Framework6.1.x源码阅读环境搭建
    Spring-FrameWork6.1.x源码阅读环境搭建1.代码获取https://github.com/spring-projects/spring-framework.git。2.进入目录,修改目录下gradle/wrapper/gradle-wrapper.properties,看一下这里指定的gradle版本,distributionUrl=https://services.gradle.org/distributions/gradle-......
  • drf源码剖析----认证
    点击查看代码classRequest:def__init__(self,request,authenticators=None):self._request=requestself.authenticators=authenticatorsor()@propertydefuser(self):#类Request()中没有'_user',访问self._authenticate()......
  • React设计原理解密及核心源码解读
    一、React基础回顾1、react介绍React是一个用于构建用户界面的JavaScript库,它只负责应用的视图层,帮助开发人员构建快速且交互式的web应用程序。React使用组件的方式构建用户界面。2、JSX语法回顾在React中使用JSX语法描述用户界面,它是一种JavaScript语法扩展......
  • Sharding-JDBC源码解析与vivo的定制开发
    作者:vivoIT平台团队-XiongHuanxinSharding-JDBC是在JDBC层提供服务的数据库中间件,在分库分表场景具有广泛应用。本文对Sharding-JDBC的解析、路由、改写、执行、归并五大核心引擎进行了源码解析,并结合业务实践经验,总结了使用Sharding-JDBC的一些痛点问题并分享了对应的定......
  • 【源码日记】type cast
    TheSQLselectxmin::text::int8;grammara_expr:a_exprTYPECASTTypename{$$=makeTypeCast($1,$3,@2);}staticNode*makeTypeCast(Node*arg,TypeName*typename,intlocation){ TypeCast*n=makeNode(TypeCast); n->arg=arg; n->typeName=......
  • dolphinscheduler master实现去中心化源码分析
    dolphinschedulerMaster服务是去中心化的,也就是没有master和slave之分,每个master都参与工作,那么它是如何每个Master服务去取任务执行时,每个Master都取到不同的任务,并且不会漏掉,不会重复的呢,下面从源码角度来分析这个问题MasterServer.java/***runmasterserver*/@PostC......
  • 前端水印相关解析
    一、问题背景为了防止信息泄露或知识产权被侵犯,在web的世界里,对于页面和图片等增加水印处理是十分有必要的,水印的添加根据环境可以分为两大类,前端浏览器环境添加和后端服务环境添加,根据可见性,网页水印可以分为可见水印和不可见水印(盲水印/隐水印),简单对比一下这两种方式的特点:前......
  • ChatGLM3 源码解析(三)
    RotaryEmbedding#旋转位置嵌入,应用于每一层Q和KclassRotaryEmbedding(nn.Module):def__init__(self,dim,rope_ratio=1,original_impl=False,device=None,dtype=None):super().__init__()#除法项定义inv_freq=1.0/(10000**(to......