Elasticsearch Optimizations at Lyft

Stefan Zier
Lyft Engineering
Published in
6 min readFeb 23, 2021

--

Photo by Lyft

By Stefan Zier and Vinay Kakade

Introduction

At Lyft, we use an in-house Feature Service to store batch and streaming features used by ML models, making them accessible in both offline mode (for training) and online mode (for inference). The service replicates these features in Elasticsearch to enable advanced queries. For Growth products, we use Elasticsearch to power internal tools like our Customer Data Platform, which enables teams to define user cohorts based on myriad features. Under the hood, these cohorts are translated to Elasticsearch queries for use in targeting millions of customers in marketing campaigns. Elasticsearch also plays an integral role in allowing teams to assess the current state of a user and their membership to specific cohorts so that product experiences can be enhanced with real-time decision making.

In 2020, new Health & Safety and real-time initiatives proved our previous Elasticsearch SLAs challenging to maintain. With an influx of different use-cases every week, we quickly outgrew our existing cluster, latency increased, and AWS costs soared. In this article, we share the various ways we measured and altered our interactions with Elasticsearch to ultimately decrease latency and costs.

Query Optimizations

Our Elasticsearch cluster utilizes a long-lived index, where a document’s ID is known before the time of indexing and any document updates are made directly to the document source. There are roughly 300 million documents in one index with close to 4000 fields in its mapping. For each Lyft user, we have one document where the document ID matches the user’s ID. A document’s field mapping will be made up of feature names and values (e.g., user’s ride count, current region, whether or not the user has incentive credit, etc.).

To efficiently reach customers faster and more consistently, we first identified the types of Elasticsearch requests we were making and then started to performance test/tune our cluster:

  1. Elasticsearch Scroll API: Powers our marketing campaigns. The Scroll API provides us with paginated lists of users belonging to a specific cohort.
  2. Elasticsearch Count API: Used in determining “membership” to a specific cohort (ex. is UserA a San Francisco-based rider with at least 5 rides?). Additionally, the Count API helps determine the total number of users in a cohort.

Elasticsearch Scroll Performance Tuning

A scroll request will return a paginated list of documents, sorted by relevance, where each index contains the document ID (user’s ID) and all associated fields. In our case, we did not require sorting and only needed document IDs in the response, not the entire field mapping, to begin marketing campaign execution.

Using Apache JMeter for performance testing on a sandbox cluster that mirrored our production index setup, we learned:

  1. If document order does not matter, sort by _doc.
  2. When possible, omit _source entirely or return only required fields with stored_fields.

The following is a sample query used in performance testing. The query aims to aggregate unactivated riders with an expiring incentive that don’t have Lyft Pink:

POST /my-index/_search?scroll=1m{
"size": 300,
"query": {
"bool": {
"must": [
{
"range": {
"rides_in_28d": {"from": "0", "to": null}
}
},
{
"range": {
"credit_expires_at": {"from": null, "to": "now-1m"}
}
},
{
"match_phrase": {
"is_user_active": {"query": "true"}
}
}
],
"must_not": [
{
"match_phrase": {
"has_lyft_pink": {"query": "true"}
}
}
]
}
}
}

We observed the average latency for this query was ~2.7s. Then, we tested the query without returning all document fields by setting _source to False:

POST /my-index/_search?scroll=1m{
"_source": false,
"size": 300,
"query": { … }
}

Here, we observed latency dropped to ~1.2s. As expected, latency improved significantly with the reduction in response size, however, sorting with _doc in addition to disabling _source yielded even greater results:

POST /my-index/_search?scroll=1m{
"_source": false,
"size": 300,
"sort": ["_doc"],
"query": { … }
}

With these changes latency for Elasticsearch Scroll requests dropped to 307ms.

If your documents in Elasticsearch are large, then assembling many of them into memory might take a significant amount of time. After omitting _source and sorting by _doc in production, we found scroll latency immediately improved from >1s to 100–300ms, on average.

Scroll API p95 (ms)

Elasticsearch Count Performance Tuning

Thousands of times every second, the Elasticsearch Count API helps us determine a user’s membership to a specific cohort. In essence, we execute a query with a similar structure to the scroll request above however, we append a match_phrase clause containing a user’s ID. Given our cluster is organized such that there is one document per user ID, these count queries will return 0 (user is not in the cohort) or 1 (user is in the cohort).

Using the same query from the Scroll API testing in the previous section, we would add a match_phrase clause in the body for the Count API like so:

GET /my-index/_count{
"query": {
"bool": {
"must": [
{
"match_phrase": {
"_id": "<USER_ID>"
}
},
{
"range": {
"rides_in_28d": {"from": "0", "to": null}
}
},
{
"range": {
"credit_expires_at": {"from": null, "to": "now-1m"}
}
},
{
"match_phrase": {
"is_user_active": {"query": "true"}
}
}
],
"must_not": [
{
"match_phrase": {
"has_lyft_pink": {"query": "true"}
}
}
]
}
}
}

As described in Elasticsearch Scalability, each index of documents is split into shards and, per the Elastic Count API documentation, a single count query operation will be broadcasted across all shards. In our case, the cluster had 20 primary shards, meaning one count request would execute the query with 20 search operations. As previously mentioned, data in the cluster is stored on a long-lived index, meaning we are aware of the document’s ID before the time of indexing. Given our query above is only interested in a single document, there’s no reason to run the query on all shards. Instead, we can simply route the query to one shard where the document is located, costing us only 1 search operation.

To avoid extraneous search operations, when possible:

  1. Use the _routing field so a query is resolved to one shard
  2. Use terminate_after so query execution stops early when you’ve accumulated enough documents you’re interested in.

When testing the _routing field in JMeter, we did not notice an immediate latency improvement in a single query’s execution time however, we did see significant changes in reduced load when rolled out in the production cluster. Since count queries resolve to one shard now, search rate dropped and CPU utilization noticeably improved.

Avg. SearchRate / Min
Avg. CPU Usage

Summary

Consider fetching only the fields you need and sorting by _doc (if possible) in Elasticsearch Scroll requests while making use of _routing and terminate_after in Elasticsearch Count requests. These simple changes yielded performance improvements that ultimately helped us reduce cluster resources while ensuring we maintain SLAs. Since Elasticsearch performance is largely based on a variety of factors (document size, search operation rate, document structure, index size, etc.), it is recommended you test with tools like JMeter to accurately measure performance and tune to your needs.

Acknowledgments

Huge thanks to the following team members for their work in making this possible: Janani Sundarrajan, Leon Verdin and CD Wad

Interested in working with us? Check out our open roles, today!

--

--