Ray Batch Inference at Pinterest (Part 3)

Alex Wang; Software Engineer I | Lei Pan; Software Engineer II | Raymond Lee; Senior Software Engineer | Saurabh Vishwas Joshi; Senior Staff Software Engineer | Chia-Wei Chen; Senior Software Engineer |

Introduction

In Part 1 of our blog series, we discussed why we chose to use Ray(™) as a last mile data processing framework and how it enabled us to solve critical business problems. In Part 2 of our blog series, we described how we were able to integrate Ray(™) into our existing ML infrastructure. In this blog post, we will discuss a second type of popular application of Ray(™) at Pinterest: offline batch inference of ML models. We will also share how our implementation was able to deliver 4.5x throughput increases and 30x cost savings.

Background

Offline batch inference involves operating over a large dataset and passing the data in batches to a ML model which will generate a result for each batch. Offline batch inference jobs generally consist of a series of steps: dataloading, preprocessing, inference, post processing, and result writing. These offline batch inference jobs can be both I/O and compute intensive.

At Pinterest, previous batch inference solutions were built using Apache Spark(™) or Torch Dataloader. These solutions had several drawbacks.

  • Lack of heterogeneous node instance types for preprocessing and inference
  • Difficulty to achieving pipelining and overlap steps such as dataloading, inference, and result writing
  • Spark required batch data processing due to lack of streaming execution support
  • Our Spark clusters had limited GPU support
  • Offline Spark batch processing jobs used online GPU clusters through RPC calls, which was inefficient

Why Ray(™) for Batch Inference

Ray was able to address the drawbacks of the previous batch inference solutions at Pinterest. Our Ray-based batch inference solution, which we refer to as Ray Batch Inference, has several key capabilities.

  • Streaming execution: Ray Data uses streaming execution to efficiently process very large datasets. Dataset execution is pipelined so that multiple execution stages can run in parallel. This is particularly useful for batch inference in which we want to overlap dataloading, inference, and result writing.
  • Heterogeneous clusters: Ray is capable of managing a cluster of heterogeneous resources. This allows us to independently scale CPU intensive data loading / write tasks and GPU inference tasks. Our batch inference jobs that were previously bottlenecked on the data loading stage saw significant improvements in throughput.
  • Training framework agnostic: We use Ray Data, which describes itself as a scalable data processing library for ML workloads. Ray Data is not bound to any specific ML library. Data transformation can be expressed using Pyarrow Table, which allows us to stay agnostic. Ray Batch Inference is able to perform inference using any ML framework such as PyTorch, TensorFlow, HuggingFace, etc.
  • Job composition: Batch inference jobs are often included as part of larger ML workloads. Examples include offline evaluation after training and distillation training. With Ray Data, batch inference can be combined with other dataset operations inside a single ray job.

Deep Dive

Figure 1: High-level diagram of our Ray Batch Inference SDK

Ray Batch Inference at Pinterest is an SDK that runs on top of our existing Ray Infrastructure. Under the hood, it uses Ray Data to compose the streaming execution pipeline that will perform the steps necessary for batch inference: dataloading, preprocessing, inference, post processing, and result writing. Below is a short example of how our SDK is used.

servable_info = TorchScriptServableInfo(

full_path="s3://path_to_trained_model.pt",

framework=Framework.TORCHSCRIPT device=DeviceType.CUDA,)batch_predictor = BatchPredictor.from_servable_info(servable_info)

dataset = read_parquet("s3://path_to_dataset")

inference_results: ray.data.Dataset = batch_predictor.predict(dataset)

Under the Hood

At its core, batch inference is a dataset transformation. Ray Data provides a function called map_batches that allows you to define a mapping operation on a single batch of data. The operation will be executed on the entire dataset by streaming through the dataset. In our case, the mapping operation is model inference.

Figure 2: High level diagram of how we use Ray Data. Four ray actors are scheduled on an AWS g5.24xlarge instance, each ray actor is assigned one GPU.

Under the hood, Ray Data schedules ray tasks or ray actors to execute the mapping operation defined in map_batches. Because model inference requires storing the model weights and metadata, we use ray actor-based map_batches for our implementation.

When our Ray Batch Inference SDK is executed on the ray head node, it will call map_batches. Ray Data will create ray actors to be scheduled on the ray worker node(s). In the case of GPU inference, these ray actors must be scheduled on a GPU node. During instantiation, the ray actors will download the model weights and load them onto device hardware. After instantiation is complete, the ray actors are ready to accept batches of data to perform inference.

Carryover Columns

One feature of batch inference is carryover columns. Carryover columns are columns of the input dataset that will not be processed during the batch inference pipeline, but are still necessary to include in the final output so they can be consumed by downstream jobs. One example of a common carryover column is an image signature (image unique ID). The image signature may not be used as an input feature for the model, but it is useful to include in the output for data analysis and indexing purposes.

Figure 3: How Carryover Columns Work in Ray Batch Inference

If you recall, we use Ray Data map_batches to implement batch inference. In Ray Data, a batch of data can be represented as a pyarrow table. One benefit of representing the batch as a pyarrow table is that we can perform memory-efficient zero-copy operations.

In our batch inference pipeline, the input batch contains feature columns, which will be passed into the model, and carryover columns, which will not be passed into the model. Carryover columns will also skip the preprocess and postprocess stages. Using pyarrow, separating the carryover columns into a separate pyarrow table is a zero-copy operation. The pyarrow table containing the carryover columns can also be appended to the output batch in another zero-copy operation.

Depending on the use case, our batch inference jobs may have a large number of carryover columns. Efficiently implementing carryover columns was crucial for minimizing data overhead.

Multi-Model Inference

In many cases, it is beneficial to run inference for multiple models inside a single job. Our Ray Batch Inference solution supports running inference for N models in a single ray job. Repeated data reads and transformation overhead incurred from running N parallel jobs can be reduced by a factor of N by combining them into a single job.

Figure 4: High level diagram of multi-model inference. Each ray actor can load multiple models onto the same device.

In Ray Batch Inference, we represent a ML model and its metadata together through a concept called a Servable. In our multi-model inference implementation, each ray actor can hold multiple Servables.

servable_info_1 = TorchScriptServableInfo(...)servable_info_2 = TorchScriptServableInfo(...)batch_predictor = BatchPredictor.from_servable_info(servable_info=[servable_info_1, servable_info_2],)

dataset = read_parquet("s3://path_to_dataset")

inference_results: ray.data.Dataset = batch_predictor.predict(dataset)

There are many applications of multi-model inference. Some examples are:

  • Generating different embeddings. Each model will generate a unique embedding from the same input.
  • Offline evaluation for multiple model experiments.

Accumulators

Offline batch inference is typically used for evaluation jobs which calculate metrics such as AUC-ROC, cross entropy loss, etc. These evaluation metrics can be efficiently calculated using accumulators, which merge multiple values across different inference runners into one, such as in Spark. We implemented accumulators in Ray on top of our existing distributed accumulators implementation. At the end of the ray job, these metrics are persisted to an experiment tracker like MLFlow or Weights & Biases.

Figure 5: Distributed accumulators using Ray Actors. Memory overhead is reduced by a factor of X.

Our naive implementation of distributed accumulators would execute a ray task to send each computed eval metric to a Ray Actor for accumulation. However, this created significant overhead because of the large number of ray tasks and resulted in node OOM or ray object store spilling.

Our optimized approach utilizes a two-step accumulation strategy in which partial values are aggregated before being combined later. This helps reduce the number of ray remote tasks and reduce memory overhead.

Figure 6: Example of two-step accumulation. The metric being calculated is the sum of all accumulated values.

LLM Inference

Given that our Ray Batch Inference solution is framework agnostic, it was the perfect foundation to build a system for batch inference of Large Language Models.

At Pinterest, we currently use vLLM™ as an inference optimization engine for our internal LLMs. vLLM enables us to process extremely large datasets with improved throughput by optimizing the attention mechanism of LLMs via PagedAttention and KV Cache.

Throughput of LLaMA-7B model using vLLM™ compared to Hugging Face (HF) and Text Generation Interface (TGI). The experiment was performed on an Nvidia A10G GPU. Results are from vLLM team’s blog.

For TorchScript models, we implement a Servable called TorchScriptServable. This idea can be extended to all kinds of ML models, including LLMs. For vLLM, we implement a Servable called VLLMServable. By using this design pattern, Ray Batch Inference is able to support any kind of ML model regardless of its internal implementation.

servable_info = VLLMServableInfo( framework=Framework.VLLM, device=DeviceType.CUDA,

input_features=["prompt"],

output_features=["generated_text"], vllm_config = VLLMConfig( engine_args=EngineArgs(

model="my_fine_tuned_llm"

), sampling_params=SamplingParams( ... ) ))batch_predictor = BatchPredictor.from_servable_info(servable_info)dataset = ray.data.from_items(

["What is the capital of Virginia?", "How is the weather today?", "Complete this sentence: Hello, ", "How do I sort a list in python?"]

)

generated_text: ray.data.Dataset = batch_predictor.predict(dataset)

Obstacles

When integrating vLLM™ into Ray Batch Inference, we encountered an issue using vLLM on a heterogeneous ray cluster. After upgrading to vLLM 0.4.0, the library was unable to be imported on non-GPU nodes. This raised an exception when trying to use vLLM on the ray head, which was a CPU-only node.

The solution was to decouple vLLM from our user-facing APIs, VLLMServableInfo and VLLMConfig, that are used on the ray head. We only couple vLLM with VLLMServable, the object that contains the model artifacts and is only used by ray actors scheduled on GPU nodes.

Results

Ray Batch Inference has allowed ML Engineers at Pinterest to achieve significant improvements in throughput, GPU utilization, and cost savings.

Torch to Ray(™)

The related pins (p2p) team at Pinterest saw a 4.5x improvement in throughput for one of their inference jobs after migrating from a TorchDataloader to Ray. GPU utilization also improved as they were able to decrease the number of GPUs from 8 (2 g5.24xlarge machines) to 4 (1 g5.24xlarge machine). The migration also resulted in an approximately 25% decrease in job costs.

This improvement was primarily driven by the ability to utilize a heterogeneous ray cluster. The related pins team’s inference job was bottlenecked on the data loading stage of the pipeline. After adding additional CPU resources to the ray cluster, the throughput skyrocketed because the data loading stage was able to schedule extra ray tasks on the CPUs.

Spark(™) to Ray(™)

The search quality team at Pinterest saw over a 30x decrease in annual cost for one of their inference jobs after migrating the job from Spark(™) to Ray(™). This improvement was driven by two reasons:

  • After migrating to Ray(™), the job was able to take advantage of GPU resources. This dramatically improved the throughput during the inference stage of the pipeline.
  • The job utilized our multi-model inference feature. Previously, search quality’s inference job was composed of N Spark jobs. Each Spark job loaded the same dataset from an offline store and wrote its results to a separate table. These N Spark jobs were combined into a single Ray job. The cost of reading the dataset was reduced by a factor of N, and the inference results from multiple models could be written to a single table.

Adoption

Since Ray Batch Inference became GA in Q4 2023, Pinterest ML teams have readily adopted it. We currently run Ray Batch Inference jobs across 13 different teams and average over 60 jobs launched daily.

What’s Next

We plan to integrate KubeRay into our Ray(™) Infrastructure. This will enable us to utilize autoscaling for our batch inference and training jobs, significantly simplifying performance tuning.

Other upcoming topics include:

  • Ray Tune at Pinterest
  • Improving the data loader performance in Ray(™)
  • Fault tolerant Ray Batch Inference and Training
  • Feature Importance on Ray(™)

Acknowledgment

Related Pins: Shivin Thukral, Liyao Lu, Travis Ebesu

ATG: Ammar Alqatari, Vishwakarma Singh

Search Quality: Bhawna Juneja, Gandharv Kapoor

M10n: Haoyu He, Kartik Kapur

ML Platform: Qingxian Lai, Vishnu Arun

To learn more about engineering at Pinterest, check out the rest of our Engineering Blog and visit our Pinterest Labs site. To explore and apply to open roles, visit our Careers page.

Home - Wiki
Copyright © 2011-2024 iteam. Current version is 2.137.1. UTC+08:00, 2024-11-05 16:33
浙ICP备14020137号-1 $Map of visitor$