Scaling Uber’s Elasticsearch as an Geo-Temporal Database
如果无法正常显示,请先停止浏览器的去广告插件。
        
                相关话题:
                                    #Elasticsearch
                            
                        
                1. Scaling Uber’s Elasticsearch as an Geo-Temporal Database
Danny Yuan @ Uber
            
                        
                2. Use Cases for a Geo-Temporal Database
            
                        
                3. Real-time Decisions on Global Scale
            
                        
                4. Dynamic Pricing: Every Hexagon, Every Minute
            
                        
                5. Dynamic Pricing: Every Hexagon, Every Minute
            
                        
                6. Metrics: how many UberXs were in a trip in the past 10 minutes
            
                        
                7. Metrics: how many UberXs were in a trip in the past 10 minutes
            
                        
                8. Market Analysis: Travel Times
            
                        
                9. Forecasting: Granular Forecasting of Rider Demand
            
                        
                10. How Can We Produce Geo-Temporal Data for Ever Changing Business Needs?
            
                        
                11. Key Question: What Is the Right Abstraction?
            
                        
                12. Abstraction: Single-Table OLAP on Geo-Temporal Data
            
                        
                13. Abstraction: Single-Table OLAP on Geo-Temporal Data
SELECT			<agg	functions>,	<dimensions>	
FROM					<data_source>
WHERE				<boolean	filter>
GROUP	BY	<dimensions>
HAVING			<boolean	filter>
ORDER	BY	<sorting	criterial>
LIMIT				<n>
            
                        
                14. Abstraction: Single-Table OLAP on Geo-Temporal Data
SELECT			<agg	functions>,	<dimensions>	
FROM					<data_source>
WHERE				<boolean	filter>
GROUP	BY	<dimensions>
HAVING			<boolean	filter>
ORDER	BY	<sorting	criterial>
LIMIT				<n>
            
                        
                15. Why Elasticsearch?
- Arbitrary boolean query
- Sub-second response time
- Built-in distributed aggregation functions
- High-cardinality queries
- Idempotent insertion to deduplicate data
- Second-level data freshness
- Scales with data volume
- Operable by small team
            
                        
                16. Current Scale: An Important Context
- Ingestion: 850K to 1.3M messages/second
- Ingestion volume: 12TB / day
- Doc scans: 100M to 4B docs/ second
- Data size: 1 PB
- Cluster size: 700 ElasticSearch Machines
- Ingestion pipeline: 100+ Data Pipeline Jobs
            
                        
                17. Our Story of Scaling Elasticsearch
            
                        
                18. Three Dimensions of Scale
Ingestion
Query
Operation
            
                        
                19. Driving Principles
- Optimize for fast iteration
- Optimize for simple operations
- Optimize for automation and tools
- Optimize for being reasonably fast
            
                        
                20. The Past: We Started Small
            
                        
                21. Constraints for Being Small
- Three-person team
- Two data centers
- Small set of requirements: common analytics for machines
            
                        
                22. First Order of Business: Take Care of the Basics
            
                        
                23. Get Single-Node Right: Follow the 20-80 Rule
- One table <—> multiple indices by time range
- Disable _source field
- Disable _all field
- Use doc_values for storage
- Disable analyzed field
- Tune JVM parameters
            
                        
                24. Make Decisions with Numbers
- What’s the maximum number of recovery threads?
- What’s the maximum size of request queue?
- What should the refresh rate be?
- How many shards should an index have?
- What’s the throttling threshold?
- Solution: Set up end-to-end stress testing framework
            
                        
                25. Deployment in Two Data Centers
- Each data center has exclusive set of cities
- Should tolerate failure of a single data center
- Ingestion should continue to work
- Querying any city should return correct results
            
                        
                26. Deployment in Two Data Centers: trade space for availability
            
                        
                27. Deployment in Two Data Centers: trade space for availability
            
                        
                28. Deployment in Two Data Centers: trade space for availability
            
                        
                29. Discretize Geo Locations: H3
            
                        
                30. Optimizations to Ingestion
            
                        
                31. Optimizations to Ingestion
            
                        
                32. Dealing with Large Volume of Data
- An event source produces more than 3TB every day
- Key insight: human does not need too granular data
- Key insight: stream data usually has lots of redundancy
            
                        
                33. Dealing with Large Volume of Data
- Pruning unnecessary fields
- Devise algorithms to remove redundancy
- 3TB —> 42 GB, more than 70x of reduction!
- Bulk write
            
                        
                34. Data Modeling Matters
            
                        
                35. Example: Efficient and Reliable Join
- Example: Calculate Completed/Requested ratio with two different event streams
            
                        
                36. Example: Efficient and Reliable Join: Use Elasticsearch
- Calculate Completed/Requested ratio from two Kafka topics
- Can we use streaming join?
- Can we join on the query side?
- Solution: rendezvous at Elasticsearch on trip ID
TripID Pickup Time Completed
1 2018-02-03T… TRUE
2 2018-02-3T… FALSE
            
                        
                37. Example: aggregation on state transitions
            
                        
                38. Optimize Querying Elasticsearch
            
                        
                39. Hide Query Optimization from Users
- Do we really expect every user to write Elasticsearch queries?
- What if someone issues a very expensive query?
- Solution: Isolation with a query layer
            
                        
                40. Query Layer with Multiple Clusters
            
                        
                41. Query Layer with Multiple Clusters
            
                        
                42. Query Layer with Multiple Clusters
- Generate efficient Elasticsearch queries
- Rejecting expensive queries
- Routing queries - hardcoded first
            
                        
                43. Efficient Query Generation
- “GROUP BY a, b”
            
                        
                44. Rejecting Expensive Queries
- 10,000 hexagons / city x 1440 minutes per day x 800 cities
- Cardinality: 11 Billion (!) buckets —> Out Of Memory Error
            
                        
                45. Routing Queries
"DEMAND": {
"CLUSTERS": {
"TIER0": {
"CLUSTERS": ["ES_CLUSTER_TIER0"],
},
"TIER2": {
"CLUSTERS": ["ES_CLUSTER_TIER2"]
}
},
"INDEX": "MARKETPLACE_DEMAND-",
"SUFFIXFORMAT": “YYYYMM.WW",
"ROUTING": “PRODUCT_ID”,
}
            
                        
                46. Routing Queries
"DEMAND": {
"CLUSTERS": {
"TIER0": {
"CLUSTERS": ["ES_CLUSTER_TIER0"],
},
"TIER2": {
"CLUSTERS": ["ES_CLUSTER_TIER2"]
}
},
"INDEX": "MARKETPLACE_DEMAND-",
"SUFFIXFORMAT": “YYYYMM.WW",
"ROUTING": “PRODUCT_ID”,
}
            
                        
                47. Routing Queries
"DEMAND": {
"CLUSTERS": {
"TIER0": {
"CLUSTERS": ["ES_CLUSTER_TIER0"],
},
"TIER2": {
"CLUSTERS": ["ES_CLUSTER_TIER2"]
}
},
"INDEX": "MARKETPLACE_DEMAND-",
"SUFFIXFORMAT": “YYYYMM.WW",
"ROUTING": “PRODUCT_ID”,
}
            
                        
                48. Routing Queries
"DEMAND": {
"CLUSTERS": {
"TIER0": {
"CLUSTERS": ["ES_CLUSTER_TIER0"],
},
"TIER2": {
"CLUSTERS": ["ES_CLUSTER_TIER2"]
}
},
"INDEX": "MARKETPLACE_DEMAND-",
"SUFFIXFORMAT": “YYYYMM.WW",
"ROUTING": “PRODUCT_ID”,
}
            
                        
                49. Summary of First Iteration
            
                        
                50. Evolution: Success Breeds Failures
            
                        
                51. Unexpected Surges
            
                        
                52. Applications Went Haywire
            
                        
                53. Solution: Distributed Rate limiting
            
                        
                54. Solution: Distributed Rate limiting
Per-Cluster Rate Limit
            
                        
                55. Solution: Distributed Rate limiting
Per-Instance Rate Limit
            
                        
                56. Workload Evolved
- Users query months of data for modeling and complex analytics
- Key insight: Data can be a little stale for long-range queries
- Solution: Caching layer and delayed execution
            
                        
                57. Time Series Cache
            
                        
                58. Time Series Cache
- Redis as the cache store
- Cache key is based on normalized query content and time range
            
                        
                59. Time Series Cache
- Redis as the cache store
- Cache key is based on normalized query content and time range
            
                        
                60. Time Series Cache
- Redis as the cache store
- Cache key is based on normalized query content and time range
            
                        
                61. Time Series Cache
- Redis as the cache store
- Cache key is based on normalized query content and time range
            
                        
                62. Time Series Cache
- Redis as the cache store
- Cache key is based on normalized query content and time range
            
                        
                63. Time Series Cache
- Redis as the cache store
- Cache key is based on normalized query content and time range
            
                        
                64. Delayed Execution
- Allow registering long-running queries
- Provide cached but stale data for such queries
- Dedicated cluster and queued executions
- Rationale: three months of data vs a few hours of staleness
- Example: [-30d, 0d] —> [-30d, -1d]
            
                        
                65. Scale Operations
            
                        
                66. Driving Principles
- Make the system transparent
- Optimize for MTTR - mean time to recover
- Strive for consistency
- Automation is the most effective way to get consistency
            
                        
                67. Challenge: Diagnosis
- Cluster slowed down with all metrics being normal
- Requires additional instrumentation
- ES Plugin as a solution
            
                        
                68. Challenge: Cluster Size Becomes an Enemy
- Elasticsearch cluster becomes harder to operate as its size increases
- MTTR increases as cluster size increases
- Multi-tenancy becomes a huge issue
- Can’t have too many shards
            
                        
                69. Federation
- 3 clusters —> many smaller clusters
- Dynamic routing
- Meta-data driven
            
                        
                70. Federation
            
                        
                71. Federation
            
                        
                72. Federation
            
                        
                73. Federation
            
                        
                74. Federation
            
                        
                75. Federation
            
                        
                76. How Can We Trust the Data?
            
                        
                77. Self-Serving Trust System
            
                        
                78. Self-Serving Trust System
            
                        
                79. Self-Serving Trust System
            
                        
                80. Self-Serving Trust System
            
                        
                81. Too Much Manual Maintenance Work
            
                        
                82. Too Much Manual Maintenance Work
- Adjusting queue size
- Restart machines
- Relocating shards
            
                        
                83. Auto Ops
            
                        
                84. Auto Ops
            
                        
                85. Ongoing Work for the Future
            
                        
                86. Future Work
- Strong reliability
- Strong consistency among replicas
- Multi-tenancy
            
                        
                87. Summary
- Three dimensions of scaling: ingestion, query, and operations
- Be simple and practical: successful systems emerge from simple ones
- Abstraction and data modeling matter
- Invest in thorough instrumentation
- Invest in automation and tools