Ray Infrastructure at Pinterest
Chia-Wei Chen; Sr. Software Engineer | Raymond Lee; Sr. Software Engineer | Alex Wang; Software Engineer I | Saurabh Vishwas Joshi; Sr. Staff Software Engineer | Karthik Anantha Padmanabhan; Sr. Manager, Engineering | Se Won Jang; Sr. Manager, Engineering |
The Journey of our Ray Infrastructure
In the Part 1 of our blog series, we discussed the reason why we were motivated to invest in Ray to solve critical business problems. In this blogpost, we will go one step further to describe what it takes to integrate Ray into a web-scale company like Pinterest, where we have various unique constraints and challenges to embrace new technologies. This is a more comprehensive version of Ray Infrastructure part in our talk Last Mile Data Processing for ML Training using Ray in Ray summit 2023.
In our use case, being able to provision a Ray Cluster like what KubeRay provides is only part of having a matured Ray infrastructure. Companies need to follow all the other best practices suggested by Ray and other specific requirements including log, metrics persistence, network isolation, identifying optimal hardware instances, security, traffic setting, and miscellaneous internal service integrations.
The journey began in 2023 when one full-time engineer dedicated 50% of their time to this project:
- 2023 Q1: Prototyping stage was initiated with assistance from our partners at Anyscale
- 2023 Q2: Ray Infra MVP was completed, including essential tools such as logging, metrics, UI, and CLI for applications, which were iterated upon and enhanced
- 2023 Q3: The focus shifted to onboarding our first production use case, involving the integration of internal systems like workflow systems to enhance service stability
- 2023 Q4: Emphasis was placed on preparing for production, addressing security concerns, improving network stability, and evaluating the transition to a Ray-optimized Kubernetes environment
High level diagram of how Ray works at Pinterest
Challenges We Are Facing
When building the Ray infrastructure at Pinterest, several key challenges were encountered that needed to be addressed:
- Limited access to K8s API: Operating on PinCompute, a general-purpose federation Kubernetes cluster at Pinterest, restricted the installation of necessary operators like KubeRay and its Custom Resources Definitions.
- Ephemeral logging and metrics: While logging and metrics were available through the Ray Dashboard when the Ray Cluster was active, it was not practical to maintain a resource-intensive Ray Cluster solely for debugging purposes. A solution was sought to persist and replay the lifecycle of Ray workloads.
- Metrics Integration: Our company had its own version of a time series database and visualization tool that differed from popular open-source solutions like Prometheus and Grafana.
- Authentication, Authorization, Audit (AAA) guidelines: As per company standards, it’s required to have AAA guarantee For services running on K8s, using Envoy as service mesh is the recommended approach to build AAA at Pinterest.
- Multiple development experiences: Diverse development experiences were sought, including interactive options with Jupyter and CLI access with Dev servers, to cater to various developer needs.
- Cost optimization and resource wastage: Ray clusters left idle could result in significant expenses. A garbage collection policy and cost attribution were needed to increase team awareness and mitigate resource wastage.
- Offline data analysis: Exporting all Ray cluster-related metrics to a big data format (e.g., Hive, Parquet) for offline analysis was a priority. This data would include metrics such as GPU utilization to identify areas for improvement and track application and infrastructure stability over time.
Kubernetes Based
Due to the limited K8s API access, we cannot easily install KubeRay in our environment to operate Ray Cluster in K8s. Additionally, specific sidecars managed by different teams are required for tasks such as secret management, traffic handling, and log rotation within the Pinterest K8s cluster. To ensure centralized control over necessary sidecar updates like bug fixes or security patches, we must adhere to certain restrictions.
To prototype the essential components needed for the Ray cluster (as outlined in the Launching an On-Premise Cluster guide), while incorporating the required sidecars, we opted to use the Pinterest-specific CRD, which is a wrapper that builds on top of an open-source Kubeflow PyTorchJob.
For the initial iteration, we aimed to keep things simple by constructing the Ray head and Ray worker on the client side. This entailed using different commands for each component and crafting a customized script for the client side to execute.
def launch_ray_cluster(configs: RayClusterConfig) -> str: # define resources, instance_type, command, envs_vars etc...
configs = RayClusterAndJobConfigs()
with ThreadPoolExecutor() as executor: # Submit the functions to the executor
ray_head = executor.submit(launch_ray_head(configs)).result()
ray_workers = executor.submit(launch_ray_workers(configs).result()
return check_up_and_running(ray_head, ray_workers)
The step has a lot of room for improvement. The main drawback is that this approach is difficult to manage since the client-side execution can be interrupted due to various reasons (such as network errors or expired credentials), resulting in a zombie Ray cluster that wastes resources on K8s. While this approach is sufficient to unblock our Engineers to play around with Ray, it is far from ideal for a platform designed to manage the Ray Cluster efficiently.
API Gateway & Controller
In the second iteration, a transition was made from managing the Ray cluster on the client-side to a server-side approach by developing a controller similar to KubeRay. Our solution entailed the creation of an intermediate layer between the user and K8s, consisting of several components including an API Server, Ray Cluster / Job Controller, and MySQL database for external state management.
Life cycle of a Ray Cluster within Ray Infrastructure
- API Server: This component facilitates request validation, authentication, and authorization. It abstracts the complexities of K8s from the client-side, enabling users to interact with the platform APIs interface, which is particularly valuable for enhancing security, especially in TLS-related implementations in the later section.
- MySQL database:The database stores state information related to the Ray Cluster, allowing for the replay of necessary ephemeral statuses from the K8s side. It also decouples the data flow between the API Server and Ray Cluster Controller, with the added benefit of facilitating data dumping to Hive for offline analysis.
- Ray Cluster Controller: This component continuously queries K8s to manage the life cycle of the Ray Cluster, including provisioning Ray head and worker nodes, monitoring the status of the Ray Cluster, and performing cleanup operations as needed.
- Ray Job Controller: Similar to the Ray Cluster Controller, the Ray Job Controller focuses on the management of Ray Job life cycles. Serving as the primary entity for submitting RayJobs, it ensures proper authentication and authorization protocols within the system. Additionally, the controller supports the submission of multiple Ray Jobs to the same Ray Cluster, enabling users to iterate more efficiently without the need to wait for new Ray Cluster provisioning for each job submission.
This approach provides a valuable abstraction layer between users and Kubernetes, eliminating the need for users to comprehend intricate Kubernetes artifacts. Instead, they can utilize the user-facing library provided by the platform. By shifting the heavy lifting of provisioning steps from the client-side, the process is streamlined, simplifying steps and enhancing the overall user experience.
FastAPI Swagger UI of our managed Ray RESTful endpoint
During the implementation of our own controller, we ensured modularity, enabling a seamless transition to KubeRay in the future. This approach allows for the effortless substitution of the method used to launch a Ray cluster, transitioning from an in-house Kubernetes primitive to KubeRay with ease.
Class Controller:
def reconcile(self, ray_cluster: RayClusterRecord):
status, k8s_meta = self.launch_and_monitor_ray_cluster(ray_cluster.configs)
db.update(ray_cluster, status=status, k8s_meta=k8s_meta)
def run(self):
while True: ray_clusters = db.get_ray_cluster_to_dispatch()
for ray_cluster in ray_clusters:
self.reconcile(ray_cluster)
sleep(1)
def launch_and_monitor_ray_cluster(self, configs) -> Tuple[str, Dict]:
return get_actual_k8s_related_status(ray_identifier=configs.ray_identifier)
Observability
Considering that the Ray Cluster’s existing Ray dashboard is accessible only when the cluster is active, with no provision for log or metric replay, we chose to develop a dedicated user interface integrating persistent logging and metrics functionality. Supported by the APIs Gateway constructed previously, this user interface offers real-time insights into both Ray Cluster and Ray Job status. Since all the metadata, events, and logs are stored in either database or S3, this strategy allows for log analysis without the need to maintain an active Ray Cluster, mitigating costs associated with idle resources such as GPUs.
Dedicated UI for Ray Cluster
It is likely true that various companies have their own time series metrics solutions. At Pinterest, we utilize our own in-house time series database known as Goku, which has APIs compliant with OpenTSDB. We run an additional sidecar that scrapes prometheus metrics and reformats them to be compatible with our in-house system. Regarding logging, we follow Ray’s recommendation of persisting logs to AWS S3. These logs are then consumed by the API server and displayed on our Ray Cluster UI.
Observability related components om Ray Cluster
Ray Application Stats
We translate the same grafana chart to an in-house visualization tool called Statsboard. In addition, we add more application-specific features such as dcgm GPU metrics and dataloader metrics, which are helpful for ML Engineers at Pinterest to identify the bottleneck and issue for their ray applications.
Ray application metrics dashboard
Ray Infrastructure Stats
Monitoring all infrastructure-level metrics is essential for implementing effective monitoring, generating alerts, and establishing SLO/SLA benchmarks based on historical data. For example, tracking the end-to-end Ray Cluster wait time and monitoring the rolling Success Rate of Ray Jobs are critical for evaluating and sustaining system performance. Additionally, identifying any platform-side errors that may lead to Ray Cluster provisioning failures is crucial for maintaining operational efficiency.
Ray infrastructure metrics dashboard
Development & Production Interface
We provide three options for developing Ray applications at Pinterest including Dev server, Jupyter, and Spinner workflow. All of them are powered by using the RESTful APIs in our ML Platform.
Launch and connect Ray Cluster from a Jupyterhub Pod
Launch and connect Ray Cluster from Dev server using CLI
We rely on PythonOperator in Airflow to compose a customized operator where users can provide their job_configuration, and we do the translation into RayJob requests toward our MLP Server.
Testing
Unittest & Integration Test
We offer two types of testing for users to leverage when developing ray application:
- Unittest is recommended for platform library owners utilizing lower level Ray core or Ray data library. Integration testing is suitable. We follow the Tips for testing Ray programs and use pytest fixtures to reuse a ray cluster as much as possible within the same test suite.
- Integration testing is suitable for users looking to run a complete Ray job to identify and address any regressions that may arise from code changes or library updates. We also run the integration test periodically to monitor the business critical Ray application healthiness.
Network and Security
While Ray as a compute platform is extremely flexible for developers to run workloads easily through APIs, this also leads to a security vulnerability (CVE-2023–48022), emphasized by this Shadowray article. The challenge is that Ray itself doesn’t provide a good way of authentication and authorization, so everyone who has access to Ray Dashboard APIs can execute code remotely without any validation or controls.
At Pinterest, we viewed this security issue seriously and we addressed this issue properly. We go one step further to ensure proper authentication and authorization is applied on Ray Cluster, so a given Ray Cluster cannot be used if the user doesn’t have the right permissions.
However, the complexity of this issue was further compounded by Pinterest’s federation Kubernetes cluster architecture, which posed challenges in applying intra-cluster features to inter-cluster environments. For example, we cannot use NetworkPolicy to control the ingress and egress flow across K8s clusters, so we need an alternative way to achieve network isolation, especially when Pods can scatter across K8s clusters due to our aim for maximizing hardware availability in different zones.
- HTTP: At Pinterest, we use Envoy as our service mesh in the Kubernetes environment. We deploy the Ray Dashboard on localhost behind Envoy and follow the standard way of authentication and authorization at Pinterest. This allows us to limit the access of the Ray Dashboard to either OAuth for users from the UI or mTLS for services.
2. gRPC: to prevent arbitrary Pod in K8s environment that can connect to active Ray Cluster, we leverage the Ray TLS with some customization during Ray cluster bootstrap time. In detail, for each Ray Cluster, we create a unique pair (private key, certificate) Certificate Authority (CA). This ensures we have a 1:1 mapping between a CA and a specific Ray Cluster. The first step of mutual authentication is done by restricting the client (Ray Pods) access to a given CA by proper AuthN / AuthZ on the Server side, so that only a subset of the pods will be able to receive a certificate signed by the CA meant to represent that particular Ray Cluster. The second step occurs when the pods communicate using those issued certificates, checking that they were signed by the CA corresponding to the expected Ray cluster. Moreover, all cryptographic operations to sign and issue leaf certificates for Ray pods should be performed on the server side (MLP Server) to ensure that clients, including the Ray head and worker pods, do not have access to the CA private key.
Lessons Learned
Incremental improvement:
- Begin by deploying a Ray Cluster in a straightforward manner, then focus on automating and scaling the process in a production or cloud environment.
- Utilize existing infrastructure within the company to minimize the need for reinventing the wheel when developing the MVP. For us, we leverage the Kubeflow operator, and existing ML-specific infrastructure logic can streamline the development process.
- Refine the infrastructure,such as addressing security pitfalls and any other compliance issues, according to company-wide best practices once the prototype is completed,
- Conduct regular meetings with customers to gather early feedback on challenges and areas for improvement.
- With the current success of the Ray initiative at Pinterest, we are looking for more improvements like integrating KubeRay when moving to a ML dedicated K8s cluster.
Intermediate Layer between Client and Kubernetes Cluster:
- The API server serves as a bridge between the client and Kubernetes, offering an abstraction layer.
- Ensure that life cycle events of a Ray cluster are persistently recorded even after the custom resource is removed from Kubernetes.
- The platform has the opportunity to implement business logic, such as additional validation and customization, including authentication, authorization, and restricting access to the Ray Dashboard API for end users.
- By decoupling the actual method of provisioning the Ray Cluster, it becomes easier to switch to a different node provider as needed, especially as we plan to move forward to KubeRay and a dedicated K8s cluster in the future.
Visibility:
- Providing insufficient infrastructure-related information to users may lead to confusion regarding application failures or delays in Ray cluster provisioning.
- Platform-side monitoring and alert is critical to operate tens or hundreds of Ray Clusters at the same time. We are still in the early stages of Ray infrastructure, and rapid changes can break the application side, so we need to be diligent in setting up alerts and do thorough testing in staging environments before deploying to production.
Usage
We started collecting Ray infrastructure usage in Q2 2023 and observed a surge in Q4 2023 as our last mile data processing application GA and more and more users started to onboard the Ray framework to explore different Ray applications such as batch inference and adhoc Ray Serve development. We are now actively helping users migrate their native PyTorch based applications toward Ray-based applications to enjoy the benefits of Ray. We are still in the early stages of moving from native PyTorch to Ray based PyTorch training, but we are eagerly collaborating with customers to onboard more advanced use cases.
RayCluster Usage
RayJob Usage
Ray Job v.s. Regular Non Ray Job amount
Use Cases
Ray Infrastructure has been deployed for production ML use-cases and for rapid experimentation of new applications.
Ray Train
- Multiple recommender system model training has migrated to Ray, and we are actively onboarding the remaining use cases
- We are currently running 5000+ Training Jobs / month using Ray
- These training runs utilize a heterogeneous CPU / GPU cluster
Key wins:
Scalability:
- Ray enables our training runs to scale data loading & preprocessing transparently beyond a trainer instance.
- A single gpu node such as p4d.24xlarge instance has a fixed 12:1 CPU:GPU ratio, which prevents data-loaders from scaling out and saturating the GPU.
- With Ray, we can scale out the data loaders outside the p4d instance using cheaper-CPU only instances
Dev-velocity
- Aside from scalability, Ray greatly contributes to the acceleration of development velocity.
- A large part of ML engineers’ day to day work is implementing modeling changes and submitting dev training runs using local code
- Ray enables users to interactively use the Ray compute cluster to submit jobs via Jupyter notebooks as a terminal / interface
Batch Inference
- In the past, Pinterest utilized a PySpark based batch inference solution.
- Using Ray, we have re-implemented a new BatchInference solution, designed as a map_batches implementation on the ray.data.Dataset.
- We are using this solution for three production use cases
- We are currently running 300+ Batch Inference Jobs / month using Ray
Key wins:
Efficiency:
- Unlike the old implementation, Ray allows pipelining of pre-processing, GPU inference, and output file writes.
- Furthermore, it can decouple these three steps automatically to run on heterogeneous CPU & GPU nodes.
- Combined, this has resulted in a 4x reduction in job runtime (1hr → 15 mins) on our production GPU inference jobs.
Unlocked Opportunity:
- The ease of programming with Ray, and the efficiency derived from pipelining, has enabled us to adopt feature ablation tooling for GPU based models.
Experimental Workloads
- Ray offers a robust ecosystem of tools, which also includes Ray Serve
- RayServe provides built-in routing and auto-scaling functionality for model serving, which is very handy to quickly set up a model for evaluation.
- Without RayServe, clients would have to manually set up an RPC Server, deployment pipelines, service discovery, and autoscaling.
Key wins:
- During an internal hackathon, teams could set up and use an open source large model in a few hours
- Without Ray, setting up such an infrastructure would have taken days if not weeks
What’s Next
- Deep dive into Ray Batch Inference at Pinterest
- Ray Tune at Pinterest
- Unique challenge for Ray application at Pinterest
Cloud Runtime Team: Jiajun Wang, Harry Zhang
Traffic Team: James Fish, Bruno Palermo, Kuo-Chung Hsu
Security Team: Jeremy Krach, Cedric Staub
ML Platform: Qingxian Lai, Lei Pan
Anyscale: Zhe Zhang, Kai-Hsun Chen, SangBin Cho