首页 > 其他分享 >Nextflow系列 入门

Nextflow系列 入门

时间:2022-11-04 22:45:15浏览次数:75  
标签:系列 入门 process nextflow Nextflow data Channel 定义

一、Nextflow

1、Nextflow 介绍

Nextflow是西班牙巴塞罗那的生物医学和基因组学研究中心CRG开发的开源workflow引擎。是基于Groovy语言的一种工作流框架,能够大大简化复杂计算流程的编排工作,支持部署在本地计算机、集群、云端,同时也支持与Conda、Docker、Singularity等结合使用将流程在不同的平台之间进行迁移。

Nextflow 支持几乎所有的批处理调度程序(集群管理系统),包括:

  • Sun Grid Engine (SGE)
  • Open grid engine
  • Univa grid engine
  • Platform LSF
  • Linux SLURM
  • PBS Works
  • Torque
  • HTCondor

例子:如果想在SGE集群中执行搭建的流程,我们只需在流程代码中加入executor关键字参数便能轻松的指定集群类型。

如果要在集群上运行流程,在项目目录下nextflow.config 配置文件,类似这样:

process {
  executor = 'sge'

queue='<your execution queue>'
  penv = 'smp'
  clusterOptions = { "-V -S /bin/bash " }
  }

在本地运行,则:

executor {
  cpus = 10 /* max cpus on local machine */
}

云平台支持:
目前Nextflow支持的云平台如下:

  • AWS Batch
  • Azure Batch
  • Google Cloud Life Sciences
  • Kubernetes

容器支持:
Nextflow 支持Docker和Singularity容器,容器间的切换也可以直接通过在 process 中指定关键字container即可:

process samtools {
 container 'biocontainers/samtools:1.3.1'

 """
 samtools --version
 """
}

插个题外话:

DolphinNext: A graphical user interface for creating Nextflow pipelines

DolphinNext能够极大限制的简化Nextflow流程的构建过程。

DolphinNext提供:

  1. 拖放式用户界面,该界面抽象化管道并允许用户创建管道,而无需熟悉底层编程语言。
  2. 监视管道执行的用户界面,允许在中间步骤重新初始化管道。
  3. 具有版本跟踪的可复制管道和可以独立运行的独立版本。
  4. 无缝移植到分布式计算环境,例如高性能集群或云计算环境。

2、Nextflow 安装

nextflow基于JAVA, 安装有两种方式:

方式1:
curl -s https://get.nextflow.io | bash

// 这将在当前目录中创建 nextflow 主可执行文件。

方式2,使用conda进行安装:
conda install -c bioconda nextflow

安装之后还需要用 nexflow run hello 测试是否安装成功。

备注,Nextflow默认的配置文件及相关文件默认放在 $HOME/.nextflow

可以使用以下命令来更新到最新版:

nextflow self-update

Nextflow的一些使用坑(陷阱):

  1. work目录内某些文件权限为000,甚至为root所有,流程出错

    解决方案:首先看好work目录及其所在上级目录的权限(一般不是这个问题);不要使用root运行nextflow;尽量使用singularity而不是docker去解决环境问题。

  2. 某个process超时导致的流程失败

    解决方案:先排查确认不是使用上的问题(文件输入错误或者参数设置得不合理等等)。如果不是,证明是计算量太大或者算力不足导致的。流程内是有可能规定单个process执行的时间[2]的。

  3. 我明明设置可以使用很多CPU,但是只用了X个?

    解决方案:很多流程出于某些目的,可能在自身配置中就限制了最大CPU个数[3]。

最后推荐一组singularity的配置:

singularity {
  enabled = true
  autoMounts = true
  pullTimeout = 1.h
}

可以把这些内容另存为文件 nextflow.config,放在工作目录下,再运行nextflow的时候就会自动加载这些配置了。

3、Nextflow Tower 监控流程程序运行状态

Nextflow Tower 注册链接:https://tower.nf/

4、Nextflow的语法与使用

nextflow 官方文档:https://www.nextflow.io/docs/latest/

nextflow使用总结来说,用户需要设置参数,并将文件以参数或Channel的形式输入到Process中处理,在不同的Process中以Channel相互连接,进行文件的输入输出。

参考:

1. Nextflow scripting

参考见:https://www.nextflow.io/docs/latest/script.html

(1)隐式变量

以下变量在脚本全局执行范围内隐式定义:

  • baseDir : 基本目录。主工作流脚本所在的目录。(弃用,从20.04.0开始推荐使用projectDir)
  • launchDir:启动目录。运行工作流的目录(需要 20.04.0 或更高版本)。
  • moduleDir:模块目录。对于 DSL2 模块,模块脚本所在的目录或与非模块脚本的 projectDir 相同(需要版本 20.04.0 或更高版本)。
  • nextflow:下一个流。表示 nextflow 运行时信息的类似字典对象。
  • params:参数。类似字典的对象用于保存在配置文件中指定的工作流参数或作为命令行选项。
  • projectDir:项目目录。主脚本所在目录(需要20.04.0或更高版本)。
  • workDir:工作目录。创建的任务临时文件的目录。
  • workflow:工作流程。表示工作流运行时信息的类似字典对象。

Nextflow 配置文件(nextflow.config)中隐式定义了以下变量:

  • baseDir : 基本目录。主工作流脚本所在的目录。(弃用,从20.04.0开始推荐使用projectDir)
  • launchDir:启动目录。运行工作流的目录(需要 20.04.0 或更高版本)。
  • projectDir:项目目录。主脚本所在目录(需要20.04.0或更高版本)。

2. Processes 和 workflow

示例:

#!/usr/bin/env nextflow

nextflow.enable.dsl=2

process foo {
    output:
      path 'foo.txt'

    script:
      """
      echo 'Hello World' > foo.txt
      """
}

process bar {
    input:
      path x

    output:
      path 'bar.txt'

    script:
      """
      cat $x > bar.txt
      """
}

workflow {
    data = foo()
    bar(data)
}

代码说明:
nextflow.enable.dsl=2 表示我们将使用DSL的扩展版本2,推荐小伙伴们使用DSL 2。

nextflow 代码分为2部分,process 和 workflow。

(1) process

process 构成流程的基本单元,可以将process理解为流程图中的节点,而channel理解为流程图中的箭头。process可以包括 directives, input, output, when clause, script 等五部分,语法如下:

process < name > {

   [ directives ]

   input:
    <input qualifier> <input name> [from <source channel>] [attributes]

   output:
    < process outputs >

   when:
    < condition >

   [script|shell|exec]:
   < user script to be executed >

}
  • input 定义了process从哪个channel接收数据。①只能包含1个输入块,输入块中可以包含1或多个输入声明。②输入限定符(qualifier)声明了待接收数据的类型,包括val, file, path, env, stdin, tuple, each等。③直接影响产生的任务数量,process会逐项消耗channel中的数据,并依次产生新任务,直到数据被完全消耗。Nextflow会为每个任务创建一个运行目录,存放相关的文件、日志、输出等。

  • output 定义了哪些process结果数据传输到哪个channel。①只能包含1个输出块,输出块中可以包含1或多个输出声明。②输出限定符包括val, file, path, env, stdout, tuple等。

  • script 以字符串的形式定义了需要执行的命令。①只能包含1个脚本块,脚本块中可以包含1或多行字符串。②字符串可以通过使用单或双引号定义,而多行字符串是由三个单或双引号定义。除了直接定义,还可以通过关键字script,shell,exec实现更加灵活的定义。③脚本块作为bash脚本运行。

  • directives(指令) 影响process执行的可选设置,如输出相关publishDir;调度和资源相关executor,queue, cpus, memory, time;异常重试相关errorStrategy, maxErrors, maxRetries等。例如:

tag "${prefix}"
cpus 8
publishDir params.outdir, mode: 'copy'

说明:

  • tag:给每一个过程执行命名,方便在执行日志中查看
  • cpus:此过程运行时的CPU数量
  • publishDir:结果发布路径,运行完成后将最终的结果(由output定义)拷贝('copy')到该路径

(2)script 块

process 中 script 块以字符串的形式定义了需要执行的命令。script字符串默认解析为Bash脚本,以 Bash 脚本运行在主机环境。

注意,由于 Nextflow 对字符串中的变量替换使用相同的 Bash 语法(即 $name 这样形式),因此在字符串内插值时要评估是 Nextflow 变量还是 Bash 变量。

当您需要访问脚本中的系统环境变量时,您有两种选择(方式)。

如果您不需要访问任何 Nextflow 变量,则可以使用单引号定义脚本块:

process printPath {
  '''
  echo The path is: $PATH
  '''
}

否则,您可以使用双引号定义脚本并用反斜杠\字符作为前缀转义系统环境变量,例如:

// 下面例子,$DB是一个 Bash 环境变量,而 Bash 会在执行过程中将其替换为实际值
process doOtherThings {
  """
  blastp -db \$DB -query query.fa -outfmt 6 > blast_result
  cat blast_result | head -n $MAX | cut -f 2 > top_hits
  blastdbcmd -db \$DB -entry_batch top_hits > sequences
  """
}

shell 块是一个字符串表达式,它定义了process执行的脚本。它是 script 定义的替代方案,但有一个重要区别:它使用感叹号 !字符,而不是通常的美元 $ 字符,来表示 Nextflow 变量。

// 在下面的示例中,$USER 被视为 Bash 变量,而 !{str} 被视为 Nextflow 变量。
process myTask {
    input:
    val str

    shell:
    '''
    echo "User $USER says !{str}"
    '''
}

workflow {
    Channel.of('Hello', 'Hola', 'Bonjour') | myTask
}

注意,shell 脚本定义需要使用单引号 ' 分隔的字符串。当使用双引号 " 分隔的字符串时,$变量像往常一样被解释为 Nextflow 变量。以 ! 为前缀的变量必须始终用大括号括起来,即 !{str} 是有效变量,而 !str 被忽略。

(3)input

input块允许您定义process的输入通道,类似于函数参数。 一个进程最多可以有一个输入块,并且它必须包含至少一个输入。
input 语法:

input:
  <input qualifier> <input name>

输入定义由 限定符 和 名称 组成。 输入限定符定义要接收的数据的类型。

输入限定符包括:

  • val 在流程脚本中按名称访问输入值。
  • file (已弃用)将输入值作为文件处理,并在执行上下文中正确暂存。
  • path 将输入值作为路径处理,在执行上下文中正确暂存文件。
  • env 使用输入值在流程脚本中设置环境变量。
  • stdin 将输入值转发给进程 stdin特殊文件。
  • tuple 处理具有上述任何限定符的一组输入值。
  • each 对输入集合中的每个元素执行该过程。

(4)output

output块允许您定义流程的输出通道,类似于函数输出。 一个进程最多可以有一个输出块,并且它必须至少包含一个输出。

输出块遵循如下所示的语法:

output:
  <output qualifier> <output name> [, <option>: <option value>]

输出定义由 限定符 和 名称 组成。 还可以指定一些可选属性。
调用流程时,每个流程输出都作为通道返回。

输出限定符包括:

  • val 输出具有指定名称的变量。
  • file (已弃用)输出由具有指定名称的进程生成的文件。
  • path 输出由具有指定名称的进程生成的文件。
  • env 以指定的名称输出进程环境中定义的变量。
  • stdout 输出 stdout的执行过程。
  • tuple 输出多个值。

(5)when 块

when块允许您定义一个必须满足的条件才能执行该过程。 条件可以是任何返回布尔值的表达式。

process find {
  input:
  path proteins
  val dbtype

  when:
  proteins.name =~ /^BB11.*/ && dbtype == 'nr'

  script:
  """
  blastp -query $proteins -db nr
  """
}

(6)Directives

Directives 是影响当前进程执行的可选设置。 它们必须在 process 主体的顶部输入,在任何其他声明块之前( input, output, etc), 并具有以下语法:

name value [, value2 [,..]]

常用的指令有:

  • accelerator:accelerator指令允许您指定任务执行的硬件加速器要求,例如GPU 处理器。例如:accelerator 4, type: 'nvidia-tesla-k80'
  • beforeScript:beforeScript 指令允许您在运行主进程脚本之前执行自定义(Bash)片段。这对于初始化底层集群环境或其他自定义初始化可能很有用。例如:beforeScript 'source /cluster/bin/setup'
  • afterScript:afterScript 指令允许您在主进程运行后立即执行自定义(Bash)片段。这可能有助于清理暂存区。
  • cache:cache指令允许您将处理结果存储到本地缓存。当启用缓存并使用 resume 选项启动管道时,任何后续执行进程的尝试以及相同的输入都将导致进程执行被跳过,将存储的数据作为实际结果生成。 默认情况下启用缓存,您可以通过将缓存指令设置为 false 来为特定进程禁用它。 例如:cache false
  • executor:executor 定义了执行进程的底层系统。默认情况下,进程使用在 nextflow.config 文件中全局定义的执行器。当指定 executor 时,覆盖默认配置,process执行在指定的底层系统。常用的值:local、sge、condor、lsf、k8s 等。
  • queue:queue 设置调度作业的队列。
  • memory:memory 指令允许您定义允许进程使用多少内存。支持单位有 B、KB、MB、GB、TB。
  • cpus:cpus 指令允许您定义进程任务所需的(逻辑)CPU 数量。
  • time:time 指令允许您定义允许进程运行多长时间。单位:ms、s、m、h、d
  • penv:penv 指令允许您定义在向 SGE 资源管理器提交并行任务时要使用的并行环境。
  • label:label 指令给process起一个 label 名称,方便管理。备注,label必须由字母数字字符或 _ 组成,必须以字母字符开头并且必须以字母数字字符结尾。

更多指令说明,见:https://www.nextflow.io/docs/latest/process.html#directives

示例:

process grid_job {
    queue 'short,long,cn-el6'
    executor 'sge'
    cpus 8
    memory '4 GB'
    time '1h'
    label 'sge_job'

    """
    your task script here
    """
}

(7) workflow

workflow 中定义流程该如何执行,你可能会在脚本中写入N多个不同的process,但是只有在workflow中调用的process才会被执行。

例如,上述的例子:

workflow {
    data = foo()
    bar(data)
}

然后,程序执行,输入下面的命令:

   nextflow run example.nf

运行完成后,会在程序所在的执行目录中生成一个work目录,并且会存在两个文件夹,文件夹中就保存了程序执行过程中生成的foo.txt和bar.txt文件。

(8)Pipeline parameters

管道参数可以在命令行通过 --paramName 指定。其值会通过在变量名前面加上前缀 params 来简单地声明,用 . 点字符分隔。
例如:

nextflow run tutorial.nf --str 'Bonjour le monde'

而 --str 参数在Nextflow代码里是这样调用,例如:

params.str = 'Hello world!'

process splitLetters {
  output:
    path 'chunk_*'

  """
  printf '${params.str}' | split -b 6 - chunk_
  """
}

2. Nextflow中的Channel类型

1. Channel 管道

Channel是nextflow中process之间进行数据传递的通道。
Channel主要有两个属性:

  • 采用异步操作的方式进行数据传递(发送数据);
  • 数据接收采用block的形式,是一个阻塞操作,直到数据流信息传递过来之后才会“转交”给接收的process;

channel 分为2种:Queue channel、Value channel

例如:

ch_reads = Channel
    .fromFilePairs(params.read_path + '/**{1,2}.f*q*', flat: true)

ch_reads.view()  // .view()函数会将channel元素打印在标准输出
  • fromFilePair专门为fastq打造,可以直接将不同的样本以列表形式分组

  • '/**{1,2}.f*q*'定义了文件匹配方式,**表示递归地检索文件,{1,2}.f*q*跟bash本身的文件匹配一致,这里会匹配结尾为1.fastq.gz,2.fastq,1.fq,2.fq.gz的文件

  • flat: ture使返回的双端序列和匹配ID存储为一个列表中。下面是一个例子:

    $ 1s/data/reads/
    
    SRR1950772_1.fastq.gz SRR1950772_2.fastq.gz SRR1950773_1.fastq.gz
    
    SRR1950773_2.fastq. gz
    
    $./main.nf--read_path/data/reads/ NEXTFLOW~version 19.09.0-edge
    
    Launching `./main.nf` [elegant_volta] - revision: bb88634790
    
    WARN:DSL 2 IS AN EXPERIMENTAL FEATURE UNDER DEVELOPMENT -- SYNTAX MAY CHANGE IN FUTURE RELEASE
    
    [SRR1950773,/data/reads/SRR1950773_1.fastq.gz,/data/reads/SRR1950773_2.fastq.gz] [SRR1950772,/data/reads/SRR1950772_1.fastq.gz,/data/reads/SRR1950772_2.fastq.gz] 
    

Channel方法介绍(用于显式创建Channel):

(1)of 方法

of 方法可以创建任何类型的数据

ch = Channel.of( 1, 3, 5, 7 )
ch.subscribe { println "value: $it" }

(2)fromList 方法

fromList 用于产生一个含有元素的列表的管道。

示例:

Channel
    .fromList( ['a', 'b', 'c', 'd'] )
    .view { "value: $it" }

(3)fromPath 方法

fromPath 方法可以将路径字符串指定为参数来创建一个产生一个或多个文件路径的通道,通道返回的是List。例如:

myFileChannel = Channel.fromPath( '/data/some/bigfile.txt' )

或者:

myFileChannel = Channel.fromPath( '/data/big/*.txt' )

备注,fromPath 不会检查文件是否存在。

(4)fromFilePairs方法

fromFilePairs 方法创建一个通道,该通道产生与用户提供的 glob 模式匹配的文件对。匹配文件作为元组发出,其中第一个元素是匹配对的分组键,第二个元素是文件列表(按字典顺序排序)。例如:

Channel
    .fromFilePairs('/my/data/SRR*_{1,2}.fastq')
    .view()

它将产生类似于以下的输出:

[SRR493366, [/my/data/SRR493366_1.fastq, /my/data/SRR493366_2.fastq]]
[SRR493367, [/my/data/SRR493367_1.fastq, /my/data/SRR493367_2.fastq]]

备注,glob 模式必须至少包含一个 * 通配符。

(5)value 方法

value 方法用于产生value值的channel。例如:

expl1 = Channel.value()
expl2 = Channel.value( 'Hello there' )
expl3 = Channel.value( [1,2,3,4,5] )

(6)watchPath 方法

watchPath 方法监测文件夹中与指定模式匹配的一个或多个文件。只要有文件满足指定条件,就会通过 watchPath 方法返回的通道产生新的元素。默认情况下,它只监视在指定文件夹中创建的新文件。

可选地,可以提供第二个参数来指定要观看的事件。支持的事件有:

  • create A new file is created (default)
  • modify A file is modified
  • delete A file is deleted

例如:

Channel
    .watchPath( '/path/*.fa', 'create,modify' )
    .subscribe { println "File created or modified: $it" }

channel 官方文档说明:https://www.nextflow.io/docs/latest/channel.html

2. Channel operators

operators 说明:https://www.nextflow.io/docs/latest/operator.html

3. 命令的修改并恢复

Nextflow跟踪管道中执行的所有进程。如果修改脚本的某些部分,则只有实际更改的进程才会重新执行。将跳过未更改的进程的执行,而是使用已经缓存的结果。

这在我们测试和修改时十分有用,不必从头重新执行程序。

重新运行时,如果加上 -resume 参数,那么就可以使用缓存。

备注,管道结果默认缓存在 $PWD/work 目录中。根据您的脚本,此文件夹可能会占用大量磁盘空间。只要您知道不需要恢复任何管道运行,定期清理此文件夹是个好主意。

4. 配置文件 Configuration file

启动管道脚本时,Nextflow 会在多个位置查找配置文件。由于每个配置文件都可能包含冲突的设置,因此对源进行排名以决定应用哪些设置。下面报告了所有可能的配置源,按优先级顺序列出(由高到低):

  • 在命令行通过 --something value 选项指定的参数;
  • 通过 -params-file 选项指定的参数;
  • 通过 -c my_config 选项指定的config文件;
  • 当前目录下名为 nextflow.config 的配置文件;
  • 工作流项目目录中名为 nextflow.config 的配置文件;
  • 在HOME目录下的 $HOME/.nextflow/config 的配置文件;
  • 在管道脚本本身中定义的值(例如 main.nf);

配置文件可以包含一个或多个profiles的定义。profiles 是一组配置属性,可以在管道执行时使用 -profile 命令行选项启动激活/选择。

示例:

profiles {

    standard {
        process.executor = 'local'
    }

    cluster {
        process.executor = 'sge'
        process.queue = 'long'
        process.memory = '10GB'
    }

    cloud {
        process.executor = 'cirrus'
        process.container = 'cbcrg/imagex'
        docker.enabled = true
    }

}

可以通过用逗号分隔配置文件名称来指定多个配置配置文件,例如:

nextflow run <your script> -profile standard,cloud

config file 设置,更多见:https://www.nextflow.io/docs/latest/config.html

标签:系列,入门,process,nextflow,Nextflow,data,Channel,定义
From: https://www.cnblogs.com/xiezh/p/16859336.html

相关文章