3 Nextflow
Decoupling resources, parameters and Nextflow script
When making complex pipelines it is convenient to keep the definition of resources needed, the default parameters, and the main script separately from each other. This can be achieved using two additional files:
nextflow.config
params.config
The nextflow.config file allows to indicate resources needed for each class of processes. This is achieved by labeling processes in the nextflow.config file:
process {
memory='0.6G'
cpus='1'
time='6h'
withLabel: 'onecpu' {
memory='0.6G'
cpus='1'
}
withLabel: 'bigmem' {
memory='0.7G'
cpus='1'
}
}
The first part defines the “default” resources for a process:
process {
memory='0.6G'
cpus='1'
time='6h'
withLabel: 'onecpu' {
memory='0.6G'
cpus='1'
}
withLabel: 'bigmem' {
memory='0.7G'
cpus='1'
}
}
Then are specified the resources needed for a class of processes labeled bigmem. In brief, the default options will be overridden for the processes labeled bigmem and onecpu:
process {
memory='0.6G'
cpus='1'
time='6h'
withLabel: 'onecpu' {
memory='0.6G'
cpus='1'
}
withLabel: 'bigmem' {
memory='0.7G'
cpus='1'
}
}
Tip
You can add the default configuration for shell executions within to the nextflow.config file:
process {
shell = ['/bin/bash', '-euo', 'pipefail']
...
In the script /test2/test2.nf file, there are two processes to run two programs:
fastQC - a tool that calculates a number of quality control metrics on single fastq files;
multiQC - an aggregator of results from bioinformatics tools and samples for generating a single html report.
#!/usr/bin/env nextflow
/*
* This code enables the new dsl of Nextflow.
*/
nextflow.enable.dsl=2
/*
* NextFlow test pipe
* @authors
* Luca Cozzuto <lucacozzuto@gmail.com>
*
*/
/*
* Input parameters: read pairs
* Params are stored in the params.config file
*/
version = "1.0"
// this prevents a warning of undefined parameter
params.help = false
// this prints the input parameters
log.info """
BIOCORE@CRG - N F TESTPIPE ~ version ${version}
=============================================
reads : ${params.reads}
"""
// this prints the help in case you use --help parameter in the command line and it stops the pipeline
if (params.help) {
log.info 'This is the Biocore\'s NF test pipeline'
log.info 'Enjoy!'
log.info '\n'
exit 1
}
/*
* Defining the output folders.
*/
fastqcOutputFolder = "ouptut_fastqc"
multiqcOutputFolder = "ouptut_multiQC"
/* Reading the file list and creating a "Channel": a queue that connects different channels.
* The queue is consumed by channels, so you cannot re-use a channel for different processes.
* If you need the same data for different processes you need to make more channels.
*/
Channel
.fromPath( params.reads ) // read the files indicated by the wildcard
.ifEmpty { error "Cannot find any reads matching: ${params.reads}" } // if empty, complains
.set {reads_for_fastqc} // make the channel "reads_for_fastqc"
/*
* Process 1. Run FastQC on raw data. A process is the element for executing scripts / programs etc.
*/
process fastQC {
publishDir fastqcOutputFolder // where (and whether) to publish the results
tag { "${reads}" } // during the execution prints the indicated variable for follow-up
label 'big_mem'
input:
path reads // it defines the input of the process. It sets values from a channel
output: // It defines the output of the process (i.e. files) and send to a new channel
path "*_fastqc.*"
script: // here you have the execution of the script / program. Basically is the command line
"""
fastqc ${reads}
"""
}
/*
* Process 2. Run multiQC on fastQC results
*/
process multiQC {
publishDir multiqcOutputFolder, mode: 'copy' // this time do not link but copy the output file
input:
path (inputfiles)
output:
path("multiqc_report.html") // do not send the results to any channel
script:
"""
multiqc .
"""
}
workflow {
fastqc_out = fastQC(reads_for_fastqc)
multiQC(fastqc_out.collect())
}
workflow.onComplete {
println ( workflow.success ? "\nDone! Open the following report in your browser --> ${multiqcOutputFolder}/multiqc_report.html\n" : "Oops .. something went wrong" )
}
You can see that the process fastQC is labeled ‘bigmem’.
The last two rows of the config file indicate which containers to use. In this example, – and by default, if the repository is not specified, – a container is pulled from the DockerHub. In the case of using a singularity container, you can indicate where to store the local image using the singularity.cacheDir option:
process.container = 'biocorecrg/c4lwg-2018:latest'
singularity.cacheDir = "$baseDir/singularity"
Let’s now launch the script test2.nf.
cd test2;
nextflow run test2.nf
N E X T F L O W ~ version 20.07.1
Launching `test2.nf` [distracted_edison] - revision: e3a80b15a2
BIOCORE@CRG - N F TESTPIPE ~ version 1.0
=============================================
reads : /home/ec2-user/git/CRG_Nextflow_Jun_2022/nextflow/nextflow/test2/../testdata/*.fastq.gz
executor > local (2)
[df/2c45f2] process > fastQC (B7_input_s_chr19.fastq.gz) [ 0%] 0 of 2
[- ] process > multiQC -
Error executing process > 'fastQC (B7_H3K4me1_s_chr19.fastq.gz)'
Caused by:
Process `fastQC (B7_H3K4me1_s_chr19.fastq.gz)` terminated with an error exit status (127)
Command executed:
fastqc B7_H3K4me1_s_chr19.fastq.gz
Command exit status:
127
executor > local (2)
[df/2c45f2] process > fastQC (B7_input_s_chr19.fastq.gz) [100%] 2 of 2, failed: 2 ✘
[- ] process > multiQC -
Error executing process > 'fastQC (B7_H3K4me1_s_chr19.fastq.gz)'
Caused by:
Process `fastQC (B7_H3K4me1_s_chr19.fastq.gz)` terminated with an error exit status (127)
Command executed:
fastqc B7_H3K4me1_s_chr19.fastq.gz
Command exit status:
127
Command output:
(empty)
Command error:
.command.sh: line 2: fastqc: command not found
Work dir:
/home/ec2-user/git/CRG_Nextflow_Jun_2022/nextflow/nextflow/test2/work/c5/18e76b2e6ffd64aac2b52e69bedef3
Tip: when you have fixed the problem you can continue the execution adding the option `-resume` to the run command line
We will get a number of errors since no executable is found in our environment/path. This is because the executables are stored in our docker image and we have to tell Nextflow to use the docker image, using the -with-docker parameter.
nextflow run test2.nf -with-docker
N E X T F L O W ~ version 20.07.1
Launching `test2.nf` [boring_hamilton] - revision: e3a80b15a2
BIOCORE@CRG - N F TESTPIPE ~ version 1.0
=============================================
reads : /home/ec2-user/git/CRG_Nextflow_Jun_2022/nextflow/nextflow/test2/../testdata/*.fastq.gz
executor > local (3)
[22/b437be] process > fastQC (B7_H3K4me1_s_chr19.fastq.gz) [100%] 2 of 2 ✔
[1a/cfe63b] process > multiQC [ 0%] 0 of 1
executor > local (3)
[22/b437be] process > fastQC (B7_H3K4me1_s_chr19.fastq.gz) [100%] 2 of 2 ✔
[1a/cfe63b] process > multiQC [100%] 1 of 1 ✔
This time it worked because Nextflow used the image specified in the nextflow.config file and containing the executables.
Now let’s take a look at the params.config file:
params {
reads = "$baseDir/../../testdata/*.fastq.gz"
email = "myemail@google.com"
}
As you can see, we indicated two pipeline parameters, reads and email; when running the pipeline, they can be overridden using --reads and --email.
This file is included thanks again to the nextflow.config file, here shown entirely
includeConfig "$baseDir/params.config"
process {
memory='0.6G'
cpus='1'
time='6h'
withLabel: 'onecpu' {
memory='0.6G'
cpus='1'
}
withLabel: 'bigmem' {
memory='0.7G'
cpus='1'
}
}
process.container = 'biocorecrg/c4lwg-2018:latest'
singularity.cacheDir = "$baseDir/singularity"
Now, let’s examine the folders generated by the pipeline.
ls work/2a/22e3df887b1b5ac8af4f9cd0d88ac5/
total 0
drwxrwxr-x 3 ec2-user ec2-user 26 Apr 23 13:52 .
drwxr-xr-x 2 root root 136 Apr 23 13:51 multiqc_data
drwxrwxr-x 3 ec2-user ec2-user 44 Apr 23 13:51 ..
We observe that Docker runs as “root”. This can be problematic and generates security issues. To avoid this we can add this line of code within the process section of the config file:
containerOptions = { workflow.containerEngine == "docker" ? '-u $(id -u):$(id -g)': null}
This will tell Nextflow that if it is run with Docker, it has to produce files that belong to a user rather than the root.
Publishing final results
The script test2.nf generates two new folders, output_fastqc and output_multiQC, that contain the result of the pipeline output. We can indicate which process and output can be considered the final output of the pipeline using the publishDir directive that has to be specified at the beginning of a process.
In our pipeline, we define these folders here:
#!/usr/bin/env nextflow
/*
* This code enables the new dsl of Nextflow.
*/
nextflow.enable.dsl=2
/*
* NextFlow test pipe
* @authors
* Luca Cozzuto <lucacozzuto@gmail.com>
*
*/
/*
* Input parameters: read pairs
* Params are stored in the params.config file
*/
version = "1.0"
// this prevents a warning of undefined parameter
params.help = false
// this prints the input parameters
log.info """
BIOCORE@CRG - N F TESTPIPE ~ version ${version}
=============================================
reads : ${params.reads}
"""
// this prints the help in case you use --help parameter in the command line and it stops the pipeline
if (params.help) {
log.info 'This is the Biocore\'s NF test pipeline'
log.info 'Enjoy!'
log.info '\n'
exit 1
}
/*
* Defining the output folders.
*/
fastqcOutputFolder = "ouptut_fastqc"
multiqcOutputFolder = "ouptut_multiQC"
/* Reading the file list and creating a "Channel": a queue that connects different channels.
* The queue is consumed by channels, so you cannot re-use a channel for different processes.
* If you need the same data for different processes you need to make more channels.
*/
Channel
.fromPath( params.reads ) // read the files indicated by the wildcard
.ifEmpty { error "Cannot find any reads matching: ${params.reads}" } // if empty, complains
.set {reads_for_fastqc} // make the channel "reads_for_fastqc"
/*
* Process 1. Run FastQC on raw data. A process is the element for executing scripts / programs etc.
*/
process fastQC {
publishDir fastqcOutputFolder // where (and whether) to publish the results
tag { "${reads}" } // during the execution prints the indicated variable for follow-up
label 'big_mem'
input:
path reads // it defines the input of the process. It sets values from a channel
output: // It defines the output of the process (i.e. files) and send to a new channel
path "*_fastqc.*"
script: // here you have the execution of the script / program. Basically is the command line
"""
fastqc ${reads}
"""
}
/*
* Process 2. Run multiQC on fastQC results
*/
process multiQC {
publishDir multiqcOutputFolder, mode: 'copy' // this time do not link but copy the output file
input:
path (inputfiles)
output:
path("multiqc_report.html") // do not send the results to any channel
script:
"""
multiqc .
"""
}
workflow {
fastqc_out = fastQC(reads_for_fastqc)
multiQC(fastqc_out.collect())
}
workflow.onComplete {
println ( workflow.success ? "\nDone! Open the following report in your browser --> ${multiqcOutputFolder}/multiqc_report.html\n" : "Oops .. something went wrong" )
}
You can see that the default mode to publish the results in Nextflow is soft linking. You can change this behavior by specifying the mode as indicated in the multiQC process.
Note
IMPORTANT: You can also “move” the results but this is not suggested for files that will be needed for other processes. This will likely disrupt your pipeline
Adding help section to a pipeline
Here we describe another good practice: the use of the --help parameter. At the beginning of the pipeline, we can write:
#!/usr/bin/env nextflow
/*
* This code enables the new dsl of Nextflow.
*/
nextflow.enable.dsl=2
/*
* NextFlow test pipe
* @authors
* Luca Cozzuto <lucacozzuto@gmail.com>
*
*/
/*
* Input parameters: read pairs
* Params are stored in the params.config file
*/
version = "1.0"
// this prevents a warning of undefined parameter
params.help = false
// this prints the input parameters
log.info """
BIOCORE@CRG - N F TESTPIPE ~ version ${version}
=============================================
reads : ${params.reads}
"""
// this prints the help in case you use --help parameter in the command line and it stops the pipeline
if (params.help) {
log.info 'This is the Biocore\'s NF test pipeline'
log.info 'Enjoy!'
log.info '\n'
exit 1
}
/*
* Defining the output folders.
*/
fastqcOutputFolder = "ouptut_fastqc"
multiqcOutputFolder = "ouptut_multiQC"
/* Reading the file list and creating a "Channel": a queue that connects different channels.
* The queue is consumed by channels, so you cannot re-use a channel for different processes.
* If you need the same data for different processes you need to make more channels.
*/
Channel
.fromPath( params.reads ) // read the files indicated by the wildcard
.ifEmpty { error "Cannot find any reads matching: ${params.reads}" } // if empty, complains
.set {reads_for_fastqc} // make the channel "reads_for_fastqc"
/*
* Process 1. Run FastQC on raw data. A process is the element for executing scripts / programs etc.
*/
process fastQC {
publishDir fastqcOutputFolder // where (and whether) to publish the results
tag { "${reads}" } // during the execution prints the indicated variable for follow-up
label 'big_mem'
input:
path reads // it defines the input of the process. It sets values from a channel
output: // It defines the output of the process (i.e. files) and send to a new channel
path "*_fastqc.*"
script: // here you have the execution of the script / program. Basically is the command line
"""
fastqc ${reads}
"""
}
/*
* Process 2. Run multiQC on fastQC results
*/
process multiQC {
publishDir multiqcOutputFolder, mode: 'copy' // this time do not link but copy the output file
input:
path (inputfiles)
output:
path("multiqc_report.html") // do not send the results to any channel
script:
"""
multiqc .
"""
}
workflow {
fastqc_out = fastQC(reads_for_fastqc)
multiQC(fastqc_out.collect())
}
workflow.onComplete {
println ( workflow.success ? "\nDone! Open the following report in your browser --> ${multiqcOutputFolder}/multiqc_report.html\n" : "Oops .. something went wrong" )
}
so that launching the pipeline with --help will show you just the parameters and the help.
nextflow run test2.nf --help
N E X T F L O W ~ version 20.07.1
Launching `test2.nf` [mad_elion] - revision: e3a80b15a2
BIOCORE@CRG - N F TESTPIPE ~ version 1.0
=============================================
reads : /home/ec2-user/git/CRG_Nextflow_Jun_2022/nextflow/nextflow/test2/../testdata/*.fastq.gz
This is the Biocore's NF test pipeline
Enjoy!
EXERCISE
Look at the very last EXERCISE of the day before. Change the script and the config file using the label for handling failing processes.
Solution
The process should become:
#!/usr/bin/env nextflow
nextflow.enable.dsl=2
// this can be overridden by using --inputfile OTHERFILENAME
params.inputfile = "$baseDir/../../../testdata/test.fa"
// the "file method" returns a file system object given a file path string
sequences_file = file(params.inputfile)
// check if the file exists
if( !sequences_file.exists() ) exit 1, "Missing genome file: ${sequences_file}"
/*
* Process 1 for splitting a fasta file in multiple files
*/
process splitSequences {
input:
path sequencesFile
output:
path ('seq_*')
// simple awk command
script:
"""
awk '/^>/{f="seq_"++d} {print > f}' < ${sequencesFile}
"""
}
/*
* Process 2 for reversing the sequences
*/
process reverseSequence {
tag { "${seq}" }
publishDir "output"
label 'ignorefail'
input:
path seq
output:
path "all.rev"
script:
"""
cat ${seq} | awk '{if (\$1~">") {print \$0} else system("echo " \$0 " |rev")}' > all.rev
"""
}
workflow flow1 {
take: sequences
main:
splitted_seq = splitSequences(sequences)
rev_single_seq = reverseSequence(splitted_seq)
}
workflow flow2 {
take: sequences
main:
splitted_seq = splitSequences(sequences).flatten()
rev_single_seq = reverseSequence(splitted_seq)
}
workflow {
flow1(sequences_file)
flow2(sequences_file)
}
and the nextflow.config file would become:
process {
withLabel: 'ignorefail' {
errorStrategy = 'ignore'
}
}
Now look at test2.nf.
Change this script and the config file using the label for handling failing processes by retrying 3 times and incrementing time.
You can specify a very low time (5, 10 or 15 seconds) for the fastqc process so it would fail at the beginning.
Solution
The code should become:
#!/usr/bin/env nextflow
/*
* This code enables the new dsl of Nextflow.
*/
nextflow.enable.dsl=2
/*
* NextFlow test pipe
* @authors
* Luca Cozzuto <lucacozzuto@gmail.com>
*
*/
/*
* Input parameters: read pairs
* Params are stored in the params.config file
*/
version = "1.0"
// this prevents a warning of undefined parameter
params.help = false
// this prints the input parameters
log.info """
BIOCORE@CRG - N F TESTPIPE ~ version ${version}
=============================================
reads : ${params.reads}
"""
// this prints the help in case you use --help parameter in the command line and it stops the pipeline
if (params.help) {
log.info 'This is the Biocore\'s NF test pipeline'
log.info 'Enjoy!'
log.info '\n'
exit 1
}
/*
* Defining the output folders.
*/
fastqcOutputFolder = "ouptut_fastqc"
multiqcOutputFolder = "ouptut_multiQC"
/* Reading the file list and creating a "Channel": a queue that connects different channels.
* The queue is consumed by channels, so you cannot re-use a channel for different processes.
* If you need the same data for different processes you need to make more channels.
*/
Channel
.fromPath( params.reads ) // read the files indicated by the wildcard
.ifEmpty { error "Cannot find any reads matching: ${params.reads}" } // if empty, complains
.set {reads_for_fastqc} // make the channel "reads_for_fastqc"
/*
* Process 1. Run FastQC on raw data. A process is the element for executing scripts / programs etc.
*/
process fastQC {
publishDir fastqcOutputFolder // where (and whether) to publish the results
tag { "${reads}" } // during the execution prints the indicated variable for follow-up
label 'keep_trying'
input:
path reads // it defines the input of the process. It sets values from a channel
output: // It defines the output of the process (i.e. files) and send to a new channel
path "*_fastqc.*"
script: // here you have the execution of the script / program. Basically is the command line
"""
fastqc ${reads}
"""
}
/*
* Process 2. Run multiQC on fastQC results
*/
process multiQC {
publishDir multiqcOutputFolder, mode: 'copy' // this time do not link but copy the output file
input:
path (inputfiles)
output:
path("multiqc_report.html") // do not send the results to any channel
script:
"""
multiqc .
"""
}
workflow {
fastqc_out = fastQC(reads_for_fastqc)
multiQC(fastqc_out.collect())
}
workflow.onComplete {
println ( workflow.success ? "\nDone! Open the following report in your browser --> ${multiqcOutputFolder}/multiqc_report.html\n" : "Oops .. something went wrong" )
}
while the nextflow.config file would be:
includeConfig "$baseDir/params.config"
process {
memory='0.6G'
cpus='1'
time='6h'
withLabel: 'keep_trying' {
time = { 10.second * task.attempt }
errorStrategy = 'retry'
maxRetries = 3
}
}
process.container = 'biocorecrg/c4lwg-2018:latest'
singularity.cacheDir = "$baseDir/singularity"