Blog Post

Building a Data Processing Pipeline for NGS: Part 2

Introduction

In my last blog post, I described our original Celery Canvas based NGS data pipeline and its initial use case for processing hundreds of gigabytes of raw Next Generation Sequencing (NGS) data. Because of the increased usage of NGS at Ginkgo, we outgrew this NGS pipeline and had to build a new one that could handle the terabytes of data we generate daily. Let’s explore how we built it.

Choosing Technologies for Ginkgo's Unique Capabilities

We considered a number of different technologies we could leverage to build our new NGS Pipeline and we also had quite a few companies in the life sciences pitch solutions to us (and some go head-to-head against our existing pipelines). While these are all great products and solutions, they did not quite meet our use case.

One of our key learnings was that much of the commercial life science world is optimized for the needs of human genomic analysis. Ginkgo is unique in that we run thousands of tiny samples whereas most infrastructure works on dozens of human genome size samples at a time. This has important implications for workflow orchestration, where spending seconds orchestrating tasks is a small time investment compared to the hours each analysis may take.

In our world, an analysis may take 5 seconds per sample. Thus, if we spend 6 seconds orchestrating per sample, a run of 5000 samples would spend over 8 hours just deciding where to do the work! For a human genome run of 30 samples, this would only be 3 minutes spent.

This scale also has implications in how the workflow is monitored and built. It is easier to view the work of 30 samples in a simple UI. The complexity across 5000 samples, each with their own analysis workflow, can be daunting and requires a different approach.

Version 2.0 with Airflow and AWS Batch

Because of a combination of support, scalability, familiarity and ease of use, we chose to build our new NGS Pipeline on top of Airflow. Given we were already using it for ETL, we had experience in writing DAGs and a good sense of what it was capable of. Additionally:
  • It has a UI that checked most of our boxes (workflow monitoring, task introspection, and retries).

  • It has an army of people contributing to it (including some with significant self-interest to continue improvements like SaaS companies providing hosted Airflow solutions).

  • It has support for running jobs in AWS Batch (thus offering resource constraints to tasks and nearly limitless scale).

  • It can scale out with Celery, which we have an immense amount of experience with.

Our implementation of Airflow uses CeleryExecutor for distributing tasks between two task queues. One task queue distributes work to AWS Batch for jobs that have the need for resource constraints (essentially, whatever job was NGS related), and the other for simple tasks such as moving files around that would run directly within Celery.

This design lets us use AWS Batch to handle our capacity constraints and scaling, and remove additional points of failure, such as Redis and local network storage. Additionally, the state of the workflow and tasks are now held in a database instead of a largely inaccessible broker message, enabling far better introspection into what was actually happening and drastically improving the debugging and observability of the pipeline.

Challenges

As you may expect, there were challenges around implementing our workflows in Airflow.
  • Dynamic scheduling of Airflow DAGs: One of the challenges of our analysis pipeline is that DAGs are dynamically generated based on the output of previous tasks. This is a requirement because our NGS data is actually a combination of thousands of samples with unique barcodes. On sequencing, these barcodes are read which allows us to assign the data to an individual sample. This is known as demultiplexing (or demuxing). Thus, after demultiplexing, we need to spawn a number of sub-DAG for each sample's analysis, and that number isn't known until runtime. Airflow, and many other DAG engines require a fully defined workflow ahead of time, which isn't possible in this scenario. Fortunately, given Airflow is open source, a quick search showed someone had faced this challenge already and provided a plugin we could extend.

  • Testing: Airflow is very stateful which can make testing difficult

  • Scaling up: Airflow scales with a process-based approach, which means that when running thousands of workers, its databases would be overwhelmed by the number of connections. To prevent this, we put pgbouncer in front of the database and changed Airflow's Celery workers to utilize gevent instead of its default process-based pool, so that the workers can use a database connection pool.

  • Tidying up the AWS Batch Executor: While a batch executor existed, it was in need of some love. We updated its submission, querying, and logging components to handle the rate limits we were hitting. (We have about 30,000 workers running.) We also improved it so that when a worker restarts it would reconnect to the remote batch task instead of starting a new one. This was particularly useful as it allowed us to decouple our execution infrastructure from our provisioning. We plan to contribute these changes back to Airflow after it moves to exclusively support Python 3.

Likewise, we had to overcome similar challenges running AWS Batch at scale.
  • We found it is easy to DDOS yourself when running anything at scale on AWS, and the managed solutions will in fact DDOS themselves at times. For instance, we often hit cases where AWS Spot requests become blacklisted because Batch issued too many requests for compute resources that can't be provisioned, or Batch hits a rate limit against ECR and containers fail to spin up.

  • Debugging is challenging. Logs and crucial information are scattered across CloudTrail, EC2, Spot requests, and more without any coherent link between these events. Additionally, there is not a clear delineation between what is an AWS related error and a user error within our control.

  • The overhead of AWS Batch can be pretty significant. Batch moves jobs between states in batches on scheduled intervals. For instance, every 30 seconds it may move submitted jobs into a "runnable" state, then 30 seconds later into a "starting" state, and so on. We must also wait on other AWS resources to scale up during this time such as spot instances. However, the time for moving task states on Batch continues to get faster so this area is constantly improving.

  • We had the added complexity of requiring a custom AMI to run Batch jobs on.

While we invested a significant amount of time working through these issues, we learned enough to build an incredibly scalable solution based on Airflow and Batch.

Advantages of Airflow + Batch

While these were significant obstacles to overcome, we are pleased with the outcome and could not imagine going back to our previous solution. The ability to drop in new code to a task and have it rerun automatically or at the click of a button is magic. While the Airflow + Batch combination can be slower for some use cases it ultimately offers more guarantees and robustness required for building scalable systems.

Pivoting from a push-based task orchestration system in Celery to one that is pull-based in Airflow has been crucial. Celery works by pushing the next event to the broker, whereas Airflow works by polling the current state of the world and then deciding where to proceed. While pushing is blindingly fast and has little state, it is brittle when each event is important and there is no way to backfill missed events. The pull-based model also allows easier coordination across events and tasks, leading to better reports to users. This type of coordination is implicit within a database driven system whereas adding it to a push-based system would require the addition of a new state tracking system which would add another source of failure.

After migrating from Celery to Airflow + Batch, we have not thought about the NGS Pipeline execution for months. It just churns and does its work. We no longer worry about physical hardware, and we no longer worry about when the next sequencer will come online and bring our disks to a grinding halt. Our capacity is effectively unlimited, and AWS Batch continues to improve with faster scheduling times and better resource provisioning. Work that was traditionally performed by always-on compute is now done on an on-demand, elastic architecture, affording us flexibility and scalability. Finally, maintenance of the new pipeline is greatly simplified and it is a lot easier for the entire team to continue to build upon it.

Conclusion

Thanks to the NGS pipeline groundwork described above, we are now building a ton of new functionality around AWS Batch with frameworks for general users to put their scripts onto it without needing to know anything other than Python and how to invoke REST APIs. We are already at the point where the pipeline has enabled the Bioinformatics group and other teams to integrate their custom analyses without having to know Airflow or the nuances of AWS Batch. The future is bright, and we are looking forward to continuing building on this pipeline to push NGS analysis even further!

(Feature photo by Christophe Dion on Unsplash)



Posted by Chris Mitchell