Gotchas of Stream Processing: Data Skewness

Rakesh Kumar
Lyft Engineering
Published in
9 min readFeb 16, 2021

--

Using stream processing as a data processing technology and the frameworks designed to support it has become ubiquitous in recent years, as it unlocks huge potential in building business-critical systems. Within the Marketplace team at Lyft, we use the Apache Beam (Flink Runner) streaming processing framework to power our feature engineering and model execution orchestration jobs. Over the past couple of years, we have built and scaled several pipelines to process realtime events at very low latencies. To share some stats, we generate ~100 features for ~3 million geohashes per minute (~400 billion features per day).

To power some of our critical products that require primetime (it is dynamic pricing to control demand and supply in any given region) or identify hotspots, we redesigned our architecture to rely on event-driven systems to provide realtime insights. Through that journey, we ran into our fair share of obstacles trying to scale the platform. The process of identifying the root causes and solving such issues gave us insight into how to design a pipeline that scales seamlessly. We are starting this series to cover gotchas that everyone should be aware of while designing a data-intensive pipeline. Each post will cover unique types of issues and their potential solutions.

Data Skewness

For our first post, we are going to focus on the issue of Data Skewness and how it impacts the performance of the pipeline. It doesn’t matter how highly performant the distributed framework is, a pipeline’s efficiency and throughput are always challenged.

Some signs of this issue are higher CPU and memory utilization on a few of the nodes even though the average resource utilization is way lower (figure 1).

Memory per node vs time
Figure 1: Memory utilization per node vs time

Such an imbalance can cause lower throughput and higher end-to-end latency. Pipelines can fail and enter into crash loops. In rare cases, one may notice an intermittent end-to-end latency fluctuation without any noticeable change in incoming event traffic (figure 2). Identifying the root cause in such cases is tricky.

Figure 1: End-to-End latency vs time
Figure 2: end-to-end latency vs time

Finding Root Cause

Knowledge on shape and spread of data distribution can help initially, but adapting to the changing times will require having a rich set of tools. When data is at rest, like in a SQL system, it’s easier to slice and dice to help understand the causes of skewness. However, in stream-processing, data is in flux and thus one has to fall back to effective observability tools like metrics and logs.

Most of our pipelines are based on Flink, hence why the section below is Flink heavy. However, one can find similar monitoring tools for their streaming engines or frameworks.

Flink metrics

There are a ton of metrics available around task and operator levels to help one find more information. Of these metrics, input and output watermarks and the number of records ingested by task/operator can provide more precise information to determine data skewness. The graph below (figure 3) is created based on Flink metrics. As one can see, there are a couple of tasks in certain nodes processing a much higher number of records as compared to other nodes. This indicates that there is skewness in data. CurrentInputWatermark and currentOutputWatermark metrics would also give a similar graph that confirms certain nodes are falling behind and causing backpressure.

Number of event processed on each node
Figure 3: Number of events processed on each node

Flink dashboard

The Flink dashboard provides rich information on velocity and volume of data at a subtask level for each operator. This is really helpful to quickly identify the issue. Below is one screenshot for reference (figure 4). The top two subtasks are processing a significantly higher numbers of records than the others, and thus clearly the source of the skewness. The same pattern is evident from the number of bytes received by the subtasks.

Figure 4: Data Skewness at Flink Subtask level

Custom tools

The above toolsets are general-purpose tools and may not work at all times. In some special cases, one has to come up with custom tools for root cause analysis (RCA). In one instance, our Flink metric dashboard alerted us to higher latencies on a few tasks. Nothing looked suspicious from the metrics in the dashboard, which was intriguing. As they say, “Once a chance, twice a coincidence, and thrice a pattern.” After observing multiple alerts, we had to choose the route of extensive data logging to avoid a cardinality explosion of Flink metrics as part of RCA. We came up with an ad-hoc logger and logged event name, region, and relevant ids in a JSON format. We then pulled the log file from the specific task manager that caused the backpressure. We had to clean up the log and fetch only those specific to the time window in question. Next, we loaded it into PyNotebook for interactive data exploration. The Python DataFrames library helped us slice the data on various data dimensions (event name, region, etc). We realized that a specific event was noisy. In some special scenarios, a large number of these events were being routed to the same task causing data skewness and thus resulting in backpressure. We had to go back to the drawing board and redesign the pipeline to evenly distribute the data.

As a ridesharing platform, we need to process data based on cities/regions for some of our products. Densely populated regions generate more data. We key the data using the city name in order to group the data that belongs to the same region. A couple of unfortunate nodes are always assigned some big regions, causing data skewness in the system. We check data distribution across the shards and identify such issues early in the data exploration phase. We also include this information in our tech-specs so that we can design the pipeline to handle it properly.

Implementing Solutions

If you have found the root cause, you have already won half the battle. To win the next half of the battle, you have to redesign the pipeline so that you can evenly redistribute the load across all tasks/operators. There is no silver bullet to solve this issue since every application and its data characteristics are different. However, few general good practices can have a huge impact.

Reduce Noise to Signal Ratio

The streaming engines orchestrate operator executions and data transfers from one node to another. Most of the time, data transfer takes a significant amount of time, especially when the pipeline processes billions of events per minute. Reducing the volume of data being transferred increases the efficiency of the pipeline. Ideally, we have the data validation, filters, and data trimming as the first step of the pipeline. This reduces the noise to signal ratio. We aggressively do this to reduce the overall data transfer in the system.

In some cases, pipelines ingest a huge amount of data even though they are not relevant for processing. For example, input events might have many attribute fields, but only few of the attributes are used in the pipeline. In such cases, it’s ideal to aggressively drop unused attributes to reduce total data size, thereby reducing the overall data transfer within and across nodes. Coupling this with efficiently serializing the data over the wire using Protocol Buffers significantly reduces data volume (by 20% in our pipeline!).

Distributed logic

Since pipelines are generally distributed, one can devise a way to distribute processing logic as well. In some of our pipelines, data is aggregated for a region. We distribute the logic in 2 stages. In the 1st stage, we partition the data based on geohash and aggregate it. In the 2nd stage, we repartition the data based on region and combine the result of all the geohashes in the given region (figure 5). In this case, we distribute the heavy lifting work to more nodes and then combine the intermediate result. This allows us to eventually distribute the computation and also increase the overall throughput. There is a disadvantage to this approach: by shuffling the data twice we introduce the risk of more data transfer within the network, which can reduce the pipeline’s performance. This approach is beneficial for scenarios where there is heavy data processing involved per shard and shuffling of data has negligible overhead as compared to processing data.

Figure 5: Distributed Data & Logic

Salting

Most of our feature generation and processing is region-based, and this creates a huge data skewness especially for regions with heavy vehicle traffic. We salt our keys based on geohash and process the data first. But in some cases, some geohashes are also hot and they themselves can cause data skewness. We use key salting to redistribute that data further. Once most of the processing is done, we remove the salt (geohash) and re-key the stream based on the region so that we can join the data downstream. For better results, one can also combine this technique with a distributed logic solution.

As shown below, we are using random.choice to add salt to the key before processing the data (figure 6). This evenly distributes the data. Once the processing is done, we remove the salt from the key for joining the data downstream.

Figure 6: Salting key for better distribution of data

Salting the key with a geohash works best for our problem domain. Choose the salting mechanism carefully, and make sure that it evenly distributes the data.

Reshuffle

Streaming frameworks have GroupBy or CoGroupBy operations. Such operations require accumulating data with the same key on one partition so that they can join the data. This operation requires shuffling data that internally sends data across different nodes just to make sure that it ends up on the right partition. This operation is network heavy and causes an increase in latency. Generally, streaming systems (especially Flink) try to fuse different operators into one to avoid data shuffle. This is just a general optimization trick. While investigating a bottleneck issue, we noticed that some of the Kinesis shards were skewed and affected throughput. We realized Flink wasn’t fully utilizing the available resources because the upstream didn’t have a good partitioning key and Flink was combining different operators. So we added reshuffling to explicitly break fused operators and redistribute the data to underutilized nodes. This simple trick increased the throughput and decreased the end-to-end latency. Keep in mind that reshuffling is a double-edged sword that shouldn’t be used everywhere. Use it when it is absolutely necessary.

Figure 6: Necessary reshuffle just after ingesting events

Proactive Measures

Sometimes a pipeline may handle most of the traffic in sunny-day cases, but that does not guarantee that one won’t run into any data skewness issues in the future. Discovering such issues proactively can save you from big headaches, particularly when the pipeline is in production and the issue is discovered during peak load. To avoid this, design and run a load test against the pipeline before going live. This can surface issues and provide an opportunity to fix them before rolling out to production. Also, add observability for key components of the pipeline so that it can be monitored when it’s in production.

Summary

We used learnings from various issues caused by data skewness to establish a better understanding of data that helped us design more robust pipelines. We discussed different techniques to identify root causes and potential solutions. Some of the general-purpose solutions include trimming events, salting the key, and shuffling the data.

Stay tuned for the next installment in this series, where we’ll discuss the deployment orchestration of streaming pipelines.

As always, Lyft is hiring! If you’re passionate about developing state of the art machine learning models and building the infrastructure that powers them, join our team.

--

--

Hi, I am an engineer with extensive experience in highly distributed & scalable systems.