Nextflow 2

More complex scripts

We can feed the channel generated by a process to another process in the workflow definition. The variables used by AWK need to be escaped, otherwise they will be considered as proper Nextflow variables and thus produce an error. Every special character, e.g., $, needs to be escaped ($). It can be tedeous when writing long one liners; therefore, it is recommended to make a small shell script and call it as an executable. It has to be placed in a folder named bin inside the pipeline folder to be automatically considered from Nextflow as a tool in the path. This script can be seen at /test1/test1.nf

#!/usr/bin/env nextflow


/* 
 * This code enables the new dsl of Nextflow. 
 */
 
nextflow.enable.dsl=2

/* 
 * HERE YOU HAVE THE COMMMENTS
 * NextFlow example from their website 
 */
 
// this can be overridden by using --inputfile OTHERFILENAME
params.inputfile = "$baseDir/../../testdata/test.fa"	

// the "file method" returns a file system object given a file path string  
sequences_file = file(params.inputfile)				

// check if the file exists
if( !sequences_file.exists() ) exit 1, "Missing genome file: ${sequences_file}" 


/*
 * split a fasta file in multiple files
 */
 
process splitSequences {

    input:
    path sequencesFile // nextflow creates links to the original files in a temporary folder
 
    output:
    path ('seq_*')    // send output files to a new output channel (in this case is a collection)
 
    // awk command for splitting a fasta files in multiple files
    
    script:
    """
    awk '/^>/{f="seq_"++d} {print > f}' < ${sequencesFile}
    """ 
}


/*
 * Simple reverse the sequences
 */
 
process reverseSequence {

    // during the execution prints the indicated variable for follow-up
    tag { "${seq}" }  					

    input:
    path seq 

    output:
    path "all.rev" 
 
    script:
    """
    cat ${seq} | awk '{if (\$1~">") {print \$0} else system("echo " \$0 " |rev")}' > all.rev
    """
}

workflow {
    splitted_seq	= splitSequences(sequences_file)
    
    // Here you have the output channel as a collection
    splitted_seq.view()
    
    // Here you have the same channel reshaped to send separately each value 
    splitted_seq.flatten().view()
    
    // DLS2 allows you to reuse the channels! In past you had to create many identical
    // channels for sending the same kind of data to different processes
    
    rev_single_seq	= reverseSequence(splitted_seq)
}

Here we have two simple processes:

  • the former splits the input fasta file into single sequences.

  • the latter is able to reverse the position of the sequences.

The input path is fed as a parameter using the script parameters ${seq}

params.inputfile

Note

The file “test.fa” is available in the github repository of the course

This value can be overridden when calling the script:

nextflow run test1.nf --inputfile another_input.fa

The workflow part connects the two processes so that the output of the first process becomes an input of the second process.

During the execution, Nextflow creates a number of temporary folders and also a soft link to the original input file. It will then store output files locally.

The output file is then linked in other folders for being used as input from other processes. This avoids clashes, and each process is isolated from the other.

nextflow run test1.nf -bg

N E X T F L O W  ~  version 20.07.1
Launching `test1.nf` [sad_newton] - revision: 82e66714e4
[09/53e071] Submitted process > splitSequences
[/home/ec2-user/git/CRG_Nextflow_Jun_2022/nextflow/nextflow/work/09/53e071d286ed66f4020869c8977b59/seq_1, /home/ec2-user/git/CRG_Nextflow_Jun_2022/nextflow/nextflow/work/09/53e071d286ed66f4020869c8977b59/seq_2, /home/ec2-user/git/CRG_Nextflow_Jun_2022/nextflow/nextflow/work/09/53e071d286ed66f4020869c8977b59/seq_3]
/home/ec2-user/git/CRG_Nextflow_Jun_2022/nextflow/nextflow/work/09/53e071d286ed66f4020869c8977b59/seq_1
/home/ec2-user/git/CRG_Nextflow_Jun_2022/nextflow/nextflow/work/09/53e071d286ed66f4020869c8977b59/seq_2
/home/ec2-user/git/CRG_Nextflow_Jun_2022/nextflow/nextflow/work/09/53e071d286ed66f4020869c8977b59/seq_3
[fe/0a8640] Submitted process > reverseSequence ([seq_1, seq_2, seq_3])

We can inspect the content of work/09/53e071* generated by the process splitSequences:

ls -l work/09/53e071*
total 24
-rw-r--r--  1 lcozzuto  staff  29 Oct  8 19:16 seq_1
-rw-r--r--  1 lcozzuto  staff  33 Oct  8 19:16 seq_2
-rw-r--r--  1 lcozzuto  staff  27 Oct  8 19:16 seq_3
lrwxr-xr-x  1 lcozzuto  staff  69 Oct  8 19:16 test.fa -> /home/ec2-user/git/CRG_Nextflow_Jun_2022/nextflow/nextflow/testdata/test.fa

File test.fa is a soft link to the original input.

If we inspect work/fe/0a8640* that is generated by the process reverseSequence, we see that the files generated by splitSequences are now linked as input.

ls -l work/fe/0a8640*

total 8
-rw-r--r--  1 lcozzuto  staff  89 Oct  8 19:16 all.rev
lrwxr-xr-x  1 lcozzuto  staff  97 Oct  8 19:16 seq_1 -> /home/ec2-user/git/CRG_Nextflow_Jun_2022/nextflow/nextflow/work/09/53e071d286ed66f4020869c8977b59/seq_1
lrwxr-xr-x  1 lcozzuto  staff  97 Oct  8 19:16 seq_2 -> /home/ec2-user/git/CRG_Nextflow_Jun_2022/nextflow/nextflow/work/09/53e071d286ed66f4020869c8977b59/seq_2
lrwxr-xr-x  1 lcozzuto  staff  97 Oct  8 19:16 seq_3 -> /home/ec2-user/git/CRG_Nextflow_Jun_2022/nextflow/nextflow/work/09/53e071d286ed66f4020869c8977b59/seq_3

At this point we can make two different workflows to demonstrate how the new DSL allows reusing the code (script name test1_b.nf).

#!/usr/bin/env nextflow

nextflow.enable.dsl=2

// this can be overridden by using --inputfile OTHERFILENAME
params.inputfile = "$baseDir/../../testdata/test.fa"

// the "file method" returns a file system object given a file path string
sequences_file = file(params.inputfile)

// check if the file exists
if( !sequences_file.exists() ) exit 1, "Missing genome file: ${sequences_file}"

/*
 * Process 1 for splitting a fasta file in multiple files
 */
process splitSequences {
    input:
    path sequencesFile

    output:
    path ('seq_*')

    // simple awk command
    script:
    """
    awk '/^>/{f="seq_"++d} {print > f}' < ${sequencesFile}
    """
}

/*
 * Process 2 for reversing the sequences
 */
process reverseSequence {
    tag { "${seq}" }

    input:
    path seq

    output:
    path "all.rev"

    script:
    """
    	cat ${seq} | awk '{if (\$1~">") {print \$0} else system("echo " \$0 " |rev")}' > all.rev
    """
}

workflow flow1 {
    take: sequences

    main:
    splitted_seq        = splitSequences(sequences)
    rev_single_seq      = reverseSequence(splitted_seq)
}

workflow flow2 {
    take: sequences

    main:
    splitted_seq        = splitSequences(sequences).flatten()
    rev_single_seq      = reverseSequence(splitted_seq)
}

workflow {
   flow1(sequences_file)
   flow2(sequences_file)
}

The first workflow will just run like the previous script, while the second will “flatten” the output of the first process and will launch the second process on each single sequence.

The reverseSequence process of the second workflow will run in parallel if you have enough processors, or if you are running the script in a cluster environment, with a scheduler supported by Nextflow.

nextflow run test1_b.nf -bg

C02WX1XFHV2Q:nextflow lcozzuto$ N E X T F L O W  ~  version 20.07.1
Launching `test1.nf` [insane_plateau] - revision: d33befe154
[bd/f4e9a6] Submitted process > flow1:splitSequences
[37/d790ab] Submitted process > flow2:splitSequences
[33/a6fc72] Submitted process > flow1:reverseSequence ([seq_1, seq_2, seq_3])
[87/54bfe8] Submitted process > flow2:reverseSequence (seq_2)
[45/86dd83] Submitted process > flow2:reverseSequence (seq_1)
[93/c7b1c6] Submitted process > flow2:reverseSequence (seq_3)

Exercise

Optimize the previus pipeline to avoid running the process splitSequences twice.

Solution
#!/usr/bin/env nextflow

nextflow.enable.dsl=2

// this can be overridden by using --inputfile OTHERFILENAME
params.inputfile = "$baseDir/../../testdata/test.fa"

// the "file method" returns a file system object given a file path string
sequences_file = file(params.inputfile)

// check if the file exists
if( !sequences_file.exists() ) exit 1, "Missing genome file: ${sequences_file}"

/*
 * Process 1 for splitting a fasta file in multiple files
 */
process splitSequences {
    input:
    path sequencesFile

    output:
    path ('seq_*')

    // simple awk command
    script:
    """
    awk '/^>/{f="seq_"++d} {print > f}' < ${sequencesFile}
    """
}

/*
 * Process 2 for reversing the sequences
 */
process reverseSequence {
    tag { "${seq}" }

    input:
    path seq

    output:
    path "all.rev"

    script:
    """
    	cat ${seq} | awk '{if (\$1~">") {print \$0} else system("echo " \$0 " |rev")}' > all.rev
    """
}

workflow flow1 {
    take: splitted_seq

    main:
    rev_single_seq      = reverseSequence(splitted_seq)
}

workflow flow2 {
    take: splitted_seq

    main:
    rev_single_seq      = reverseSequence(splitted_seq.flatten())
}

workflow {
   splitted_seq        = splitSequences(sequences_file)
   flow1(splitted_seq)
   flow2(splitted_seq)
}


Resuming a pipeline

You can resume the execution after the code modification using the parameter -resume.

nextflow run test1.nf -bg -resume

N E X T F L O W  ~  version 20.07.1
Launching `test1.nf` [determined_celsius] - revision: eaf5b4d673
[bd/f4e9a6] Cached process > flow1:splitSequences
[37/d790ab] Cached process > flow2:splitSequences
[93/c7b1c6] Cached process > flow2:reverseSequence (seq_3)
[45/86dd83] Cached process > flow2:reverseSequence (seq_1)
[87/54bfe8] Cached process > flow2:reverseSequence (seq_2)
[33/a6fc72] Cached process > flow1:reverseSequence ([seq_1, seq_2, seq_3])
/home/ec2-user/git/CRG_Nextflow_Jun_2022/nextflow/nextflow/work/33/a6fc72786d042cacf733034d501691/all.rev

Note

IMPORTANT: Nextflow parameters are provided using one hyphen (-resume) while a pipeline parameters, two hyphens (--inputfile).

Sometimes you might want to resume a previous run of your pipeline.

To do so you need to extract the job id of that run. You can do this by using the command nextflow log.

nextflow log
TIMESTAMP               DURATION        RUN NAME                STATUS  REVISION ID     SESSION ID                              COMMAND
2020-10-06 14:49:09     2s              agitated_avogadro       OK      61a595c5bf      4a7a8a4b-9bdb-4b15-9cc6-1b2cabe9a938    nextflow run test1.nf
2020-10-08 19:14:38     2.8s            sick_edison             OK      82e66714e4      4fabb863-2038-47b4-bac0-19e71f93f284    nextflow run test1.nf -bg
2020-10-08 19:16:03     3s              sad_newton              OK      82e66714e4      2d13e9f8-1ba6-422d-9087-5c6c9731a795    nextflow run test1.nf -bg
2020-10-08 19:30:59     2.3s            disturbed_wozniak       OK      d33befe154      0a19b60d-d5fe-4a26-9e01-7a63d0a1d300    nextflow run test1.nf -bg
2020-10-08 19:35:52     2.5s            insane_plateau          OK      d33befe154      b359f32c-254f-4271-95bb-6a91b281dc6d    nextflow run test1.nf -bg
2020-10-08 19:56:30     2.8s            determined_celsius      OK      eaf5b4d673      b359f32c-254f-4271-95bb-6a91b281dc6d    nextflow run test1.nf -bg -resume

You can then resume the state of your execution using the SESSION ID:

nextflow run -resume 0a19b60d-d5fe-4a26-9e01-7a63d0a1d300 test1.nf

Nextflow’s cache can be disabled for a specific process by setting the directive cache to false. You can also choose among the three caching methods:

cache = true // (default) Cache keys are created indexing input files meta-data information (name, size and last update timestamp attributes).

cache = 'deep' // Cache keys are created indexing input files content.

cache = 'lenient' // (Best in HPC and shared file systems) Cache keys are created indexing input files path and size attributes

IMPORTANT: On some shared file systems you might have inconsistent file timestamps. Thus cache lenient prevents you from unwanted restarting of cached processes.

Directives

The directives are declaration blocks that can provide optional settings for a process.

For example, they can affect the way a process stages in and out the input and output files (stageInMode and stageOutMode), or specify a particular resource such as the number of cpus , the memory, and the time

Here an example:

process my_process {
    time '1h'
    memory '2 GB'
    cpus 8

    """
    Some execution
    """
}

We can also indicate what to do if a process fails.

The default is to stop the pipeline and to raise an error. But we can also skip the process using the errorStrategy directive:

process my_process {
    time '1h'
    memory '2 GB'
    cpus 8
    errorStrategy 'ignore'

    """
    Some execution
    """
}

or retry a number of times changing the available memory or the maximum execution time, using the foolowing directives:

process my_process {
    memory { 1.GB * task.attempt }
    time { 1.hour * task.attempt }
    errorStrategy 'retry'
    maxRetries 3
}

EXERCISE

Make the previous pipeline resilient to the process failing and save the results so the process execution would be skipped when the pipeline is launched again.

First, make the process reverseSequence to fail by introducing a typo in the command line, then add the directive to the process.

Solution

The solution is at sol3.nf. In particular the change is here:

#!/usr/bin/env nextflow

nextflow.enable.dsl=2

// this can be overridden by using --inputfile OTHERFILENAME
params.inputfile = "$baseDir/../../testdata/test.fa"

// the "file method" returns a file system object given a file path string
sequences_file = file(params.inputfile)

// check if the file exists
if( !sequences_file.exists() ) exit 1, "Missing genome file: ${sequences_file}"

/*
 * Process 1 for splitting a fasta file in multiple files
 */
process splitSequences {
    input:
    path sequencesFile

    output:
    path ('seq_*')

    // simple awk command
    script:
    """
    awk '/^>/{f="seq_"++d} {print > f}' < ${sequencesFile}
    """
}

/*
* Broken process
*/

 process reverseSequence {

    tag { "${seq}" }

    errorStrategy 'ignore'

    input:
    path seq

    output:
    path "all.rev"

    script:
    """
    cat ${seq} | AAAAAAA '{if (\$1~">") {print \$0} else system("echo " \$0 " |rev")}' > all.rev
    """
}

workflow flow1 {
    take: sequences

    main:
    splitted_seq        = splitSequences(sequences)
    rev_single_seq      = reverseSequence(splitted_seq)
}

workflow flow2 {
    take: sequences

    main:
    splitted_seq        = splitSequences(sequences).flatten()
    rev_single_seq      = reverseSequence(splitted_seq)
}

workflow {
   flow1(sequences_file)
   flow2(sequences_file)
}


Write the first workflow using pipes. Nextflow DLS2 allows you to use pipes for connecting channels via input / output.

See the documentation on pipes.

Solution

The solution is at sol4.nf. Here is the change:

#!/usr/bin/env nextflow

nextflow.enable.dsl=2

// this can be overridden by using --inputfile OTHERFILENAME
params.inputfile = "$baseDir/../../testdata/test.fa"

// the "file method" returns a file system object given a file path string
sequences_file = file(params.inputfile)

// check if the file exists
if( !sequences_file.exists() ) exit 1, "Missing genome file: ${sequences_file}"

/*
 * Process 1 for splitting a fasta file in multiple files
 */
process splitSequences {
    input:
    path sequencesFile

    output:
    path ('seq_*')

    // simple awk command
    script:
    """
    awk '/^>/{f="seq_"++d} {print > f}' < ${sequencesFile}
    """
}

/*
 * Process 2 for reversing the sequences
 */
process reverseSequence {
    tag { "${seq}" }

    input:
    path seq

    output:
    path "all.rev"

    script:
    """
    	cat ${seq} | awk '{if (\$1~">") {print \$0} else system("echo " \$0 " |rev")}' > all.rev
    """
}

workflow flow1 {
    take: sequences

    main:
    splitSequences(sequences) | reverseSequence | view()
}

workflow flow2 {
    take: sequences

    main:
    splitted_seq        = splitSequences(sequences).flatten()
    rev_single_seq      = reverseSequence(splitted_seq)
}

workflow {
   flow1(sequences_file)
   flow2(sequences_file)
}


Decoupling resources, parameters and nextflow script

When making a complex pipelines it is convenient to keep the definition of resources needed, the default parameters and the main script separately from each other. This can be achieved using two additional files:

  • nextflow.config

  • params.config

The nextflow.config file allows to indicate resources needed for each class of processes. This is achieved labeling processes in the nextflow.config file:

includeConfig "$baseDir/params.config"

process {
    memory='0.6G'
    cpus='1'
    time='6h'

    withLabel: 'onecpu' {
        memory='0.6G'
        cpus='1'
    }

    withLabel: 'bigmem' {
        memory='0.7G'
        cpus='1'
    }
}

process.container = 'biocorecrg/c4lwg-2018:latest'
singularity.cacheDir = "$baseDir/singularity"

The first row indicates to use the information stored in the params.config file (described later). Then follows the definition of the default resources for a process:

Then we specify resources needed for a class of processes labeled bigmem (i.e., the default options will be overridden for these processes):

withLabel: 'bigmem' {
    memory='0.7G'
    cpus='1'
}

In the script /test2/test2.nf file, there are two processes to run two programs:

  • fastQC - a tool that calculates a number of quality control metrics on single fastq files;

  • multiQC - an aggregator of results from bioinformatics tools and samples for generating a single html report.

#!/usr/bin/env nextflow


/* 
 * This code enables the new dsl of Nextflow. 
 */

nextflow.enable.dsl=2


/* 
 * NextFlow test pipe
 * @authors
 * Luca Cozzuto <lucacozzuto@gmail.com>
 * 
 */

/*
 * Input parameters: read pairs
 * Params are stored in the params.config file
 */

version                 = "1.0"
// this prevents a warning of undefined parameter
params.help             = false

// this prints the input parameters
log.info """
BIOCORE@CRG - N F TESTPIPE  ~  version ${version}
=============================================
reads                           : ${params.reads}
"""

// this prints the help in case you use --help parameter in the command line and it stops the pipeline
if (params.help) {
    log.info 'This is the Biocore\'s NF test pipeline'
    log.info 'Enjoy!'
    log.info '\n'
    exit 1
}

/*
 * Defining the output folders.
 */
fastqcOutputFolder    = "ouptut_fastqc"
multiqcOutputFolder   = "ouptut_multiQC"


/* Reading the file list and creating a "Channel": a queue that connects different channels.
 * The queue is consumed by channels, so you cannot re-use a channel for different processes. 
 * If you need the same data for different processes you need to make more channels.
 */
 
Channel
    .fromPath( params.reads )  											 // read the files indicated by the wildcard                            
    .ifEmpty { error "Cannot find any reads matching: ${params.reads}" } // if empty, complains
    .set {reads_for_fastqc} 											 // make the channel "reads_for_fastqc"


/*
 * Process 1. Run FastQC on raw data. A process is the element for executing scripts / programs etc.
 */
process fastQC {
    publishDir fastqcOutputFolder  			// where (and whether) to publish the results
    tag { "${reads}" }  							// during the execution prints the indicated variable for follow-up
    label 'big_mem' 

    input:
    path reads   							// it defines the input of the process. It sets values from a channel

    output:									// It defines the output of the process (i.e. files) and send to a new channel
   	path "*_fastqc.*"

    script:									// here you have the execution of the script / program. Basically is the command line
    """
        fastqc ${reads} 
    """
}

/*
 * Process 2. Run multiQC on fastQC results
 */
process multiQC {
    publishDir multiqcOutputFolder, mode: 'copy' 	// this time do not link but copy the output file

    input:
    path (inputfiles)

    output:
    path("multiqc_report.html") 					// do not send the results to any channel

    script:
    """
    multiqc .
    """
}

workflow {
	fastqc_out = fastQC(reads_for_fastqc)
	multiQC(fastqc_out.collect())
}


workflow.onComplete { 
	println ( workflow.success ? "\nDone! Open the following report in your browser --> ${multiqcOutputFolder}/multiqc_report.html\n" : "Oops .. something went wrong" )
}

You can see that the process fastQC is labeled ‘bigmem’.

The last two rows of the config file indicate which containers to use. In this example, – and by default, if the repository is not specified, – a container is pulled from the DockerHub. In case of using a singularity container, you can indicate where to store the local image using the singularity.cacheDir option:

process.container = 'biocorecrg/c4lwg-2018:latest'
singularity.cacheDir = "$baseDir/singularity"

Let’s now launch the script test2.nf.

     cd test2;
     nextflow run test2.nf

     N E X T F L O W  ~  version 20.07.1
     Launching `test2.nf` [distracted_edison] - revision: e3a80b15a2
     BIOCORE@CRG - N F TESTPIPE  ~  version 1.0
     =============================================
     reads                           : /home/ec2-user/git/CRG_Nextflow_Jun_2022/nextflow/nextflow/test2/../testdata/*.fastq.gz
     executor >  local (2)
     [df/2c45f2] process > fastQC (B7_input_s_chr19.fastq.gz) [  0%] 0 of 2
     [-        ] process > multiQC                            -
     Error executing process > 'fastQC (B7_H3K4me1_s_chr19.fastq.gz)'

     Caused by:
       Process `fastQC (B7_H3K4me1_s_chr19.fastq.gz)` terminated with an error exit status (127)

     Command executed:

       fastqc B7_H3K4me1_s_chr19.fastq.gz

     Command exit status:
       127

     executor >  local (2)
     [df/2c45f2] process > fastQC (B7_input_s_chr19.fastq.gz) [100%] 2 of 2, failed: 2 ✘
     [-        ] process > multiQC                            -
     Error executing process > 'fastQC (B7_H3K4me1_s_chr19.fastq.gz)'

     Caused by:
       Process `fastQC (B7_H3K4me1_s_chr19.fastq.gz)` terminated with an error exit status (127)

     Command executed:

       fastqc B7_H3K4me1_s_chr19.fastq.gz

     Command exit status:
       127

     Command output:
       (empty)

     Command error:
       .command.sh: line 2: fastqc: command not found

     Work dir:
       /home/ec2-user/git/CRG_Nextflow_Jun_2022/nextflow/nextflow/test2/work/c5/18e76b2e6ffd64aac2b52e69bedef3

     Tip: when you have fixed the problem you can continue the execution adding the option `-resume` to the run command line

We will get a number of errors since no executable is found in our environment / path. This is because the executables are stored in our docker image and we have to tell Nextflow to use the docker image, using the -with-docker parameter.

     nextflow run test2.nf -with-docker

     N E X T F L O W  ~  version 20.07.1
     Launching `test2.nf` [boring_hamilton] - revision: e3a80b15a2
     BIOCORE@CRG - N F TESTPIPE  ~  version 1.0
     =============================================
     reads                           : /home/ec2-user/git/CRG_Nextflow_Jun_2022/nextflow/nextflow/test2/../testdata/*.fastq.gz
     executor >  local (3)
     [22/b437be] process > fastQC (B7_H3K4me1_s_chr19.fastq.gz) [100%] 2 of 2 ✔
     [1a/cfe63b] process > multiQC                              [  0%] 0 of 1
     executor >  local (3)
     [22/b437be] process > fastQC (B7_H3K4me1_s_chr19.fastq.gz) [100%] 2 of 2 ✔
     [1a/cfe63b] process > multiQC                              [100%] 1 of 1 ✔

This time it worked because Nextflow used the image specified in the nextflow.config file and containing the executables.

Now let’s take a look at the params.config file:

params {

	reads		= "$baseDir/../../testdata/*.fastq.gz"
	email		= "myemail@google.com"

}

As you can see, we indicated two pipeline parameters, reads and email; when running the pipeline, they can be overridden using --reads and --email.

Now, let’s examine the folders generated by the pipeline.

ls  work/2a/22e3df887b1b5ac8af4f9cd0d88ac5/

total 0
drwxrwxr-x 3 ec2-user ec2-user  26 Apr 23 13:52 .
drwxr-xr-x 2 root     root     136 Apr 23 13:51 multiqc_data
drwxrwxr-x 3 ec2-user ec2-user  44 Apr 23 13:51 ..

We observe that Docker runs as “root”. This can be problematic and generates security issues. To avoid this we can add this line of code within the process section of the config file:

containerOptions = { workflow.containerEngine == "docker" ? '-u $(id -u):$(id -g)': null}

This will tell Nextflow that if it is run with Docker, it has to produce files that belong to a user rather than the root.

Publishing final results

The script test2.nf generates two new folders, output_fastqc and output_multiQC, that contain the result of the pipeline output. We can indicate which process and output can be considered the final output of the pipeline using the publishDir directive that has to be specified at the beginning of a process.

In our pipeline we define these folders here:

#!/usr/bin/env nextflow


/* 
 * This code enables the new dsl of Nextflow. 
 */

nextflow.enable.dsl=2


/* 
 * NextFlow test pipe
 * @authors
 * Luca Cozzuto <lucacozzuto@gmail.com>
 * 
 */

/*
 * Input parameters: read pairs
 * Params are stored in the params.config file
 */

version                 = "1.0"
// this prevents a warning of undefined parameter
params.help             = false

// this prints the input parameters
log.info """
BIOCORE@CRG - N F TESTPIPE  ~  version ${version}
=============================================
reads                           : ${params.reads}
"""

// this prints the help in case you use --help parameter in the command line and it stops the pipeline
if (params.help) {
    log.info 'This is the Biocore\'s NF test pipeline'
    log.info 'Enjoy!'
    log.info '\n'
    exit 1
}

/*
 * Defining the output folders.
 */
fastqcOutputFolder    = "ouptut_fastqc"
multiqcOutputFolder   = "ouptut_multiQC"


/* Reading the file list and creating a "Channel": a queue that connects different channels.
 * The queue is consumed by channels, so you cannot re-use a channel for different processes. 
 * If you need the same data for different processes you need to make more channels.
 */
 
Channel
    .fromPath( params.reads )  											 // read the files indicated by the wildcard                            
    .ifEmpty { error "Cannot find any reads matching: ${params.reads}" } // if empty, complains
    .set {reads_for_fastqc} 											 // make the channel "reads_for_fastqc"


/*
 * Process 1. Run FastQC on raw data. A process is the element for executing scripts / programs etc.
 */
process fastQC {
    publishDir fastqcOutputFolder  			// where (and whether) to publish the results
    tag { "${reads}" }  							// during the execution prints the indicated variable for follow-up
    label 'big_mem' 

    input:
    path reads   							// it defines the input of the process. It sets values from a channel

    output:									// It defines the output of the process (i.e. files) and send to a new channel
   	path "*_fastqc.*"

    script:									// here you have the execution of the script / program. Basically is the command line
    """
        fastqc ${reads} 
    """
}

/*
 * Process 2. Run multiQC on fastQC results
 */
process multiQC {
    publishDir multiqcOutputFolder, mode: 'copy' 	// this time do not link but copy the output file

    input:
    path (inputfiles)

    output:
    path("multiqc_report.html") 					// do not send the results to any channel

    script:
    """
    multiqc .
    """
}

workflow {
	fastqc_out = fastQC(reads_for_fastqc)
	multiQC(fastqc_out.collect())
}


workflow.onComplete { 
	println ( workflow.success ? "\nDone! Open the following report in your browser --> ${multiqcOutputFolder}/multiqc_report.html\n" : "Oops .. something went wrong" )
}

You can see that the default mode to publish the results in Nextflow is soft linking. You can change this behaviour specifying the mode as indicated in the multiQC process.

Note

IMPORTANT: You can also “move” the results but this is not suggested for files that will be needed for other processes. This will likely disrupt your pipeline

To access the output files via the web they can be copied to your S3 bucket . Your bucket is mounted in /mnt:

ls /mnt

/mnt/nf-class-bucket-1

Note

In this class, each student has its own bucket, with the number correponding to the number of the AWS instance.

Let’s copy the multiqc_report.html file in the S3 bucket and change the privileges:

cp output_multiQC/multiqc_report.html /mnt/nf-class-bucket-1

sudo chmod 775 /mnt/nf-class-bucket-1/multiqc_report.html

Now you will be able to see this html file via the browser (change the bucket number to correspond to your instance):

http://nf-class-bucket-1.s3.eu-central-1.amazonaws.com/multiqc_report.html

Adding help section to a pipeline

Here we describe another good practice: the use of the --help parameter. At the beginning of the pipeline we can write:

#!/usr/bin/env nextflow


/* 
 * This code enables the new dsl of Nextflow. 
 */

nextflow.enable.dsl=2


/* 
 * NextFlow test pipe
 * @authors
 * Luca Cozzuto <lucacozzuto@gmail.com>
 * 
 */

/*
 * Input parameters: read pairs
 * Params are stored in the params.config file
 */

version                 = "1.0"
// this prevents a warning of undefined parameter
params.help             = false

// this prints the input parameters
log.info """
BIOCORE@CRG - N F TESTPIPE  ~  version ${version}
=============================================
reads                           : ${params.reads}
"""

// this prints the help in case you use --help parameter in the command line and it stops the pipeline
if (params.help) {
    log.info 'This is the Biocore\'s NF test pipeline'
    log.info 'Enjoy!'
    log.info '\n'
    exit 1
}

/*
 * Defining the output folders.
 */
fastqcOutputFolder    = "ouptut_fastqc"
multiqcOutputFolder   = "ouptut_multiQC"


/* Reading the file list and creating a "Channel": a queue that connects different channels.
 * The queue is consumed by channels, so you cannot re-use a channel for different processes. 
 * If you need the same data for different processes you need to make more channels.
 */
 
Channel
    .fromPath( params.reads )  											 // read the files indicated by the wildcard                            
    .ifEmpty { error "Cannot find any reads matching: ${params.reads}" } // if empty, complains
    .set {reads_for_fastqc} 											 // make the channel "reads_for_fastqc"


/*
 * Process 1. Run FastQC on raw data. A process is the element for executing scripts / programs etc.
 */
process fastQC {
    publishDir fastqcOutputFolder  			// where (and whether) to publish the results
    tag { "${reads}" }  							// during the execution prints the indicated variable for follow-up
    label 'big_mem' 

    input:
    path reads   							// it defines the input of the process. It sets values from a channel

    output:									// It defines the output of the process (i.e. files) and send to a new channel
   	path "*_fastqc.*"

    script:									// here you have the execution of the script / program. Basically is the command line
    """
        fastqc ${reads} 
    """
}

/*
 * Process 2. Run multiQC on fastQC results
 */
process multiQC {
    publishDir multiqcOutputFolder, mode: 'copy' 	// this time do not link but copy the output file

    input:
    path (inputfiles)

    output:
    path("multiqc_report.html") 					// do not send the results to any channel

    script:
    """
    multiqc .
    """
}

workflow {
	fastqc_out = fastQC(reads_for_fastqc)
	multiQC(fastqc_out.collect())
}


workflow.onComplete { 
	println ( workflow.success ? "\nDone! Open the following report in your browser --> ${multiqcOutputFolder}/multiqc_report.html\n" : "Oops .. something went wrong" )
}

so that launching the pipeline with --help will show you just the parameters and the help.

nextflow run test2.nf --help

N E X T F L O W  ~  version 20.07.1
Launching `test2.nf` [mad_elion] - revision: e3a80b15a2
BIOCORE@CRG - N F TESTPIPE  ~  version 1.0
=============================================
reads                           : /home/ec2-user/git/CRG_Nextflow_Jun_2022/nextflow/nextflow/test2/../testdata/*.fastq.gz
This is the Biocore's NF test pipeline
Enjoy!

EXERCISE

  • Look at the very last EXERCISE of the day before. Change the script and the config file using the label for handling failing processes.

Solution

The process should become:

#!/usr/bin/env nextflow

nextflow.enable.dsl=2

// this can be overridden by using --inputfile OTHERFILENAME
params.inputfile = "$baseDir/../../../testdata/test.fa"

// the "file method" returns a file system object given a file path string
sequences_file = file(params.inputfile)

// check if the file exists
if( !sequences_file.exists() ) exit 1, "Missing genome file: ${sequences_file}"

/*
 * Process 1 for splitting a fasta file in multiple files
 */
process splitSequences {
    input:
    path sequencesFile

    output:
    path ('seq_*')

    // simple awk command
    script:
    """
    awk '/^>/{f="seq_"++d} {print > f}' < ${sequencesFile}
    """
}

/*
 * Process 2 for reversing the sequences
 */
process reverseSequence {
    tag { "${seq}" }

    publishDir "output"
    label 'ignorefail'
    
    input:
    path seq

    output:
    path "all.rev"

    script:
    """
    	cat ${seq} | awk '{if (\$1~">") {print \$0} else system("echo " \$0 " |rev")}' > all.rev
    """
}

workflow flow1 {
    take: sequences

    main:
    splitted_seq        = splitSequences(sequences)
    rev_single_seq      = reverseSequence(splitted_seq)
}

workflow flow2 {
    take: sequences

    main:
    splitted_seq        = splitSequences(sequences).flatten()
    rev_single_seq      = reverseSequence(splitted_seq)
}

workflow {
   flow1(sequences_file)
   flow2(sequences_file)
}

and the nextflow.config file would become:

process {
    withLabel: 'ignorefail' {
        errorStrategy = 'ignore'
    }
}


  • Now look at test2.nf.

Change this script and the config file using the label for handling failing processes by retrying 3 times and incrementing time.

You can specify a very low time (5, 10 or 15 seconds) for the fastqc process so it would fail at beginning.

Solution

The code should become:

#!/usr/bin/env nextflow


/* 
 * This code enables the new dsl of Nextflow. 
 */

nextflow.enable.dsl=2


/* 
 * NextFlow test pipe
 * @authors
 * Luca Cozzuto <lucacozzuto@gmail.com>
 * 
 */

/*
 * Input parameters: read pairs
 * Params are stored in the params.config file
 */

version                 = "1.0"
// this prevents a warning of undefined parameter
params.help             = false

// this prints the input parameters
log.info """
BIOCORE@CRG - N F TESTPIPE  ~  version ${version}
=============================================
reads                           : ${params.reads}
"""

// this prints the help in case you use --help parameter in the command line and it stops the pipeline
if (params.help) {
    log.info 'This is the Biocore\'s NF test pipeline'
    log.info 'Enjoy!'
    log.info '\n'
    exit 1
}

/*
 * Defining the output folders.
 */
fastqcOutputFolder    = "ouptut_fastqc"
multiqcOutputFolder   = "ouptut_multiQC"


/* Reading the file list and creating a "Channel": a queue that connects different channels.
 * The queue is consumed by channels, so you cannot re-use a channel for different processes. 
 * If you need the same data for different processes you need to make more channels.
 */
 
Channel
    .fromPath( params.reads )  											 // read the files indicated by the wildcard                            
    .ifEmpty { error "Cannot find any reads matching: ${params.reads}" } // if empty, complains
    .set {reads_for_fastqc} 											 // make the channel "reads_for_fastqc"


/*
 * Process 1. Run FastQC on raw data. A process is the element for executing scripts / programs etc.
 */
process fastQC {
    publishDir fastqcOutputFolder  			// where (and whether) to publish the results
    tag { "${reads}" }  							// during the execution prints the indicated variable for follow-up
    label 'keep_trying'

    input:
    path reads   							// it defines the input of the process. It sets values from a channel

    output:									// It defines the output of the process (i.e. files) and send to a new channel
    path "*_fastqc.*"

    script:									// here you have the execution of the script / program. Basically is the command line
    """
        fastqc ${reads} 
    """
}


/*
 * Process 2. Run multiQC on fastQC results
 */
process multiQC {
    publishDir multiqcOutputFolder, mode: 'copy' 	// this time do not link but copy the output file

    input:
    path (inputfiles)

    output:
    path("multiqc_report.html") 					// do not send the results to any channel

    script:
    """
       multiqc .
    """
}

workflow {
	fastqc_out = fastQC(reads_for_fastqc)
	multiQC(fastqc_out.collect())
}


workflow.onComplete { 
	println ( workflow.success ? "\nDone! Open the following report in your browser --> ${multiqcOutputFolder}/multiqc_report.html\n" : "Oops .. something went wrong" )
}

while the nextflow.config file would be:

includeConfig "$baseDir/params.config"

process {
    memory='0.6G'
    cpus='1'
    time='6h'

    withLabel: 'keep_trying' { 
        time = { 10.second * task.attempt }
        errorStrategy = 'retry' 
        maxRetries = 3	
    } 	

}

process.container = 'biocorecrg/c4lwg-2018:latest'
singularity.cacheDir = "$baseDir/singularity"