首页 > 其他分享 >DNA 突变可信度评估升级(支持捕获、扩增子、UMI三种类型 )

DNA 突变可信度评估升级(支持捕获、扩增子、UMI三种类型 )

时间:2024-03-09 09:45:50浏览次数:24  
标签:DNA 扩增子 self num record reads score var UMI

2022-11-20 12:09:06 星期日

目的

原先写过 DNA germline 变异可信度判定 (证据项收集) DNA germline 变异可信度判定 ,基于 pysam 对 bam 文件的解析,从突变相关的 reads 收集一些统计指标,再根据各指标人工划分阈值进行评分的增减,从最终得分的高低进而评估突变的可信度。这一年来代码经过持续的优化和升级,本篇再介绍这一年来的变动。

更新内容

  • 增加了扩增子突变的评估
  • 增加了 UMI 分子标签样本突变的评估
  • 优化变异是否在高质量 reads 上的判定规则
  • 增加 reads 上软截断的检测和收集,并判断这些软截断序列相似性是否接近
  • 增加了突变 reads 的起、止位置收集,作为评分项之一(扩增子除外)
  • 修复 delins 突变 reads 开始区域有软截断时候,造成突变 reads 丢失的问题
  • 增加输入突变自动左对齐功能(往参考基因组的起始方向重排)
  • 增加突变是否在重复区域的判定
  • 增加重复区域与突变碱基的比较
  • 增加突变位置的多等位基因突变收集和深度收集

不足之处

  • 突变大于 50 bp 时可能证据项收集不全
  • 突变左对齐位置大于 50 bp 时,可能检测不出来实际突变
  • 对于存在转化(G>A, C>T) 的样本,可能无法区分转化突变与真实突变

代码实现

点击查看代码

import re
import sys
import math
import attr
import yaml
import pysam
import logging
import pathlib
import argparse
from Bio import pairwise2
from Bio.Seq import Seq
from pprint import pprint
from collections import Counter, defaultdict

logging.basicConfig(
    level = logging.INFO,
    format = "[%(asctime)s - %(levelname)-4s - %(module)s : line %(lineno)s] %(message)s",
    datefmt = "%Y-%m-%d %H:%M:%S"
)


@attr.s
class Var_Info:
    var= attr.ib(type=str, default=attr.Factory(str))    # 实际位置变异,非 vcf 位置变异
    Bam= attr.ib(type=str, default=attr.Factory(str))
    Fasta = attr.ib(type=str, default=attr.Factory(str))
    # 重复次数超过限制认为是简单重复区域
    dup_num= attr.ib(type=int, default=attr.Factory(int))

    def __attrs_post_init__(self):
        # 简单区域最小重复是 4
        if not self.dup_num:
            self.dup_num = 4
        self.var = re.sub('[ \:\t_]+', ' ', self.var.strip())
        target = re.findall('\d\-\d', self.var)
        if target:
            new_str = target[0].replace('-', ' ')
            self.var = self.var.replace(target[0], new_str)
        self.fasta = pysam.FastaFile(self.Fasta)
        try:
            self.Bam_reader = pysam.AlignmentFile(self.Bam, "rb")
        except StopIteration:
            logging.error(f'变异位置不存在 reads [{self.var}]')
            sys.exit(1)
        try:
            self.var_Chr, self.var_Start, self.var_End, \
                self.var_Ref, self.var_Alt = self.var.split(' ')
        except ValueError:
            logging.error(f'变异有误 [{self.var}]')
            sys.exit(1)
        self.var_Start: int = int(self.var_Start)
        self.var_End: int = int(self.var_End)
        self.__set_Type_Len()
        self.DP: int = 0
        self.AD: int = 0
        self.End_AD: int = 0
        self.Is_reads: int = 0
        self.XA_reads_num: int = 0
        self.all_XA_num: int = 0
        self.conflict_pair_reads = 0
        self.ref_dup_num = 0
        self.ref_left_dup_num = 0
        self.ref_right_dup_num = 0
        self.var_MQ: list = []
        self.other_MQ: list = []
        self.var_BQ: list = []
        self.var_reads_name: list = []
        self.normal_reads_name: list = []
        self.soft_clip_reads: list = []
        self.complex: bool = True
        self.reads_direction: set = set()
        self.var_read_s_e: dict = defaultdict(str)
        self.var_read_s: dict = defaultdict(int)
        self.var_read_e: dict = defaultdict(int)
        self.all_var = defaultdict(int)
        self.clip_pos = defaultdict(int)
        self.pos_cov_num = defaultdict(int)
        self.var_cov_num = defaultdict(int)
        self.ref_right_dup_base = ''
        self.ref_left_dup_base = ''
        self.score = 0
        self.level = 'False'
        self.Clean_reads = True
        self.Soft_num = 0
        self.Allele_var_score = 0
        self.Same_Start_end_score = 0
        self.Same_insert_score = 0
        self.Same_clip_pos = 0
        self.Allele_var_num = 1
        self.header = [
            'level', 'score',
            'var_Type', 'AD', 'End_AD', 'DP', 'AF', 'Is_reads', 'XA_reads_num',
            'all_XA_num', 'conflict_pair_reads', 'ref_left_dup_num',
            'ref_right_dup_num', 'complex', 'high_MQ', 'Double_direction',
            'high_BQ', 'Clean_reads', 'Soft_num', 'Allele_var_score', 
            'Same_Start_end_score', 'Same_clip_pos', 'Same_insert_score',
        ]

    def __set_Type_Len(self) -> None:
        """ 分析设置变异类型和长度 """
        if self.var_Ref == '-':
            self.var_Type = 'INS'
            self.var_Len = len(self.var_Alt)
            # 修正一些插入位置描述错误
            if self.var_Start == self.var_End:
                self.var_End += 1
        elif self.var_Alt == '-':
            self.var_Type = 'DEL'
            self.var_Len = len(self.var_Ref)
        elif len(self.var_Alt) == len(self.var_Ref) == 1:
            self.var_Type: str = 'SNV'
            self.var_Len: int = 1
        else:
            if not self.var_Alt or not self.var_Ref:
                logging.error(f'变异错误: <{self.var}>')
                sys.exit(1)
            self.var_Type = 'DELINS'
            self.var_Len = len(self.var_Alt) if len(self.var_Alt) > len(self.var_Ref) else len(self.var_Ref)
        # if self.var_Len > 50:
        #     logging.error(f'变异大于 50 bp,不适用于本程序 [{self.var}]')
            # sys.exit(1)

    def check_high_MQ(self) -> bool:
        """ 判断变异是否处于 reads 高质量上 """
        # print((self.var_MQ))
        # print((self.other_MQ))
        if len(self.other_MQ) == 0 and len(self.var_MQ) != 0:
            if sum(self.var_MQ)/len(self.var_MQ) > 30:
                return True
            else:
                return False
        if sum(self.other_MQ) == 0 and self.var_MQ:
            if sum(self.var_MQ)/len(self.var_MQ) > 30:
                return True
            else:
                return False
        if len(self.var_MQ) == 0:
            return True
        perc = 0.75
        # print(sum(self.var_MQ)/len(self.var_MQ))
        # print(sum(self.other_MQ)/len(self.other_MQ))
        # reads 质量需要大于非变异 reads 质量*0.75
        return (sum(self.var_MQ)/len(self.var_MQ)) > ((sum(self.other_MQ)/len(self.other_MQ)) * perc)

    def check_high_BQ(self) -> bool:
        """ 判断变异的碱基是不是高质量的(碱基质量大于 30 的比例小于 50%,且平均碱基质量小于 25) """
        if self.var_BQ:
            if len([v for v in self.var_BQ if v > 30]) < len(self.var_BQ) * 0.5 \
                and sum(self.var_BQ)/len(self.var_BQ) < 25:
                return False
        return True

    def Clean_reads_check(self) -> bool:
        """
        判断变异所在的 reads 是否存在比较相似的软截断
        """
        seq_count = Counter(self.soft_clip_reads)
        keys = list(seq_count.keys())
        n = len(keys)
        if n == 1:
            return True
        similarity_score = []
        for i in range(n):
            for j in range(i, n):
                if i == j:
                    continue
                similarity_score.append(
                    # 最佳局部比对得分
                    self.calculate_rel_score(keys[i], keys[j]) * (seq_count[keys[i]] * seq_count[keys[j]])
                )
        if not similarity_score:
            return True
        threshold = 70 - len(seq_count)
        if threshold < 20:
            threshold = 20
        return sum(similarity_score)/len(similarity_score) < threshold

    @staticmethod
    def calculate_rel_score(seq1, seq2):
        """
        计算两个碱基序列之间的相似性得分
        参数:
            seq1: 字符串表示的碱基序列1
            seq2: 字符串表示的碱基序列2
        返回值:
            两个序列之间的相似性得分
        """
        # 使用Biopython的pairwise2模块计算最佳局部比对得分
        alignments = pairwise2.align.localxx(Seq(seq1), Seq(seq2), one_alignment_only=True)
        if len(alignments) == 0:
            return 0
        # 只取最高得分
        return round(alignments[0][2] / alignments[0][4] * 100, 2)

    @staticmethod
    def min_dup(base: str):
        """ 找到输入碱基的最小重复区域,用于确定串联重复的最小单位 """
        for i in range(len(base)):
            if base.count(base[0:i+1]) == len(base) / (i + 1):
                return base[0:i+1]
        else:
            return base

    def var_inside(self, Start, End) -> bool:
        """ 变异是否在某个区间内 """
        # 变异在比对位置内
        if self.var_Start - 1 >= Start and self.var_End <= End:
            return True
        return False

    def var_near(self, Start, End) -> bool:
        """ 变异是与某个位置相连 """
        # 变异在比对位置内
        if self.var_Start == End or self.var_End == Start:
            return True
        return False

    def check_clip_read(self, record):
        """ 检测 reads 中软截断 """
        s = e = 0
        for (i, pos) in record.get_aligned_pairs():
            if not pos:
                if not s:
                    s = e = i
                else:
                    e = i
        if e - s > 4:
            self.clip_pos[record.reference_start + s] += 1
            self.soft_clip_reads.append(record.query_sequence[s:e+1])

    def add_SNV(self, record):
        """ SNV 变异记录处理 """
        # 点突变在 reads 上的起始索引位置
        for (i, pos) in record.get_aligned_pairs():
            if pos == self.var_Start - 1:
                # 缺失的位置
                if i == None:
                    if record.query_name in self.var_reads_name:
                        self.conflict_pair_reads += 1
                    else:
                        self.normal_reads_name.append(record.query_name)
                    self.other_MQ.append(record.mapping_quality)
                    return
                snv_index: int = i
                break
        else:
            logging.error(f'SNV 位置不存在 [{self.var_Start}] in reads [{record.query_name}]')
            sys.exit(1)
        #if record.query_name == 'FT100012174L1C004R00401550706':
        #    print(record.query_sequence[snv_index])
        # 变异在这个 reads 上
        if record.query_sequence[snv_index] == self.var_Alt:
            if record.is_reverse:
                self.var_cov_num['reverse'] += 1
            else:
                self.var_cov_num['forward'] += 1
            # print(record.get_aligned_pairs())
            self.check_clip_read(record)
            # 其他比对位置
            if record.has_tag('XA'):
                self.XA_reads_num += 1
            self.AD += 1
            # 变异所在 reads 的起止比对位置
            self.update_s_e(record.query_name, record.reference_start, record.reference_end, record.isize)
            s = False
            e = 0
            for (i, pos) in record.get_aligned_pairs():
                if not s and pos:
                    self.var_read_s[pos] += 1
                    s = True
                if pos:
                    e = pos
            self.var_read_e[e] += 1
            # 变异在 reads 末段
            if snv_index == record.get_aligned_pairs()[0][0] or snv_index == record.get_aligned_pairs()[-1][0]:
                self.End_AD += 1
            # var MQ
            self.var_MQ.append(record.mapping_quality)
            self.var_BQ.append(record.query_alignment_qualities.tolist()[snv_index-record.query_alignment_start])
            # 测穿的 reads 对数量
            if record.query_name in self.var_reads_name:
                self.Is_reads += 1
            else:
                if record.query_name in self.normal_reads_name:
                    self.conflict_pair_reads += 1
                self.var_reads_name.append(record.query_name)
        else:
            self.normal_reads_name.append(record.query_name)
            if record.query_name in self.var_reads_name:
                self.conflict_pair_reads += 1
            self.other_MQ.append(record.mapping_quality)

    def add_INS(self, record):
        """ INS 变异记录处理 """
        # INS 在 reads 上的起始索引位置,实际的 INS 实在起始碱基之后才开始插入序列的
        for (i, pos) in record.get_aligned_pairs():
            if pos == self.var_Start:
                ins_index: int = i
                break
        else:
            logging.error(f'INS 位置不存在 [{self.var_Start}] in reads [{record.query_name}]')
            sys.exit(1)
        # 变异在这个记录的 reads 上
        var_in_reads: bool = False
        if ins_index:
            # 左对齐需要插入的碱基与左侧重复的碱基一致
            if self.ref_left_dup_num and self.ref_left_dup_base == self.var_dup_base:
                check_len = (self.ref_left_dup_num)*len(self.ref_left_dup_base) + 1
                ref_seq: str = self.fasta.fetch(
                    region=f'{self.var_Chr}:{self.var_Start + 1}-{self.var_Start + check_len}'
                ).upper()
                real_seq = record.query_sequence[ins_index-self.var_Len:ins_index+check_len]
                if f'{self.var_Alt}{ref_seq}' == real_seq:
                    var_in_reads = True
            elif record.query_sequence[ins_index-self.var_Len:ins_index] == self.var_Alt:
                var_in_reads = True
        if var_in_reads:
            if record.is_reverse:
                self.var_cov_num['reverse'] += 1
            else:
                self.var_cov_num['forward'] += 1
            # 其他比对位置
            if record.has_tag('XA'):
                self.XA_reads_num += 1
            self.check_clip_read(record)
            # 变异所在 reads 的起止比对位置
            self.update_s_e(record.query_name, record.reference_start, record.reference_end, record.isize)
            s = False
            e = 0
            for (i, pos) in record.get_aligned_pairs():
                if not s and pos:
                    self.var_read_s[pos] += 1
                    s = True
                if pos:
                    e = pos
            self.var_read_e[e] += 1
            self.AD += 1
            # var MQ
            self.var_MQ.append(record.mapping_quality)
            try:
                _ = [self.var_BQ.append(record.query_alignment_qualities.tolist()[ins_index-i-record.query_alignment_start]) \
                    for i in range(self.var_Len) if ins_index-i-record.query_alignment_start > 0]
            except Exception:
                print(record.query_alignment_qualities.tolist())
                print(len(record.query_alignment_qualities.tolist()))
                print(ins_index-i-record.query_alignment_start)
            # 测穿的 reads 对数量
            if record.query_name in self.var_reads_name:
                self.Is_reads += 1
            else:
                if record.query_name in self.normal_reads_name:
                    self.conflict_pair_reads += 1
                self.var_reads_name.append(record.query_name)
        else:
            self.normal_reads_name.append(record.query_name)
            if record.query_name in self.var_reads_name:
                self.conflict_pair_reads += 1
            self.other_MQ.append(record.mapping_quality)

    def add_DEL(self, record):
        """ DEL 变异记录处理 """
        pos_dict: dict = {v: k for (k, v) in record.get_aligned_pairs()}
        # DEL 在 reads 上的话,前面能匹配上且有索引或后面匹配上且有索引(存在重复区域时,所有的重复区域要测通才算),del 区域没有索引
        if ((self.var_Start - 2 in pos_dict and pos_dict[self.var_Start - 2]) and \
            (self.var_End in pos_dict and pos_dict[self.var_End])) and \
            all(not pos_dict[i] for i in range(self.var_Start - 1, self.var_End)):
            # self.check_clip_read(record)
            # 其他比对位置
            if record.is_reverse:
                self.var_cov_num['reverse'] += 1
            else:
                self.var_cov_num['forward'] += 1
            if record.has_tag('XA'):
                self.XA_reads_num += 1
            # 变异所在 reads 的起止比对位置
            self.update_s_e(record.query_name, record.reference_start, record.reference_end, record.isize)
            s = False
            e = 0
            for (i, pos) in record.get_aligned_pairs():
                if not s and pos:
                    self.var_read_s[pos] += 1
                    s = True
                if pos:
                    e = pos
            self.var_read_e[e] += 1
            self.AD += 1
            # var MQ
            self.var_MQ.append(record.mapping_quality)
            # 测穿的 reads 对数量
            if record.query_name in self.var_reads_name:
                self.Is_reads += 1
            else:
                if record.query_name in self.normal_reads_name:
                    self.conflict_pair_reads += 1
                self.var_reads_name.append(record.query_name)
        else:
            self.normal_reads_name.append(record.query_name)
            if record.query_name in self.var_reads_name:
                self.conflict_pair_reads += 1
            self.other_MQ.append(record.mapping_quality)

    def add_DELINS(self, record):
        """ DELINS 变异记录处理 """
        # DELINS 在 reads 上的话,前面能匹配上且有索引,后面匹配上且有索引,del 区域没有索引
        pre_var_seq: str = ''
        ins_base = ''
        # iis = ''
        # sis = ''
        for (i, pos) in record.get_aligned_pairs():
            if pos:
                if self.var_Start - 2 <= pos <= self.var_End:
                    if i:
                        pre_var_seq = f'{pre_var_seq}{ins_base}{record.query_sequence[i]}'
                        # iis = f'{iis}{sis}{i}'
                    ins_base = ''
            else:
                ins_base = f'{ins_base}{record.query_sequence[i]}'
                # sis = f'{sis}{i}'
        ref_seq: str = self.fasta.fetch(region=f'{self.var_Chr}:{self.var_Start-1}-{self.var_End+1}').upper()
        var_seq: str = ref_seq.replace(self.var_Ref, self.var_Alt, 1)
        if var_seq == pre_var_seq:
            if record.is_reverse:
                self.var_cov_num['reverse'] += 1
            else:
                self.var_cov_num['forward'] += 1
            # 其他比对位置
            if record.has_tag('XA'):
                self.XA_reads_num += 1
            # 变异所在 reads 的起止比对位置
            self.update_s_e(record.query_name, record.reference_start, record.reference_end, record.isize)
            s = False
            e = 0
            for (i, pos) in record.get_aligned_pairs():
                if not s and pos:
                    self.var_read_s[pos] += 1
                    s = True
                if pos:
                    e = pos
            self.var_read_e[e] += 1
            self.AD += 1
            # var MQ
            self.var_MQ.append(record.mapping_quality)
            # 测穿的 reads 对数量
            if record.query_name in self.var_reads_name:
                self.Is_reads += 1
            else:
                if record.query_name in self.normal_reads_name:
                    self.conflict_pair_reads += 1
                self.var_reads_name.append(record.query_name)
        else:
            self.normal_reads_name.append(record.query_name)
            if record.query_name in self.var_reads_name:
                self.conflict_pair_reads += 1
            self.other_MQ.append(record.mapping_quality)

    def update_s_e(self, name, S, E, isize):
        """ 更新插入片段的起止位置 """
        if name in self.var_read_s_e:
            return
        if isize < 0:
            self.var_read_s_e[name] = f'{E - isize + 1}_{E}'
        else:
            if isize == 0:
                self.var_read_s_e[name] = f'{S}_{E}'
            self.var_read_s_e[name] = f'{S}_{S + isize}'
        # self.var_read_s_e[name].extend([int(S), int(E)])
        # self.var_read_s_e[name].sort()
        # if name in self.var_read_s_e:
        #     L = sorted([*self.var_read_s_e[name], int(S), int(E)])
        #     self.var_read_s_e[name] = [L[0], L[-1]]
        # else:
        #     self.var_read_s_e[name] = [int(S), int(E)]

    def left_norm(self):
        """ 
        DEL、INS 左对齐,注意碱基变化
        """
        # 重复区域的左对齐
        if self.ref_left_dup_num:
            # print(self.var_dup_base)
            # print(self.ref_left_dup_base)
            if self.var_Type == 'DEL' and self.var_dup_base == self.ref_left_dup_base:
                # print(self.var_Start)
                # print(self.ref_left_dup_num)
                # print(self.var_dup_base)
                self.var_Start = self.var_Start - self.ref_left_dup_num * len(self.var_dup_base)
                self.var_End = self.var_End - self.ref_left_dup_num * len(self.var_dup_base)
                # self.var_Ref = self.fasta.fetch(region=f'{self.var_Chr}:{self.var_Start}-{self.var_End}').upper()
                # print(self.var_Start)
            elif self.var_Type == 'INS' and self.var_dup_base == self.ref_left_dup_base:
                self.var_Start = self.var_Start - self.ref_left_dup_num * len(self.var_dup_base)
                self.var_End = self.var_End - self.ref_left_dup_num * len(self.var_dup_base)
        # 相同碱基的左对齐
        if self.var_Type == 'DEL':
            # print(self.var_Start)
            left_seq: str = self.fasta.fetch(
                region=f'{self.var_Chr}:{self.var_Start-150}-{self.var_Start-1}'
            ).upper()[::-1]
            mv_len = 0
            for i, base in enumerate(self.var_Ref[::-1]*5):
                if base == left_seq[i]:
                    mv_len += 1
                    continue
                break
            # print(left_seq)
            # print(self.var_Ref[::-1]*3)
            # print(f'移动的距离: {mv_len}')
            if mv_len:
                self.var_Start = self.var_Start - mv_len
                self.var_End = self.var_End - mv_len
                self.var_Ref = self.fasta.fetch(region=f'{self.var_Chr}:{self.var_Start}-{self.var_End}').upper()
        if self.var_Type == 'INS':
            left_seq: str = self.fasta.fetch(
                region=f'{self.var_Chr}:{self.var_Start-150}-{self.var_Start}'
            ).upper()[::-1]
            mv_len = 0
            for i, base in enumerate(self.var_Alt[::-1]):
                if base == left_seq[i]:
                    mv_len += 1
                    continue
                break
            if mv_len:
                self.var_Start = self.var_Start - mv_len
                self.var_End = self.var_End - mv_len
                self.var_Alt = f'{left_seq[:mv_len][::-1]}{self.var_Alt[:-mv_len]}'
        if self.var_Type == 'DELINS':
            if len(self.var_Ref) > len(self.var_Alt):
                left_seq: str = self.fasta.fetch(
                    region=f'{self.var_Chr}:{self.var_Start-150}-{self.var_Start - 1}'
                ).upper()[::-1]
                loc_ref = self.var_Ref[:-len(self.var_Alt)]
                add_b = ''
                for a, b in zip(loc_ref[::-1], left_seq):
                    if a != b:
                        break
                    add_b = f'{a}{add_b}'
                if add_b:
                    self.var_Start = self.var_Start - len(add_b)
                    self.var_Ref = add_b + self.var_Ref
                    self.var_Alt = add_b + self.var_Alt

    def complex_check(self):
        """ 判断变异是否在重复区域 """
        # 变异自身的重复
        self.var_dup_base = self.min_dup(self.var_Alt) if self.var_Type == 'INS' else self.min_dup(self.var_Ref)
        # 右侧是否为重复区域
        local_End = self.var_End if self.var_Type == 'INS' else self.var_End + 1
        ref_right_seq: str = self.fasta.fetch(
            region=f'{self.var_Chr}:{local_End}-{local_End + 150}'
        ).upper()
        self.check_ref_dup(ref_right_seq)
        # 左侧是否为重复区域
        local_start = self.var_Start if self.var_Type == 'INS' else self.var_Start - 1
        ref_left_seq: str = self.fasta.fetch(
            region=f'{self.var_Chr}:{local_start-150}-{local_start}'
        ).upper()
        self.check_ref_dup(ref_left_seq, True)
        self.ref_dup_num = self.ref_right_dup_num + self.ref_left_dup_num

    def check_ref_dup(self, ref_seq, reverse=False):
        """ 返回重复的数量 """
        max_num = 0
        # print(ref_seq)
        for i in range(0,4):
            num = 0
            local_ref = ref_seq[::-1] if reverse else ref_seq
            # 先查询是否属于变异本身的重复区域
            if i == 0:
                base = self.var_dup_base[::-1] if reverse else self.var_dup_base
            else:
                base = local_ref[:i]
            local_len = len(base)
            while True:
                if local_ref.startswith(base):
                    num += 1
                    local_ref = local_ref[local_len:]
                else:
                    break
            if num > max_num:
                max_num = num
                if reverse:
                    self.ref_left_dup_base = base[::-1]
                    self.ref_left_dup_num = max_num
                    # print('>>>>', base, self.var_Alt, max_num)
                    if num > self.dup_num and (self.var_dup_base == base[::-1] or \
                        (self.var_Type == 'SNV' and base[::-1].startswith(self.var_Alt))):
                        self.complex = False
                else:
                    self.ref_right_dup_base = base
                    self.ref_right_dup_num = max_num
                    # print('----', base, self.var_Alt, max_num)
                    if num > self.dup_num and (self.var_dup_base == base or \
                        (self.var_Type == 'SNV' and base[::-1].startswith(self.var_Alt))):
                        self.complex = False

    def allele_var(self, record):
        """
            等位基因检测 
            SNV 变异:与 ref 碱基不一致
            INS 变异: i 有 pos 没有
            DEL 变异: pos 有,i 没有
        """
        local_Start = self.var_Start
        if self.var_Type != 'INS':
            local_Start -= 1
        # 检测 reads 在目标区域的变异类型 ( 判断多等位基因 )
        var_type = ''
        seq = ''
        not_ready =  True
        for (i, pos) in record.get_aligned_pairs():
            if pos and pos < local_Start:
                # 变异前一个位置没有正确比对,不解析变异
                if pos == local_Start - 1:
                    not_ready = False
                    if not i or not pos:
                        break
                continue
            if not_ready:
                continue
            if var_type == 'INS':
                if i and not pos:
                    seq += record.query_sequence[i]
                    continue
                break
            if var_type == 'DEL':
                if not i and pos:
                    seq += self.fasta.fetch(region=f'{self.var_Chr}:{pos+1}-{pos+1}').upper()
                    continue
                break
            if i and pos:
                if self.fasta.fetch(region=f'{self.var_Chr}:{pos+1}-{pos+1}').upper() != record.query_sequence[i]:
                    var_type = 'SNV'
                    seq += record.query_sequence[i]
                break
            elif i:
                var_type = 'INS'
                seq += record.query_sequence[i]
            else:
                var_type = 'DEL'
                seq += self.fasta.fetch(region=f'{self.var_Chr}:{pos+1}-{pos+1}').upper()
        if var_type:
            self.all_var[f'{var_type}-{seq}'] += 1

    def res_line(self):
        """ 返回固定的数据列 """
        line = ''
        for key in self.header:
            line = f'{line}{self.__dict__[key]}\t'
        self.result_line = line.strip() + '\n'

    def run(self, data_type='target'):
        """ 添加 bam 记录 """
        # print(self.var)
        self.complex_check()
        self.left_norm()
        # print(f"{self.var_Type},{self.var_Chr}:{self.var_Start}-{self.var_End} {self.var_Ref} {self.var_Alt}")
        for record in self.Bam_reader.fetch(region=f"{self.var_Chr}:{self.var_Start}-{self.var_End}"):
            # 未比对、非配对、PCR重复的 reads 剔除
            # if record.is_unmapped or not record.is_paired or record.is_duplicate:
            if record.is_unmapped or record.is_duplicate:
                continue
            # if record.query_name == 'FT100014559L1C007R00402151783':
            #     print(record.get_aligned_pairs())
            #     print(record.is_unmapped, record.is_paired, record.is_duplicate, record.reference_start, record.reference_end, record.isize)
            # continue
            # 变异在 read 的比对上的区域( reads 不包含变异位置的 reads 剔除)
            if self.var_inside(record.pos, record.aend):
                if record.is_reverse:
                    self.pos_cov_num['reverse'] += 1
                else:
                    self.pos_cov_num['forward'] += 1
                if record.has_tag('XA'):
                    self.all_XA_num += 1
                # 检测 reads 在目标位置的变异
                self.allele_var(record)
                # reads 方向
                self.reads_direction.add(record.is_reverse)
                self.DP += 1
                # 按变异类型分开处理
                if self.var_Type == 'SNV':
                    self.add_SNV(record)
                elif self.var_Type == 'INS':
                    self.add_INS(record)
                elif self.var_Type == 'DEL':
                    self.add_DEL(record)
                else:
                    self.add_DELINS(record)
        # pprint(self.all_var)
        self.var_read_s_num = len(self.var_read_s)
        self.var_read_e_num = len(self.var_read_e)
        self.Double_direction = False if len(self.reads_direction) == 1 else True
        self.AF = self.AD / self.DP if self.AD else 0
        self.high_MQ = self.check_high_MQ()
        self.high_BQ = self.check_high_BQ()
        if self.AD:
            if data_type == 'amplicon':
                self.call_amplicon_level()
            elif data_type == 'UMI':
                self.Clean_reads = self.Clean_reads_check()
                self.call_UMI_level()
            else:
                self.Clean_reads = self.Clean_reads_check()
                self.call_level()
        self.res_line()

    def call_level(self):
        """ 根据变异各项指标,判断变异可信度 
        分级: True    Likely_true    Unsure     Likely_false    False
        """
        self.score = self.call_score()
        if self.score <= 0:
            self.level = 'False'
        elif self.score < 60:
            self.level = 'Likely_false'
        elif self.score < 100:
            self.level = 'Unsure'
        elif self.score < 150:
            self.level = 'Likely_true'
        else:
            self.level = 'True'

    def call_score(self):
        """ 根据得分划分阈值 """
        score = 0
        # AD
        if self.AD == 0:
            logging.error(f"未检测到指定变异 {self.var}")
            return 0
        if self.AD < 20:
            score += (-800 / self.AD) + 40
        elif self.AD < 80:
            score += 7 * math.sqrt(self.AD-20)
        else:
            score += 100
        # print(f'基础得分  {score}')
        # End_AD 占比扣分
        score -= self.End_AD * 100 / self.AD if self.End_AD else 0
        # 插入片段起始或终止位置 reads 扣分
        judge_score = 0
        for p, nu in Counter(self.var_read_s_e.values()).items():
            if nu > 1:
                judge_score -= (nu * 3000 / (self.AD *self.AD))
                # print(p, nu, judge_score)
        if judge_score < -200:
            judge_score = -200
        self.Same_insert_score = judge_score
        score += judge_score
        # print(f'起止位置  {score}')
        # 软截断相同位置断的扣分
        judge_score = 0
        for p, nu in self.clip_pos.items():
            if nu > 3:
                judge_score -= (nu * 2000 / (self.AD *self.AD))
                # print(p, nu, judge_score)
        if judge_score < -200:
            judge_score = -200
        self.Same_clip_pos = judge_score
        score += judge_score
        print(f'软阶段扣分  {score}')
        #-- old
        judge_score = 0
        for ddict in [self.var_read_s, self.var_read_e]:
            # pprint(ddict)
            for _, nu in ddict.items():
                if nu >= 3:
                    if nu / self.AD > 0.1:
                        judge_score -= (nu * 100 / self.AD)
                    elif nu > 20:
                        judge_score -= (nu * 50 / self.AD)
                elif nu < 3:
                    judge_score += 1
        if judge_score < -200:
            judge_score = -200
        elif judge_score > 20:
            judge_score = 20
        self.Same_Start_end_score = judge_score
        score += judge_score
        print(f'起止位置  {score}')
        # DP
        if self.DP < 20:
            score += (-200 / self.DP) + 10
        elif self.DP < 80:
            score += 5 * math.sqrt(self.DP-20)
        else:
            score += 60
        # AF 最高减分 40
        if self.AF < 0.001:
            score -= 50
        elif self.AF < 0.05:
            score -= 500 * (0.1 - self.AF)
        # 接近纯合加分
        elif self.AF > 0.9:
            score += 20
        # 测穿 Is_reads
        # print(f'突变频率  {score}')
        score += self.Is_reads ** 2 if self.Is_reads < 8 else 60
        # 等位基因扣分
        judge_score = 0
        # pprint(self.all_var)
        if self.var_Type == 'INS':
            key = [f'{self.var_Type}-{self.var_Alt}']
        elif self.var_Type == 'DEL':
            key = [f'{self.var_Type}-{self.var_Ref}']
        elif self.var_Type == 'DELINS':
            key = [f'{self.var_Type}-{self.var_Ref}', f'SNV-{self.var_Alt[:1]}']
        else:
            key = [f'{self.var_Type}-{self.var_Ref}', f'{self.var_Type}-{self.var_Alt}']
        for k, num in self.all_var.items():
            if k in key:
                continue
            if num > 5:
                if num / self.AD > 0.08:
                    judge_score -= 110 if num > 10 else 10 * num
                else:
                    judge_score -= 20
                if num > self.AD * 0.1:
                    self.Allele_var_num += 1
        # 简单区域,但是只检测到了一种突变,可信度增加
        if len(self.all_var) == 1 and not self.complex:
            judge_score += 120
        if judge_score < -130:
            judge_score = -130
        if judge_score > 0:
            judge_score = 0
        self.Allele_var_score = judge_score
        score += judge_score
        # print(f'多等位基因  {score}')
        # 矛盾的配对 reads
        score -= self.conflict_pair_reads * 8 if self.conflict_pair_reads < 20 else 180 
        # 碱基重复区域
        score -= ((self.ref_left_dup_num + 1)**2) if self.ref_left_dup_num < 12 else 160
        score -= ((self.ref_right_dup_num + 1)**2) if self.ref_right_dup_num < 12 else 160
        score += 50 if self.complex else -50
        # 比对到其他位置 reads 占比
        if self.XA_reads_num:
            score -= 150 * (self.XA_reads_num / self.AD - self.all_XA_num / self.DP)
            # score -= self.XA_reads_num * 50 / self.AD
            # score -= 150 - self.all_XA_num * 150 / self.DP
            # # 两个占比接近,说明 XA 是这个区域的常态,予以恢复一定的分数
            # score += 30 if abs(self.XA_reads_num/self.AD - self.all_XA_num/self.DP) < 0.3 else 0
        # print(self.XA_reads_num * 50 / self.AD)
        # print(150 - self.all_XA_num * 150 / self.DP)
        # print(f'XA  {score}', self.XA_reads_num/self.AD - self.all_XA_num/self.DP)
        # 双向 reads 测到
        score -= 0 if self.Double_direction else 40
        # 高质量的 reads
        if self.high_MQ:
            score += 80
        else:
            if self.var_Type == 'DEL':
                score -= 150/len(self.var_Ref)
            else:
                score -= 200
        # 高质量碱基
        score += 30 if self.high_BQ else -40
        # print(f'质控  {score}')
        # 软截断区域
        soft_num = sum(n for seq, n in Counter(self.soft_clip_reads).items() if len(seq) > 3 and seq != self.var_Alt)
        self.Soft_num = soft_num
        soft_score = (soft_num * (soft_num / self.AD) + 1) ** 2
        score -= soft_score if soft_score < 200 else 200
        # 相似的软截断
        # score += 0 if self.Clean_reads else -50
        return round(score,2)

    def call_amplicon_level(self):
        """ 根据变异各项指标,判断变异可信度 
        分级: True    Likely_true    Unsure     Likely_false    False
        """
        self.score = self.call_amplicon_score()
        if self.score <= 0:
            self.level = 'False'
        elif self.score < 200:
            self.level = 'Likely_false'
        elif self.score < 400:
            self.level = 'Unsure'
        elif self.score < 500:
            self.level = 'Likely_true'
        else:
            self.level = 'True'
        # print(f'<<{self.level}>>')

    def call_amplicon_score(self):
        """ 根据得分划分阈值 """
        score = 0
        # AD
        if self.AD == 0:
            logging.error(f"未检测到指定变异 {self.var}")
            return 0
        if self.AD < 20:
            score -= 1500 / self.AD
        elif self.AD < 80:
            score += 7 * math.sqrt(self.AD-20)
        else:
            score += 100
        # End_AD 占比扣分
        score -= self.End_AD * 150 / self.AD if self.End_AD else 0
        # AF 最高减分 40
        if self.AD > 40:
            score += 300 * self.AF
        # 测穿 Is_reads
        score += self.Is_reads ** 2 if self.Is_reads < 15 else 200
        # 等位基因扣分
        judge_score = 0
        # print(f'基础得分  {score}')
        # pprint(self.all_var)
        if self.var_Type == 'INS':
            key = [f'{self.var_Type}-{self.var_Alt}']
        elif self.var_Type == 'DEL':
            key = [f'{self.var_Type}-{self.var_Ref}']
        elif self.var_Type == 'DELINS':
            key = [f'{self.var_Type}-{self.var_Ref}', f'SNV-{self.var_Alt[:1]}']
        else:
            key = [f'{self.var_Type}-{self.var_Ref}', f'{self.var_Type}-{self.var_Alt}']
        for k, num in self.all_var.items():
            if k in key:
                continue
            if num > 15 and num > self.AD * 0.1:
                if  num > self.AD * 0.2:
                    judge_score -= 150 if num > 20 else 7 * num
                else:
                    judge_score -= 20
                if num/self.DP > 0.005:
                    self.Allele_var_num += 1
        if judge_score < -450:
            judge_score = -450
        if judge_score > 0:
            judge_score = 0
        # 简单区域,但是只检测到了一种突变,可信度增加
        if len(self.all_var) == 1:
            judge_score += 50
            if not self.complex:
                judge_score += 100
        elif self.Allele_var_num > 2 and self.var_Type != 'DELINS':
            judge_score -= 250
        self.Allele_var_score = judge_score
        score += judge_score
        # print(self.Allele_var_num)
        # print(f'多等位基因  {score}  {judge_score}')
        # 矛盾的配对 reads
        score -= self.conflict_pair_reads * 400 / self.AD
        # print(f'矛盾 reads  {score}')
        # 碱基重复区域
        score -= ((self.ref_left_dup_num + 1)**2) if self.ref_left_dup_num < 12 else 160
        score -= ((self.ref_right_dup_num + 1)**2) if self.ref_right_dup_num < 12 else 160
        score += 50 if self.complex else -50
        score += 200 if not self.complex and self.var_Type == 'SNV' else 0
        # print(f'重复区域  {score}')
        # 比对到其他位置 reads 占比
        if self.XA_reads_num:
            score -= self.XA_reads_num * 120 / self.AD
            score -= 150 - self.all_XA_num * 150 / self.DP
            # 两个占比接近,说明 XA 是这个区域的常态,予以恢复一定的分数
            score += 130 if abs(self.XA_reads_num / self.AD - self.all_XA_num / self.DP) < 0.2 else 0
        # print(f'XA区域  {score}')
        # 双向 reads 测到
        score += 80 if self.Double_direction else -200
        # 高质量的 reads
        if self.high_MQ:
            score += 80
        else:
            if self.var_Type == 'DEL':
                score -= 150/len(self.var_Ref)
            else:
                score -= 200
        # 高质量碱基
        score += 80 if self.high_BQ else -350
        # print(f'质量  {score}')
        # 链偏差
        if self.var_cov_num['forward'] > 5 and self.var_cov_num['reverse'] > 5:
            bias = abs(self.var_cov_num['forward']/self.pos_cov_num['forward'] - self.var_cov_num['reverse']/self.pos_cov_num['reverse'])
            # print(f'链偏差得分 {bias}: {1200 * bias - 50}')
            if self.var_Type != 'SNV':
                bias = bias * 0.7
            score -= 1200 * bias - 50
        # print(f'最终  {score}')
        return round(score,2)

    def call_UMI_level(self):
        """ 根据变异各项指标,判断变异可信度 
        分级: True    Likely_true    Unsure     Likely_false    False
        """
        self.score = self.call_UMI_score()
        if self.score <= 0:
            self.level = 'False'
        elif self.score < 80:
            self.level = 'Likely_false'
        elif self.score < 150:
            self.level = 'Unsure'
        elif self.score < 220:
            self.level = 'Likely_true'
        else:
            self.level = 'True'

    def call_UMI_score(self):
        """ 根据得分划分阈值 """
        score = 0
        # AD
        if self.AD == 0:
            logging.error(f"未检测到指定变异 {self.var}")
            return 0
        if self.var_Type == 'SNV':
            if self.AD < 10:
                score += (self.AD-10)*10
            elif self.AD < 30:
                score += self.AD*4
            else:
                score += 120
        else:
            if self.AD < 13:
                score += self.AD * 13
            else:
                score += 160
        # 插入、缺失 给多一些基础分,低频也可信
        # End_AD 占比扣分
        score -= self.End_AD * 100 / self.AD if self.End_AD else 0
        # print('基础得分', score)
        # 插入片段起始或终止位置 reads 扣分
        # print(len(set(self.var_read_s_e.values())), self.AD)
        if self.AD < 40:
            score -= 100 * (self.AD - len(set(self.var_read_s_e.values()))) / self.AD
            # print(f'起止位置  {score}')
        # 软截断相同位置断的扣分
        judge_score = 0
        for _, nu in self.clip_pos.items():
            if nu > 3:
                judge_score -= (nu * 2000 / (self.AD *self.AD))
                # print(p, nu, judge_score)
        if judge_score < -200:
            judge_score = -200
        # print('软截断扣分', score)
        self.Same_clip_pos = judge_score
        score += judge_score
        # DP
        if self.DP < 20:
            score += (-200 / self.DP) + 10
        elif self.DP < 80:
            score += 8 * math.sqrt(self.DP-20)
        else:
            score += 65
        # AF 最高减分 40
        if self.AF < 0.001:
            score -= 50
        # 接近纯合加分
        elif self.AF > 0.9:
            score += 30
        # print('深度', score)
        # 等位基因扣分
        judge_score = 0
        # pprint(self.all_var)
        if self.var_Type == 'INS':
            key = [f'{self.var_Type}-{self.var_Alt}']
        elif self.var_Type == 'DEL':
            key = [f'{self.var_Type}-{self.var_Ref}']
        elif self.var_Type == 'DELINS':
            key = [f'{self.var_Type}-{self.var_Ref}', f'SNV-{self.var_Alt[:1]}']
        else:
            key = [f'{self.var_Type}-{self.var_Ref}', f'{self.var_Type}-{self.var_Alt}']
        for k, num in self.all_var.items():
            if k in key:
                continue
            if num > 5:
                if num / self.AD > 0.08:
                    judge_score -= 110 if num > 10 else 10 * num
                else:
                    judge_score -= 20
                if num > self.AD * 0.1:
                    self.Allele_var_num += 1
        # 简单区域,但是只检测到了一种突变,可信度增加
        if len(self.all_var) == 1 and not self.complex:
            judge_score += 120
        if judge_score < -130:
            judge_score = -130
        if judge_score > 0:
            judge_score = 0
        self.Allele_var_score = judge_score
        score += judge_score
        # print('多等位基因', score)
        # 碱基重复区域
        score -= ((self.ref_left_dup_num + 1)**2) if self.ref_left_dup_num < 12 else 160
        score -= ((self.ref_right_dup_num + 1)**2) if self.ref_right_dup_num < 12 else 160
        score += 50 if self.complex else -50
        # 比对到其他位置 reads 占比
        if self.XA_reads_num:
            score -= 150 * (self.XA_reads_num / self.AD - self.all_XA_num / self.DP)
        # 高质量的 reads
        if self.high_MQ:
            score += 40
        else:
            if self.var_Type == 'DEL':
                score -= 70/len(self.var_Ref)
            else:
                score -= 100
        # 高质量碱基
        score += 30 if self.high_BQ else -60
        # print('质量', score)
        # 软截断区域
        soft_num = sum(n for seq, n in Counter(self.soft_clip_reads).items() if len(seq) > 3 and seq != self.var_Alt)
        self.Soft_num = soft_num
        soft_score = (soft_num * (soft_num / self.AD) + 1) ** 2
        score -= soft_score if soft_score < 200 else 200
        # print('软截断', score)
        #! 点突变在 reads 的起始或末端 15 bp 内,占比超过 80-95% ,扣分
        if self.var_Type == 'SNV':
            # 比例跟随 AD 走, AD 越低,比例越低
            if self.AD < 10:
                perc = 0.8
            elif self.AD < 100:
                perc = 0.8 + 0.0016 * (self.AD - 10)
            else:
                perc = 0.95
            distance = []
            for ddict in [self.var_read_s, self.var_read_e]:
                for p, nu in ddict.items():
                    distance.extend([abs(p - self.var_Start)] * nu)
            distance = sorted(distance)[:int(len(distance)/2)]
            mid = distance[int(len(distance)/2)]
            # print('中位数', mid)
            in_size = [i for i in distance if i < 20]
            in_perc = len(in_size)/len(distance)
            # print(distance)
            # print(in_perc)
            # print(perc)
            if in_perc > perc:
                score -= 230 * in_perc
            # 大于 60% 的时候,按中值算
            elif in_perc > 0.6 and mid < 20:
                score -= 160 * in_perc
        # print('突变在末端', score)
        return round(score,2)


def parse_text(txt, bam, fasta, data_type):
    header = ''
    header_line = ''
    # 变异位置列
    pos = 1
    # 变异的 ref 列
    ref = 3
    # 变异的 alt 列
    alt = 4
    outfile = txt.replace('.txt', '.check.txt')
    with open(txt) as fi, open(outfile, 'w') as fo:
        for line in fi.readlines():
            if not header_line:
                header_line = line.strip()
                continue
            line = line.rstrip()
            cells = line.split('\t')
            Chr, Start, End = cells[pos].replace(':', '-').split('-')
            var_info = Var_Info(
                f'{Chr} {Start} {End} {cells[ref]} {cells[alt]}',
                bam,
                fasta
            )
            if not header:
                header = header_line + "\t" + "\t".join(var_info.header) + "\n"
                fo.write(header)
            var_info.run(data_type)
            fo.write(f'{line}\t{var_info.result_line}')

def append_file(av, bam, fasta, date_type):
    """ 在文件内添加可信度两列 """
    results = ''
    with open(av) as fi:
        for n, line in enumerate(fi.readlines(), 1):
            if n % 50 == 0:
                logging.info(n)
            cells = line.strip().split('\t')
            var = " ".join(cells[:5])
            var_info = Var_Info(var, bam, fasta)
            var_info.run(date_type)
            results = f'{results}{line.strip()}\t{var_info.level}\t{var_info.score}\n'
    with open(av, 'w') as fo:
        fo.write("".join(results))

def parse_avinput(av, bam, fasta, date_type):
    header = False
    outfile = av.replace('_avinput.txt', '_var_confidence.txt')
    with open(av) as fi, open(outfile, 'w') as fo:
        for line in fi.readlines():
            line = line.rstrip()
            cells = line.split('\t')
            var_info = Var_Info(
                f'{" ".join(cells[:5])}',
                bam,
                fasta
            )
            if not header:
                header = f"Chr\tStart\tEnd\tRef\tAlt\t" + "\t".join(var_info.header) + "\n"
                fo.write(header)
            var_info.run(date_type)
            key = "\t".join(cells[:5])
            fo.write(f'{key}\t{var_info.result_line}')

if __name__ == '__main__':
    parse = argparse.ArgumentParser()
    parse.add_argument('--var', default='')
    parse.add_argument('--txt', default='')
    parse.add_argument('--avinput', default='')
    parse.add_argument(
        '--Bam', 
        default='PC-231001042-DNA.sorted.bam',
    )
    parse.add_argument(
        '--Fasta', 
        default='ucsc.hg19.fasta.gz'
    )
    parse.add_argument('--dup_num', default=4, type=int)
    parse.add_argument('--test', default=False, action="store_true")
    parse.add_argument('--append', default=False, action="store_true")
    parse.add_argument('--data_type', default='target', choices=['target', 'amplicon', 'UMI'])
    parse.add_argument('--example_yml', default="")
    man_doc = lambda _:print(
        f"""
        \rpython3 {sys.argv[0]} [-h] --var VAR --Bam BAM [--Fasta FASTA]
        \r                    [--dup_num DUP_NUM] [--test]

        \r参数:
        \r\t--var              变异位点, 格式为 “Chr Start End Ref Alt” (非 vcf 的格式), 如:
        \r\t                        “1 12345 12345 C -”, “X 123456 123456 - GTG”
        \r\t--txt              变异位点 tsv 格式文件
        \r\t--avinput          变异位点 avinput 格式文件
        \r\t--Bam              bam 文件地址(同目录下需要有 bai 索引文件)
        \r\t--Fasta            fasta 文件地址(同目录下需要有 fai 索引文件)
        \r\t--dup_num          判定为简单序列区域的最小重复次数, 默认 4. (简单序列区域变异不太可靠)示例:
        \r\t                        “1 1234 1234 C -”, 如果 1234-1236 位置是连续五个 C 碱基,则判断该变异属于简单序列区域。
        \r\t--test             测试固定变异是否符合预期。
        \r\t--append           在文件内增加两列 conf_level, conf_score
        \r\t--data_type             分析的数据类型,目前可选 'target', 'amplicon', 'UMI' 分别是杂交捕获,扩增子,UMI 三种。
        \r\t--example_yml      测试用的变异文件
        """
    )
    parse.print_help=man_doc
    parse.print_usage=man_doc
    opts = parse.parse_args()
    if opts.test:
        if opts.example_yml:
            check_yml(opts.example_yml, opts.Fasta, opts.data_type)
        else:
            check_example()
    elif opts.append and opts.Bam and opts.Fasta:
        if opts.avinput:
            append_file(opts.avinput, opts.Bam, opts.Fasta, opts.data_type)
        else:
            parse.print_help()
    elif opts.txt and opts.Bam and opts.Fasta:
        parse_text(opts.txt, opts.Bam, opts.Fasta, opts.data_type)
    elif opts.avinput and opts.Bam and opts.Fasta:
        parse_avinput(opts.avinput, opts.Bam, opts.Fasta, opts.data_type)
    elif opts.var and opts.Bam and opts.Fasta:
        var_info = Var_Info(
            opts.var,
            opts.Bam,
            opts.Fasta
        )
        var_info.run(opts.data_type)
        print(f"""
        \r{'-'* 70}
        {var_info.var_Chr} {var_info.var_Start} {var_info.var_End} {var_info.var_Ref} {var_info.var_Alt}
        '{var_info.var}': [
            '{var_info.var_Type}',             # var_Type
            {var_info.AD},                     # AD
            {var_info.End_AD},                 # End_AD
            {var_info.DP},                     # DP
            {var_info.AF},                     # AF
            {var_info.Is_reads},               # Is_reads
            {var_info.XA_reads_num},           # XA_reads
            {var_info.all_XA_num},             # all_XA_num
            {var_info.conflict_pair_reads},    # conflict_pair_reads
            {var_info.ref_left_dup_num},       # ref_left_dup_num
            {var_info.ref_right_dup_num},      # ref_right_dup_num
            {var_info.complex},                # complex
            {var_info.high_MQ},                # high_MQ
            {var_info.Double_direction},       # Double_direction
            {var_info.high_BQ},                # high_BQ
            {var_info.Clean_reads},            # Clean_reads
            {var_info.Soft_num},               # Soft_num
            {var_info.Allele_var_score},       # Allele_var_score
            {var_info.Same_Start_end_score},   # Same_Start_end_score
            {var_info.Same_insert_score},      # Same_insert_score
            {var_info.Same_clip_pos},          # Same_clip_pos
            {var_info.score},                  # score
            {var_info.level},                  # level
            {var_info.result_line},            # line
        ],
        """)

    else:
        parse.print_help()


特殊说明

  • 该脚本的解析性能跟突变的深度相关,一般 1000X 以下的突变评估结果返回基本是秒级,如果扩增子深度比较高,而且一次性需要评估的突变数量很多,可能运行时间会比较长,届时建议提前进行 downsample 到适宜深度再执行该脚本。
  • 可以嵌入分析流程,协助筛选过滤一些假变异,挑选高可信的变异。在大部分情况下都可能准确的判断突变的可信度,但由于代码还未经过全面的变异检验,可能会存在未考虑到的缺陷,不要过分依赖其评估结果。
  • 该脚本主要用于帮助人工复核 DNA 突变的可信程度,在自身实际应用中进行持续的优化、更新功能,后期希望能建立数据库,以机器学习的方式替换掉内部的评分机制,以更适应复杂情况下突变评估的准确性。

标签:DNA,扩增子,self,num,record,reads,score,var,UMI
From: https://www.cnblogs.com/magower/p/18062277

相关文章

  • 易基因:妊娠期母体甲基供体摄入对IUGR猪模型回肠DNA甲基化和功能影响|项目文章
    出生体重较低的宫内生长受限(Intrauterinegrowthrestriction,IUGR)影响肠道的生长、形态和功能,导致生长性能不佳和高死亡率。最近研究表明,IUGR导致肠道中不同的DNA甲基化,可能在IUGR肠道损伤中起关键作用。母体营养可以通过表观遗传修饰(例如DNA甲基化和组蛋白乙酰化)导致后代的永久表......
  • 解决Umi项目中的Markdown文件编译错误
    在基于AntDesignPro的代码模板进行前端开发过程中,我对代码模板中的Markdown(.md)文件进行了修改后遇到了编译错误,在此记录下解决方法,如有错误,欢迎指正!1.问题描述1.1报错信息在编译Umi项目时,遇到以下错误:ERRORin./README.mdModulebuildfailed(from./node_modules/......
  • 使用Umi-OCR进行本地OCR文字识别
    在GitHub上看到了一个好用的OCR工具Umi-OCR,不仅支持批量导入图片、PDF文档识别而且开源免费,还能够在本地离线运行。可以说是为某些不适宜导入在线OCR应用的文档以及大体量本地OCR需求量身定做的软件。(顿时感觉自己的白描白买了)软件在线提供两种版本下载,区别在于一种内置的识别引......
  • umijs 项目配置问题汇总
    umi配置问题汇总umi或@umijs/max集成tailwindcss正常umi内置了tailwindcss插件,详情可参考官方文档TailwindCSS配置生成器但是由于内置的tailwindcss插件过老,umi官方已不推荐使用内置,建议使用tailwindcss官方的配置。详情可见issue同时,umi官方也不推荐使用p......
  • 【译】Lumiere,谷歌令人惊叹的视频突破
    原作:伊格纳西奥·德格雷戈里奥引言:文本到视频的新境界著名商学教授斯科特·加洛韦(ScottGalloway)打赌,2024年将是谷歌的人工智能年。现在看起来似乎正在成为现实。今天,谷歌推出了Lumiere,这是文本到视频领域的巨大突破,是当今生成人工智能中最艰巨的任务之一。而且就其......
  • CF741E Arpa’s abnormal DNA and Mehrdad’s deep interest
    我永远喜欢数据结构。感觉\(\color{maroon}*3400\)虚高,但是第一眼不会做/ng。太菜了。CF洛谷给出两个字符串\(s,t\),记\(r_i\)表示在\(s_i\)和\(s_{i+1}\)插入\(t\)得到的字符串。若\(i=0\)表示在开头插入,若\(i=|s|\)表示在结尾插入。形式化的,\(r_i=\ov......
  • luminance
    -------------------------------//ApproximatesthebrightnessofaRGBvalue.floatluminance(vec3color){returndot(lum,color);}---------------------------intsize=width*height;std::vector<GLfloat>texData(size*3);glActiveTexture(GL_......
  • 易基因:RNA m6A修饰促进玉米籽粒发育过程中的DNA甲基化|互作研究
    大家好,这里是专注表观组学十余年,领跑多组学科研服务的易基因。mRNA中的N6-甲基腺苷(m6A)和DNA中的5-甲基胞嘧啶(5mC)对于调控基因表达和调节植物生长发育具有关键功能。然而,m6A和5mC之间的互作是一个难以捉摸的领域,在植物中的机制尚不清楚。2023年11月23日,中国农业大学贺岩教授课题......
  • DNA mRNA结构
    DNA结构 及其与mRNA的关系  Pre-mRNA到成熟mRNAProteinCodingRegion=CDS    ......
  • 易基因:cfDNA甲基化在器官和组织损伤检测中的强大力量
    大家好,这里是专注表观组学十余年,领跑多组学科研服务的易基因。检测器官和组织损伤对于早期诊断、治疗决策和监测疾病进展至关重要。由于DNA甲基化模式可以响应组织损伤而改变,甲基化检测提供了一种有前途的方法,在早筛早诊、疾病进展监测、治疗效果和器官移植评估等可行性方面具有......