首页 > 其他分享 >DNA突变合并

DNA突变合并

时间:2024-12-05 16:44:51浏览次数:7  
标签:DNA seq self 合并 len Start End var 突变

前言

工作中一直接触 DNA 突变,突变检出软件众多,包括 GATK、pisces、vardict、strelka 等,但经常会出现两个、甚至多个突变需要合并起来综合判断其功能等影响的情况;因此,希望能有一个便捷的工具对突变进行合并,而不需要人工从 igv 上进行去判读。

实现逻辑

  1. 将需要合并的突变进行标准化(左对齐)
  2. 按突变位置进行排序
  3. 根据突变位置顺序,依次整理合并出后的结果

输入输出

  • 输入需要参考基因组 fa 以对突变进行左对齐和确定合并后的突变 ref,alt 碱基序列
  • 合并完成后 Mergr 对象会多一个 result_var 属性,其是 Var 的实例化对象

完整代码

点击查看代码
import re
import pysam
from itertools import combinations


class Merge:
    """ 合并多个突变 """
    def __init__(self, fasta):
        self.fasta = fasta
        self.Fa = pysam.FastaFile(fasta)
        self.vars = []

    def 增加突变(self, in_var):
        """ 添加需要合并的单个突变,重复调用添加多个突变  """
        self.vars.append(Var(in_var, self.fasta))

    def _check_chr(self):
        """ 核对所有的突变是否在同一条染色体上 """
        Chrs = set(var.Chr for var in self.vars)
        if len(Chrs) > 1:
            msg = f'染色体不一致 {Chrs}'
            raise ValueError(msg)

    def _check_pos(self):
        """ 核对突变位置是否有交集 """
        for a, b in combinations(self.vars, 2):
            if a.with_same_pos(b):
                msg = f'两个突变存在共同位置,不能合并. {a} {b}'
                raise ValueError(msg)

    def _sort_var_by_pos(self):
        """ 根据位置顺序,排序变异 """
        # 插入和点图标的话,插入突变要在其后端
        pos_var = {}
        for var in self.vars:
            if var.Ref == '-':
                pos_var[f'{var.Start}_2ins'] = var
            else:
                pos_var[f'{var.Start}_1other'] = var 
        new_var_list = []
        for k in sorted([key for key in pos_var.keys()]):
            # print(k, pos_var[k])
            new_var_list.append(pos_var[k])
        self.vars = new_var_list

    def 合并(self) -> Var:
        """ 将输入的突变合并成新的复杂合并 """
        self._check_chr()
        self._check_pos()
        self._sort_var_by_pos()
        start_pos = self.vars[0].Start + 1 if self.vars[0].Ref == '-' else self.vars[0].Start
        # print(start_pos)
        ref_seq = self.Fa.fetch(region=f'{self.vars[0].Chr}:{start_pos}-{self.vars[-1].End}')
        base_dict = {p:b for p,b in zip(range(start_pos, self.vars[-1].End+1), ref_seq)}
        # print(base_dict)
        last_end = 0
        alt_seq = ''
        for n, var in enumerate(self.vars):
            if n > 0:
                end = var.Start + 1 if var.Ref == '-' else var.Start
                alt_seq += "".join(base_dict[i] for i in range(last_end+1, end))
            # print(var)
            alt_seq += var.Alt
            last_end = var.End
        # 去除缺失符号 -
        alt_seq = alt_seq.replace('-', '')
        if len(ref_seq) > len(alt_seq):
            loop = range(len(alt_seq))
        else:
            loop = range(len(ref_seq))
        # print(ref_seq)
        # print(alt_seq)
        # End 移动
        move = 0
        for i in loop:
            if ref_seq[::-1][i] != alt_seq[::-1][i]:
                break
            move += 1
        if move:
            ref_seq = ref_seq[:-move] if ref_seq[:-move] else '-'
            alt_seq = alt_seq[:-move] if alt_seq[:-move] else '-'
        End = self.vars[-1].End - move
        # Start 移动
        move = 0
        for i in loop:
            if ref_seq[i] != alt_seq[i]:
                break
            move += 1
        ref_seq = ref_seq[move:] if ref_seq[move:] else '-'
        alt_seq = alt_seq[move:] if alt_seq[move:] else '-'
        if ref_seq == '-':
            Start = start_pos+move - 1
            End = End + 1
        else:
            Start = start_pos+move
            End = End
        # print(f'{Start} {End} {ref_seq} {alt_seq}')
        o_var = f'{self.vars[0].Chr} {Start} {End} {ref_seq if len(ref_seq) < 10 else (ref_seq[:10] + "...")} {alt_seq if len(alt_seq)<10 else (alt_seq[:10] + "...")}'
        print(f'合并后的突变: [{o_var}]')
        self.result_var = Var(f'{self.vars[0].Chr} {Start} {End} {ref_seq} {alt_seq}', self.fasta)

    def __str__(self):
        return str(self.result_var)


class Var:
    """ 突变基础类,记录突变的 染色体、起始位置、终止位置、ref序列和突变序列 
    并将其进行左对齐,以进行比较
    """
    def __init__(self, var, fasta):
        self.in_var = var
        self.fasta = pysam.FastaFile(fasta)
        self.Chr, self.Start, self.End, self.Ref, self.Alt, self.Type = self.格式化(var)
        self.左对齐()

    def 格式化(self, var):
        """ # 处理 vcf 格式的插入缺失, 如: T/TC  TC/T """
        var = re.sub('[ \:\t_]+', ' ', var.strip())
        m = re.findall(r'(\d *?\- *?\d)', var)
        if m:
            var = var.replace(m[0], m[0].replace('-', ' '))
        var = re.sub(r' +', ' ', var)
        Chr, Start, End, Ref, Alt = var.strip().split(' ')
        Ref = Ref.upper()
        Alt = Alt.upper()
        Start = int(Start)
        End = int(End)
        if Ref.startswith(Alt):
            Ref = Ref[len(Alt):]
            Alt = '-'
            Start += len(Alt)
        elif Alt.startswith(Ref):
            Alt = Alt[len(Ref):]
            Ref = '-'
        elif Ref == '-':
            if Start == End - 1:
                End -= 1
        Chr = Chr.upper()
        if Chr.startswith('CHR'):
            Chr = Chr.replace('CHR', 'chr')
        else:
            Chr = f'chr{Chr}'
        if Ref == '' or Alt == '' or (Ref == '-' and Alt == '-'):
            raise ValueError(f'突变的碱基写法错误 {self.in_var}')
        if any([c not in 'ATCG-' for c in Ref+Alt]):
            raise ValueError(f'突变的碱基字母错误 {self.in_var}')
        if not any([f'chr{c}' == Chr for c in list(range(1,23)) + ['X', 'Y']]):
            raise ValueError(f'染色体有误 {Chr}')
        Type = self.突变类型判断(Ref, Alt)
        return Chr, Start, End, Ref, Alt, Type

    @staticmethod
    def 最小重复碱基(base: str):
        """ 找到输入碱基的最小重复区域,用于确定串联重复的最小单位 """
        for i in range(len(base)):
            if base.count(base[0:i+1]) == len(base) / (i + 1):
                return base[0:i+1]
        else:
            return base

    @staticmethod
    def 突变类型判断(Ref, Alt):
        """ 根据输入位置判断突变的类型 """
        if Ref == '-':
            return 'INS'
        elif Alt == '-':
            return 'DEL'
        elif len(Ref) > 1:
            return 'DELINS'
        else:
            return 'SNV'

    def 左对齐(self):
        """ DEL、INS 左对齐,注意碱基变化 """
        if self.Type in ['SNV', 'DELINS']:
            return
        var_dup_base = self.最小重复碱基(self.Alt) if self.Type == 'INS' else self.最小重复碱基(self.Ref)
        local_start = self.Start if self.Type == 'INS' else self.Start - 1
        ref_left_seq: str = self.fasta.fetch(
            region=f'{self.Chr}:{local_start-500}-{local_start}'
        ).upper()[::-1]
        dup_len = len(var_dup_base)
        反向重复碱基 = var_dup_base[::-1]
        for count in range(int(500//len(var_dup_base))):
            if ref_left_seq[count*dup_len:(count+1)*dup_len] != 反向重复碱基:
                break
        # 先对重复区域左移
        if count:
            move_len = count * dup_len
            self.Start = self.Start - move_len
            self.End = self.End - move_len
        # 相同碱基的左对齐
        if self.Type == 'DEL':
            # print(self.Start)
            left_seq: str = self.fasta.fetch(
                region=f'{self.Chr}:{self.Start-150}-{self.Start-1}'
            ).upper()[::-1]
            target_seq = self.Ref[::-1]
        elif self.Type == 'INS':
            left_seq: str = self.fasta.fetch(
                region=f'{self.Chr}:{self.Start-150}-{self.Start}'
            ).upper()[::-1]
            target_seq = self.Alt[::-1]
        mv_len = 0
        for i, base in enumerate(target_seq):
            if base == left_seq[i]:
                mv_len += 1
                continue
            break
        # 最终位置和碱基调整
        if mv_len:
            if self.Type == 'DEL':
                self.Start = self.Start - mv_len
                self.End = self.End - mv_len
                self.Ref = self.fasta.fetch(region=f'{self.Chr}:{self.Start}-{self.End}').upper()
            elif self.Type == 'INS':
                self.Start = self.Start - mv_len
                self.End = self.End - mv_len
                self.Alt = f'{left_seq[:mv_len][::-1]}{self.Alt[:-mv_len]}'

    def with_same_pos(self, in_var):
        """ 判断两个突变位置是否有交集 """
        # 插入突变不消耗位置
        if in_var.Ref == '-' or self.Ref == '-':
            return False
        return self.Start <= in_var.Start <= self.End or \
            self.Start <= in_var.End <= self.End or \
            (in_var.Start <= self.Start and \
                in_var.End >= self.End)

    def __str__(self):
        return f'[{self.Chr} {self.Start} {self.End} {self.Ref} {self.Alt}]'

    def tab_str(self):
        return f'{self.Chr}\t{self.Start}\t{self.End}\t{self.Ref}\t{self.Alt}'


使用示例

点击查看代码
import Merge
fasta = 'genome.fa'
m = Merge(fasta)
# 三个突变合并到一起
m.增加突变('chr6 117632269 117632270 - ACT')
m.增加突变('chr6 117632269 117632269 A T')
m.增加突变('chr6 117632270 117632270 G A')
m.合并()

# 运行后提示为 “合并后的突变: [chr6 117632269 117632270 AG TACTA]”
# 也可使用 print(m.result_var) 打印其内容

拓展

可结合先前开发的可信度评估工具,对检出的突变自动判断是否需要合并,然后自动判读合并的突变是否可信,再将可信度突变增加到检出结果内,随后续流程一并注释: https://www.cnblogs.com/magower/p/18062277

标签:DNA,seq,self,合并,len,Start,End,var,突变
From: https://www.cnblogs.com/magower/p/18588847

相关文章

  • SQL多行数据合并到一行中的一个字段
    在SQL中,将多行数据转换为一行数据通常涉及到使用聚合函数和字符串函数。这种转换在数据库中被称为“行转列”或“透视”操作。以下是一些常用的方法来实现多行转一行:1.使用GROUP_CONCAT()(MySQL)在MySQL中,可以使用GROUP_CONCAT()函数将多行数据合并为一行,以逗号或其他分隔......
  • 京东合并职级后的职级标准预测
    京东把原先的M(管理岗)、P(项目管理岗)、T(产品和技术岗)、S(服务岗,比如分拣员,配送员)序列合并为新的专业主序列P。京东调整职级序列体系这个调整后的职级能力矩阵会是如何的?不同专业的职级咋能合并呢?比较好奇,多轮询问O1-preview后,给出下面这个职级矩阵表:下面是这个职级能力矩阵的说......
  • 京东合并职级后的职级标准预测
    京东把原先的M(管理岗)、P(项目管理岗)、T(产品和技术岗)、S(服务岗,比如分拣员,配送员)序列合并为新的专业主序列P。京东调整职级序列体系这个调整后的职级能力矩阵会是如何的?不同专业的职级咋能合并呢?比较好奇,多轮询问O1-preview后,给出下面这个职级矩阵表:下面是这个职级能力矩阵的说......
  • Element-Plus表格:Table自定义合并行数据的最佳实践
    Element-Plus表格:Table自定义合并行数据的最佳实践“知行合一”——王阳明在开发项目中,我们时常会用到表格,许多需求可能会要求自定义特定的行或列。 接下来,我们将探讨在实际开发中如何应对这一挑战。本文案例采用的技术:名称版本Vue3^3.5.12element-plus^2.8.8......
  • 地块打散与合并
    一、地块打散(分割)如果要打散地块(比如将一个大的土地利用多边形分割成多个小多边形),在ArcGIS的“编辑”工具条处于开启状态下,选中要打散的地块多边形要素。然后使用“高级编辑”工具条(如果没有,需要通过“自定义-工具条-高级编辑”开启)中的“分割”工具。可以通......
  • XMES合并主分支的操作
    一、IDEA1、先切换到远程的main分支注意:如果切换的时候,提示本地有未提交的,你可以先暂存提交(gitstash) 2、执行gitpull。拉取最新的main分支代码。 3、选择develop分支的提交,优选合并到main分支 4、推送gitpush,将本地main分支代码,推送到远程 5、直接将本地代码打......
  • 网站视频如何下载,如何在最短的时间内下载m3u8文件中的ts文件,并将其合并成MP4格式的文
    在现实生活中,我是一个二次元爱好者,每当番剧更新时我总会第一时间去观看,但无论是哪个平台,加载视频总是很缓慢,如果你的网络并不是特别好,有时甚至看几秒就卡顿,非常影响观影体验。但有些网站并不支持离线下载,这时候你想要下载视频应该怎么办呢?下面推荐一个解析和下载视频的插件——万......
  • 华为机试HJ80 整型数组合并
    首先看一下题描述题目标题:将两个整型数组按照升序合并,并且过滤掉重复数组元素。输出时相邻两数之间没有空格。输入描述:输入说明,按下列顺序输入:1 输入第一个数组的个数2 输入第一个数组的数值3 输入第二个数组的个数4 输入第二个数组的数值输出描述:输出合并之......
  • 代码随想录算法训练营第三十一天|leetcode56. 合并区间、leetcode738.单调递增的数字
    1leetcode56.合并区间题目链接:56.合并区间-力扣(LeetCode)文章链接:代码随想录视频链接:贪心算法,合并区间有细节!LeetCode:56.合并区间哔哩哔哩bilibili思路:其实很清楚,跟之前的方法差不多,但是自己写的时候就是有地方不会了,会不知道接下来的思路是什么1.1视频后的思路卡壳......
  • idea的如何git代码进行合并
    案例,将dev-wsg的部分代码提交,合并到develop中 1、先使用命令,将分支切换到develop分支gitcheckoutdevelop 2、在idea底部,选中这个标签页 3、选中要合并的提交,选择优选。这样本地的代码,会合并到本地develop分支中 4、优选后,如果分支右侧显示还要推送,则要进行推送......