Newer
Older
#!/usr/bin/env nextflow
/*
========================================================================================
nf-core/hic
========================================================================================
nf-core/hic Analysis Pipeline.
#### Homepage / Documentation
https://github.com/nf-core/hic
----------------------------------------------------------------------------------------
*/
/*TOOO
- outputs
- env
- multiqc
- update version tools
- install + compile
*/
def helpMessage() {
log.info"""
=======================================================
,--./,-.
___ __ __ __ ___ /,-._.--~\'
|\\ | |__ __ / ` / \\ |__) |__ } {
| \\| | \\__, \\__/ | \\ |___ \\`-._,-`-,
`._,._,\'
nf-core/hic v${workflow.manifest.version}
=======================================================
This pipeline is a Nextflow version of the HiC-Pro pipeline for Hi-C data processing.
See https://github.com/nservant/HiC-Pro for details.
Usage:
The typical command for running the pipeline is as follows:
nextflow run nf-core/hic --reads '*_R{1,2}.fastq.gz' -profile docker
Mandatory arguments:
--readsPath Path to input data (must be surrounded with quotes)
--genome Name of iGenomes reference
-profile Configuration profile to use. Can use multiple (comma separated)
Available: conda, docker, singularity, awsbatch, test and more.
Options:
--bwt2_index Path to bowtie2 indexes (including indexes prefix)
--bwt2_opts_end2end Option for bowtie2 end-to-end mappinf (first mapping step)
--bwt2_opts_trimmed Option for bowtie2 mapping after ligation site trimming
--min_mapq Minimum mapping quality values to consider
--chromosome_size Path to chromosome size file
--restriction_fragment_bed Path to restriction fragment file (bed)
--ligation-site Ligation motifs to trim (comma separated)
--min_restriction_fragment_size Minimum size of restriction fragments to consider
--max_restriction_framgnet_size Maximum size of restriction fragmants to consider
--min_insert_size Minimum insert size of mapped reads to consider
--max_insert_size Maximum insert size of mapped reads to consider
--min_cis_dist Minimum intra-chromosomal distance to consider
--rm_singleton Remove singleton reads
--rm_multi Remove multi-mapped reads
--rm_dup Remove duplicates
--bin_size Bin size for contact maps (comma separated)
--ice_max_iter Maximum number of iteration for ICE normalization
--ice_filter_low_count_perc Percentage of low counts columns/rows to filter before ICE normalization
--ice_filter_high_count_perc Percentage of high counts columns/rows to filter before ICE normalization
--ice_eps Convergence criteria for ICE normalization
//References If not specified in the configuration file or you wish to overwrite any of the references.
// --fasta Path to Fasta reference
Other options:
--outdir The output directory where the results will be saved
--email Set this parameter to your e-mail address to get a summary e-mail with details of the run sent to you when the workflow exits
-name Name for the pipeline run. If not specified, Nextflow will automatically generate a random mnemonic.
AWSBatch options:
--awsqueue The AWSBatch JobQueue that needs to be set when running on AWSBatch
--awsregion The AWS Region for your AWS Batch job to run on
""".stripIndent()
}
/**********************************************************
* SET UP CONFIGURATION VARIABLES
*/
// Show help emssage
if (params.help){
helpMessage()
exit 0
}
// TODO nf-core: Add any reference files that are needed
// Configurable reference genomes
//fasta = params.genome ? params.genomes[ params.genome ].fasta ?: false : false
//if ( params.fasta ){
// fasta = file(params.fasta)
// if( !fasta.exists() ) exit 1, "Fasta file not found: ${params.fasta}"
//}
// Has the run name been specified by the user?
// this has the bonus effect of catching both -name and --name
custom_runName = params.name
if( !(workflow.runName ==~ /[a-z]+_[a-z]+/) ){
custom_runName = workflow.runName
}
if( workflow.profile == 'awsbatch') {
// AWSBatch sanity checking
if (!params.awsqueue || !params.awsregion) exit 1, "Specify correct --awsqueue and --awsregion parameters on AWSBatch!"
if (!workflow.workDir.startsWith('s3') || !params.outdir.startsWith('s3')) exit 1, "Specify S3 URLs for workDir and outdir parameters on AWSBatch!"
// Check workDir/outdir paths to be S3 buckets if running on AWSBatch
// related: https://github.com/nextflow-io/nextflow/issues/813
if (!workflow.workDir.startsWith('s3:') || !params.outdir.startsWith('s3:')) exit 1, "Workdir or Outdir not on S3 - specify S3 Buckets for each to run on AWSBatch!"
}
// Stage config files
ch_multiqc_config = Channel.fromPath(params.multiqc_config)
ch_output_docs = Channel.fromPath("$baseDir/docs/output.md")
/**********************************************************
* SET UP CHANNELS
*/
/*
* input read files
*/
Channel
.fromFilePairs( params.readPaths )
.ifEmpty { exit 1, "params.readPaths was empty - no input files supplied" }
.set { raw_reads_pairs }
raw_reads = Channel.create()
raw_reads_2 = Channel.create()
Channel
.fromFilePairs( params.readPaths )
.separate( raw_reads, raw_reads_2 ) { a -> [tuple(a[0], a[1][0]), tuple(a[0], a[1][1])] }
// SPlit fastq files
// https://www.nextflow.io/docs/latest/operator.html#splitfastq
// Bowtie2 Index
bwt2_file = file("${params.bwt2_index}.1.bt2")
if( !bwt2_file.exists() ) exit 1, "Reference genome Bowtie 2 not found: ${params.bwt2_index}"
bwt2_index = Channel.value( "${params.bwt2_index}" )
res_frag_file = Channel.value( "${params.restriction_fragment_bed}" )
chr_size = Channel.value( "${params.chromosome_size}" )
map_res = Channel.from( params.bins_size.tokenize(',') )
/**********************************************************
* SET UP LOGS
*/
// Header log info
log.info """=======================================================
,--./,-.
___ __ __ __ ___ /,-._.--~\'
|\\ | |__ __ / ` / \\ |__) |__ } {
| \\| | \\__, \\__/ | \\ |___ \\`-._,-`-,
`._,._,\'
nf-core/hic v${workflow.manifest.version}"
======================================================="""
def summary = [:]
summary['Pipeline Name'] = 'nf-core/hic'
summary['Pipeline Version'] = workflow.manifest.version
summary['Run Name'] = custom_runName ?: workflow.runName
// TODO nf-core: Report custom parameters here
summary['Reads'] = params.reads
//summary['Fasta Ref'] = params.fasta
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
summary['Max Memory'] = params.max_memory
summary['Max CPUs'] = params.max_cpus
summary['Max Time'] = params.max_time
summary['Output dir'] = params.outdir
summary['Working dir'] = workflow.workDir
summary['Container Engine'] = workflow.containerEngine
if(workflow.containerEngine) summary['Container'] = workflow.container
summary['Current home'] = "$HOME"
summary['Current user'] = "$USER"
summary['Current path'] = "$PWD"
summary['Working dir'] = workflow.workDir
summary['Output dir'] = params.outdir
summary['Script dir'] = workflow.projectDir
summary['Config Profile'] = workflow.profile
if(workflow.profile == 'awsbatch'){
summary['AWS Region'] = params.awsregion
summary['AWS Queue'] = params.awsqueue
}
if(params.email) summary['E-mail Address'] = params.email
log.info summary.collect { k,v -> "${k.padRight(15)}: $v" }.join("\n")
log.info "========================================="
def create_workflow_summary(summary) {
def yaml_file = workDir.resolve('workflow_summary_mqc.yaml')
yaml_file.text = """
id: 'nf-core-hic-summary'
description: " - this information is collected when the pipeline is started."
section_name: 'nf-core/hic Workflow Summary'
section_href: 'https://github.com/nf-core/hic'
plot_type: 'html'
data: |
<dl class=\"dl-horizontal\">
${summary.collect { k,v -> " <dt>$k</dt><dd><samp>${v ?: '<span style=\"color:#999999;\">N/A</a>'}</samp></dd>" }.join("\n")}
</dl>
""".stripIndent()
return yaml_file
}
/*
* Parse software version numbers
*/
process get_software_versions {
output:
file 'software_versions_mqc.yaml' into software_versions_yaml
script:
echo $workflow.manifest.version > v_pipeline.txt
echo $workflow.nextflow.version > v_nextflow.txt
bowtie2 --version > v_bowtie2.txt
python --version > v_python.txt
samtools --version > v_samtools.txt
scrape_software_versions.py > software_versions_mqc.yaml
"""
}
/****************************************************
* MAIN WORKFLOW
*/
* STEP 1 - Two-steps Reads Mapping
*/
raw_reads = raw_reads.concat( raw_reads_2 )
process bowtie2_end_to_end {
tag "$prefix"
input:
set val(sample), file(reads) from raw_reads
val bt2_index from bwt2_index
output:
set val(prefix), file("${prefix}_unmap.fastq") into unmapped_end_to_end
set val(prefix), file("${prefix}.bam") into end_to_end_bam
script:
prefix = reads.toString() - ~/(\.fq)?(\.fastq)?(\.gz)?$/
def bwt2_opts = params.bwt2_opts_end2end
"""
bowtie2 --rg-id BMG --rg SM:${prefix} \\
${bwt2_opts} \\
-p ${task.cpus} \\
-x ${bt2_index} \\
--un ${prefix}_unmap.fastq \\
-U ${reads} | samtools view -F 4 -bS - > ${prefix}.bam
"""
}
process trim_reads {
tag "$prefix"
input:
set val(prefix), file(reads) from unmapped_end_to_end
output:
set val(prefix), file("${prefix}_trimmed.fastq") into trimmed_reads
script:
"""
cutsite_trimming --fastq $reads \\
--cutsite ${params.ligation_site} \\
--out ${prefix}_trimmed.fastq
"""
}
process bowtie2_on_trimmed_reads {
tag "$prefix"
input:
set val(prefix), file(reads) from trimmed_reads
val bt2_index from bwt2_index
output:
set val(prefix), file("${prefix}_trimmed.bam") into trimmed_bam
script:
prefix = reads.toString() - ~/(_trimmed)?(\.fq)?(\.fastq)?(\.gz)?$/
"""
bowtie2 --rg-id BMG --rg SM:${prefix} \\
${params.bwt2_opts_trimmed} \\
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
-p ${task.cpus} \\
-x ${bt2_index} \\
-U ${reads} | samtools view -bS - > ${prefix}_trimmed.bam
"""
}
process merge_mapping_steps{
tag "$bam1 + $bam2"
input:
set val(prefix), file(bam1), file(bam2) from end_to_end_bam.join( trimmed_bam )
output:
set val(sample), file("${prefix}_bwt2merged.bam") into bwt2_merged_bam
script:
sample = prefix.toString() - ~/(_R1)?(_R2)?(_val_1)?(_val_2)?$/
"""
samtools merge -@ ${task.cpus} \\
-f ${prefix}_bwt2merged.bam \\
${bam1} ${bam2}
samtools sort -@ ${task.cpus} -m 800M \\
-n -T /tmp/ \\
-o ${prefix}_bwt2merged.sorted.bam \\
${prefix}_bwt2merged.bam
mv ${prefix}_bwt2merged.sorted.bam ${prefix}_bwt2merged.bam
"""
process combine_mapped_files{
tag "$sample = $r1_prefix + $r2_prefix"
input:
set val(sample), file(aligned_bam) from bwt2_merged_bam.groupTuple()
output:
set val(sample), file("${sample}_bwt2pairs.bam") into paired_bam
script:
r1_bam = aligned_bam[0]
r1_prefix = r1_bam.toString() - ~/_bwt2merged.bam$/
r2_bam = aligned_bam[1]
r2_prefix = r2_bam.toString() - ~/_bwt2merged.bam$/
def opts = ""
opts = params.rm_singleton ? "${opts}" : "--single ${opts}"
opts = params.rm_multi ? "${opts}" : "--multi ${opts}"
mergeSAM.py -f ${r1_bam} -r ${r2_bam} -o ${sample}_bwt2pairs.bam -q ${params.min_mapq} ${opts}
* STEP2 - DETECT VALID PAIRS
*/
process get_valid_interaction{
tag "$sample"
input:
set val(sample), file(pe_bam) from paired_bam
val frag_file from res_frag_file
output:
set val(sample), file("*.validPairs") into valid_pairs
set val(sample), file("*.validPairs") into valid_pairs_4cool
def opts = ""
if (params.min_cis_dist != "") opts="${opts} -d ${params.min_cis_dist}"
if (params.min_insert_size != "") opts="${opts} -s ${params.min_insert_size}"
if (params.max_insert_size != "") opts="${opts} -l ${params.max_insert_size}"
if (params.min_restriction_fragment_size != "") opts="${opts} -t ${params.min_restriction_fragment_size}"
if (params.max_restriction_fragment_size != "") opts="${opts} -m ${params.max_restriction_fragment_size}"
mapped_2hic_fragments.py -f ${frag_file} -r ${pe_bam} ${opts}
"""
}
/*
* STEP3 - BUILD MATRIX
*/
process build_contact_maps{
set val(sample), file(vpairs), val(mres) from valid_pairs.combine(map_res)
val chrsize from chr_size
output:
file("*.matrix") into raw_maps
build_matrix --matrix-format upper --binsize ${mres} --chrsizes ${chrsize} --ifile ${vpairs} --oprefix ${sample}_${mres}
}
/*
* STEP 4 - NORMALIZE MATRIX
*/
process run_iced{
tag "$rmaps"
input:
file(rmaps) from raw_maps
output:
file("*iced.matrix") into iced_maps
script:
prefix = rmaps.toString() - ~/(\.matrix)?$/
"""
ice --filter_low_counts_perc ${params.ice_filer_low_count_perc} \
--results_filename ${prefix}_iced.matrix \
--filter_high_counts_perc ${params.ice_filer_high_count_perc} \
--max_iter ${params.ice_max_iter} --eps ${params.ice_eps} --remove-all-zeros-loci --output-bias 1 --verbose 1 ${rmaps}
"""
/*
* STEP 5 - COOLER FILE
process generate_cool{
tag "$sample"
input:
set val(sample), file(vpairs) from valid_pairs_4cool
val chrsize from chr_size
output:
file("*mcool") into cool_maps
script:
"""
hicpro2higlass.sh -i $vpairs -r 5000 -c ${chrsize} -n
"""
}
*/
process multiqc {
publishDir "${params.outdir}/MultiQC", mode: 'copy'
input:
file multiqc_config from ch_multiqc_config
// TODO nf-core: Add in log files from your new processes for MultiQC to find!
file ('fastqc/*') from fastqc_results.collect().ifEmpty([])
file ('software_versions/*') from software_versions_yaml
file workflow_summary from create_workflow_summary(summary)
output:
file "*multiqc_report.html" into multiqc_report
file "*_data"
script:
rtitle = custom_runName ? "--title \"$custom_runName\"" : ''
rfilename = custom_runName ? "--filename " + custom_runName.replaceAll('\\W','_').replaceAll('_+','_') + "_multiqc_report" : ''
// TODO nf-core: Specify which MultiQC modules to use with -m for a faster run time
"""
multiqc -f $rtitle $rfilename --config $multiqc_config .
"""
}
process output_documentation {
publishDir "${params.outdir}/Documentation", mode: 'copy'
input:
file output_docs from ch_output_docs
output:
file "results_description.html"
script:
"""
markdown_to_html.r $output_docs results_description.html
"""
}
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
/*
* Completion e-mail notification
*/
workflow.onComplete {
// Set up the e-mail variables
def subject = "[nf-core/hic] Successful: $workflow.runName"
if(!workflow.success){
subject = "[nf-core/hic] FAILED: $workflow.runName"
}
def email_fields = [:]
email_fields['version'] = workflow.manifest.version
email_fields['runName'] = custom_runName ?: workflow.runName
email_fields['success'] = workflow.success
email_fields['dateComplete'] = workflow.complete
email_fields['duration'] = workflow.duration
email_fields['exitStatus'] = workflow.exitStatus
email_fields['errorMessage'] = (workflow.errorMessage ?: 'None')
email_fields['errorReport'] = (workflow.errorReport ?: 'None')
email_fields['commandLine'] = workflow.commandLine
email_fields['projectDir'] = workflow.projectDir
email_fields['summary'] = summary
email_fields['summary']['Date Started'] = workflow.start
email_fields['summary']['Date Completed'] = workflow.complete
email_fields['summary']['Pipeline script file path'] = workflow.scriptFile
email_fields['summary']['Pipeline script hash ID'] = workflow.scriptId
if(workflow.repository) email_fields['summary']['Pipeline repository Git URL'] = workflow.repository
if(workflow.commitId) email_fields['summary']['Pipeline repository Git Commit'] = workflow.commitId
if(workflow.revision) email_fields['summary']['Pipeline Git branch/tag'] = workflow.revision
email_fields['summary']['Nextflow Version'] = workflow.nextflow.version
email_fields['summary']['Nextflow Build'] = workflow.nextflow.build
email_fields['summary']['Nextflow Compile Timestamp'] = workflow.nextflow.timestamp
// Render the TXT template
def engine = new groovy.text.GStringTemplateEngine()
def tf = new File("$baseDir/assets/email_template.txt")
def txt_template = engine.createTemplate(tf).make(email_fields)
def email_txt = txt_template.toString()
// Render the HTML template
def hf = new File("$baseDir/assets/email_template.html")
def html_template = engine.createTemplate(hf).make(email_fields)
def email_html = html_template.toString()
// Render the sendmail template
def smail_fields = [ email: params.email, subject: subject, email_txt: email_txt, email_html: email_html, baseDir: "$baseDir" ]
def sf = new File("$baseDir/assets/sendmail_template.txt")
def sendmail_template = engine.createTemplate(sf).make(smail_fields)
def sendmail_html = sendmail_template.toString()
// Send the HTML e-mail
if (params.email) {
try {
if( params.plaintext_email ){ throw GroovyException('Send plaintext e-mail, not HTML') }
// Try to send HTML e-mail using sendmail
[ 'sendmail', '-t' ].execute() << sendmail_html
log.info "[nf-core/hic] Sent summary e-mail to $params.email (sendmail)"
} catch (all) {
// Catch failures and try with plaintext
[ 'mail', '-s', subject, params.email ].execute() << email_txt
log.info "[nf-core/hic] Sent summary e-mail to $params.email (mail)"
}
}
// Write summary e-mail HTML to a file
def output_d = new File( "${params.outdir}/Documentation/" )
if( !output_d.exists() ) {
output_d.mkdirs()
}
def output_hf = new File( output_d, "pipeline_report.html" )
output_hf.withWriter { w -> w << email_html }
def output_tf = new File( output_d, "pipeline_report.txt" )
output_tf.withWriter { w -> w << email_txt }
log.info "[nf-core/hic] Pipeline Complete"
}