Tuning Flink Clusters for Stability and Efficiency

Pinterest Engineering
Pinterest Engineering Blog
12 min readJul 11, 2023

--

Divye, Teja, Chen, Sam, Lu, Heng, Kanchi, Rainie, Dinesh, Ashish, Nishant, Pooja | Stream Processing Platform Team | Big Data Systems SRE Team

At Pinterest, stream data processing powers a wide range of real-time use cases. Our Flink clusters are multitenant and run jobs that concurrently process more than 20M msgs/sec across 12 clusters. Over the course of 2022 and early 2023, we’ve spent a significant period of time optimizing our Flink runtime environment and cluster configurations, and we’d like to share our learnings with you.

The quick takeaways?

  1. We’ve reduced our costs for the Stream Processing Platform by 40% while increasing the number of onboarded jobs on our platform by 40%
  2. Our typical jobs are 50–90% less expensive to run than before our optimizations
  3. Our total cost savings were the single largest platform efficiency improvement in Data Eng by % of AWS spending in 2022. These savings were significant to Pinterest.

While this was a major rollout for our team, we had 0 incidents relating to our performance tuning rollouts and we’ve kept that number at 0 for H1 2023. Read on below to find out more and if the post is too long for you, skip to the end to review what we learnt along the way.

So, what is challenging about running and tuning Flink jobs on a multitenant cluster?

Pinterest runs 130+ Flink jobs across eight multitenant production YARN clusters. These jobs vary in size. Our largest Flink job is 2,000+ cores and computes engagement statistics and our smallest jobs process long tail data and take up just a handful of cores. These jobs also vary in importance: some of our Ads jobs are smaller Tier 1 systems without which Pinterest can’t function, others are larger and may be able to afford more relaxed guarantees.

In this mixture of importance and scale, we’re tasked with balancing:

  1. Efficiency of the entire system
  2. Ensuring higher tier jobs have the resources when they need them
  3. Ensuring that the jobs do not interfere with each other when running in the multi-tenanted YARN environment

Multitenancy problems faced on our clusters prior to 2022 and how we resolved them

Lack of CPU isolation (noisy neighbors)

Our Flink clusters are built on YARN. At the beginning of 2022, our YARN containers were configured to run completely unisolated. This caused two major problems:

  1. Our load tests were not stable. We had no way to qualify a job’s ability to handle a certain amount of incoming traffic. When we ran our load tests on our multitenant staging clusters, depending on what other jobs were running on the most constrained nodes, the performance would vary from run to run.
  2. CPU bursts on one job affected others on the same host. While allowing Flink jobs to burst up and consume unused CPU is exactly what we want, we want that excess consumption to be regulated. It is common for a job starting up immediately after a deployment to surge its CPU utilization trying to catch up on the backlog of events. This negatively affects the other jobs on those hosts. In addition, if there is an incident or an unanticipated influx of events, unrelated jobs can become unstable through the fault of their neighbors.
Here we can see a hot node being created by a poorly tuned Flink application that overcommits resources on one host. The application uses far more than its reserved resources and the host eventually reaches 100% CPU at the weekend traffic peak, causing all the jobs on that host to backpressure — a classic noisy neighbor. The iowait graphs show that the backpressure is purely due to CPU starvation and not due to IO.

Maladjusted vcore reservations (hot nodes)

In addition to the lack of CPU isolation, several of our jobs had maladjusted vcore reservations on the cluster. Said simply, they were asking for too few cores and were using too many even under normal 1x load. Since our cluster configuration did not throttle a job if it exceeded its vcore limits, the job would continue to run on spare capacity without any apparent issues, however it would consume CPU headroom away from the other jobs on the host and overcommit the host’s CPU leading to hot hosts.

Burst capacity allocation

Our burst capacity policy on the cluster was also similarly variable. Our initial guidelines were that each job reserved the full capacity it needed to serve 3x the traffic. While this ensured that the jobs were likely to have the resources they needed even in the case of incidents and backlogs, it was wasteful.

By design, a cluster where all jobs are reserving their entire burst capacity quota would have a target CPU utilization of 33% under normal conditions. This is not sustainable from a cost perspective. All our jobs don’t burst at the same time and we need to be more thoughtful about how we manage burst overhead.

Foundations of stability: CGroups & capacity reservations

CGroups soft CPU limits

The first order of business was to set up CGroups with soft CPU limits for each worker on our YARN clusters. In technical terms, we asked YARN to schedule containers based on the cores requested, and we configured CGroups soft CPU limits to enforce CPU limits proportional to requested capacity when the machine was reaching full capacity and not otherwise.

The reason we made the choice of CGroups soft limits vs hard limits is our understanding of our job behaviors. On deploys, our jobs burst on CPU to catch up. Throttling them at this stage is counterproductive. In addition, incidents and unexpected influx of events mean that any job might want to burst at any point of time. As long as CPU capacity is available on the host, we want to make all the burst capacity available for any one job to use. However, if multiple jobs need burst capacity, we want it to be proportionally allocated with respect to CPU reservations.

In the absence of soft CPU limits, a single bursting job could degrade multiple jobs on the cluster and we needed to allocate extra headroom on the hosts to avoid this. Soft CPU limits helped us run the cluster hotter without compromising availability and we were immediately able to size down our clusters by 20%.

Fixing capacity reservations (hot node mitigation)

After launching soft CPU limits on the cluster, we were seeing better behavior on hot nodes. However, around daily traffic peaks and on weekends, hot nodes might still be created. The only remediation available at the time was to redeploy and place job containers on different nodes, avoiding the hot ones. This, of course, was on-call intensive and a manual process. We tried two approaches to address this automatically: CPU aware container scheduling & burst capacity reservations.

Container placement: Optimizing for multi-tenant stability

This diagram covers the two container placement approaches on our YARN cluster. The second approach with guaranteed burst capacity reservations is what we use in production today. It allowed us to schedule containers on our hosts with up to 75% reserved CPU capacity and allowed these jobs to burst all the way to 97% CPU utilization on the host.

CPU aware container scheduling: our initial approach

Our first approach, CPU aware container scheduling, leveraged the idea that containers on the YARN cluster could be placed in a manner that was aware of the existing CPU utilization on the host machines. The key idea is to allocate containers only on lightly loaded hosts by picking hosts with a CPU utilization below a certain percentile threshold (eg. P50 or P75). The hope is that new containers will utilize lightly loaded hosts more and pack the cluster better. By doing so, we expected the cluster CPU utilization to rise in a roughly uniform manner as jobs get added to the cluster (much like filling water into a bucket).

The initial hope did work to a large extent. However, our learning from rolling out this strategy was that the average cluster node didn’t count, only the most heavily loaded ones did. Typically, at traffic peak, everyone needs capacity at the same time. Hosts with more containers scheduled during off peak hours could get overloaded due to poor initial container placement when traffic materialized. In essence, containers had to be moved as traffic load changed on jobs to keep the overall cluster balanced. One option for us was to regularly redeploy jobs to adapt to changing traffic conditions; however, repeated job deployments were disruptive (each deploy causes a CPU surge). Instead, we opted for a different approach.

Guaranteed shared burst capacity + job retuning: our path forward

Since our goal was to avoid heavily loaded hosts (hot nodes cause backpressure!), we asked each job to reserve exactly the number of cores per host they needed for continuous processing. We then asked YARN to place these containers on our hosts assuming that only 24 out of the 32 cores on each host in the cluster were available for scheduling (75%) (see yarn.nodemanager.resource.cpu-vcores). The remaining 25% cores were “reserved” for burst capacity. By forcing YARN to under-schedule our hosts, we intentionally left a deterministic amount of burst capacity even on the most constrained hosts. If the burst capacity would be fully utilized, we would rely on CGroups CPU shares to allocate burst capacity proportional to job capacity reservations.

In order to roll this out, we had to re-tune the CPU reservations for most of our jobs. Our existing jobs were reserving either too few or too many cores as we were not enforcing their CPU reservations. We re-tuned the jobs manually and rolled them out with our clients over the course of four months. New metrics were established for tracking capacity utilization and eventually we were successful. The hot hosts went away.

As a result of this change, jobs gained access to a shared burst capacity reservation that could handle job deployments even at traffic peaks without triggering overload. This was both cost effective and a big win for stability. The rollout was a success.

Job optimization: Removing CPU banding saves 50%+ in job costs

Once we had a stable cluster to work with and jobs with a consistent resource reservation philosophy, we started to look for ways to make our jobs faster and leaner. The first observation was that jobs often suffered from CPU banding.

Before: CPU utilization showing some Flink Taskmanagers with nothing to do and others with skewed load. This is wasteful because the lightly loaded Task Managers are asking for the same resources as the heavily loaded ones.
After: Single tight band for CPU utilization, tight provisioning. The CPU used is 70% lower.

The CPU banding came in two forms: (1) separate banding because of excess allocation of parallelism, and (2) wide CPU bands due to data skew. CPU banding wastes resources because the lightly loaded hosts are reserving the same capacity as the heavily loaded hosts; however, the excess capacity is not being used.

Improving task placements reduces cross-host network traffic by up to 60% and reduces CPU needs by up to 70%

There were two main root causes for CPU banding on the Flink jobs: (1) inherent data skew, and (2) poor choice of Task placement in Flink 1.11. We could not do much about inherent data skew from the inputs, so we added rebalance() calls to input sources where appropriate. This served to reduce the width of the wide bands. However, the main win in mitigating CPU banding came from mitigating a poor choice of task placement in Flink 1.11.

In our workloads, different Flink operators can have different CPU needs. Some operators are CPU light, while others are CPU intensive. If the tasks are non-deterministically placed on Flink Taskmanagers (as we observed Flink 1.11 doing in our cluster), there would be some Taskmanagers that would have two or more copies of the CPU intensive tasks and some Taskmanagers would have two or more copies of light tasks. This would lead to CPU banding.

Once we realized that our tasks were non-deterministically placed and the non-deterministic placement of light and heavy tasks was responsible for the CPU banding, it was important to find a way to streamline the workload. While digging through the Flink 1.11 codebase, we came across an undocumented feature: colocation constraints. After a review, we realized that we might be able to leverage colocation constraints to force a uniform placement of tasks such that the i-th subtask of a particular operator was colocated with the i-th subtask of the subsequent operator. Rolling out the change was a big success, we saw an immediate change in our CPU utilization. The bands collapsed. We rolled this subtask-pinning change out to all our jobs.

One of the major side effects of this change was triggering a 60% drop in cross-host network communication. The i-th subtasks would largely pass data in-process to the next i-th subtask for processing on the same host in-memory and the network hop would be avoided. This reduced our CPU needs by > 50%. After accounting for the overhead reduction, we reduced job parallelism and dropped our per-job costs by 50–90% with no change in performance. This was a big win. In 2022, we absorbed 30% more jobs with a 40% reduction in total cost and our AWS spend savings were significant in relation to Pinterest’s overall AWS spend.

Before & After: 60% reduction in cross host network traffic on one of our clusters.

Hardware upgrades: AWS i4i instances made our jobs 40% more efficient

After a round of testing, we realized that the new i4i AWS instance types were performing really well with our Flink jobs. The jobs used 40% less CPU in exchange for a 10% increase in cost. The upgrade was a no-brainer and after the upgrade we reduced our platform’s AWS spend by an additional 10%.

Before & After: Median CPU utilization drops 40% after we move from i3 instances to i4i instances. The lines represent max CPU, median CPU, mean CPU and min CPU and the utilization is already lower on account of prior optimizations. The spare capacity was recouped immediately after the jobs stabilized on the new hardware.

Data Optimization: Creating efficient inputs for our jobs

The last major optimization for 2022 was optimizing the inputs to the Flink jobs. Very briefly, we identified that jobs were overfetching and overfiltering streams. Addressing the issue allowed these jobs to be scaled down significantly in size, saving us another 5% in total costs on the Stream Processing Platform. The work done in this area shall be covered in a separate post, so please stay tuned.

Lookback, learnings, takeaways, and acknowledgements

Overall, in 2022, we made significant operational changes to our streaming stack and drove large efficiency improvements for the business. We ended the year with zero incidents associated with our efficiency rollouts. Our stability streak has continued into 2023. Along the way we learned that:

  1. Soft limits on shared CPU are the right choice for multitenant Flink clusters. CGroups is a must-have in order to drive compute density.
  2. Reserving a guaranteed burst capacity and sharing that capacity among streaming jobs correctly balances high availability with a pragmatic spending profile.
  3. Addressing CPU banding and balancing load on Flink Taskmanagers produces non-linear cost savings and we’re able to reduce our per-job costs by more than 50% (bending our growth curves).
  4. Newer CPU generations can yield significant improvements to our Flink workloads (over 40%) and a net efficiency win (10% of savings).
  5. Ensuring that jobs are consuming only the right amount of data to produce their outputs can help reduce wasted data transport throughout the ecosystem.

It’s been a great 2022 and early 2023, and our wins would not have been possible without the support and cooperation of our 30+ client teams throughout the company. We are very grateful to our 200+ member client community who tried out our suggested changes and worked with us to stabilize and optimize their jobs. We’re also very thankful to our team members — Lu, Heng, Kanchi, Rainie, Dinesh, Nishant, Ashish, Chenqi, Anu, Pooja, and Hannah who all have at one point or another optimized jobs, reviewed our changes or helped with mass rollouts. A big thanks also to our leadership (Dave Burgess, Chunyan Wang, Ang Zhang, Abhishek Ghosh and David Chaiken) and partner teams: (Roger (小东) Wang, Jiacheng Hong, Shun-ping Chiu, Andrey Gusev, Chengcheng Hu, Mingsi Liu, Nancy Cheng, Yasser Adel Rashwan, Nilesh Gohel, Bin Yang, Dumitru Daniliuc and many others) who worked with us throughout this process.

We are also very grateful that Pinterest won the best company-wide data streaming implementation at the Data Streaming Awards hosted by Confluent. If all of this sounds interesting and you’d like to work with us, please check out Pinterest Careers!

Read more about Stream Processing at Pinterest

To learn more about engineering at Pinterest, check out the rest of our Engineering Blog and visit our Pinterest Labs site. To explore life at Pinterest, visit our Careers page.

--

--