2 Nextflow
More complex scripts
Now, we will introduce a more complex script, where we the channel produced as output from a process feeds another process in the workflow definition.
Here we have two simple processes:
the former splits the input fasta file into single sequences.
the latter can reverse the position of the sequences.
/*
* split a fasta file in multiple files
*/
process splitSequences {
input:
path sequencesFile // nextflow creates links to the original files in a temporary folder
output:
path ('seq_*') // send output files to a new output channel (in this case is a collection)
// awk command for splitting a fasta files in multiple files
script:
"""
awk '/^>/{f="seq_"++d} {print > f}' < ${sequencesFile}
"""
}
/*
* Simple reverse the sequences
*/
process reverseSequence {
// during the execution prints the indicated variable for follow-up
tag { "${seq}" }
input:
path seq
output:
path "all.rev"
script:
"""
cat ${seq} | awk '{if (\$1~">") {print \$0} else system("echo " \$0 " |rev")}' > all.rev
"""
}
Note
The variables used by AWK need to be escaped, otherwise, they will be considered as proper Nextflow variables and thus produce an error. Every special character, e.g., $, needs to be escaped ($). This script can be seen at /test1/test1.nf
The input path is fed as a parameter using the script parameters ${inputfile}
// this can be overridden by using --inputfile OTHERFILENAME
params.inputfile = "$baseDir/../../testdata/test.fa"
// the "file method" returns a file system object given a file path string
sequences_file = file(params.inputfile)
Note
The file “test.fa” is available in the github repository of the course
This value can be overridden when calling the script:
nextflow run test1.nf --inputfile another_input.fa
The workflow part connects the two processes so that the output of the first process becomes an input of the second process.
workflow {
splitted_seq = splitSequences(sequences_file)
// Here you have the output channel as a collection
splitted_seq.view()
// Here you have the same channel reshaped to send separately each value
splitted_seq.flatten().view()
// DLS2 allows you to reuse the channels! In past you had to create many identical
// channels for sending the same kind of data to different processes
rev_single_seq = reverseSequence(splitted_seq)
}
During the execution, Nextflow creates several temporary folders and a soft link to the original input file. It will then store output files locally.
The output file is then linked to other folders to be used as input from other processes. This avoids clashes, and each process is isolated from the other.
nextflow run test1.nf -bg
N E X T F L O W ~ version 20.07.1
Launching `test1.nf` [sad_newton] - revision: 82e66714e4
[09/53e071] Submitted process > splitSequences
[/home/ec2-user/git/CRG_Nextflow_Jun_2022/nextflow/nextflow/work/09/53e071d286ed66f4020869c8977b59/seq_1, /home/ec2-user/git/CRG_Nextflow_Jun_2022/nextflow/nextflow/work/09/53e071d286ed66f4020869c8977b59/seq_2, /home/ec2-user/git/CRG_Nextflow_Jun_2022/nextflow/nextflow/work/09/53e071d286ed66f4020869c8977b59/seq_3]
/home/ec2-user/git/CRG_Nextflow_Jun_2022/nextflow/nextflow/work/09/53e071d286ed66f4020869c8977b59/seq_1
/home/ec2-user/git/CRG_Nextflow_Jun_2022/nextflow/nextflow/work/09/53e071d286ed66f4020869c8977b59/seq_2
/home/ec2-user/git/CRG_Nextflow_Jun_2022/nextflow/nextflow/work/09/53e071d286ed66f4020869c8977b59/seq_3
[fe/0a8640] Submitted process > reverseSequence ([seq_1, seq_2, seq_3])
We can inspect the content of work/09/53e071* generated by the process splitSequences:
ls -l work/09/53e071*
total 24
-rw-r--r-- 1 lcozzuto staff 29 Oct 8 19:16 seq_1
-rw-r--r-- 1 lcozzuto staff 33 Oct 8 19:16 seq_2
-rw-r--r-- 1 lcozzuto staff 27 Oct 8 19:16 seq_3
lrwxr-xr-x 1 lcozzuto staff 69 Oct 8 19:16 test.fa -> /home/ec2-user/git/CRG_Nextflow_Jun_2022/nextflow/nextflow/testdata/test.fa
File test.fa is a soft link to the original input.
If we inspect work/fe/0a8640* that is generated by the process reverseSequence, we see that the files generated by splitSequences are now linked as input.
ls -l work/fe/0a8640*
total 8
-rw-r--r-- 1 lcozzuto staff 89 Oct 8 19:16 all.rev
lrwxr-xr-x 1 lcozzuto staff 97 Oct 8 19:16 seq_1 -> /home/ec2-user/git/CRG_Nextflow_Jun_2022/nextflow/nextflow/work/09/53e071d286ed66f4020869c8977b59/seq_1
lrwxr-xr-x 1 lcozzuto staff 97 Oct 8 19:16 seq_2 -> /home/ec2-user/git/CRG_Nextflow_Jun_2022/nextflow/nextflow/work/09/53e071d286ed66f4020869c8977b59/seq_2
lrwxr-xr-x 1 lcozzuto staff 97 Oct 8 19:16 seq_3 -> /home/ec2-user/git/CRG_Nextflow_Jun_2022/nextflow/nextflow/work/09/53e071d286ed66f4020869c8977b59/seq_3
The “bin” folder
As we see before we need to escape some characters. So it can be very tedious when writing long one-liners; therefore, it is recommended to make a small shell script and call it as if it were an executable. This script has to be placed in a folder named bin inside the pipeline folder to be automatically considered from Nextflow as a tool in the path.
You can see an example of this code by looking at the script test1_a.nf where the one-liner is replaced by a shell script:
/*
* split a fasta file in multiple files
*/
process splitSequences {
input:
path sequencesFile // nextflow creates links to the original files in a temporary folder
output:
path ('seq_*') // send output files to a new output channel (in this case is a collection)
// awk command for splitting a fasta files in multiple files
script:
"""
splitseq.sh ${sequencesFile}
"""
}
Here instead the shell script in bin/splitseq.sh
#!/bin/bash
#
# This script will split a fasta files in multiple files
awk '/^>/ {f="seq_"++d} {print > f}' < $1
Named workflows
At this point we can make two different workflows to demonstrate how the new DSL allows reusing the code (script name test1_b.nf).
#!/usr/bin/env nextflow
nextflow.enable.dsl=2
// this can be overridden by using --inputfile OTHERFILENAME
params.inputfile = "$baseDir/../../testdata/test.fa"
// the "file method" returns a file system object given a file path string
sequences_file = file(params.inputfile)
// check if the file exists
if( !sequences_file.exists() ) exit 1, "Missing genome file: ${sequences_file}"
/*
* Process 1 for splitting a fasta file in multiple files
*/
process splitSequences {
input:
path sequencesFile
output:
path ('seq_*')
// simple awk command
script:
"""
awk '/^>/{f="seq_"++d} {print > f}' < ${sequencesFile}
"""
}
/*
* Process 2 for reversing the sequences
*/
process reverseSequence {
tag { "${seq}" }
input:
path seq
output:
path "all.rev"
script:
"""
cat ${seq} | awk '{if (\$1~">") {print \$0} else system("echo " \$0 " |rev")}' > all.rev
"""
}
workflow flow1 {
take: sequences
main:
splitted_seq = splitSequences(sequences)
rev_single_seq = reverseSequence(splitted_seq)
emit:
rev_single_seq
}
workflow flow2 {
take: sequences
main:
splitted_seq = splitSequences(sequences).flatten()
rev_single_seq = reverseSequence(splitted_seq)
emit:
rev_single_seq
}
workflow {
out1 = flow1(sequences_file)
out2 = flow2(sequences_file)
out2.view()
}
The first workflow will just run like the previous script, while the second will “flatten” the output of the first process and will launch the second process on every single sequence.
The reverseSequence process of the second workflow will run in parallel if you have enough processors, or if you are running the script in a cluster environment, with a scheduler supported by Nextflow.
Our named workflows have an <<output>> too this time. It is specified using the emit keyword.
nextflow run test1_b.nf
N E X T F L O W ~ version 23.08.1-edge
Launching `test1_b.nf` [suspicious_faggin] DSL2 - revision: d89b62c1d4
executor > local (6)
[18/37c7cb] process > flow1:splitSequences [100%] 1 of 1 ✔
[cc/458cd1] process > flow1:reverseSequence (seq_1 seq_2 seq_3) [100%] 1 of 1 ✔
[eb/cf273d] process > flow2:splitSequences [100%] 1 of 1 ✔
[f7/ce4ed9] process > flow2:reverseSequence (seq_1) [100%] 3 of 3 ✔
/Users/lcozzuto/aaa/nextflow-course-2023-fall/nextflow/test1/work/6f/e1ed9278d23363b740693e287a4167/all.rev
/Users/lcozzuto/aaa/nextflow-course-2023-fall/nextflow/test1/work/b2/ad05b00fcfca30fa7a4fbec4dad811/all.rev
/Users/lcozzuto/aaa/nextflow-course-2023-fall/nextflow/test1/work/f7/ce4ed995da248211f125e5b91ab762/all.rev
Exercise
Optimize the previous pipeline to avoid running the process splitSequences twice.
Solution
#!/usr/bin/env nextflow
nextflow.enable.dsl=2
// this can be overridden by using --inputfile OTHERFILENAME
params.inputfile = "$baseDir/../../testdata/test.fa"
// the "file method" returns a file system object given a file path string
sequences_file = file(params.inputfile)
// check if the file exists
if( !sequences_file.exists() ) exit 1, "Missing genome file: ${sequences_file}"
/*
* Process 1 for splitting a fasta file in multiple files
*/
process splitSequences {
input:
path sequencesFile
output:
path ('seq_*')
// simple awk command
script:
"""
awk '/^>/{f="seq_"++d} {print > f}' < ${sequencesFile}
"""
}
/*
* Process 2 for reversing the sequences
*/
process reverseSequence {
tag { "${seq}" }
input:
path seq
output:
path "all.rev"
script:
"""
cat ${seq} | awk '{if (\$1~">") {print \$0} else system("echo " \$0 " |rev")}' > all.rev
"""
}
workflow flow1 {
take: splitted_seq
main:
rev_single_seq = reverseSequence(splitted_seq)
}
workflow flow2 {
take: splitted_seq
main:
rev_single_seq = reverseSequence(splitted_seq.flatten())
}
workflow {
splitted_seq = splitSequences(sequences_file)
flow1(splitted_seq)
flow2(splitted_seq)
}
Resuming a pipeline
You can resume the execution after the code modification using the parameter -resume.
nextflow run test1.nf -bg -resume
N E X T F L O W ~ version 20.07.1
Launching `test1.nf` [determined_celsius] - revision: eaf5b4d673
[bd/f4e9a6] Cached process > flow1:splitSequences
[37/d790ab] Cached process > flow2:splitSequences
[93/c7b1c6] Cached process > flow2:reverseSequence (seq_3)
[45/86dd83] Cached process > flow2:reverseSequence (seq_1)
[87/54bfe8] Cached process > flow2:reverseSequence (seq_2)
[33/a6fc72] Cached process > flow1:reverseSequence ([seq_1, seq_2, seq_3])
/home/ec2-user/git/CRG_Nextflow_Jun_2022/nextflow/nextflow/work/33/a6fc72786d042cacf733034d501691/all.rev
Note
IMPORTANT: Nextflow parameters are provided using one hyphen (-resume) while a pipeline parameters, two hyphens (--inputfile).
Sometimes you might want to resume a previous run of your pipeline.
To do so you need to extract the job id of that run. You can do this by using the command nextflow log.
nextflow log
TIMESTAMP DURATION RUN NAME STATUS REVISION ID SESSION ID COMMAND
2020-10-06 14:49:09 2s agitated_avogadro OK 61a595c5bf 4a7a8a4b-9bdb-4b15-9cc6-1b2cabe9a938 nextflow run test1.nf
2020-10-08 19:14:38 2.8s sick_edison OK 82e66714e4 4fabb863-2038-47b4-bac0-19e71f93f284 nextflow run test1.nf -bg
2020-10-08 19:16:03 3s sad_newton OK 82e66714e4 2d13e9f8-1ba6-422d-9087-5c6c9731a795 nextflow run test1.nf -bg
2020-10-08 19:30:59 2.3s disturbed_wozniak OK d33befe154 0a19b60d-d5fe-4a26-9e01-7a63d0a1d300 nextflow run test1.nf -bg
2020-10-08 19:35:52 2.5s insane_plateau OK d33befe154 b359f32c-254f-4271-95bb-6a91b281dc6d nextflow run test1.nf -bg
2020-10-08 19:56:30 2.8s determined_celsius OK eaf5b4d673 b359f32c-254f-4271-95bb-6a91b281dc6d nextflow run test1.nf -bg -resume
You can then resume the state of your execution using the SESSION ID:
nextflow run -resume 0a19b60d-d5fe-4a26-9e01-7a63d0a1d300 test1.nf
Nextflow’s cache can be disabled for a specific process by setting the directive cache to false. You can also choose among the three caching methods:
cache = true // (default) Cache keys are created indexing input files meta-data information (name, size and last update timestamp attributes).
cache = 'deep' // Cache keys are created indexing input files content.
cache = 'lenient' // (Best in HPC and shared file systems) Cache keys are created indexing input files path and size attributes
IMPORTANT: On some shared file systems you might have inconsistent file timestamps. Thus cache lenient prevents you from unwanted restarting of cached processes.
Directives
The directives are declaration blocks that can provide optional settings for a process.
For example, they can affect the way a process stages in and out the input and output files (stageInMode and stageOutMode), or specify a particular resource such as the number of cpus , the memory, and the time
Here an example:
process my_process {
time '1h'
memory '2 GB'
cpus 8
"""
Some execution
"""
}
We can also indicate what to do if a process fails.
The default is to stop the pipeline and to raise an error. But we can also skip the process using the errorStrategy directive:
process my_process {
time '1h'
memory '2 GB'
cpus 8
errorStrategy 'ignore'
"""
Some execution
"""
}
or retry a number of times changing the available memory or the maximum execution time, using the foolowing directives:
process my_process {
memory { 1.GB * task.attempt }
time { 1.hour * task.attempt }
errorStrategy 'retry'
maxRetries 3
}
EXERCISE
Make the previous pipeline resilient to the process failing and save the results so the process execution would be skipped when the pipeline is launched again.
First, make the process reverseSequence fail by introducing a typo in the command line, then add the directive to the process.
Solution
The solution is at sol3.nf. In particular, the change is here:
#!/usr/bin/env nextflow
nextflow.enable.dsl=2
// this can be overridden by using --inputfile OTHERFILENAME
params.inputfile = "$baseDir/../../testdata/test.fa"
// the "file method" returns a file system object given a file path string
sequences_file = file(params.inputfile)
// check if the file exists
if( !sequences_file.exists() ) exit 1, "Missing genome file: ${sequences_file}"
/*
* Process 1 for splitting a fasta file in multiple files
*/
process splitSequences {
input:
path sequencesFile
output:
path ('seq_*')
// simple awk command
script:
"""
awk '/^>/{f="seq_"++d} {print > f}' < ${sequencesFile}
"""
}
/*
* Broken process
*/
process reverseSequence {
tag { "${seq}" }
errorStrategy 'ignore'
input:
path seq
output:
path "all.rev"
script:
"""
cat ${seq} | AAAAAAA '{if (\$1~">") {print \$0} else system("echo " \$0 " |rev")}' > all.rev
"""
}
workflow flow1 {
take: sequences
main:
splitted_seq = splitSequences(sequences)
rev_single_seq = reverseSequence(splitted_seq)
}
workflow flow2 {
take: sequences
main:
splitted_seq = splitSequences(sequences).flatten()
rev_single_seq = reverseSequence(splitted_seq)
}
workflow {
flow1(sequences_file)
flow2(sequences_file)
}
Write the first workflow using pipes. Nextflow DLS2 allows you to use pipes for connecting channels via input/output.
See the documentation on pipes.
Solution
The solution is at sol4.nf. Here is the change:
#!/usr/bin/env nextflow
nextflow.enable.dsl=2
// this can be overridden by using --inputfile OTHERFILENAME
params.inputfile = "$baseDir/../../testdata/test.fa"
// the "file method" returns a file system object given a file path string
sequences_file = file(params.inputfile)
// check if the file exists
if( !sequences_file.exists() ) exit 1, "Missing genome file: ${sequences_file}"
/*
* Process 1 for splitting a fasta file in multiple files
*/
process splitSequences {
input:
path sequencesFile
output:
path ('seq_*')
// simple awk command
script:
"""
awk '/^>/{f="seq_"++d} {print > f}' < ${sequencesFile}
"""
}
/*
* Process 2 for reversing the sequences
*/
process reverseSequence {
tag { "${seq}" }
input:
path seq
output:
path "all.rev"
script:
"""
cat ${seq} | awk '{if (\$1~">") {print \$0} else system("echo " \$0 " |rev")}' > all.rev
"""
}
workflow flow1 {
take: sequences
main:
splitSequences(sequences) | reverseSequence | view()
}
workflow flow2 {
take: sequences
main:
splitted_seq = splitSequences(sequences).flatten()
rev_single_seq = reverseSequence(splitted_seq)
}
workflow {
flow1(sequences_file)
flow2(sequences_file)
}
Note
When a shell script fails you might still have a 0 exit code and if an output file is generated Nextflow won’t recognize an error. This is because in a bash pipeline exit code is the one from the last command. To change this behavior you should add at the beginning of your bash script the code:
set -euxo pipefail
BASH CODE HERE