Skip to content
Snippets Groups Projects
main.nf 16.1 KiB
Newer Older
#!/usr/bin/env nextflow
/*
========================================================================================
                         nf-core/hic
========================================================================================
 nf-core/hic Analysis Pipeline.
 #### Homepage / Documentation
 https://github.com/nf-core/hic
----------------------------------------------------------------------------------------
*/


def helpMessage() {
    // TODO nf-core: Add to this help message with new command line parameters
    log.info"""
    =======================================================
                                              ,--./,-.
              ___     __   __   __   ___     /,-._.--~\'
        |\\ | |__  __ /  ` /  \\ |__) |__         }  {
        | \\| |       \\__, \\__/ |  \\ |___     \\`-._,-`-,
                                              `._,._,\'

     nf-core/hic v${workflow.manifest.version}
    =======================================================

    This pipeline is a Nextflow version of the HiC-Pro pipeline for Hi-C data processing.
    See https://github.com/nservant/HiC-Pro for details.

    Usage:

    The typical command for running the pipeline is as follows:

    nextflow run nf-core/hic --reads '*_R{1,2}.fastq.gz' -profile docker

    Mandatory arguments:
      --readsPath                   Path to input data (must be surrounded with quotes)
      --genome                      Name of iGenomes reference
      -profile                      Configuration profile to use. Can use multiple (comma separated)
                                    Available: conda, docker, singularity, awsbatch, test and more.

    Options:

    References                      If not specified in the configuration file or you wish to overwrite any of the references.
      --fasta                       Path to Fasta reference

    Other options:
      --outdir                      The output directory where the results will be saved
      --email                       Set this parameter to your e-mail address to get a summary e-mail with details of the run sent to you when the workflow exits
      -name                         Name for the pipeline run. If not specified, Nextflow will automatically generate a random mnemonic.

    AWSBatch options:
      --awsqueue                    The AWSBatch JobQueue that needs to be set when running on AWSBatch
      --awsregion                   The AWS Region for your AWS Batch job to run on
    """.stripIndent()
}

/**********************************************************
 * SET UP CONFIGURATION VARIABLES
 */

// Show help emssage
if (params.help){
    helpMessage()
    exit 0
}

// TODO nf-core: Add any reference files that are needed
// Configurable reference genomes
//fasta = params.genome ? params.genomes[ params.genome ].fasta ?: false : false
//if ( params.fasta ){
//    fasta = file(params.fasta)
//    if( !fasta.exists() ) exit 1, "Fasta file not found: ${params.fasta}"
//}


// Has the run name been specified by the user?
//  this has the bonus effect of catching both -name and --name
custom_runName = params.name
if( !(workflow.runName ==~ /[a-z]+_[a-z]+/) ){
  custom_runName = workflow.runName
}


if( workflow.profile == 'awsbatch') {
  // AWSBatch sanity checking
  if (!params.awsqueue || !params.awsregion) exit 1, "Specify correct --awsqueue and --awsregion parameters on AWSBatch!"
  if (!workflow.workDir.startsWith('s3') || !params.outdir.startsWith('s3')) exit 1, "Specify S3 URLs for workDir and outdir parameters on AWSBatch!"
  // Check workDir/outdir paths to be S3 buckets if running on AWSBatch
  // related: https://github.com/nextflow-io/nextflow/issues/813
  if (!workflow.workDir.startsWith('s3:') || !params.outdir.startsWith('s3:')) exit 1, "Workdir or Outdir not on S3 - specify S3 Buckets for each to run on AWSBatch!"
}

// Stage config files
ch_multiqc_config = Channel.fromPath(params.multiqc_config)
ch_output_docs = Channel.fromPath("$baseDir/docs/output.md")




/**********************************************************
 * SET UP CHANNELS
 */

/*
 * input read files
 */
Channel
        .fromFilePairs( params.readPaths )
	.ifEmpty { exit 1, "params.readPaths was empty - no input files supplied" }
        .set { raw_reads_pairs }

raw_reads = Channel.create()
raw_reads_2 = Channel.create()
Channel
        .fromFilePairs( params.readPaths )
        .separate( raw_reads, raw_reads_2 ) { a -> [tuple(a[0], a[1][0]), tuple(a[0], a[1][1])] }


// SPlit fastq files
// https://www.nextflow.io/docs/latest/operator.html#splitfastq

 * Other input channels
// Bowtie2 Index
bwt2_file = file("${params.bwt2_index}.1.bt2")
if( !bwt2_file.exists() ) exit 1, "Reference genome Bowtie 2 not found: ${params.bwt2_index}"
bwt2_index = Channel.value( "${params.bwt2_index}" )

// Restriction fragment
res_frag_file = Channel.value( "${params.restriction_fragment_bed}" )

// Chromosome size
chr_size = Channel.value( "${params.chromosome_size}" )



/**********************************************************
 * SET UP LOGS
 */

// Header log info
log.info """=======================================================
                                          ,--./,-.
          ___     __   __   __   ___     /,-._.--~\'
    |\\ | |__  __ /  ` /  \\ |__) |__         }  {
    | \\| |       \\__, \\__/ |  \\ |___     \\`-._,-`-,
                                          `._,._,\'

nf-core/hic v${workflow.manifest.version}"
======================================================="""
def summary = [:]
summary['Pipeline Name']  = 'nf-core/hic'
summary['Pipeline Version'] = workflow.manifest.version
summary['Run Name']     = custom_runName ?: workflow.runName
// TODO nf-core: Report custom parameters here
summary['Reads']        = params.reads
summary['Fasta Ref']    = params.fasta
//summary['Data Type']    = params.singleEnd ? 'Single-End' : 'Paired-End'
summary['Max Memory']   = params.max_memory
summary['Max CPUs']     = params.max_cpus
summary['Max Time']     = params.max_time
summary['Output dir']   = params.outdir
summary['Working dir']  = workflow.workDir
summary['Container Engine'] = workflow.containerEngine
if(workflow.containerEngine) summary['Container'] = workflow.container
summary['Current home']   = "$HOME"
summary['Current user']   = "$USER"
summary['Current path']   = "$PWD"
summary['Working dir']    = workflow.workDir
summary['Output dir']     = params.outdir
summary['Script dir']     = workflow.projectDir
summary['Config Profile'] = workflow.profile
if(workflow.profile == 'awsbatch'){
   summary['AWS Region'] = params.awsregion
   summary['AWS Queue'] = params.awsqueue
}
if(params.email) summary['E-mail Address'] = params.email
log.info summary.collect { k,v -> "${k.padRight(15)}: $v" }.join("\n")
log.info "========================================="


def create_workflow_summary(summary) {
    def yaml_file = workDir.resolve('workflow_summary_mqc.yaml')
    yaml_file.text  = """
    id: 'nf-core-hic-summary'
    description: " - this information is collected when the pipeline is started."
    section_name: 'nf-core/hic Workflow Summary'
    section_href: 'https://github.com/nf-core/hic'
    plot_type: 'html'
    data: |
        <dl class=\"dl-horizontal\">
${summary.collect { k,v -> "            <dt>$k</dt><dd><samp>${v ?: '<span style=\"color:#999999;\">N/A</a>'}</samp></dd>" }.join("\n")}
        </dl>
    """.stripIndent()

   return yaml_file
}


/*
 * Parse software version numbers
 */
process get_software_versions {

    output:
    file 'software_versions_mqc.yaml' into software_versions_yaml

    script:
    echo $workflow.manifest.version > v_pipeline.txt
    echo $workflow.nextflow.version > v_nextflow.txt
    bowtie2 --version > v_bowtie2.txt
    python --version > v_python.txt
    samtools --version > v_samtools.txt
    scrape_software_versions.py > software_versions_mqc.yaml
    """
}


/****************************************************
 * MAIN WORKFLOW
 */
 * STEP 1 - Two-steps Reads Mapping
*/
raw_reads = raw_reads.concat( raw_reads_2 )

process bowtie2_end_to_end {
   tag "$prefix"
   input:
        set val(sample), file(reads) from raw_reads
        val bt2_index from bwt2_index
 
   output:
	set val(prefix), file("${prefix}_unmap.fastq") into unmapped_end_to_end
     	set val(prefix), file("${prefix}.bam") into end_to_end_bam

   script:
	prefix = reads.toString() - ~/(\.fq)?(\.fastq)?(\.gz)?$/
        def bwt2_opts = params.bwt2_opts_end2end
	"""
        bowtie2 --rg-id BMG --rg SM:${prefix} \\
		${bwt2_opts} \\
		-p ${task.cpus} \\
		-x ${bt2_index} \\
		--un ${prefix}_unmap.fastq \\
	 	-U ${reads} | samtools view -F 4 -bS - > ${prefix}.bam
        """
}
process trim_reads {
   tag "$prefix"
   input:
      set val(prefix), file(reads) from unmapped_end_to_end
   output:
      set val(prefix), file("${prefix}_trimmed.fastq") into trimmed_reads

   script:
      """
      cutsite_trimming --fastq $reads \\
       		       --cutsite  params.ligation_motifs \\
                       --out ${prefix}_trimmed.fastq
      """
}

process bowtie2_on_trimmed_reads {
   tag "$prefix"
   input:
      set val(prefix), file(reads) from trimmed_reads
      val bt2_index from bwt2_index

   output:
      set val(prefix), file("${prefix}_trimmed.bam") into trimmed_bam

   script:
      prefix = reads.toString() - ~/(_trimmed)?(\.fq)?(\.fastq)?(\.gz)?$/
      def bwt2_opts = params.bwt2_opts_trimmed
      """
      bowtie2 --rg-id BMG --rg SM:${prefix} \\
      	      ${bwt2_opts} \\
              -p ${task.cpus} \\
	      -x ${bt2_index} \\
	      -U ${reads} | samtools view -bS - > ${prefix}_trimmed.bam
      """
}

process merge_mapping_steps{
   tag "$bam1 + $bam2"
   input:
      set val(prefix), file(bam1), file(bam2) from end_to_end_bam.join( trimmed_bam )

   output:
      set val(sample), file("${prefix}_bwt2merged.bam") into bwt2_merged_bam

   script:
      sample = prefix.toString() - ~/(_R1)?(_R2)?(_val_1)?(_val_2)?$/
      """
      samtools merge -@ ${task.cpus} \\
       	             -f ${prefix}_bwt2merged.bam \\
	             ${bam1} ${bam2} 

      samtools sort -@ ${task.cpus} -m 800M \\
      	            -n -T /tmp/ \\
	            -o ${prefix}_bwt2merged.sorted.bam \\
	            ${prefix}_bwt2merged.bam
            
      mv ${prefix}_bwt2merged.sorted.bam ${prefix}_bwt2merged.bam
      """
process combine_mapped_files{
   tag "$sample = $r1_prefix + $r2_prefix"
   input:
      set val(sample), file(aligned_bam) from bwt2_merged_bam.groupTuple()

   output:
      set val(sample), file("${sample}_bwt2pairs.bam") into paired_bam

   script:
      r1_bam = aligned_bam[0]
      r1_prefix = r1_bam.toString() - ~/_bwt2merged.bam$/
      r2_bam = aligned_bam[1]
      r2_prefix = r2_bam.toString() - ~/_bwt2merged.bam$/
      """
      mergeSAM.py -f ${r1_bam} -r ${r2_bam} -o ${sample}_bwt2pairs.bam
      """
}
 * STEP2 - DETECT VALID PAIRS
*/

process get_valid_interaction{
   tag "$sample"
   input:
      set val(sample), file(pe_bam) from paired_bam
      val frag_file from res_frag_file

   output:
      set val(sample), file("*.validPairs") into valid_pairs

   script:
      """
      mapped_2hic_fragments.py -f ${frag_file} -r ${pe_bam}
      """
}


/*
 * STEP3 - BUILD MATRIX
*/

process build_contact_maps{
   tag "$sample"
   input:
      set val(sample), file(vpairs) from valid_pairs
      val chrsize from chr_size

   output:
      set val(sample), file("*.matrix") into matrix_file

   script:
   """
   build_matrix --matrix-format upper  --binsize 1000000 --chrsizes ${chrsize} --ifile ${vpairs} --oprefix ${sample}_1000000
   """

}


/*
 // STEP 2 - MultiQC

process multiqc {
    publishDir "${params.outdir}/MultiQC", mode: 'copy'

    input:
    file multiqc_config from ch_multiqc_config
    // TODO nf-core: Add in log files from your new processes for MultiQC to find!
    file ('fastqc/*') from fastqc_results.collect().ifEmpty([])
    file ('software_versions/*') from software_versions_yaml
    file workflow_summary from create_workflow_summary(summary)

    output:
    file "*multiqc_report.html" into multiqc_report
    file "*_data"

    script:
    rtitle = custom_runName ? "--title \"$custom_runName\"" : ''
    rfilename = custom_runName ? "--filename " + custom_runName.replaceAll('\\W','_').replaceAll('_+','_') + "_multiqc_report" : ''
    // TODO nf-core: Specify which MultiQC modules to use with -m for a faster run time
    """
    multiqc -f $rtitle $rfilename --config $multiqc_config .
    """
}


// STEP 3 - Output Description HTML

process output_documentation {
    publishDir "${params.outdir}/Documentation", mode: 'copy'

    input:
    file output_docs from ch_output_docs

    output:
    file "results_description.html"

    script:
    """
    markdown_to_html.r $output_docs results_description.html
    """
}


/*
 * Completion e-mail notification
 */
workflow.onComplete {

    // Set up the e-mail variables
    def subject = "[nf-core/hic] Successful: $workflow.runName"
    if(!workflow.success){
      subject = "[nf-core/hic] FAILED: $workflow.runName"
    }
    def email_fields = [:]
    email_fields['version'] = workflow.manifest.version
    email_fields['runName'] = custom_runName ?: workflow.runName
    email_fields['success'] = workflow.success
    email_fields['dateComplete'] = workflow.complete
    email_fields['duration'] = workflow.duration
    email_fields['exitStatus'] = workflow.exitStatus
    email_fields['errorMessage'] = (workflow.errorMessage ?: 'None')
    email_fields['errorReport'] = (workflow.errorReport ?: 'None')
    email_fields['commandLine'] = workflow.commandLine
    email_fields['projectDir'] = workflow.projectDir
    email_fields['summary'] = summary
    email_fields['summary']['Date Started'] = workflow.start
    email_fields['summary']['Date Completed'] = workflow.complete
    email_fields['summary']['Pipeline script file path'] = workflow.scriptFile
    email_fields['summary']['Pipeline script hash ID'] = workflow.scriptId
    if(workflow.repository) email_fields['summary']['Pipeline repository Git URL'] = workflow.repository
    if(workflow.commitId) email_fields['summary']['Pipeline repository Git Commit'] = workflow.commitId
    if(workflow.revision) email_fields['summary']['Pipeline Git branch/tag'] = workflow.revision
    email_fields['summary']['Nextflow Version'] = workflow.nextflow.version
    email_fields['summary']['Nextflow Build'] = workflow.nextflow.build
    email_fields['summary']['Nextflow Compile Timestamp'] = workflow.nextflow.timestamp

    // Render the TXT template
    def engine = new groovy.text.GStringTemplateEngine()
    def tf = new File("$baseDir/assets/email_template.txt")
    def txt_template = engine.createTemplate(tf).make(email_fields)
    def email_txt = txt_template.toString()

    // Render the HTML template
    def hf = new File("$baseDir/assets/email_template.html")
    def html_template = engine.createTemplate(hf).make(email_fields)
    def email_html = html_template.toString()

    // Render the sendmail template
    def smail_fields = [ email: params.email, subject: subject, email_txt: email_txt, email_html: email_html, baseDir: "$baseDir" ]
    def sf = new File("$baseDir/assets/sendmail_template.txt")
    def sendmail_template = engine.createTemplate(sf).make(smail_fields)
    def sendmail_html = sendmail_template.toString()

    // Send the HTML e-mail
    if (params.email) {
        try {
          if( params.plaintext_email ){ throw GroovyException('Send plaintext e-mail, not HTML') }
          // Try to send HTML e-mail using sendmail
          [ 'sendmail', '-t' ].execute() << sendmail_html
          log.info "[nf-core/hic] Sent summary e-mail to $params.email (sendmail)"
        } catch (all) {
          // Catch failures and try with plaintext
          [ 'mail', '-s', subject, params.email ].execute() << email_txt
          log.info "[nf-core/hic] Sent summary e-mail to $params.email (mail)"
        }
    }

    // Write summary e-mail HTML to a file
    def output_d = new File( "${params.outdir}/Documentation/" )
    if( !output_d.exists() ) {
      output_d.mkdirs()
    }
    def output_hf = new File( output_d, "pipeline_report.html" )
    output_hf.withWriter { w -> w << email_html }
    def output_tf = new File( output_d, "pipeline_report.txt" )
    output_tf.withWriter { w -> w << email_txt }

    log.info "[nf-core/hic] Pipeline Complete"

}