3 Nextflow

Decoupling resources, parameters and Nextflow script

When making complex pipelines it is convenient to keep the definition of resources needed, the default parameters, and the main script separately from each other. This can be achieved using two additional files:

  • nextflow.config

  • params.config

The nextflow.config file allows to indicate resources needed for each class of processes. This is achieved by labeling processes in the nextflow.config file:

process {
    memory='0.6G'
    cpus='1'
    time='6h'

    withLabel: 'onecpu' {
        memory='0.6G'
        cpus='1'
    }

    withLabel: 'bigmem' {
        memory='0.7G'
        cpus='1'
    }
}

The first part defines the “default” resources for a process:

process {
    memory='0.6G'
    cpus='1'
    time='6h'

    withLabel: 'onecpu' {
        memory='0.6G'
        cpus='1'
    }

    withLabel: 'bigmem' {
        memory='0.7G'
        cpus='1'
    }
}

Then are specified the resources needed for a class of processes labeled bigmem. In brief, the default options will be overridden for the processes labeled bigmem and onecpu:

process {
    memory='0.6G'
    cpus='1'
    time='6h'

    withLabel: 'onecpu' {
        memory='0.6G'
        cpus='1'
    }

    withLabel: 'bigmem' {
        memory='0.7G'
        cpus='1'
    }
}

Tip

You can add the default configuration for shell executions within to the nextflow.config file:

process {
        shell = ['/bin/bash', '-euo', 'pipefail']
        ...

In the script /test2/test2.nf file, there are two processes to run two programs:

  • fastQC - a tool that calculates a number of quality control metrics on single fastq files;

  • multiQC - an aggregator of results from bioinformatics tools and samples for generating a single html report.

#!/usr/bin/env nextflow


/* 
 * This code enables the new dsl of Nextflow. 
 */

nextflow.enable.dsl=2


/* 
 * NextFlow test pipe
 * @authors
 * Luca Cozzuto <lucacozzuto@gmail.com>
 * 
 */

/*
 * Input parameters: read pairs
 * Params are stored in the params.config file
 */

version                 = "1.0"
// this prevents a warning of undefined parameter
params.help             = false

// this prints the input parameters
log.info """
BIOCORE@CRG - N F TESTPIPE  ~  version ${version}
=============================================
reads                           : ${params.reads}
"""

// this prints the help in case you use --help parameter in the command line and it stops the pipeline
if (params.help) {
    log.info 'This is the Biocore\'s NF test pipeline'
    log.info 'Enjoy!'
    log.info '\n'
    exit 1
}

/*
 * Defining the output folders.
 */
fastqcOutputFolder    = "ouptut_fastqc"
multiqcOutputFolder   = "ouptut_multiQC"


/* Reading the file list and creating a "Channel": a queue that connects different channels.
 * The queue is consumed by channels, so you cannot re-use a channel for different processes. 
 * If you need the same data for different processes you need to make more channels.
 */
 
Channel
    .fromPath( params.reads )  											 // read the files indicated by the wildcard                            
    .ifEmpty { error "Cannot find any reads matching: ${params.reads}" } // if empty, complains
    .set {reads_for_fastqc} 											 // make the channel "reads_for_fastqc"


/*
 * Process 1. Run FastQC on raw data. A process is the element for executing scripts / programs etc.
 */
process fastQC {
    publishDir fastqcOutputFolder  			// where (and whether) to publish the results
    tag { "${reads}" }  							// during the execution prints the indicated variable for follow-up
    label 'big_mem' 

    input:
    path reads   							// it defines the input of the process. It sets values from a channel

    output:									// It defines the output of the process (i.e. files) and send to a new channel
   	path "*_fastqc.*"

    script:									// here you have the execution of the script / program. Basically is the command line
    """
        fastqc ${reads} 
    """
}

/*
 * Process 2. Run multiQC on fastQC results
 */
process multiQC {
    publishDir multiqcOutputFolder, mode: 'copy' 	// this time do not link but copy the output file

    input:
    path (inputfiles)

    output:
    path("multiqc_report.html") 					// do not send the results to any channel

    script:
    """
    multiqc .
    """
}

workflow {
	fastqc_out = fastQC(reads_for_fastqc)
	multiQC(fastqc_out.collect())
}


workflow.onComplete { 
	println ( workflow.success ? "\nDone! Open the following report in your browser --> ${multiqcOutputFolder}/multiqc_report.html\n" : "Oops .. something went wrong" )
}

You can see that the process fastQC is labeled ‘bigmem’.

The last two rows of the config file indicate which containers to use. In this example, – and by default, if the repository is not specified, – a container is pulled from the DockerHub. In the case of using a singularity container, you can indicate where to store the local image using the singularity.cacheDir option:

process.container = 'biocorecrg/c4lwg-2018:latest'
singularity.cacheDir = "$baseDir/singularity"

Let’s now launch the script test2.nf.

     cd test2;
     nextflow run test2.nf

     N E X T F L O W  ~  version 20.07.1
     Launching `test2.nf` [distracted_edison] - revision: e3a80b15a2
     BIOCORE@CRG - N F TESTPIPE  ~  version 1.0
     =============================================
     reads                           : /home/ec2-user/git/CRG_Nextflow_Jun_2022/nextflow/nextflow/test2/../testdata/*.fastq.gz
     executor >  local (2)
     [df/2c45f2] process > fastQC (B7_input_s_chr19.fastq.gz) [  0%] 0 of 2
     [-        ] process > multiQC                            -
     Error executing process > 'fastQC (B7_H3K4me1_s_chr19.fastq.gz)'

     Caused by:
       Process `fastQC (B7_H3K4me1_s_chr19.fastq.gz)` terminated with an error exit status (127)

     Command executed:

       fastqc B7_H3K4me1_s_chr19.fastq.gz

     Command exit status:
       127

     executor >  local (2)
     [df/2c45f2] process > fastQC (B7_input_s_chr19.fastq.gz) [100%] 2 of 2, failed: 2 ✘
     [-        ] process > multiQC                            -
     Error executing process > 'fastQC (B7_H3K4me1_s_chr19.fastq.gz)'

     Caused by:
       Process `fastQC (B7_H3K4me1_s_chr19.fastq.gz)` terminated with an error exit status (127)

     Command executed:

       fastqc B7_H3K4me1_s_chr19.fastq.gz

     Command exit status:
       127

     Command output:
       (empty)

     Command error:
       .command.sh: line 2: fastqc: command not found

     Work dir:
       /home/ec2-user/git/CRG_Nextflow_Jun_2022/nextflow/nextflow/test2/work/c5/18e76b2e6ffd64aac2b52e69bedef3

     Tip: when you have fixed the problem you can continue the execution adding the option `-resume` to the run command line

We will get a number of errors since no executable is found in our environment/path. This is because the executables are stored in our docker image and we have to tell Nextflow to use the docker image, using the -with-docker parameter.

     nextflow run test2.nf -with-docker

     N E X T F L O W  ~  version 20.07.1
     Launching `test2.nf` [boring_hamilton] - revision: e3a80b15a2
     BIOCORE@CRG - N F TESTPIPE  ~  version 1.0
     =============================================
     reads                           : /home/ec2-user/git/CRG_Nextflow_Jun_2022/nextflow/nextflow/test2/../testdata/*.fastq.gz
     executor >  local (3)
     [22/b437be] process > fastQC (B7_H3K4me1_s_chr19.fastq.gz) [100%] 2 of 2 ✔
     [1a/cfe63b] process > multiQC                              [  0%] 0 of 1
     executor >  local (3)
     [22/b437be] process > fastQC (B7_H3K4me1_s_chr19.fastq.gz) [100%] 2 of 2 ✔
     [1a/cfe63b] process > multiQC                              [100%] 1 of 1 ✔

This time it worked because Nextflow used the image specified in the nextflow.config file and containing the executables.

Now let’s take a look at the params.config file:

params {

	reads		= "$baseDir/../../testdata/*.fastq.gz"
	email		= "myemail@google.com"

}

As you can see, we indicated two pipeline parameters, reads and email; when running the pipeline, they can be overridden using --reads and --email.

This file is included thanks again to the nextflow.config file, here shown entirely

includeConfig "$baseDir/params.config"

process {
    memory='0.6G'
    cpus='1'
    time='6h'

    withLabel: 'onecpu' {
        memory='0.6G'
        cpus='1'
    }

    withLabel: 'bigmem' {
        memory='0.7G'
        cpus='1'
    }
}

process.container = 'biocorecrg/c4lwg-2018:latest'
singularity.cacheDir = "$baseDir/singularity"

Now, let’s examine the folders generated by the pipeline.

ls  work/2a/22e3df887b1b5ac8af4f9cd0d88ac5/

total 0
drwxrwxr-x 3 ec2-user ec2-user  26 Apr 23 13:52 .
drwxr-xr-x 2 root     root     136 Apr 23 13:51 multiqc_data
drwxrwxr-x 3 ec2-user ec2-user  44 Apr 23 13:51 ..

We observe that Docker runs as “root”. This can be problematic and generates security issues. To avoid this we can add this line of code within the process section of the config file:

containerOptions = { workflow.containerEngine == "docker" ? '-u $(id -u):$(id -g)': null}

This will tell Nextflow that if it is run with Docker, it has to produce files that belong to a user rather than the root.

Publishing final results

The script test2.nf generates two new folders, output_fastqc and output_multiQC, that contain the result of the pipeline output. We can indicate which process and output can be considered the final output of the pipeline using the publishDir directive that has to be specified at the beginning of a process.

In our pipeline, we define these folders here:

#!/usr/bin/env nextflow


/* 
 * This code enables the new dsl of Nextflow. 
 */

nextflow.enable.dsl=2


/* 
 * NextFlow test pipe
 * @authors
 * Luca Cozzuto <lucacozzuto@gmail.com>
 * 
 */

/*
 * Input parameters: read pairs
 * Params are stored in the params.config file
 */

version                 = "1.0"
// this prevents a warning of undefined parameter
params.help             = false

// this prints the input parameters
log.info """
BIOCORE@CRG - N F TESTPIPE  ~  version ${version}
=============================================
reads                           : ${params.reads}
"""

// this prints the help in case you use --help parameter in the command line and it stops the pipeline
if (params.help) {
    log.info 'This is the Biocore\'s NF test pipeline'
    log.info 'Enjoy!'
    log.info '\n'
    exit 1
}

/*
 * Defining the output folders.
 */
fastqcOutputFolder    = "ouptut_fastqc"
multiqcOutputFolder   = "ouptut_multiQC"


/* Reading the file list and creating a "Channel": a queue that connects different channels.
 * The queue is consumed by channels, so you cannot re-use a channel for different processes. 
 * If you need the same data for different processes you need to make more channels.
 */
 
Channel
    .fromPath( params.reads )  											 // read the files indicated by the wildcard                            
    .ifEmpty { error "Cannot find any reads matching: ${params.reads}" } // if empty, complains
    .set {reads_for_fastqc} 											 // make the channel "reads_for_fastqc"


/*
 * Process 1. Run FastQC on raw data. A process is the element for executing scripts / programs etc.
 */
process fastQC {
    publishDir fastqcOutputFolder  			// where (and whether) to publish the results
    tag { "${reads}" }  							// during the execution prints the indicated variable for follow-up
    label 'big_mem' 

    input:
    path reads   							// it defines the input of the process. It sets values from a channel

    output:									// It defines the output of the process (i.e. files) and send to a new channel
   	path "*_fastqc.*"

    script:									// here you have the execution of the script / program. Basically is the command line
    """
        fastqc ${reads} 
    """
}

/*
 * Process 2. Run multiQC on fastQC results
 */
process multiQC {
    publishDir multiqcOutputFolder, mode: 'copy' 	// this time do not link but copy the output file

    input:
    path (inputfiles)

    output:
    path("multiqc_report.html") 					// do not send the results to any channel

    script:
    """
    multiqc .
    """
}

workflow {
	fastqc_out = fastQC(reads_for_fastqc)
	multiQC(fastqc_out.collect())
}


workflow.onComplete { 
	println ( workflow.success ? "\nDone! Open the following report in your browser --> ${multiqcOutputFolder}/multiqc_report.html\n" : "Oops .. something went wrong" )
}

You can see that the default mode to publish the results in Nextflow is soft linking. You can change this behavior by specifying the mode as indicated in the multiQC process.

Note

IMPORTANT: You can also “move” the results but this is not suggested for files that will be needed for other processes. This will likely disrupt your pipeline

Adding help section to a pipeline

Here we describe another good practice: the use of the --help parameter. At the beginning of the pipeline, we can write:

#!/usr/bin/env nextflow


/* 
 * This code enables the new dsl of Nextflow. 
 */

nextflow.enable.dsl=2


/* 
 * NextFlow test pipe
 * @authors
 * Luca Cozzuto <lucacozzuto@gmail.com>
 * 
 */

/*
 * Input parameters: read pairs
 * Params are stored in the params.config file
 */

version                 = "1.0"
// this prevents a warning of undefined parameter
params.help             = false

// this prints the input parameters
log.info """
BIOCORE@CRG - N F TESTPIPE  ~  version ${version}
=============================================
reads                           : ${params.reads}
"""

// this prints the help in case you use --help parameter in the command line and it stops the pipeline
if (params.help) {
    log.info 'This is the Biocore\'s NF test pipeline'
    log.info 'Enjoy!'
    log.info '\n'
    exit 1
}

/*
 * Defining the output folders.
 */
fastqcOutputFolder    = "ouptut_fastqc"
multiqcOutputFolder   = "ouptut_multiQC"


/* Reading the file list and creating a "Channel": a queue that connects different channels.
 * The queue is consumed by channels, so you cannot re-use a channel for different processes. 
 * If you need the same data for different processes you need to make more channels.
 */
 
Channel
    .fromPath( params.reads )  											 // read the files indicated by the wildcard                            
    .ifEmpty { error "Cannot find any reads matching: ${params.reads}" } // if empty, complains
    .set {reads_for_fastqc} 											 // make the channel "reads_for_fastqc"


/*
 * Process 1. Run FastQC on raw data. A process is the element for executing scripts / programs etc.
 */
process fastQC {
    publishDir fastqcOutputFolder  			// where (and whether) to publish the results
    tag { "${reads}" }  							// during the execution prints the indicated variable for follow-up
    label 'big_mem' 

    input:
    path reads   							// it defines the input of the process. It sets values from a channel

    output:									// It defines the output of the process (i.e. files) and send to a new channel
   	path "*_fastqc.*"

    script:									// here you have the execution of the script / program. Basically is the command line
    """
        fastqc ${reads} 
    """
}

/*
 * Process 2. Run multiQC on fastQC results
 */
process multiQC {
    publishDir multiqcOutputFolder, mode: 'copy' 	// this time do not link but copy the output file

    input:
    path (inputfiles)

    output:
    path("multiqc_report.html") 					// do not send the results to any channel

    script:
    """
    multiqc .
    """
}

workflow {
	fastqc_out = fastQC(reads_for_fastqc)
	multiQC(fastqc_out.collect())
}


workflow.onComplete { 
	println ( workflow.success ? "\nDone! Open the following report in your browser --> ${multiqcOutputFolder}/multiqc_report.html\n" : "Oops .. something went wrong" )
}

so that launching the pipeline with --help will show you just the parameters and the help.

nextflow run test2.nf --help

N E X T F L O W  ~  version 20.07.1
Launching `test2.nf` [mad_elion] - revision: e3a80b15a2
BIOCORE@CRG - N F TESTPIPE  ~  version 1.0
=============================================
reads                           : /home/ec2-user/git/CRG_Nextflow_Jun_2022/nextflow/nextflow/test2/../testdata/*.fastq.gz
This is the Biocore's NF test pipeline
Enjoy!

EXERCISE

  • Look at the very last EXERCISE of the day before. Change the script and the config file using the label for handling failing processes.

Solution

The process should become:

#!/usr/bin/env nextflow

nextflow.enable.dsl=2

// this can be overridden by using --inputfile OTHERFILENAME
params.inputfile = "$baseDir/../../../testdata/test.fa"

// the "file method" returns a file system object given a file path string
sequences_file = file(params.inputfile)

// check if the file exists
if( !sequences_file.exists() ) exit 1, "Missing genome file: ${sequences_file}"

/*
 * Process 1 for splitting a fasta file in multiple files
 */
process splitSequences {
    input:
    path sequencesFile

    output:
    path ('seq_*')

    // simple awk command
    script:
    """
    awk '/^>/{f="seq_"++d} {print > f}' < ${sequencesFile}
    """
}

/*
 * Process 2 for reversing the sequences
 */
process reverseSequence {
    tag { "${seq}" }

    publishDir "output"
    label 'ignorefail'
    
    input:
    path seq

    output:
    path "all.rev"

    script:
    """
    	cat ${seq} | awk '{if (\$1~">") {print \$0} else system("echo " \$0 " |rev")}' > all.rev
    """
}

workflow flow1 {
    take: sequences

    main:
    splitted_seq        = splitSequences(sequences)
    rev_single_seq      = reverseSequence(splitted_seq)
}

workflow flow2 {
    take: sequences

    main:
    splitted_seq        = splitSequences(sequences).flatten()
    rev_single_seq      = reverseSequence(splitted_seq)
}

workflow {
   flow1(sequences_file)
   flow2(sequences_file)
}

and the nextflow.config file would become:

process {
    withLabel: 'ignorefail' {
        errorStrategy = 'ignore'
    }
}


  • Now look at test2.nf.

Change this script and the config file using the label for handling failing processes by retrying 3 times and incrementing time.

You can specify a very low time (5, 10 or 15 seconds) for the fastqc process so it would fail at the beginning.

Solution

The code should become:

#!/usr/bin/env nextflow


/* 
 * This code enables the new dsl of Nextflow. 
 */

nextflow.enable.dsl=2


/* 
 * NextFlow test pipe
 * @authors
 * Luca Cozzuto <lucacozzuto@gmail.com>
 * 
 */

/*
 * Input parameters: read pairs
 * Params are stored in the params.config file
 */

version                 = "1.0"
// this prevents a warning of undefined parameter
params.help             = false

// this prints the input parameters
log.info """
BIOCORE@CRG - N F TESTPIPE  ~  version ${version}
=============================================
reads                           : ${params.reads}
"""

// this prints the help in case you use --help parameter in the command line and it stops the pipeline
if (params.help) {
    log.info 'This is the Biocore\'s NF test pipeline'
    log.info 'Enjoy!'
    log.info '\n'
    exit 1
}

/*
 * Defining the output folders.
 */
fastqcOutputFolder    = "ouptut_fastqc"
multiqcOutputFolder   = "ouptut_multiQC"


/* Reading the file list and creating a "Channel": a queue that connects different channels.
 * The queue is consumed by channels, so you cannot re-use a channel for different processes. 
 * If you need the same data for different processes you need to make more channels.
 */
 
Channel
    .fromPath( params.reads )  											 // read the files indicated by the wildcard                            
    .ifEmpty { error "Cannot find any reads matching: ${params.reads}" } // if empty, complains
    .set {reads_for_fastqc} 											 // make the channel "reads_for_fastqc"


/*
 * Process 1. Run FastQC on raw data. A process is the element for executing scripts / programs etc.
 */
process fastQC {
    publishDir fastqcOutputFolder  			// where (and whether) to publish the results
    tag { "${reads}" }  							// during the execution prints the indicated variable for follow-up
    label 'keep_trying'

    input:
    path reads   							// it defines the input of the process. It sets values from a channel

    output:									// It defines the output of the process (i.e. files) and send to a new channel
    path "*_fastqc.*"

    script:									// here you have the execution of the script / program. Basically is the command line
    """
        fastqc ${reads} 
    """
}


/*
 * Process 2. Run multiQC on fastQC results
 */
process multiQC {
    publishDir multiqcOutputFolder, mode: 'copy' 	// this time do not link but copy the output file

    input:
    path (inputfiles)

    output:
    path("multiqc_report.html") 					// do not send the results to any channel

    script:
    """
       multiqc .
    """
}

workflow {
	fastqc_out = fastQC(reads_for_fastqc)
	multiQC(fastqc_out.collect())
}


workflow.onComplete { 
	println ( workflow.success ? "\nDone! Open the following report in your browser --> ${multiqcOutputFolder}/multiqc_report.html\n" : "Oops .. something went wrong" )
}

while the nextflow.config file would be:

includeConfig "$baseDir/params.config"

process {
    memory='0.6G'
    cpus='1'
    time='6h'

    withLabel: 'keep_trying' { 
        time = { 10.second * task.attempt }
        errorStrategy = 'retry' 
        maxRetries = 3	
    } 	

}

process.container = 'biocorecrg/c4lwg-2018:latest'
singularity.cacheDir = "$baseDir/singularity"


Using Singularity

We recommend using Singularity instead of Docker in HPC environments. This can be done using the Nextflow parameter -with-singularity without changing the code.

Nextflow will take care of pulling, converting, and storing the image for you. This will be done only once and then Nextflow will use the stored image for further executions.

Within an AWS main node, both Docker and Singularity are available. While within the AWS batch system, only Docker is available.

nextflow run test2.nf -with-singularity -bg > log

tail -f log
N E X T F L O W  ~  version 20.10.0
Launching `test2.nf` [soggy_miescher] - revision: 5a0a513d38

BIOCORE@CRG - N F TESTPIPE  ~  version 1.0
=============================================
reads                           : /home/ec2-user/git/CoursesCRG_Containers_Nextflow_May_2021/nextflow/test2/../../testdata/*.fastq.gz

Pulling Singularity image docker://biocorecrg/c4lwg-2018:latest [cache /home/ec2-user/git/CoursesCRG_Containers_Nextflow_May_2021/nextflow/test2/singularity/biocorecrg-c4lwg-2018-latest.img]
[da/eb7564] Submitted process > fastQC (B7_H3K4me1_s_chr19.fastq.gz)
[f6/32dc41] Submitted process > fastQC (B7_input_s_chr19.fastq.gz)
...

Let’s inspect the folder singularity:

ls singularity/
biocorecrg-c4lwg-2018-latest.img

This singularity image can be used to execute the code outside the pipeline exactly the same way as inside the pipeline.

Sometimes we can be interested in launching only a specific job, because it might fail or for making a test. For that, we can go to the corresponding temporary folder; for example, one of the fastQC temporary folders:

cd work/da/eb7564*/

Inspecting the .command.run file shows us this piece of code:

...

nxf_launch() {
    set +u; env - PATH="$PATH" SINGULARITYENV_TMP="$TMP" SINGULARITYENV_TMPDIR="$TMPDIR" singularity exec /home/ec2-user/git/CoursesCRG_Containers_Nextflow_May_2021/nextflow/test2/singularity/biocorecrg-c4lwg-2018-latest.img /bin/bash -c "cd $PWD; /bin/bash -ue /home/ec2-user/git/CoursesCRG_Containers_Nextflow_May_2021/nextflow/test2/work/da/eb756433aa0881d25b20afb5b1366e/.command.sh"
}
...

This means that Nextflow is running the code by using the singularity exec command.

Thus we can launch this command outside the pipeline (locally):

bash .command.run

Started analysis of B7_H3K4me1_s_chr19.fastq.gz
Approx 5% complete for B7_H3K4me1_s_chr19.fastq.gz
Approx 10% complete for B7_H3K4me1_s_chr19.fastq.gz
Approx 15% complete for B7_H3K4me1_s_chr19.fastq.gz
Approx 20% complete for B7_H3K4me1_s_chr19.fastq.gz
Approx 25% complete for B7_H3K4me1_s_chr19.fastq.gz
Approx 30% complete for B7_H3K4me1_s_chr19.fastq.gz
Approx 35% complete for B7_H3K4me1_s_chr19.fastq.gz
Approx 40% complete for B7_H3K4me1_s_chr19.fastq.gz
Approx 45% complete for B7_H3K4me1_s_chr19.fastq.gz
Approx 50% complete for B7_H3K4me1_s_chr19.fastq.gz
Approx 55% complete for B7_H3K4me1_s_chr19.fastq.gz
Approx 60% complete for B7_H3K4me1_s_chr19.fastq.gz
...

If you have to submit a job to a HPC you need to use the corresponding program, such as qsub if you have a Sun Grid Engine or sbatch if you have Slurm. Here an example using Slurm:

sbatch .command.run

Profiles

For deploying a pipeline in a cluster or Cloud, in the nextflow.config file, we need to indicate what kind of the executor to use.

In the Nextflow framework architecture, the executor indicates which batch-queue system to use to submit jobs to an HPC or to Cloud.

The executor is completely abstracted, so you can switch from SGE to SLURM just by changing this parameter in the configuration file.

You can group different classes of configuration or profiles within a single nextflow.config file.

Let’s inspect the nextflow.config file in test3 folder. We can see three different profiles:

  • standard

  • cluster

  • cloud

The first profile indicates the resources needed for running the pipeline locally. They are quite small since we have little power and CPU on the test node.

includeConfig "$baseDir/params.config"

process.container = 'biocorecrg/c4lwg-2018:latest'
singularity.cacheDir = "$baseDir/singularity"


profiles {
  standard {
     process {
        containerOptions = { workflow.containerEngine == "docker" ? '-u $(id -u):$(id -g)': null}
        executor="local"
        memory='0.6G'
        cpus='1'
        time='6h'

        withLabel: 'twocpus' {
            memory='0.6G'
            cpus='1'
        }
   	  }
   }

As you can see, we explicitly indicated the local executor. By definition, the local executor is a default executor if the pipeline is run without specifying a profile.

The second profile is for running the pipeline on the cluster; here in particular for the cluster supporting the SLURM Workload Manager queuing system:

   cluster {
     process {
        containerOptions = { workflow.containerEngine == "docker" ? '-u $(id -u):$(id -g)': null}
        executor="slurm"

        queue = "medium"
        memory='1G'
        cpus='1'
        time='6h'

        withLabel: 'twocpus' {
            queue = "long"
            memory='4G'
            cpus='2'
        }
      }
   }

This profile indicates that the system uses SLURM as a job scheduler and that we have different queues for small jobs and more intensive ones.

Note

IMPORTANT: You need to have either Docker or Singularity installed and running in your computing nodes to run the pipeline. In some HPC you might need to “load” the programs using the modules. For this we advice to add this command inside your .bash_profile file.

vi $HOME/.bash_profile

#ADD THIS

module load apptainer

Deployment in the AWS cloud

The final profile is for running the pipeline in the Amazon Cloud, known as Amazon Web Services or AWS. In particular, we will use AWS Batch that allows the execution of containerized workloads in the Amazon cloud infrastructure (where NNNN is the number of your bucket which you can see in the mounted folder /mnt by typing the command df).

includeConfig "$baseDir/params.config"

process.container = 'biocorecrg/c4lwg-2018:latest'
singularity.cacheDir = "$baseDir/singularity"


profiles {
  standard {
     process {
        containerOptions = { workflow.containerEngine == "docker" ? '-u $(id -u):$(id -g)': null}
        executor="local"
        memory='0.6G'
        cpus='1'
        time='6h'

        withLabel: 'twocpus' {
            memory='0.6G'
            cpus='1'
        }
   	  }
   }
   cluster {
     process {
        containerOptions = { workflow.containerEngine == "docker" ? '-u $(id -u):$(id -g)': null}
        executor="slurm"

        queue = "medium"
        memory='1G'
        cpus='1'
        time='6h'

        withLabel: 'twocpus' {
            queue = "long"
            memory='4G'
            cpus='2'
        }
      }
   }

   cloud {
    workDir = 's3://nf-class-bucket-XXX/work'
    aws.region = 'eu-central-1'
    aws.batch.cliPath = '/home/ec2-user/miniconda/bin/aws'
    
   process {
       executor = 'awsbatch'
       queue = 'spot'
       memory='1G'
       cpus='1'
       time='6h'

       withLabel: 'twocpus' {
           memory='2G'
           cpus='2'
       }
    }
  }
}

We indicate the AWS specific parameters (region and cliPath) and the executor awsbatch. Then we indicate the working directory, which should be mounted as S3 volume. This is mandatory when running Nextflow on the cloud.

We can now launch the pipeline indicating -profile cloud:

nextflow run test3.nf -bg -with-docker -profile cloud > log

Note that there is no longer a work folder in the directory where test3.nf is located, because, in the AWS cloud, the output is copied locally in the folder /mnt/nf-class-bucket-NNN/work (you can see the mounted folder - and the corresponding number - typing df).

The multiqc report can be seen on the AWS webpage at https://nf-class-bucket-NNN.s3.eu-central-1.amazonaws.com/results/ouptut_multiQC/multiqc_report.html

But you need before to change permissions for that file (where NNNN is the number of your bucket):

chmod 775 /mnt/nf-class-bucket-NNNN/results/ouptut_multiQCmultiqc_report.html

Sometimes you can find that the Nextflow process itself is very memory intensive and the main node can run out of memory. To avoid this, you can reduce the memory needed by setting an environmental variable:

export NXF_OPTS="-Xms50m -Xmx500m"

Again we can copy the output file to the bucket.

We can also tell Nextflow to directly copy the output file to the S3 bucket: to do so, change the parameter outdir in the params file (use the bucket corresponding to your AWS instance):

outdir = "s3://nf-class-bucket-NNNN/results"

Making a Nextflow pipeline for image processing

We can build a pipeline incrementally adding more and more processes. Nextflow will handle the dependencies between the input/output and the parallelization.

Let’s see the content of the folder nextflow/ex_alba

ls
etl_peem.py  importUview_v3.py  step1_denoise.py  step2_normalize.py  step3_aggregate_and_save.py

We have some Python scripts made by Fernán and Nicolas for making those steps:

  • step1_denoise.py: it will read an image file (“.dat”), turn it into a NumPy array (“_den.npy”), and denoise it.

# command line
./step1_denoise.py my_image.dat
# output my_image_den.npy
  • step2_normalize.py: it will read a denoised NumPy array (“_den.npy”), that is output by the previous step and produces a nomalized / denoised NumPy array (“_den_norm.npy”). It needs the original image file in the current directory for working even if not specified in the command line.

ls ./
step2_normalize.py my_image_den.npy my_image.dat
# command line
./step2_normalize.py my_image_den.npy
# output my_image_den_norm.npy
  • step3_aggregate_and_save.py: it will read all the normalized/denoised NumPy arrays together and it will produce a NeXus/HDF5 file called example.nxs

EXERCISE

Make the pipeline considering the use of the docker/singularity image biocorecrg/alba:0.1 hosted at dockerhub. The images are at ../../testdata/test_images/ and the executor must be specified for working with our infrastructure.

Solution
#!/usr/bin/env nextflow


/* 
 * This code enables the new dsl of Nextflow. 
 */

nextflow.enable.dsl=2


/* 
 * NextFlow test pipe
 * @authors
 * Luca Cozzuto <lucacozzuto@gmail.com>
 * 
 */

/*
 * Input parameters: read pairs
 * Params are stored in the params.config file
 */

version                 = "1.0"
// this prevents a warning of undefined parameter
params.help             = false

// this prints the input parameters
log.info """
BIOCORE@CRG - N F TESTPIPE  ~  version ${version}
=============================================
images                           : ${params.images}
"""

// this prints the help in case you use --help parameter in the command line and it stops the pipeline
if (params.help) {
    log.info 'This is the Biocore\'s NF test pipeline'
    log.info 'Enjoy!'
    log.info '\n'
    exit 1
}

/*
 * Defining the output folders.
 */
output_denoise     = "output_denoise"
output_aggregate   = "output_aggregate"


/* Reading the file list and creating a "Channel": a queue that connects different channels.
 * The queue is consumed by channels, so you cannot re-use a channel for different processes. 
 * If you need the same data for different processes you need to make more channels.
 */
 
Channel
    .fromFilePairs( params.images, size:1 , checkIfExists: true)  											 
    .set {images} 			                								 


/*
 * Process 1. De-noise input image
 */
process denoise {
    publishDir output_denoise  			

    tag { "${image}" }  				
    label 'big_mem' 

    input:
    tuple val(id), path (image)   						

    output:								
   	tuple val(id), path("*_den.npy")

    script:								
    """
		step1_denoise.py ${image}
    """
}

/*
 * Process 1. Normalise the de-noise image
 */
process normalize {

    tag { "${image}" }  				
    label 'big_mem' 

    input:
    tuple val(id), path(image), path(denoised)    	// WE NEED THE IMAGE TO BE THERE BUT NOT IN THE COMMAND LINE					

    output:								
    path("*_den_norm.npy")

    script:								
    """
		step2_normalize.py ${denoised}
    """
}

/*
 * Process 2. Run aggregate on normalized files
 */
process aggregate {
    publishDir output_aggregate, mode: 'copy' 	// this time do not link but copy the output file

    input:
    path (inputfiles)

    output:
    path("*") 					

    script:
    """
    step3_aggregate_and_save.py ./
    """
}

workflow {

    // HERE WE DO DENOISING 
    denois_out = denoise(images)
    // HERE WE JOIN DENOISE OUTPUT WITH ORIGINAL IMAGES JUST FOR PRINTING PURPOSES
    images.join(denois_out).view()
    // HERE WE RUN NORMALIZE 
    normalized_out = normalize(images.join(denois_out))
    aggregate(normalized_out.collect())
}


workflow.onComplete { 
	println ( workflow.success ? "\nDone! The results are in --> ${output_aggregate}\n" : "Oops .. something went wrong" )
}