Part 7: Reorganize modules in subworkflow
In this part, we will create a local subworkflow called PREPROC_DIFF
which will include
the two first modules used in your pipeline: DENOISING_MPPCA
and RECONST_DTIMETRICS
.
1. Create a local subworkflow structure
Create the following local directory structure, including a main.nf file inside the subworkflow folder.
mkdir -p /workspaces/nf-neuro-tutorial/subworkflows/local/preproc_diff touch /workspaces/nf-neuro-tutorial/subworkflows/local/preproc_diff/main.nf
You should get this structure.
Directorynf-neuro-tutorial/
- ..
Directorysubworkflows
Directorylocal
Directorypreproc_diff
- main.nf
- …
2. Write the subworkflow module
A Nextflow subworkflow is a file containing at least two modules. The structure of a subworkflow should look like this:
include { MODULE1 } from '../../../path/module1/main'include { MODULE2 } from '../../../path/module2/main'workflow SUBWORKFLOW_NAME { take: input_channel // channel: [ val(meta), input1, input2, input3 ] main: reorganize_input_channel = () // ** description MODULE 1 ** // if (params.run_mdoule1) { input_channel_module1 = () MODULE1 ( input_channel_module1 ) // Output channel output_option = MODULE1.out.output_name .join(reorganize_input_channel.outname_2) } // ** description MODULE 2 ** // input_channel_module2 = () MODULE2( input_channel_module2 ) emit: output_module1_1 = MODULE1.out.output1 // channel: [ val(meta), file ] output_module1_2 = output_option // channel: [ val(meta), file1, file2 ] output_module2_1 = MODULE2.out.output1 // channel: [ val(meta), file ] output_module2_2 = MODULE2.out.output2 // channel: [ val(meta), file ]}
This workflow demonstrates the use of Nextflow DSL2 features such as module inclusion, workflow definition with input and output channels, conditional process execution, and channel manipulation.
The purpose of this local subworkflow is to preprocess DWI data, optionally apply denoising, and compute DTI-derived metrics.
Based on the “Example of module”, create a local subworkflow that integrates the following two modules:
-
Import required modules
Include
denoising_mppca
andreconst_dtimetrics
in your subworkflow.include { RECONST_DTIMETRICS } from '../../../modules/nf-neuro/reconst/dtimetrics/main'include { DENOISING_MPPCA } from '../../../modules/nf-neuro/denoising/mppca/main' -
Rename your worflow :
PREPROC_DIFF
-
Define input channels
Specify the necessary input channels for the subworkflow. In this case you require the files dwi, bval and bvec.
take:ch_dwi // channel: [ val(meta), dwi, bval, bvec ] -
Implement
main:
1- Copy and paste the relevant denoising and DTI metrics sections into the workflow
2- Modify the workflow structure to include a condition that runs denoising only if the option is enabled using the parameter name:
preproc_dwi_run_denoising
// ** Denoise DWI ** //if (params.preproc_dwi_run_denoising) {ch_dwi_bvalbvec = ch_dwi.multiMap { meta, dwi, bval, bvec ->dwi: [ meta, dwi ]bvs_files: [ meta, bval, bvec ]}ch_denoise_dwi = ch_dwi_bvalbvec.dwi.map{ it + [[]] }DENOISING_MPPCA ( ch_denoise_dwi )// Fetch specific outputch_dwi = DENOISING_MPPCA.out.image.join(ch_dwi_bvalbvec.bvs_files)}// Input DTI update with DWI denoised outputinput_dti = ch_dwi.map{ it + [[]] }// DTI-derived metricsRECONST_DTIMETRICS( input_dti ) -
Define output channels
Emit relevant output files, including original and processed dMRI data and DTI metrics.
emit:dwi = ch_dwi_bvalbvec.dwi // channel: [ val(meta), dwi-raw ]dwi_denoised = DENOISING_MPPCA.out.image // channel: [ val(meta), dwi-after-mppca ]bvs_files = ch_dwi_bvalbvec.bvs_files // channel: [ val(meta), bval, bvec ]fa = RECONST_DTIMETRICS.out.fa // channel: [ val(meta), fa ]md = RECONST_DTIMETRICS.out.md // channel: [ val(meta), md ]
include { RECONST_DTIMETRICS } from '../../../modules/nf-neuro/reconst/dtimetrics/main' include { DENOISING_MPPCA } from '../../../modules/nf-neuro/denoising/mppca/main'
workflow PREPROC_DIFF {
take: ch_dwi // channel: [ val(meta), dwi, bval, bvec ]
main:
// ** Denoise DWI ** // if (params.preproc_dwi_run_denoising) { ch_dwi_bvalbvec = ch_dwi .multiMap { meta, dwi, bval, bvec -> dwi: [ meta, dwi ] bvs_files: [ meta, bval, bvec ] }
ch_denoise_dwi = ch_dwi_bvalbvec.dwi .map{ it + [[]] }
DENOISING_MPPCA ( ch_denoise_dwi )
// Fetch specific output ch_dwi = DENOISING_MPPCA.out.image .join(ch_dwi_bvalbvec.bvs_files) }
// Input DTI update with DWI denoised output input_dti = ch_dwi.map{ it + [[]] }
// DTI-derived metrics RECONST_DTIMETRICS( input_dti )
emit: dwi = ch_dwi_bvalbvec.dwi // channel: [ val(meta), dwi-raw ] dwi_denoised = DENOISING_MPPCA.out.image // channel: [ val(meta), dwi-after-mppca ] bvs_files = ch_dwi_bvalbvec.bvs_files // channel: [ val(meta), bval, bvec ] fa = RECONST_DTIMETRICS.out.fa // channel: [ val(meta), fa ] md = RECONST_DTIMETRICS.out.md // channel: [ val(meta), md ]
}
3. Prepare the input structure for the subworkflow and include it in your main.nf
You can now include and bind your local subworkflow to the workflow, as shown in the previous steps:
// Add this line on the top of the main.nf include { PREPROC_DIFF } from './subworkflows/local/preproc_diff/main'
// Replace the section corresponding to the sub-worflow by calling // PREPROC_DIFF() in the main.nf
PREPROC_DIFF()
As we have defined the input of the subworkflow to take the DWI image and the bval and bvec files, you can directly provide the input data to the workflow.
PREPROC_DIFF( inputs.dwi )
Last but not least, don’t forget to adapt the input channels for the module computing statistics within specific regions-of-interest!
input_extract_metric = PREPROC_T1.out.image_bet .join(PREPROC_DIFF.out.fa) .map{ it }
4. Configure your local subworkflow
Finally, all that’s left is to add the parameters defined in the subworkflow to the nextflow.config
file.
// ** subworkflow PREPROC_DIFF ** params.preproc_dwi_run_denoising = true
5. Verify your files
You now have a workflow with one nf-neuro
subworkflow and your own local
subworkflow
in your Nextflow pipeline!
#!/usr/bin/env nextflow
include { PREPROC_T1 } from './subworkflows/nf-neuro/preproc_t1/main'include { STATS_METRICSINROI } from './modules/local/stats/metricsinrois/main'include { PREPROC_DIFF } from './subworkflows/local/preproc_diff/main'
workflow get_data { main: if ( !params.input ) { log.info "You must provide an input directory containing all images using:" log.info "" log.info " --input=/path/to/[input] Input directory containing your subjects" log.info " |" log.info " ├-- S1" log.info " | ├-- *dwi.nii.gz" log.info " | ├-- *dwi.bval" log.info " | ├-- *dwi.bvec" log.info " | └-- *t1.nii.gz" log.info " └-- S2" log.info " ├-- *dwi.nii.gz" log.info " ├-- *bval" log.info " ├-- *bvec" log.info " └-- *t1.nii.gz" log.info "" error "Please resubmit your command with the previous file structure." }
input = file(params.input) // ** Loading DWI files. ** // dwi_channel = Channel.fromFilePairs("$input/**/**/dwi/*dwi.{nii.gz,bval,bvec}", size: 3, flat: true) { it.parent.parent.parent.name + "_" + it.parent.parent.name} // Set the subject filename as subjectID + '_' + session. .map{ sid, bvals, bvecs, dwi -> [ [id: sid], dwi, bvals, bvecs ] } // Reordering the inputs. // ** Loading T1 file. ** // t1_channel = Channel.fromFilePairs("$input/**/**/anat/*T1w.nii.gz", size: 1, flat: true) { it.parent.parent.parent.name + "_" + it.parent.parent.name } // Set the subject filename as subjectID + '_' + session. .map{ sid, t1 -> [ [id: sid], t1 ] } emit: dwi = dwi_channel anat = t1_channel}
workflow { inputs = get_data()
//Processing DWI PREPROC_DIFF( inputs.dwi )
// Preprocessing T1 images //inputs.anat.view()
PREPROC_T1( inputs.anat, Channel.empty(), Channel.empty(), Channel.empty(), Channel.empty(), Channel.empty(), Channel.empty() )
// Extract FA value input_extract_metric = PREPROC_T1.out.image_bet .join(PREPROC_DIFF.out.fa) .map{ it }
STATS_METRICSINROI( input_extract_metric )}
profiles { docker { docker.enabled = true conda.enabled = false singularity.enabled = false podman.enabled = false shifter.enabled = false charliecloud.enabled = false apptainer.enabled = false docker.runOptions = '-u $(id -u):$(id -g)' }}
manifest { name = 'scilus/nf-neuro-tutorial' description = """nf-neuro-tutorial is a Nextflow pipeline for processing neuroimaging data.""" version = '0.1dev'}
params.input = falseparams.output = 'result'
// ** subworkflow PREPROC_DIFF **params.preproc_dwi_run_denoising = true
// ** Subworkflow PREPROC T1 **params.preproc_t1_run_denoising = trueparams.preproc_t1_run_N4 = falseparams.preproc_t1_run_resampling = falseparams.preproc_t1_run_ants_bet = falseparams.preproc_t1_run_synthbet = trueparams.preproc_t1_run_crop = false
process {
publishDir = { "${params.output}/$meta.id/${task.process.replaceAll(':', '-')}" }
withName: "DENOISING_MPPCA" { ext.extent = 3 }
withName: "BETCROP_SYNTHBET" { memory = "4G" ext.nocsf = false }
withName: "RECONST_DTIMETRICS" { ext.ad = false ext.evecs = false ext.evals = false ext.fa = true ext.ga = false ext.rgb = false ext.md = true ext.mode = false ext.norm = false ext.rd = false ext.tensor = false ext.nonphysical = false ext.pulsation = false ext.residual = false ext.b0_thr_extract_b0 = 10 ext.dwi_shell_tolerance = 50 ext.max_dti_shell_value = 1200 ext.run_qc = false }
withName: "STATS_METRICSINROI" { ext.bin = true ext.normalize_weights = false }}
6. Run nextflow
Now, you can run nextflow..
nextflow run main.nf --input data -profile docker -resume
N E X T F L O W ~ version 24.10.4
Launching `main.nf` [mighty_bose] DSL2 - revision: 7d89ad250c
executor > local (5) [6d/15e8f5] PREPROC_DIFF:DENOISING_MPPCA (sub-003_ses-01) [100%] 1 of 1 ✔ [18/41c27f] PREPROC_DIFF:RECONST_DTIMETRICS (sub-003_ses-01) [100%] 1 of 1 ✔ [de/b69a00] PREPROC_T1:DENOISING_NLMEANS (sub-003_ses-01) [100%] 1 of 1 ✔ [e0/adaf76] PREPROC_T1:BETCROP_SYNTHBET (sub-003_ses-01) [100%] 1 of 1 ✔ [8f/41d4e5] STATS_METRICSINROI (sub-003_ses-01) [100%] 1 of 1 ✔ Completed at: 16-Mar-2025 20:00:54 Duration : 2m 4s CPU hours : (a few seconds) Succeeded : 5