How LyftLearn Democratizes Distributed Compute through Kubernetes Spark and Fugue
In a previous blog post, we discussed LyftLearn’s infrastructure built on top of Kubernetes. In this post, we will focus on the compute layer of LyftLearn, and will discuss how LyftLearn solves some of the major pain points faced by Lyft’s machine learning practitioners.
Efficiently utilizing computing resources
Pain points
There is always a tradeoff between user convenience and efficient resource utilization. When LyftLearn users need more compute power, it’s a lot easier to scale vertically (more powerful machines) as opposed to horizontally (more machines). This is less work for users, but the downside is that large machines are often only needed for a portion of the workload. The rest of the time, the utilization is low and huge costs are incurred.
For example, using 90-core EC2 instances to run Jupyter notebooks that only use 60 cores would waste 30 cores and result in a utilization rate of 67%. Since our Kubernetes cluster runs on EC2 instances, our platform team is primarily concerned with maximizing its cost-efficiency.
Solution: Kubernetes Spark
Apache Spark is a multi-language engine for executing data engineering, data science, and machine learning on clusters. It offers superb scalability, performance, and reliability. Using Spark with Kubernetes further reduces the overhead of starting Spark jobs through image caching. Kubernetes enables individual users to use their own images. For each cluster node, the pod architecture allows for heterogeneous Spark workers that don’t interfere with each other.
The following screen recording shows an example map operation delay
(sleeping 5 seconds before returning the original item) that could take 5120 seconds sequentially but only takes 22.9 seconds in our LyftLearn environment. Notice that the 22.9 seconds also includes requesting and releasing the 1024 cores (64 instances, 16 cores each) on Kubernetes. The Spark cluster is ephemeral, only existing in the with
statement.
Animation of Kubernetes Spark execution
The low start-up time allows us to exclusively host on-demand ephemeral Spark clusters. Users can request as many cores as they need with a minimal programming interface. With ephemeral Spark, they can use smaller notebook instances and then leverage large on-demand compute resources when necessary.
Surprisingly, even with multiple heavy Spark jobs running at the same time, the number of EC2 instances in the ASGs (Auto-Scaling Groups) is rarely affected. This means that Spark effectively utilizes the fragmented resources in our cluster, so we get Spark for free in LyftLearn!
Another major benefit of Kubernetes Spark is that there is no bootstrapping needed for Spark workers. In LyftLearn, we made it very easy for users to create their own images as seen in the previous blog post. We also provide custom Spark base images for users so that they do not run into any dependency issues when creating their own.
Distributing existing logic with minimal effort
Pain points
In the past, we tried to persuade users to rewrite their existing logic to optimize for speed and resource utilization because this can save both time and money. In practice, we observed:
- The existing logic was often too complicated or too critical to rewrite confidently.
- Scaling often meant running the same logic on a large number of groups in parallel.
Although Spark is a perfect fit for this scenario, the learning curve is steep. Given the business goals that our customer teams need to focus on, it is suboptimal for everyone to spend time learning Spark. Additionally, even Spark experts may still spend a non-trivial amount of effort implementing a big data workflow.
So how do we minimize the code to bring workloads to Spark?
Solution: Fugue transform
The open-source project Fugue makes this very simple. It is an abstraction layer unifying the computing logic over Spark, Dask, Pandas, and CUDF through Blazing SQL.
One of the most popular features of Fugue is the ability to use one function call to distribute logic. Users can provide functions with type-annotated inputs and outputs, and Fugue will then convert the data based on the type annotations. This makes the custom logic independent from Fugue and Spark, removing the need for any pre-existing knowledge.
Here’s a simple example of bringing custom Python logic to Spark:
Screenshot of distributing custom Python logic on Spark using one function call (out_transform)
In the above example, assume each task is to sleep some seconds determined by the parameter a
. In the first cell, compute_tasks
describes how to batch process the tasks locally. We also defined 100 tasks, each of which takes 58-62 seconds. Running this sequentially would take about 100 minutes, excluding any overhead.
In the second cell, we use Fugue’s out_transform
to run compute_task
with LyftLearn’s Kubernetes Spark. The job takes only 1.5 minutes to finish, including the overhead of getting compute resources and releasing them.
In this next example, we look at a distributed machine learning inference:
Screenshot of distributed inference using one function call (transform)
In this case, the custom function predict()
needs to operate on each region separately. For each region, it will load a region-specific model and make predictions. With the Fugue transform()
function, we use partition={"by": region"}
to split up the all_region
data into separate partitions before running the predict()
function on each one. The beauty of transform()
and out_transform()
is that the logic is defined in a scale-agnostic manner.
Because the input and output types of the predict()
function are Pandas DataFrame
s, we may consider using Spark’s Pandas User-Defined Functions (UDFs) to improve performance. Although PySpark has many different interfaces for this, Fugue will take care of using Pandas UDFs under the hood without any code changes from the user, provided we enable a configuration. Even if the function uses native Python input and output types like List
and Dict
, Fugue can still execute it as a Pandas UDF.
Utilizing SQL-based tools
Pain points
SQL is the major interface for different data services and computing frameworks. However, even for an interface that is widely adopted, there are many variations such that doing a simple operation like SELECT * FROM <table> LIMIT 10
requires a lot of varying boilerplate code.
Presto works great for small output scenarios, and users tend to create small tasks in which they call the Presto service and bring in the data locally for further processing. In order to scale, they run hundreds of small tasks in parallel, overloading the Presto service. They could use a single Hive query to replace the bulk Presto calls, but they don’t know how to handle the large output from Hive and process the data in a distributed fashion.
Another challenge is developing complex SQL queries. They are error-prone and expensive to debug and iterate. How can we use fewer iterations to write 100% correct complex SQL?
Solution
In LyftLearn, we make Kubernetes Spark the only option for running Hive. Many users got huge performance boosts on their queries as a result. This is because:
- During the execution of a query, there are fixed and dedicated computing resources for the query.
- Map-reduce is the natural solution to the large output of the Hive queries.
- Spark 3’s adaptive execution is a huge boost for SQL performance.
- For each execution, the configurations such as CPU, memory, and broadcast threshold can be fine-tuned.
With ephemeral Spark clusters, we rarely see a competition of resources to run Hive queries. Most executions can utilize their configured CPUs immediately.
In addition to improving the performance of execution, LyftLearn is committed to delivering the best development experience in order to increase developer productivity.
Presto and Hive are the two most commonly used SQL services. As such, we created %%presto
and %%hive
magics to enable writing SQL directly in cells with no boilerplate code. Additionally, Fugue SQL enables SQL to operate on mixed data sources such as Presto, Hive, Pandas, and flat files. To support this, we also brought in the fsql
magic from Fugue SQL.
As an example, assume we write a Presto query to get the top 2 subscenarios with the most records in a table. The output is stored as a Pandas DataFrame
named top
. This is a typical aggregation with small output, so Presto is great:
Screenshot of Presto query execution
Now we use a Hive query to dump all the data for these two scenarios into a temporary file. This works well regardless of output size:
Screenshot of Hive query execution
With the cached file, we can write native PySpark code to do big data analysis without re-running the previous queries. Here we get the distinct snapshot_ts
:
Screenshot of Kubernetes Spark execution
Optionally, we can write a Fugue SQL query that is equivalent to the Hive query and the native Spark code combined. It directly joins the Pandas DataFrame
top
with the Hive table, and prints the result:
Screenshot of a Fugue SQL joining a Hive table and a Pandas DataFrame
Below are a few best practices that we share with LyftLearn users:
- Presto is good for aggregation and small output scenarios — it shouldn’t take more than 10 minutes. If Presto is slow, try Hive.
- Hive is slower but generally more scalable. Always try to save the output to files instead of dumping it into Pandas.
- If a Hive query is too complex, too slow, or if requires mixed data sources, consider Fugue SQL. It can simplify the logic and accelerate both development and execution.
Conclusion
In this article, we discussed three major pain points with using distributed batch systems. To address them, we created an intuitive, scalable, and agile system based on Kubernetes Spark and Fugue and provided users with best practices.
We saw great results for LyftLearn in the second half of 2021 with these efforts. Compared with the first half of the year, the average per-Spark job wall time dropped from 3 hours to 0.3 hours, and although the total usage of Spark increased by over 60%, the total cost of Spark execution dropped by over 50%.
Our objective is to minimize the total effort of development, execution, and maintenance in the machine learning lifecycle. Guided by Lyft’s practical needs, we continue to build a balanced ecosystem with an emphasis on intuitive user experience, minimal learning curve, and a system design that influences best practices.
Acknowledgements
Huge shout out to Anindya Saha and Catalin Toda_,_ working tirelessly to create the Spark infrastructure for LyftLearn.
Special thanks to Shiraz Zaman, Vinay Kakade and Michael Rebello for helping to put this together.