2 Nextflow

More complex scripts

Now, we will introduce a more complex script, where we the channel produced as output from a process feeds another process in the workflow definition.

Here we have two simple processes:

  • the former splits the input fasta file into single sequences.

  • the latter can reverse the position of the sequences.

/*
 * split a fasta file in multiple files
 */
 
process splitSequences {

    input:
    path sequencesFile // nextflow creates links to the original files in a temporary folder
 
    output:
    path ('seq_*')    // send output files to a new output channel (in this case is a collection)
 
    // awk command for splitting a fasta files in multiple files
    
    script:
    """
    awk '/^>/{f="seq_"++d} {print > f}' < ${sequencesFile}
    """ 
}


/*
 * Simple reverse the sequences
 */
 
process reverseSequence {

    // during the execution prints the indicated variable for follow-up
    tag { "${seq}" }  					

    input:
    path seq 

    output:
    path "all.rev" 
 
    script:
    """
    cat ${seq} | awk '{if (\$1~">") {print \$0} else system("echo " \$0 " |rev")}' > all.rev
    """
}

Note

The variables used by AWK need to be escaped, otherwise, they will be considered as proper Nextflow variables and thus produce an error. Every special character, e.g., $, needs to be escaped ($). This script can be seen at /test1/test1.nf

The input path is fed as a parameter using the script parameters ${inputfile}

// this can be overridden by using --inputfile OTHERFILENAME
params.inputfile = "$baseDir/../../testdata/test.fa"	

// the "file method" returns a file system object given a file path string  
sequences_file = file(params.inputfile)				

Note

The file “test.fa” is available in the github repository of the course

This value can be overridden when calling the script:

nextflow run test1.nf --inputfile another_input.fa

The workflow part connects the two processes so that the output of the first process becomes an input of the second process.

workflow {
    splitted_seq	= splitSequences(sequences_file)
    
    // Here you have the output channel as a collection
    splitted_seq.view()
    
    // Here you have the same channel reshaped to send separately each value 
    splitted_seq.flatten().view()
    
    // DLS2 allows you to reuse the channels! In past you had to create many identical
    // channels for sending the same kind of data to different processes
    
    rev_single_seq	= reverseSequence(splitted_seq)
}

During the execution, Nextflow creates several temporary folders and a soft link to the original input file. It will then store output files locally.

The output file is then linked to other folders to be used as input from other processes. This avoids clashes, and each process is isolated from the other.

nextflow run test1.nf -bg

N E X T F L O W  ~  version 20.07.1
Launching `test1.nf` [sad_newton] - revision: 82e66714e4
[09/53e071] Submitted process > splitSequences
[/home/ec2-user/git/CRG_Nextflow_Jun_2022/nextflow/nextflow/work/09/53e071d286ed66f4020869c8977b59/seq_1, /home/ec2-user/git/CRG_Nextflow_Jun_2022/nextflow/nextflow/work/09/53e071d286ed66f4020869c8977b59/seq_2, /home/ec2-user/git/CRG_Nextflow_Jun_2022/nextflow/nextflow/work/09/53e071d286ed66f4020869c8977b59/seq_3]
/home/ec2-user/git/CRG_Nextflow_Jun_2022/nextflow/nextflow/work/09/53e071d286ed66f4020869c8977b59/seq_1
/home/ec2-user/git/CRG_Nextflow_Jun_2022/nextflow/nextflow/work/09/53e071d286ed66f4020869c8977b59/seq_2
/home/ec2-user/git/CRG_Nextflow_Jun_2022/nextflow/nextflow/work/09/53e071d286ed66f4020869c8977b59/seq_3
[fe/0a8640] Submitted process > reverseSequence ([seq_1, seq_2, seq_3])

We can inspect the content of work/09/53e071* generated by the process splitSequences:

ls -l work/09/53e071*
total 24
-rw-r--r--  1 lcozzuto  staff  29 Oct  8 19:16 seq_1
-rw-r--r--  1 lcozzuto  staff  33 Oct  8 19:16 seq_2
-rw-r--r--  1 lcozzuto  staff  27 Oct  8 19:16 seq_3
lrwxr-xr-x  1 lcozzuto  staff  69 Oct  8 19:16 test.fa -> /home/ec2-user/git/CRG_Nextflow_Jun_2022/nextflow/nextflow/testdata/test.fa

File test.fa is a soft link to the original input.

If we inspect work/fe/0a8640* that is generated by the process reverseSequence, we see that the files generated by splitSequences are now linked as input.

ls -l work/fe/0a8640*

total 8
-rw-r--r--  1 lcozzuto  staff  89 Oct  8 19:16 all.rev
lrwxr-xr-x  1 lcozzuto  staff  97 Oct  8 19:16 seq_1 -> /home/ec2-user/git/CRG_Nextflow_Jun_2022/nextflow/nextflow/work/09/53e071d286ed66f4020869c8977b59/seq_1
lrwxr-xr-x  1 lcozzuto  staff  97 Oct  8 19:16 seq_2 -> /home/ec2-user/git/CRG_Nextflow_Jun_2022/nextflow/nextflow/work/09/53e071d286ed66f4020869c8977b59/seq_2
lrwxr-xr-x  1 lcozzuto  staff  97 Oct  8 19:16 seq_3 -> /home/ec2-user/git/CRG_Nextflow_Jun_2022/nextflow/nextflow/work/09/53e071d286ed66f4020869c8977b59/seq_3

The “bin” folder

As we see before we need to escape some characters. So it can be very tedious when writing long one-liners; therefore, it is recommended to make a small shell script and call it as if it were an executable. This script has to be placed in a folder named bin inside the pipeline folder to be automatically considered from Nextflow as a tool in the path.

You can see an example of this code by looking at the script test1_a.nf where the one-liner is replaced by a shell script:

/*
 * split a fasta file in multiple files
 */
 
process splitSequences {

    input:
    path sequencesFile // nextflow creates links to the original files in a temporary folder
 
    output:
    path ('seq_*')    // send output files to a new output channel (in this case is a collection)
 
    // awk command for splitting a fasta files in multiple files
    
    script:
    """
    splitseq.sh ${sequencesFile}
    """ 
}

Here instead the shell script in bin/splitseq.sh

#!/bin/bash
#
# This script will split a fasta files in multiple files


awk '/^>/ {f="seq_"++d} {print > f}' < $1

Named workflows

At this point we can make two different workflows to demonstrate how the new DSL allows reusing the code (script name test1_b.nf).

#!/usr/bin/env nextflow

nextflow.enable.dsl=2

// this can be overridden by using --inputfile OTHERFILENAME
params.inputfile = "$baseDir/../../testdata/test.fa"

// the "file method" returns a file system object given a file path string
sequences_file = file(params.inputfile)

// check if the file exists
if( !sequences_file.exists() ) exit 1, "Missing genome file: ${sequences_file}"

/*
 * Process 1 for splitting a fasta file in multiple files
 */
process splitSequences {
    input:
    path sequencesFile

    output:
    path ('seq_*')

    // simple awk command
    script:
    """
    awk '/^>/{f="seq_"++d} {print > f}' < ${sequencesFile}
    """
}

/*
 * Process 2 for reversing the sequences
 */
process reverseSequence {
    tag { "${seq}" }

    input:
    path seq

    output:
    path "all.rev"

    script:
    """
    	cat ${seq} | awk '{if (\$1~">") {print \$0} else system("echo " \$0 " |rev")}' > all.rev
    """
}

workflow flow1 {
    take: sequences

    main:
    splitted_seq        = splitSequences(sequences)
    rev_single_seq      = reverseSequence(splitted_seq)

    emit:
    rev_single_seq

}

workflow flow2 {
    take: sequences

    main:
    splitted_seq        = splitSequences(sequences).flatten()
    rev_single_seq      = reverseSequence(splitted_seq)

    emit:
    rev_single_seq

}

workflow {
   out1 = flow1(sequences_file)
   out2 = flow2(sequences_file)
   out2.view()

}

The first workflow will just run like the previous script, while the second will “flatten” the output of the first process and will launch the second process on every single sequence.

The reverseSequence process of the second workflow will run in parallel if you have enough processors, or if you are running the script in a cluster environment, with a scheduler supported by Nextflow.

Our named workflows have an <<output>> too this time. It is specified using the emit keyword.

nextflow run test1_b.nf
N E X T F L O W  ~  version 23.08.1-edge
Launching `test1_b.nf` [suspicious_faggin] DSL2 - revision: d89b62c1d4
executor >  local (6)
[18/37c7cb] process > flow1:splitSequences                      [100%] 1 of 1 ✔
[cc/458cd1] process > flow1:reverseSequence (seq_1 seq_2 seq_3) [100%] 1 of 1 ✔
[eb/cf273d] process > flow2:splitSequences                      [100%] 1 of 1 ✔
[f7/ce4ed9] process > flow2:reverseSequence (seq_1)             [100%] 3 of 3 ✔
/Users/lcozzuto/aaa/nextflow-course-2023-fall/nextflow/test1/work/6f/e1ed9278d23363b740693e287a4167/all.rev
/Users/lcozzuto/aaa/nextflow-course-2023-fall/nextflow/test1/work/b2/ad05b00fcfca30fa7a4fbec4dad811/all.rev
/Users/lcozzuto/aaa/nextflow-course-2023-fall/nextflow/test1/work/f7/ce4ed995da248211f125e5b91ab762/all.rev

Exercise

Optimize the previous pipeline to avoid running the process splitSequences twice.

Solution
#!/usr/bin/env nextflow

nextflow.enable.dsl=2

// this can be overridden by using --inputfile OTHERFILENAME
params.inputfile = "$baseDir/../../testdata/test.fa"

// the "file method" returns a file system object given a file path string
sequences_file = file(params.inputfile)

// check if the file exists
if( !sequences_file.exists() ) exit 1, "Missing genome file: ${sequences_file}"

/*
 * Process 1 for splitting a fasta file in multiple files
 */
process splitSequences {
    input:
    path sequencesFile

    output:
    path ('seq_*')

    // simple awk command
    script:
    """
    awk '/^>/{f="seq_"++d} {print > f}' < ${sequencesFile}
    """
}

/*
 * Process 2 for reversing the sequences
 */
process reverseSequence {
    tag { "${seq}" }

    input:
    path seq

    output:
    path "all.rev"

    script:
    """
    	cat ${seq} | awk '{if (\$1~">") {print \$0} else system("echo " \$0 " |rev")}' > all.rev
    """
}

workflow flow1 {
    take: splitted_seq

    main:
    rev_single_seq      = reverseSequence(splitted_seq)
}

workflow flow2 {
    take: splitted_seq

    main:
    rev_single_seq      = reverseSequence(splitted_seq.flatten())
}

workflow {
   splitted_seq        = splitSequences(sequences_file)
   flow1(splitted_seq)
   flow2(splitted_seq)
}


Resuming a pipeline

You can resume the execution after the code modification using the parameter -resume.

nextflow run test1.nf -bg -resume

N E X T F L O W  ~  version 20.07.1
Launching `test1.nf` [determined_celsius] - revision: eaf5b4d673
[bd/f4e9a6] Cached process > flow1:splitSequences
[37/d790ab] Cached process > flow2:splitSequences
[93/c7b1c6] Cached process > flow2:reverseSequence (seq_3)
[45/86dd83] Cached process > flow2:reverseSequence (seq_1)
[87/54bfe8] Cached process > flow2:reverseSequence (seq_2)
[33/a6fc72] Cached process > flow1:reverseSequence ([seq_1, seq_2, seq_3])
/home/ec2-user/git/CRG_Nextflow_Jun_2022/nextflow/nextflow/work/33/a6fc72786d042cacf733034d501691/all.rev

Note

IMPORTANT: Nextflow parameters are provided using one hyphen (-resume) while a pipeline parameters, two hyphens (--inputfile).

Sometimes you might want to resume a previous run of your pipeline.

To do so you need to extract the job id of that run. You can do this by using the command nextflow log.

nextflow log
TIMESTAMP               DURATION        RUN NAME                STATUS  REVISION ID     SESSION ID                              COMMAND
2020-10-06 14:49:09     2s              agitated_avogadro       OK      61a595c5bf      4a7a8a4b-9bdb-4b15-9cc6-1b2cabe9a938    nextflow run test1.nf
2020-10-08 19:14:38     2.8s            sick_edison             OK      82e66714e4      4fabb863-2038-47b4-bac0-19e71f93f284    nextflow run test1.nf -bg
2020-10-08 19:16:03     3s              sad_newton              OK      82e66714e4      2d13e9f8-1ba6-422d-9087-5c6c9731a795    nextflow run test1.nf -bg
2020-10-08 19:30:59     2.3s            disturbed_wozniak       OK      d33befe154      0a19b60d-d5fe-4a26-9e01-7a63d0a1d300    nextflow run test1.nf -bg
2020-10-08 19:35:52     2.5s            insane_plateau          OK      d33befe154      b359f32c-254f-4271-95bb-6a91b281dc6d    nextflow run test1.nf -bg
2020-10-08 19:56:30     2.8s            determined_celsius      OK      eaf5b4d673      b359f32c-254f-4271-95bb-6a91b281dc6d    nextflow run test1.nf -bg -resume

You can then resume the state of your execution using the SESSION ID:

nextflow run -resume 0a19b60d-d5fe-4a26-9e01-7a63d0a1d300 test1.nf

Nextflow’s cache can be disabled for a specific process by setting the directive cache to false. You can also choose among the three caching methods:

cache = true // (default) Cache keys are created indexing input files meta-data information (name, size and last update timestamp attributes).

cache = 'deep' // Cache keys are created indexing input files content.

cache = 'lenient' // (Best in HPC and shared file systems) Cache keys are created indexing input files path and size attributes

IMPORTANT: On some shared file systems you might have inconsistent file timestamps. Thus cache lenient prevents you from unwanted restarting of cached processes.

Directives

The directives are declaration blocks that can provide optional settings for a process.

For example, they can affect the way a process stages in and out the input and output files (stageInMode and stageOutMode), or specify a particular resource such as the number of cpus , the memory, and the time

Here an example:

process my_process {
    time '1h'
    memory '2 GB'
    cpus 8

    """
    Some execution
    """
}

We can also indicate what to do if a process fails.

The default is to stop the pipeline and to raise an error. But we can also skip the process using the errorStrategy directive:

process my_process {
    time '1h'
    memory '2 GB'
    cpus 8
    errorStrategy 'ignore'

    """
    Some execution
    """
}

or retry a number of times changing the available memory or the maximum execution time, using the foolowing directives:

process my_process {
    memory { 1.GB * task.attempt }
    time { 1.hour * task.attempt }
    errorStrategy 'retry'
    maxRetries 3
}

EXERCISE

Make the previous pipeline resilient to the process failing and save the results so the process execution would be skipped when the pipeline is launched again.

First, make the process reverseSequence fail by introducing a typo in the command line, then add the directive to the process.

Solution

The solution is at sol3.nf. In particular, the change is here:

#!/usr/bin/env nextflow

nextflow.enable.dsl=2

// this can be overridden by using --inputfile OTHERFILENAME
params.inputfile = "$baseDir/../../testdata/test.fa"

// the "file method" returns a file system object given a file path string
sequences_file = file(params.inputfile)

// check if the file exists
if( !sequences_file.exists() ) exit 1, "Missing genome file: ${sequences_file}"

/*
 * Process 1 for splitting a fasta file in multiple files
 */
process splitSequences {
    input:
    path sequencesFile

    output:
    path ('seq_*')

    // simple awk command
    script:
    """
    awk '/^>/{f="seq_"++d} {print > f}' < ${sequencesFile}
    """
}

/*
* Broken process
*/

 process reverseSequence {

    tag { "${seq}" }

    errorStrategy 'ignore'

    input:
    path seq

    output:
    path "all.rev"

    script:
    """
    cat ${seq} | AAAAAAA '{if (\$1~">") {print \$0} else system("echo " \$0 " |rev")}' > all.rev
    """
}

workflow flow1 {
    take: sequences

    main:
    splitted_seq        = splitSequences(sequences)
    rev_single_seq      = reverseSequence(splitted_seq)
}

workflow flow2 {
    take: sequences

    main:
    splitted_seq        = splitSequences(sequences).flatten()
    rev_single_seq      = reverseSequence(splitted_seq)
}

workflow {
   flow1(sequences_file)
   flow2(sequences_file)
}


Write the first workflow using pipes. Nextflow DLS2 allows you to use pipes for connecting channels via input/output.

See the documentation on pipes.

Solution

The solution is at sol4.nf. Here is the change:

#!/usr/bin/env nextflow

nextflow.enable.dsl=2

// this can be overridden by using --inputfile OTHERFILENAME
params.inputfile = "$baseDir/../../testdata/test.fa"

// the "file method" returns a file system object given a file path string
sequences_file = file(params.inputfile)

// check if the file exists
if( !sequences_file.exists() ) exit 1, "Missing genome file: ${sequences_file}"

/*
 * Process 1 for splitting a fasta file in multiple files
 */
process splitSequences {
    input:
    path sequencesFile

    output:
    path ('seq_*')

    // simple awk command
    script:
    """
    awk '/^>/{f="seq_"++d} {print > f}' < ${sequencesFile}
    """
}

/*
 * Process 2 for reversing the sequences
 */
process reverseSequence {
    tag { "${seq}" }

    input:
    path seq

    output:
    path "all.rev"

    script:
    """
    	cat ${seq} | awk '{if (\$1~">") {print \$0} else system("echo " \$0 " |rev")}' > all.rev
    """
}

workflow flow1 {
    take: sequences

    main:
    splitSequences(sequences) | reverseSequence | view()
}

workflow flow2 {
    take: sequences

    main:
    splitted_seq        = splitSequences(sequences).flatten()
    rev_single_seq      = reverseSequence(splitted_seq)
}

workflow {
   flow1(sequences_file)
   flow2(sequences_file)
}


Note

When a shell script fails you might still have a 0 exit code and if an output file is generated Nextflow won’t recognize an error. This is because in a bash pipeline exit code is the one from the last command. To change this behavior you should add at the beginning of your bash script the code:

set -euxo pipefail
BASH CODE HERE