4 Nextflow

Using Singularity

We recommend using Singularity instead of Docker in HPC environments. This can be done using the Nextflow parameter -with-singularity without changing the code.

Nextflow will take care of pulling, converting, and storing the image for you. This will be done only once and then Nextflow will use the stored image for further executions.

Within an AWS main node, both Docker and Singularity are available. While within the AWS batch system, only Docker is available.

nextflow run test2.nf -with-singularity -bg > log

tail -f log
N E X T F L O W  ~  version 20.10.0
Launching `test2.nf` [soggy_miescher] - revision: 5a0a513d38

BIOCORE@CRG - N F TESTPIPE  ~  version 1.0
=============================================
reads                           : /home/ec2-user/git/CoursesCRG_Containers_Nextflow_May_2021/nextflow/test2/../../testdata/*.fastq.gz

Pulling Singularity image docker://biocorecrg/c4lwg-2018:latest [cache /home/ec2-user/git/CoursesCRG_Containers_Nextflow_May_2021/nextflow/test2/singularity/biocorecrg-c4lwg-2018-latest.img]
[da/eb7564] Submitted process > fastQC (B7_H3K4me1_s_chr19.fastq.gz)
[f6/32dc41] Submitted process > fastQC (B7_input_s_chr19.fastq.gz)
...

Let’s inspect the folder singularity:

ls singularity/
biocorecrg-c4lwg-2018-latest.img

This singularity image can be used to execute the code outside the pipeline exactly the same way as inside the pipeline.

Sometimes we can be interested in launching only a specific job, because it might fail or for making a test. For that, we can go to the corresponding temporary folder; for example, one of the fastQC temporary folders:

cd work/da/eb7564*/

Inspecting the .command.run file shows us this piece of code:

...

nxf_launch() {
    set +u; env - PATH="$PATH" SINGULARITYENV_TMP="$TMP" SINGULARITYENV_TMPDIR="$TMPDIR" singularity exec /home/ec2-user/git/CoursesCRG_Containers_Nextflow_May_2021/nextflow/test2/singularity/biocorecrg-c4lwg-2018-latest.img /bin/bash -c "cd $PWD; /bin/bash -ue /home/ec2-user/git/CoursesCRG_Containers_Nextflow_May_2021/nextflow/test2/work/da/eb756433aa0881d25b20afb5b1366e/.command.sh"
}
...

This means that Nextflow is running the code by using the singularity exec command.

Thus we can launch this command outside the pipeline (locally):

bash .command.run

Started analysis of B7_H3K4me1_s_chr19.fastq.gz
Approx 5% complete for B7_H3K4me1_s_chr19.fastq.gz
Approx 10% complete for B7_H3K4me1_s_chr19.fastq.gz
Approx 15% complete for B7_H3K4me1_s_chr19.fastq.gz
Approx 20% complete for B7_H3K4me1_s_chr19.fastq.gz
Approx 25% complete for B7_H3K4me1_s_chr19.fastq.gz
Approx 30% complete for B7_H3K4me1_s_chr19.fastq.gz
Approx 35% complete for B7_H3K4me1_s_chr19.fastq.gz
Approx 40% complete for B7_H3K4me1_s_chr19.fastq.gz
Approx 45% complete for B7_H3K4me1_s_chr19.fastq.gz
Approx 50% complete for B7_H3K4me1_s_chr19.fastq.gz
Approx 55% complete for B7_H3K4me1_s_chr19.fastq.gz
Approx 60% complete for B7_H3K4me1_s_chr19.fastq.gz
...

If you have to submit a job to a HPC you need to use the corresponding program, such as qsub if you have a Sun Grid Engine or sbatch if you have Slurm. Here an example using SGE:

qsub .command.run

Adding more processes

We can build a pipeline incrementally adding more and more processes. Nextflow will take care of the dependencies between the input/output and the parallelization.

Let’s add to the test2.nf pipeline two additional steps, indexing of the reference genome and the read alignment using Bowtie. For that, we will have to modify the test2.nf, params.config and nexflow.config files (the new script is available in the test3 folder).

In params.config, we have to add new parameters:

params {
	reads		= "$baseDir/../../testdata/*.fastq.gz"
	reference       = "$baseDir/../../testdata/chr19.fasta.gz"
        outdir          = "$baseDir"
        //outdir          = "s3://class-bucket-1/results"
	email		= "myemail@google.com"
}

In test3.nf, we have to add a new input for the reference sequence:

#!/usr/bin/env nextflow

/* 
 * This code enables the new dsl of Nextflow. 
 */

nextflow.enable.dsl=2


/* 
 * NextFlow test pipe
 * @authors
 * Luca Cozzuto <lucacozzuto@gmail.com>
 * 
 */

/*
 * Input parameters: read pairs
 * Params are stored in the params.config file
 */

version                 = "1.0"
// this prevents a warning of undefined parameter
params.help             = false

// this prints the input parameters
log.info """
BIOCORE@CRG - N F TESTPIPE  ~  version ${version}
=============================================
reads                           : ${params.reads}
reference                       : ${params.reference}
outdir                          : ${params.outdir}
"""

// this prints the help in case you use --help parameter in the command line and it stops the pipeline
if (params.help) {
    log.info 'This is the Biocore\'s NF test pipeline'
    log.info 'Enjoy!'
    log.info '\n'
    exit 1
}

/*
 * Defining the output folders.
 */
fastqcOutputFolder    = "${params.outdir}/ouptut_fastqc"
alnOutputFolder       = "${params.outdir}/ouptut_aln"
multiqcOutputFolder   = "${params.outdir}/ouptut_multiQC"


/* Reading the file list and creating a "Channel": a queue that connects different channels.
 * The queue is consumed by channels, so you cannot re-use a channel for different processes. 
 * If you need the same data for different processes you need to make more channels.
 */
 
Channel
    .fromPath( params.reads )  											 
    .ifEmpty { error "Cannot find any reads matching: ${params.reads}" } 
    .set {reads} 											 

reference = file(params.reference)

/*
 * Process 1. Run FastQC on raw data. A process is the element for executing scripts / programs etc.
 */
process fastQC {
    publishDir fastqcOutputFolder  			
    tag { "${reads}" }  							

    input:
    path reads   							

    output:									
   	path "*_fastqc.*"

    script:									
    """
        fastqc ${reads} 
    """
}

/*
 * Process 2. Bowtie index
 */
process bowtieIdx {
    tag { "${ref}" }  							


    input:
    path ref   							

    output:									
   	tuple val("${ref}"), path ("${ref}*.ebwt")

    script:									
    """
        gunzip -c ${ref} > reference.fa
        bowtie-build reference.fa ${ref}
        rm reference.fa
    """
}

/*
 * Process 3. Bowtie alignment
 */
process bowtieAln {
    publishDir alnOutputFolder, pattern: '*.sam'

    tag { "${reads}" }  							
    label 'twocpus' 

    input:
    tuple val(refname), path (ref_files)
    path reads  							

    output:									
    path "${reads}.sam", emit: samples_sam
    path "${reads}.log", emit: samples_log

    script:									
    """
    bowtie -p ${task.cpus} ${refname} -q ${reads} -S > ${reads}.sam 2> ${reads}.log
    """
}
/*

/*
 * Process 4. Run multiQC on fastQC results
 */
process multiQC {
    publishDir multiqcOutputFolder, mode: 'copy' 	// this time do not link but copy the output file

    input:
    path (inputfiles)

    output:
    path("multiqc_report.html") 					

    script:
    """
    multiqc .
    """
}

workflow {
    reads.view()
	fastqc_out = fastQC(reads)
	bowtie_index = bowtieIdx(reference)
	bowtieAln(bowtie_index, reads)
	
	multiQC(fastqc_out.mix(bowtieAln.out.samples_log).collect())
}


workflow.onComplete { 
	println ( workflow.success ? "\nDone! Open the following report in your browser --> ${multiqcOutputFolder}/multiqc_report.html\n" : "Oops .. something went wrong" )
}

This way, the singleton channel called reference is created. Its content can be used indefinitely. We also add a path specifying where to place the output files.

/*
 * Defining the output folders.
 */
fastqcOutputFolder    = "${params.outdir}/output_fastqc"
alnOutputFolder       = "${params.outdir}/output_aln"
multiqcOutputFolder   = "${params.outdir}/output_multiQC"

And we have to add two new processes. The first one is for indexing the reference genome (with bowtie-build):

/*
 * Process 2. Bowtie index
 */
process bowtieIdx {
    tag { "${ref}" }  							


    input:
    path ref   							

    output:									
   	tuple val("${ref}"), path ("${ref}*.ebwt")

    script:									
    """
        gunzip -c ${ref} > reference.fa
        bowtie-build reference.fa ${ref}
        rm reference.fa
    """
}

Since bowtie indexing requires an unzipped reference fasta file, we first gunzip it, then build the reference index, and finally remove the unzipped file.

The output channel is organized as a tuple; i.e., a list of elements.

The first element of the list is the name of the index as a value, and the second is a list of files constituting the index.

The former is needed for building the command line of the alignment step, the latter are the files needed for the alignment.

The second process bowtieAln is the alignment step:

/*
 * Process 3. Bowtie alignment
 */
process bowtieAln {
    publishDir alnOutputFolder, pattern: '*.sam'

    tag { "${reads}" }  							
    label 'twocpus' 

    input:
    tuple val(refname), path (ref_files)
    path reads  							

    output:									
    path "${reads}.sam", emit: samples_sam
    path "${reads}.log", emit: samples_log

    script:									
    """
    bowtie -p ${task.cpus} ${refname} -q ${reads} -S > ${reads}.sam 2> ${reads}.log
    """
}

There are two different input channels, the index and reads.

The index name specified by refname is used for building the command line; while the index files, indicated by ref_files, are linked to the current directory by using the path qualifier.

We also produced two kinds of outputs, the alignments and logs. The first one is the one we want to keep as a final result; for that, we specify the pattern parameter in publishDir.

publishDir alnOutputFolder, pattern: '*.sam'

The second output will be passed to the next process, which is the multiQC process. To distinguish the outputs let’s assign them different names.

output:
    path "${reads}.sam", emit: samples_sam
    path "${reads}.log", emit: samples_log

This section will allow us to connect these outputs directly with other processes when we call them in the workflow section:

workflow {
    reads.view()
	fastqc_out = fastQC(reads)
	bowtie_index = bowtieIdx(reference)
	bowtieAln(bowtie_index, reads)
	
	multiQC(fastqc_out.mix(bowtieAln.out.samples_log).collect())
}

As you can see, we passed the samples_log output to the multiqc process after mixing it with the output channel from the fastqc process.

Profiles

For deploying a pipeline in a cluster or Cloud, in the nextflow.config file, we need to indicate what kind of the executor to use.

In the Nextflow framework architecture, the executor indicates which batch-queue system to use to submit jobs to an HPC or to Cloud.

The executor is completely abstracted, so you can switch from SGE to SLURM just by changing this parameter in the configuration file.

You can group different classes of configuration or profiles within a single nextflow.config file.

Let’s inspect the nextflow.config file in test3 folder. We can see three different profiles:

  • standard

  • cluster

  • cloud

The first profile indicates the resources needed for running the pipeline locally. They are quite small since we have little power and CPU on the test node.

includeConfig "$baseDir/params.config"

process.container = 'biocorecrg/c4lwg-2018:latest'
singularity.cacheDir = "$baseDir/singularity"


profiles {
  standard {
     process {
        containerOptions = { workflow.containerEngine == "docker" ? '-u $(id -u):$(id -g)': null}
        executor="local"
        memory='0.6G'
        cpus='1'
        time='6h'

        withLabel: 'twocpus' {
            memory='0.6G'
            cpus='1'
        }
   	  }
   }

As you can see, we explicitly indicated the local executor. By definition, the local executor is a default executor if the pipeline is run without specifying a profile.

The second profile is for running the pipeline on the cluster; here in particular for the cluster supporting the Sun Grid Engine queuing system:

   cluster {
     process {
        containerOptions = { workflow.containerEngine == "docker" ? '-u $(id -u):$(id -g)': null}
        executor="SGE"
        queue = "smallcpus"

        memory='1G'
        cpus='1'
        time='6h'

        withLabel: 'twocpus' {
            queue = "bigcpus"
            memory='4G'
            cpus='2'
        }
      }
   }

This profile indicates that the system uses Sun Grid Engine as a job scheduler and that we have different queues for small jobs and more intensive ones.

Deployment in the AWS cloud

The final profile is for running the pipeline in the Amazon Cloud, known as Amazon Web Services or AWS. In particular, we will use AWS Batch that allows the execution of containerized workloads in the Amazon cloud infrastructure (where NNNN is the number of your bucket which you can see in the mounted folder /mnt by typing the command df).

includeConfig "$baseDir/params.config"

process.container = 'biocorecrg/c4lwg-2018:latest'
singularity.cacheDir = "$baseDir/singularity"


profiles {
  standard {
     process {
        containerOptions = { workflow.containerEngine == "docker" ? '-u $(id -u):$(id -g)': null}
        executor="local"
        memory='0.6G'
        cpus='1'
        time='6h'

        withLabel: 'twocpus' {
            memory='0.6G'
            cpus='1'
        }
   	  }
   }
   cluster {
     process {
        containerOptions = { workflow.containerEngine == "docker" ? '-u $(id -u):$(id -g)': null}
        executor="SGE"
        queue = "smallcpus"

        memory='1G'
        cpus='1'
        time='6h'

        withLabel: 'twocpus' {
            queue = "bigcpus"
            memory='4G'
            cpus='2'
        }
      }
   }

   cloud {
    workDir = 's3://nf-class-bucket-XXX/work'
    aws.region = 'eu-central-1'
    aws.batch.cliPath = '/home/ec2-user/miniconda/bin/aws'
    
   process {
       executor = 'awsbatch'
       queue = 'spot'
       memory='1G'
       cpus='1'
       time='6h'

       withLabel: 'twocpus' {
           memory='2G'
           cpus='2'
       }
    }
  }
}

We indicate the AWS specific parameters (region and cliPath) and the executor awsbatch. Then we indicate the working directory, which should be mounted as S3 volume. This is mandatory when running Nextflow on the cloud.

We can now launch the pipeline indicating -profile cloud:

nextflow run test3.nf -bg -with-docker -profile cloud > log

Note that there is no longer a work folder in the directory where test3.nf is located, because, in the AWS cloud, the output is copied locally in the folder /mnt/nf-class-bucket-NNN/work (you can see the mounted folder - and the corresponding number - typing df).

The multiqc report can be seen on the AWS webpage at https://nf-class-bucket-NNN.s3.eu-central-1.amazonaws.com/results/ouptut_multiQC/multiqc_report.html

But you need before to change permissions for that file (where NNNN is the number of your bucket):

chmod 775 /mnt/nf-class-bucket-NNNN/results/ouptut_multiQCmultiqc_report.html

Sometimes you can find that the Nextflow process itself is very memory intensive and the main node can run out of memory. To avoid this, you can reduce the memory needed by setting an environmental variable:

export NXF_OPTS="-Xms50m -Xmx500m"

Again we can copy the output file to the bucket.

We can also tell Nextflow to directly copy the output file to the S3 bucket: to do so, change the parameter outdir in the params file (use the bucket corresponding to your AWS instance):

outdir = "s3://nf-class-bucket-NNNN/results"

EXERCISE

Modify the test3.nf file to make two sub-workflows:

  • for fastqc of fastq files and bowtie alignment;

  • for a fastqc analysis of the aligned files produced by bowtie.

For convenience, you can use the multiqc config file called config.yaml in the multiqc process.

Solution
#!/usr/bin/env nextflow

/* 
 * This code enables the new dsl of Nextflow. 
 */

nextflow.enable.dsl=2


/* 
 * NextFlow test pipe
 * @authors
 * Luca Cozzuto <lucacozzuto@gmail.com>
 * 
 */

/*
 * Input parameters: read pairs
 * Params are stored in the params.config file
 */

version                 = "1.0"
// this prevents a warning of undefined parameter
params.help             = false

// this prints the input parameters
log.info """
BIOCORE@CRG - N F TESTPIPE  ~  version ${version}
=============================================
reads                           : ${params.reads}
reference                       : ${params.reference}
"""

// this prints the help in case you use --help parameter in the command line and it stops the pipeline
if (params.help) {
    log.info 'This is the Biocore\'s NF test pipeline'
    log.info 'Enjoy!'
    log.info '\n'
    exit 1
}

/*
 * Defining the output folders.
 */
fastqcOutputFolder    = "ouptut_fastqc"
alnOutputFolder       = "ouptut_aln"
multiqcOutputFolder   = "ouptut_multiQC"


/* Reading the file list and creating a "Channel": a queue that connects different channels.
 * The queue is consumed by channels, so you cannot re-use a channel for different processes. 
 * If you need the same data for different processes you need to make more channels.
 */
 
Channel
    .fromPath( params.reads )  											 
    .ifEmpty { error "Cannot find any reads matching: ${params.reads}" } 
    .set {reads} 											 

reference = file(params.reference)
multiconf = file("config.yaml")

/*
 * Process 1. Run FastQC on raw data. A process is the element for executing scripts / programs etc.
 */
process fastQC {
    publishDir fastqcOutputFolder  			
    tag { "${reads}" }  							

    input:
    path reads   							

    output:									
   	path "*_fastqc.*"

    script:									
    """
        fastqc ${reads} 
    """
}

/*
 * Process 2. Bowtie index
 */
process bowtieIdx {
    tag { "${ref}" }  							

    input:
    path ref   							

    output:									
   	tuple val("${ref}"), path ("${ref}*.ebwt")

    script:									
    """
        gunzip -c ${ref} > reference.fa
        bowtie-build reference.fa ${ref}
        rm reference.fa
    """
}

/*
 * Process 3. Bowtie alignment
 */
process bowtieAln {
    publishDir alnOutputFolder, pattern: '*.sam'

    tag { "${reads}" }  							
    label 'twocpus' 

    input:
    tuple val(refname), path (ref_files)
    path reads  							

    output:									
    path "${reads}.sam", emit: samples_sam
    path "${reads}.log", emit: samples_log

    script:									
    """
    bowtie -p ${task.cpus} ${refname} -q ${reads} -S > ${reads}.sam 2> ${reads}.log
    """
}

/*
 * Process 4. Run multiQC on fastQC results
 */
process multiQC {
    publishDir multiqcOutputFolder, mode: 'copy' 	// this time do not link but copy the output file

    input:
	path (multiconf)
    path (inputfiles)
	
    output:
    path("multiqc_report.html") 					

    script:
    """
    multiqc . -c ${multiconf}
    """
}

workflow flow1 {
    take: 
    reads
    ref
    
    main:
	fastqc_out = fastQC(reads)
	bowtie_index = bowtieIdx(ref)
	bowtieAln(bowtie_index, reads)
	emit:
	sam = bowtieAln.out.samples_sam
	logs = bowtieAln.out.samples_log
	fastqc_out
}


workflow flow2 {
    take: alns
    main:
	fastqc_out2 = fastQC(alns)
	emit:
	fastqc_out2
}

workflow {
    flow1(reads, reference)
    flow2(flow1.out.sam)
 	multiQC(multiconf, flow1.out.fastqc_out.mix(flow1.out.logs).mix(flow2.out.fastqc_out2).collect())
}


workflow.onComplete { 
	println ( workflow.success ? "\nDone! Open the following report in your browser --> ${multiqcOutputFolder}/multiqc_report.html\n" : "Oops .. something went wrong" )
}