Oxford nanopore is the only sequencing platform to offer the possibility of real-time analysis during a sequencing experiment. This functionality is complemented nicely by unique features of the Nextflow workflow manager. The combination of Nextflow’s use of data streaming and recursive operations allows us to write continuous pipelines that output progressive analysis as data becomes available. This makes it possible to gain insights in real-time using many bioinformatics tools.
There are many possible applications of progressive workflows. In this post we will explain how we implemented the real-time ability in a forthcoming release of our wf-metagenomics workflow. We chose this workflow as there are many usecases where the ability to accumulate knowledge as data become available allows decisions in real-time before sequencing has completed. Clearly this is not possible with sequencing technologies which are inherently a batch process.
If you have worked with Nextflow you are probably aware of its dataflow paradigm: data files move around in Channels. These channels can be manipulated as streams of data by Operators and Processes. In terms of the functionality of Nextflow, this paradigm is really just a method of programming workflows — it doesn’t in many cases grant users of Nextflow any new abilities to create workflows that couldn’t be created with other tools. That is until we leverage the power of watchPath
.
The channel factory method watchPath()
allows us to extend the data streaming paradigm from an abstraction used in Nextflow programming to a literal input stream without a preknown input length. We can start our workflow without any data present, wait for inputs, and process them as they become available.
The watchPath()
method allows Nextflow to watch an input directory or set of directories for files being created, modified or deleted and process them. In wf-metagenomics we watch the input directories for new .fastq
files. This creates a Nextflow Channel without a terminator: our workflow will continue to execute indefinitely processing inputs into outputs. To stop a workflow we need to inject a signal into the input Channel. This can be achieved with watchPath().until()
which allows specification of a stopping condition. A useful method to stop a workflow externally to the Nextflow process is to inject a specially named file into the input directory, for example we can write:
Channel.watchPath('/some/input/path/*.fastq').until { file->file.name == 'STOP.fastq' }
to signal Nextflow to stop waiting for data. Additionally, if desired, its possible to write the above stop signal from the workflow itself based on workflow results. This is simple but effective, however in the future we hope that Nextflow will incorporate nicer methods to signal an input channel should close.
With watchPath()
and Nextflow process
es we can create an output stream with items in a one-to-one correspondence with the input stream. In the context of wf-metagenomics this means creating batches of per-read classifications for each input .fastq
file produced in real-time by an Oxford Nanopore Technologies’ sequencing device. More usefully however we’d like to accumulate workflow outputs into an updating stream summarising all historic data. In the language of reactive programming, such an operation is called a scan.
We tried in vain to implement such a pattern in Nextflow before realising its not possible to encode recursive operations such as a scan. So we reached out to Paolo at SeqeraLabs and asked if this pattern could be implemented. After understanding why we would want to implement such a beast it didn’t take Paolo long to provide us with the scan()
and recurse()
process methods.
The recurse()
operator isn’t terribly interesting for our use cases. Applying the operator to a process allows us to pass a single value to a process and iteratively run the process on its own output until a condition is met. The condition could be simply to apply the recursion a set number of times or a more complex function of the data.
As discussed above the scan()
operator is what we want: it allows us to process not a single input in a recursive fashion but to process an input stream merging new input items with previous results. For the case of wf-metagenomics this means we can at each iteration:
.fastq
file containing readsAs it turns out, in order to allow additional parallelism at the classification step we break apart the above into a distinct classification process and a second aggregating scan process.
As a model of the above, the code below processes an input data stream to create a cumulative output stream (full code with additional commentary). In this toy example the metagenomic classification is replaced with a simple file summary, and our cumulative output is a growing list of these summaries formatted as JSON.
nextflow.enable.dsl=2nextflow.preview.recursion=trueimport nextflow.util.BlankSeparatedList/* Classify the input data stream. */process classify {input: path "input.txt"output: path "summary.json"script:"""#!/usr/bin/env python3# just count the characters in the file in this exampleimport jsondata = {'data': [len(open("input.txt", "r").read())}json.dump(data, open("summary.json", "w"))"""}/* Accumulate results from `classify` processprocess accumulate {input: path dataoutput: path "output_${task.index}.json"script:output = "output_${task.index}.json"if (data instanceof BlankSeparatedList){ new_input = data.getAt(0); state = data.getAt(-1) }else { new_input = "null"; state = data }"""#!/usr/bin/env python3import jsonstate_data = json.load(open("${state}"))if "${new_input}" != "null":new_data = json.load(open("${new_input}"))state_data["data"].extend(new_data["data"])json.dump(state_data, open("${output}", "w"))"""}workflow {// watch a directory, classify inputs, aggregate results progressivelydata = channel.watchPath("inputs/*.txt")classifications = classify(data)cumulative = accumulate.scan(classifications)cumulative.view(it -> it.text) // show contents of files}
The classify
process here is responsible for taking the input data stream, performing calculations on each item, and creating output that can be readily aggregated. The accumulate
process takes the output of classify
and recursively aggregates the data; on each iteration it outputs a new summary of the data observed thus far.
In implementing streaming workflows with watchPath()
and scan()
there’s at least one important “gotcha” of which to be aware. We must ensure that there are sufficient compute resources available to service all steps of the workflow. It’s no good performing the data processing, in real-time as data arrives, if our reporting and output processes become queued behind the heavy lifting. We want our output to be updating regularly and so our reporting processes to be executing regularly.
To achieve this some optimization of input batching is required and use of one little Nextflow feature buried in the documentation.
Within wf-metagenomics we have implemented the --batch_size
parameter. By setting this to 1, each input file is processed independently giving maximum possible granularity in how frequently (in principle) output reports can be updated. However, this can create very many small compute jobs and so larger batch sizes, coalescing some compute jobs, can be beneficial.
In order to avoid the problem highlighted above where our report processes can become starved of resource through very many classification tasks, we also control the flow by making use of the maxForks process directive. In a non-real-time workflow we would set the total executor CPU resource to say 16, and the process CPU directive to 2, with the effect that up to 8 instances of a process could run simultaneously. In a real-time workflow we want to further ensure some resources are kept available for the summary processes in order to produce updating outputs. In wf-metagenomics we constrain maxForks
such that the total CPU resource consumed by compute steps is less than that available to the executor.
With everything in place for real-time streaming workflows in Nextflow we had one other problem to solve: metagenomic classification.
Of course there are excellent community tools out there already for performing this task. Our chosen tool is Kraken2. There is however one issue with using the Kraken2 command-line tool in a real-time setting. We’d like to be able to perform classifications against large databases. Kraken2 databases can be tens of gigabytes in size and therefore take a significant amount of time to load into computer memory.
For real-time analysis we wish to avoid loading the database for every classification task streamed through the workflow. We therefore implemented a server-client architecture around the algorithms of Kraken2. The Kraken2-server package is available on conda and very easy to use. It allows one-time loading of databases into persistent memory to be queried at a later time, and can also expose the databases for use in low-resource settings as the databases can be queried remotely.
Within wf-metagenomics a Kraken2 server is initiated within a Nextflow process and left to run indefinitely. Our classification process uses the Kraken2 client to send incoming sequencing data to the server without having to load the Kraken2 database for each input. The output of this process is the classification results which are fed to a downstream process which utilises Nextflow’s scan to provided a continually updating classification summary for all data received thus far.
Most sequencing platforms operate in a batch fashion: a sequencing run is started, the user waits until the run is complete and their sequencing data is output. Oxford Nanopore Technologies’ sequencing devices are unique in their ability to output sequencing data continuously throughout an experiment as soon as reads are complete (and with adaptive sampling, even before reads are complete!). This opens a world of data analysis possibilities, enhanced review of data by users, and opportunities for automated and manual feedback with the sequencer to adapt the experiment on-the-fly for maximum scientific value.
In this post we have shown how the reactive dataflow paradigm, used in Nextflow to write explicit data processing pipelines, naturally leads to the possiblity of processing data in real-time. With Nextflow we can perform on-line analyses within a standard bioinformatics workflow manager, using standard bioinformatics tooling, without having to resort to writing bespoke data streaming pipelines. Any suitably written Nextflow workflow can be transformed from a batch processing pipeline to a real-time workflow with minimal changes.
We are particularly excited by the prospect of using sequencing device APIs to amend sequencing parameters dynamically as a result of on-going data analysis orchestrated from within Nextflow. At a simple level this could be changing the targets of an adaptive sampling experiment to supress sequencing of common taxons, or more elaborately to feedback results of an ongoing progressive genome assembly to search for reads overlapping contig ends or poorly resolved graph structures.
Of course we are also intrigued to see what creations the community can create with the combination of real-time data analysis provided through Nanopore sequencing and Nextflow.