Scaling Uber’s Elasticsearch as an Geo-Temporal Database
如果无法正常显示,请先停止浏览器的去广告插件。
1. Scaling Uber’s Elasticsearch as an Geo-Temporal Database
Danny Yuan @ Uber
2. Use Cases for a Geo-Temporal Database
3. Real-time Decisions on Global Scale
4. Dynamic Pricing: Every Hexagon, Every Minute
5. Dynamic Pricing: Every Hexagon, Every Minute
6. Metrics: how many UberXs were in a trip in the past 10 minutes
7. Metrics: how many UberXs were in a trip in the past 10 minutes
8. Market Analysis: Travel Times
9. Forecasting: Granular Forecasting of Rider Demand
10. How Can We Produce Geo-Temporal Data for Ever Changing Business Needs?
11. Key Question: What Is the Right Abstraction?
12. Abstraction: Single-Table OLAP on Geo-Temporal Data
13. Abstraction: Single-Table OLAP on Geo-Temporal Data
SELECT <agg functions>, <dimensions>
FROM <data_source>
WHERE <boolean filter>
GROUP BY <dimensions>
HAVING <boolean filter>
ORDER BY <sorting criterial>
LIMIT <n>
14. Abstraction: Single-Table OLAP on Geo-Temporal Data
SELECT <agg functions>, <dimensions>
FROM <data_source>
WHERE <boolean filter>
GROUP BY <dimensions>
HAVING <boolean filter>
ORDER BY <sorting criterial>
LIMIT <n>
15. Why Elasticsearch?
- Arbitrary boolean query
- Sub-second response time
- Built-in distributed aggregation functions
- High-cardinality queries
- Idempotent insertion to deduplicate data
- Second-level data freshness
- Scales with data volume
- Operable by small team
16. Current Scale: An Important Context
- Ingestion: 850K to 1.3M messages/second
- Ingestion volume: 12TB / day
- Doc scans: 100M to 4B docs/ second
- Data size: 1 PB
- Cluster size: 700 ElasticSearch Machines
- Ingestion pipeline: 100+ Data Pipeline Jobs
17. Our Story of Scaling Elasticsearch
18. Three Dimensions of Scale
Ingestion
Query
Operation
19. Driving Principles
- Optimize for fast iteration
- Optimize for simple operations
- Optimize for automation and tools
- Optimize for being reasonably fast
20. The Past: We Started Small
21. Constraints for Being Small
- Three-person team
- Two data centers
- Small set of requirements: common analytics for machines
22. First Order of Business: Take Care of the Basics
23. Get Single-Node Right: Follow the 20-80 Rule
- One table <—> multiple indices by time range
- Disable _source field
- Disable _all field
- Use doc_values for storage
- Disable analyzed field
- Tune JVM parameters
24. Make Decisions with Numbers
- What’s the maximum number of recovery threads?
- What’s the maximum size of request queue?
- What should the refresh rate be?
- How many shards should an index have?
- What’s the throttling threshold?
- Solution: Set up end-to-end stress testing framework
25. Deployment in Two Data Centers
- Each data center has exclusive set of cities
- Should tolerate failure of a single data center
- Ingestion should continue to work
- Querying any city should return correct results
26. Deployment in Two Data Centers: trade space for availability
27. Deployment in Two Data Centers: trade space for availability
28. Deployment in Two Data Centers: trade space for availability
29. Discretize Geo Locations: H3
30. Optimizations to Ingestion
31. Optimizations to Ingestion
32. Dealing with Large Volume of Data
- An event source produces more than 3TB every day
- Key insight: human does not need too granular data
- Key insight: stream data usually has lots of redundancy
33. Dealing with Large Volume of Data
- Pruning unnecessary fields
- Devise algorithms to remove redundancy
- 3TB —> 42 GB, more than 70x of reduction!
- Bulk write
34. Data Modeling Matters
35. Example: Efficient and Reliable Join
- Example: Calculate Completed/Requested ratio with two different event streams
36. Example: Efficient and Reliable Join: Use Elasticsearch
- Calculate Completed/Requested ratio from two Kafka topics
- Can we use streaming join?
- Can we join on the query side?
- Solution: rendezvous at Elasticsearch on trip ID
TripID Pickup Time Completed
1 2018-02-03T… TRUE
2 2018-02-3T… FALSE
37. Example: aggregation on state transitions
38. Optimize Querying Elasticsearch
39. Hide Query Optimization from Users
- Do we really expect every user to write Elasticsearch queries?
- What if someone issues a very expensive query?
- Solution: Isolation with a query layer
40. Query Layer with Multiple Clusters
41. Query Layer with Multiple Clusters
42. Query Layer with Multiple Clusters
- Generate efficient Elasticsearch queries
- Rejecting expensive queries
- Routing queries - hardcoded first
43. Efficient Query Generation
- “GROUP BY a, b”
44. Rejecting Expensive Queries
- 10,000 hexagons / city x 1440 minutes per day x 800 cities
- Cardinality: 11 Billion (!) buckets —> Out Of Memory Error
45. Routing Queries
"DEMAND": {
"CLUSTERS": {
"TIER0": {
"CLUSTERS": ["ES_CLUSTER_TIER0"],
},
"TIER2": {
"CLUSTERS": ["ES_CLUSTER_TIER2"]
}
},
"INDEX": "MARKETPLACE_DEMAND-",
"SUFFIXFORMAT": “YYYYMM.WW",
"ROUTING": “PRODUCT_ID”,
}
46. Routing Queries
"DEMAND": {
"CLUSTERS": {
"TIER0": {
"CLUSTERS": ["ES_CLUSTER_TIER0"],
},
"TIER2": {
"CLUSTERS": ["ES_CLUSTER_TIER2"]
}
},
"INDEX": "MARKETPLACE_DEMAND-",
"SUFFIXFORMAT": “YYYYMM.WW",
"ROUTING": “PRODUCT_ID”,
}
47. Routing Queries
"DEMAND": {
"CLUSTERS": {
"TIER0": {
"CLUSTERS": ["ES_CLUSTER_TIER0"],
},
"TIER2": {
"CLUSTERS": ["ES_CLUSTER_TIER2"]
}
},
"INDEX": "MARKETPLACE_DEMAND-",
"SUFFIXFORMAT": “YYYYMM.WW",
"ROUTING": “PRODUCT_ID”,
}
48. Routing Queries
"DEMAND": {
"CLUSTERS": {
"TIER0": {
"CLUSTERS": ["ES_CLUSTER_TIER0"],
},
"TIER2": {
"CLUSTERS": ["ES_CLUSTER_TIER2"]
}
},
"INDEX": "MARKETPLACE_DEMAND-",
"SUFFIXFORMAT": “YYYYMM.WW",
"ROUTING": “PRODUCT_ID”,
}
49. Summary of First Iteration
50. Evolution: Success Breeds Failures
51. Unexpected Surges
52. Applications Went Haywire
53. Solution: Distributed Rate limiting
54. Solution: Distributed Rate limiting
Per-Cluster Rate Limit
55. Solution: Distributed Rate limiting
Per-Instance Rate Limit
56. Workload Evolved
- Users query months of data for modeling and complex analytics
- Key insight: Data can be a little stale for long-range queries
- Solution: Caching layer and delayed execution
57. Time Series Cache
58. Time Series Cache
- Redis as the cache store
- Cache key is based on normalized query content and time range
59. Time Series Cache
- Redis as the cache store
- Cache key is based on normalized query content and time range
60. Time Series Cache
- Redis as the cache store
- Cache key is based on normalized query content and time range
61. Time Series Cache
- Redis as the cache store
- Cache key is based on normalized query content and time range
62. Time Series Cache
- Redis as the cache store
- Cache key is based on normalized query content and time range
63. Time Series Cache
- Redis as the cache store
- Cache key is based on normalized query content and time range
64. Delayed Execution
- Allow registering long-running queries
- Provide cached but stale data for such queries
- Dedicated cluster and queued executions
- Rationale: three months of data vs a few hours of staleness
- Example: [-30d, 0d] —> [-30d, -1d]
65. Scale Operations
66. Driving Principles
- Make the system transparent
- Optimize for MTTR - mean time to recover
- Strive for consistency
- Automation is the most effective way to get consistency
67. Challenge: Diagnosis
- Cluster slowed down with all metrics being normal
- Requires additional instrumentation
- ES Plugin as a solution
68. Challenge: Cluster Size Becomes an Enemy
- Elasticsearch cluster becomes harder to operate as its size increases
- MTTR increases as cluster size increases
- Multi-tenancy becomes a huge issue
- Can’t have too many shards
69. Federation
- 3 clusters —> many smaller clusters
- Dynamic routing
- Meta-data driven
70. Federation
71. Federation
72. Federation
73. Federation
74. Federation
75. Federation
76. How Can We Trust the Data?
77. Self-Serving Trust System
78. Self-Serving Trust System
79. Self-Serving Trust System
80. Self-Serving Trust System
81. Too Much Manual Maintenance Work
82. Too Much Manual Maintenance Work
- Adjusting queue size
- Restart machines
- Relocating shards
83. Auto Ops
84. Auto Ops
85. Ongoing Work for the Future
86. Future Work
- Strong reliability
- Strong consistency among replicas
- Multi-tenancy
87. Summary
- Three dimensions of scaling: ingestion, query, and operations
- Be simple and practical: successful systems emerge from simple ones
- Abstraction and data modeling matter
- Invest in thorough instrumentation
- Invest in automation and tools