Where’s My Data — A Unique Encounter with Flink Streaming’s Kinesis Connector
For years now, Lyft has not only been a proponent of but also a contributor to Apache Flink. Lyft’s pipelines have evolved drastically over the years, yet, time and time again, we run into unique cases that stretch Flink to its breaking points — this is one of those times.
Context
While Lyft runs many streaming applications, the one specifically in question is a persistence job. Simply put, it streams data from Kinesis, performs some level of serializations and transformations, and writes to S3 every few minutes.
In this case, it persists a hefty majority of events generated at Lyft, occurring at a rate of 80 gigabytes per minute on average and running at a parallelism of 1800, which happens to be one of Lyft’s largest streaming jobs.
Chapter 1: The Outage
Let’s start at the end, shall we?
Data Engineer: “Alert! My reports aren’t being generated! The upstream data is not available to generate them on!”
Platform Engineer: “I’m on it! Looks like our streaming application to persist data is up and running, but I hardly see any data being written either!”
Like any good engineer would, we pulled out our runbooks and carefully performed the well-detailed steps:
Platform Engineer: “Let me roll back our seemingly innocuous change we just deployed.”
Platform Engineer: “No luck.”
Platform Engineer: “Ok, let me try turning it off and on again.”
Platform Engineer: “No luck.”
Platform Engineer: “Ok, let me try performing a hard reset and we’ll backfill later.”
Platform Engineer: “It worked!”
This pattern unfortunately repeated on a roughly biweekly cadence for over a month. It certainly warranted further investigation.
Chapter 2: The Investigation
Throughout the rest of this article, we’ll dissect the root cause of the behavior mentioned above and its relation to the inner-workings of the Flink Kinesis Consumer.
To start, we asked the following questions:
- Why did the job suddenly have such low throughput?
- Why did the issue persist after both attempting to rollback changes and starting from an earlier checkpoint?
Our initial investigations yielded the following:
Kinesis Per Shard Iterator Age
The iterator age, or the time between the current processing time and the time at which the event arrived on the Kinesis shard during the outage is pictured below.
While difficult to see, the above is a graph indicating that for all shards but one, the iterator age is regular at ~3 minutes. There is then a gap in metrics followed by the iterator age of a single shard indicating ~5 day iterator age lag.
Digging deeper, we see that single shard in question experienced the following behavior days prior:
Lo and behold, it appears our issue did not begin at the time of detection when downstream users appeared to be missing data, but rather days prior.
Chapter 3: Five Days Prior
At this point, we shifted our investigation to five days prior, corresponding to the time at which the iterator age lag for the shard above was absent. There appeared to be a handful of irregular metrics at exactly this time, including the following:
CPU Usage & CPU Throttling
As shown above, during this period several of the 1800 task managers reached a CPU usage of 100% with significant CPU throttling. Furthermore, for task managers experiencing prolonged periods of CPU throttling, we saw a gap in logs of up to 10 minutes for the Kinesis consumer:
Before we begin our next part of the story, we’ll need to develop a deeper understanding of the Flink Kinesis Consumer.
Chapter 4: The Flink Kinesis Consumer
For the sake of simplicity, let’s assume there is one Kinesis shard per Flink subtask. A subtask in the context is defined as one parallel slice of a task or a unit of execution able to be scheduled. Now from a high level, most assume consumption from a single shard occurs as follows:
Indeed, this may have been the case at inception, but further optimizations could and should be made.
What if we want to read and write asynchronously?
The multithreaded approach above elegantly solves this issue, however, further issues arise.
What if we want to maintain event time alignment in the source?
Both of the models above are inherently flawed in regards to a concept known as event time alignment, preventing any single subtask from processing data with an event time far ahead of other subtasks’ corresponding record event times.
As discussed in this Flink improvement proposal, unaligned event times from the source operator can lead to uncontrollable growth in state.
Let’s define some terminology that will help with the below examples:
- Local watermark: Corresponds to a single subtask and aligns closely with event time, indicating to downstream operators that no elements with a timestamp older or equal to the watermark timestamp should arrive at the operator.
- Global watermark: Corresponds to the minimum of all subtasks’ local watermarks.
- Watermark “lookahead”: The amount of time for which one subtask can process data ahead of the global watermark.
Example
The following will exemplify the relationship of the different watermark-related timestamps defined above.
Now, let’s see how this concept is implemented in the context of our consumer.
- RecordEmitter: The watermark of each record is compared to the global watermark to decide to emit the record downstream or to wait.
- PeriodicWatermarkEmitter: This thread periodically emits the watermark corresponding to the last emitted record.
- WatermarkSyncCallback: This thread periodically updates the job manager with the subtasks local watermark and is returned the global watermark of all subtasks.
While elegant, this model has yet another flaw that needs resolving.
What if a shard does not have more data?
Under conditions of the current model, if a shard has stopped being written to for any reason, the subtask’s local watermark will not progress, and thus holds back the global watermark. If this occurs, other subtasks will eventually be prevented from processing data due to event time alignment.
To handle the case of an idle shard, what do we do? Essentially, we ignore the subtask.
Example
To handle idle shards we perform three actions:
- Mark the subtask as idle so its watermarks, or lack thereof, are ignored by downstream operators.
- Prevent the subtask’s local watermark from impacting the global watermark.
- When the global watermark is calculated for other subtasks, it only considers local watermarks updated in the last 60 seconds.
At this point, we have the proper understanding required to continue to the next chapter. It’s worth noting that much of the complexity of multithreading and event time alignment is the motivation behind the Flink source refactor that implements these concepts as a first-class citizen in Flink.
Chapter 5: Time Travel
There’s one final piece of the puzzle to uncover before explaining what happened — the logs! The logs periodically show the local watermark, global watermark, and idle status of each subtask.
In our case, we see the following events for the subtask and shard in question:
- The local and global watermarks are emitted normally.
- During CPU throttling, no logs are reported for the consumer for several minutes.
- The global watermark has moved backwards in time several minutes.
- The subtask is marked idle.
- The subtask does not emit a record due to the event time alignment mechanism.
- The global subtask does not update as the source is idle.
- Logs with the same local watermark, global watermark, and idle status are reported indefinitely.
It’s often thought that global watermarks can only move forward in time, not backward, and yet here we are. In the next chapter we’ll put the pieces together to explain the series of strange phenomena we’ve seen so far.
Chapter 6: The Deadlock
At this point, we now possess a handful of curious observations.
Days prior to restart:
- High CPU throttling occurs for an extended period of time.
- After high CPU throttling, the global watermark is moved backward in time.
- After high CPU throttling, a single subtask is marked idle and it does not emit new records even when the shard contains more data.
- A shard’s iterator age does not appear in the metrics for ~5 days.
At the time of the application restart, 5 days later:
- The single shard is the only shard whose iterator age appears in the metrics.
- The application’s throughput is <1% of expected, regardless of starting with an earlier checkpoint or savepoint.
This brings us to answer the first of many questions that explain the behavior.
Why is the subtask not emitting data when the shard contains data?
The subtask does not emit data as it enters the following deadlock state:
- Subtasks A’s local watermark is too far ahead of the global watermark, corresponding to Subtask B’s watermark, to emit a new record per the event time alignment mechanism.
- Subtask A is marked idle and thus does not receive the updated global watermark from the job manager.
The next obvious question that needs answering is how each of these conditions are met.
#1: How does the subtask get too far ahead of the global watermark?
While a subtask’s watermark can, under normal circumstances, get too far ahead (or in our case 10 minutes ahead) of the slowest subtask’s watermark due to skew, this is not the case here. As a result of normal processing, the subtask’s local watermark is a few minutes ahead of the global watermark, something to be expected with 1800 subtasks.
In this case, the subtask gets too far ahead of the global watermark as a result of the global watermark moving backwards in time several minutes.
What causes the global watermark to move backwards in time?
The global watermark moves backwards in time as a result of prolonged CPU throttling among a series of subtasks. As both the logs and CPU throttling metrics suggest, many subtasks hit this stop-the-world scenario where operations are no longer occurring. In this next example, we’ll show how this leads to a global watermark that seems to reverse in time.
Example
Thus, by the global watermark moving backwards, as opposed to the local watermark moving forward naturally, it becomes very possible for a subtask to instantaneously reach a point where it can no longer emit records, meeting condition #1 of the deadlock state.
#2: Why is the subtask marked idle?
A Kinesis Consumer subtask is marked idle when two conditions occur:
- The emit queue is empty.
- No records have been written to the emit queue in the last 60 seconds.
Extended CPU throttling is a unique case where these conditions have the possibility to hold true while records as still being written to the shard by a producer. The conditions hold as follows:
- According to the logs, the queue is empty when waking from CPU throttling and thus when entering CPU throttling, a by chance occurrence that experiences increased probability with lower queue size and higher Kinesis poll times as we have configured.
- Due to CPU throttling, no records have been written to the emit queue for several minutes.
It is through this logic that we understand why a subtask is marked idle; however, an important question still remains.
Why does the subtask remain idle?
Shortly after CPU throttling stops, we’d expect the emit queue to fill and relieve us of our idle state. This would then allow the global watermark to be updated for the subtask and our subtask would resume emitting data as usual, no longer hindered by the event time alignment mechanism.
Note, however, that above it states the conditions for a subtask to be marked idle. It does not state the conditions for a subtask to be marked not idle. To be marked not idle, it is required that the emitted watermark has progressed, which, as mentioned above, is not the case due to the deadlock state.
Chapter 7: The Deployment
At this point, we have a single subtask in a deadlock state for 5 days. No data is being consumed from its corresponding shard. All other behavior appears normal, including the consumption of other shards from corresponding subtasks.
What occurs during a deploy?
On deploy, Lyft’s FlinkK8sOperator performs the following:
- Takes a savepoint
- Stops the job
- Starts a new cluster
- Submits the job to the new cluster with the savepoint
From the Flink Kinesis Consumer’s perspective, the following occurs on submission of the new job:
- Shards are assigned to subtasks
- Previous shard offsets are distributed from the savepoint
- Subtasks begin consuming from shards
- Subtasks recalculate the global watermark
- Data processing continues normally
Example
For this reason, we see metrics for only a single shard’s iterator age as these metrics are only produced when records are retrieved from the corresponding Kinesis shard.
Chapter 8: The Hard Reset
This issue was temporarily alleviated by the “hard reset”, or starting the job without a savepoint. As no state is associated upon initialization, each subtask begins reading from the latest record of each shard. Thus, the deadlock is also relieved and the event time alignment mechanism does not prevent any subtask from processing its corresponding shard.
To mitigate data loss, we started a separate job where each subtask begins reading its corresponding shard from a specific point in time. In our case, this was set to the point in time prior to the original deployment.
Chapter 9: The Fallout
We originally assumed we had mitigated all data loss, however, this was not the case. There was but 1 fickle shard we failed to save. Recall a single subtask was in a deadlock state for 5 days. The backfill to mitigate the incident prevented data loss from the time of deployment. As a result, data loss occurred for the 1 of 1800 shards, corresponding to 1/1800 of data, over a 5 day period.
As the Kinesis stream had a corresponding 7 day retention period, 2 days after the “hard reset”, our unprocessed data in the shard had unfortunately fallen out of retention, far prior to concluding the aforementioned root cause.
Chapter 10: Preventative Measures
To wrap up our investigation, we asked how we could detect such a case sooner as well as avoid this possibility in the future. We took a multi-pronged approach and performed the following:
- Root Cause Fix [FLINK-29099]: Alleviate the deadlock by retrieving the updated global watermark when the subtask is idle.
- Additional Monitoring: On a per-shard level, monitor that shards being produced to are also being consumed from the corresponding Flink application.
- Mitigate CPU Throttling: Given that this application had occasional high levels of CPU usage and thus CPU throttling, we decomposed the application into smaller applications and performed additional analysis into the root cause of high CPU usage.
Closing Notes
Here at Lyft, we know this unique issue we’ve experienced will not be our last. In fact, new and exciting challenges arise everyday. As such, we plan to continue to share these experiences with the engineering community.
Additionally, we are actively looking for innovative engineers to help us take our streaming platforms to the next level! I would love to connect with you to share more on what we do and discuss opportunities that excite you!
Relevant Posts
- Get insights on how we overcame data skewness in streaming applications at Lyft.
- Learn more about various use cases of streaming at Lyft in this blog post.
- Check out this blog post of the evolution of streaming pipelines in Lyft’s Marketplace.