Preon: Presto Query Analysis for Intelligent and Efficient Analytics

Presto™ is an open source SQL query engine used on a large scale at Uber. Uber has around 20+ Presto clusters comprising over 12,000 hosts. We have about 7,000 weekly users and run about half a million queries per day. Presto has various use cases at Uber like ad hoc interactive analytics, ETL and batch workloads, dashboarding, data quality checks, report generation, experimentation, and data-driven services. Due to the scale of the system, there are various opportunities to make  it more efficient. However, these opportunities need intelligence regarding the queries being processed by the system.

Presto is a query engine and provides an SQL interface for running queries, but there are many cases where we need to be able to analyze queries in order to get specific actionable insights. Some examples are:

  • To be able to analyze predicates used to query tables – This can be used to reformat (sort, bucket, partition) the tables on those columns leading to less data being read from the backend during query execution and faster and more efficient queries.
  • To be able to ascertain the tables/columns being read/written in the query – This can be used to route queries to specific clusters based on availability of those tables in specific regions of the Uber data lake or to do permissions checks.
  • To determine the type of a query – For example, DDL/DML to route ETL queries to certain clusters.
  • To ascertain what is the most recent modification time of any of the datasets being queried – This can be used to ascertain if results from a previous run of the query are still valid and can be reused instead of rerunning the query.
  • To fingerprint queries – There are many use cases where we need to know if a query is the same or similar to a previous query. The fingerprint has to be immune to whitespaces and comments in the query and thus just a string hash does not work very well. Finding similar queries (e.g., queries with only the literal values changed are even more difficult.)
  • To validate queries – It is important to fail queries with syntax errors early, at the time of submission itself rather than the query having to get through the queue and fail when it starts running.

All these analyses need to be able to parse the query structurally. Before we created Preon, all query analysis was done in the context of executing the query in a Presto cluster and on completion the analysis results were published to a Presto Query Events topic along with other stats related to the query execution. Also, logging any arbitrary information was difficult since the information had to be generated in the course of executing the query (i.e., either in the analysis/planning/optimization or execution phase and carried forward to the point of query completion). Also, the analysis code had to be carried as a private patch in the main Uber Presto repo, which could become problematic when upgrading Presto.

Query Execution and Query Analysis are two different services with very different form factors, as summarized below:

Query AnalysisQuery Execution
Needs to only parse/analyze/optimize queryNeeds to parse/analyze/plan/execute query 
Needs to only be a standard microserviceRuns in distributed clusters on bare metal (for performance), multiple clusters per DC
Any information can be logged at any timeLogging is limited to query completion; any information to be logged has to be brought to that point
Standard CI/CD for microservices; changes can be deployed fastMaking changes is slow; needs to be deployed after land on every cluster separately
Each type of analysis can be a separate endpointOnly a single query execution endpoint
An analysis can return a result, log the result to a DB, or return a transformed queryThe only output is the query execution results
An analysis can involve multiple queries (e.g., to find common structures between them)Each query is run independently
Analysis can be done offline (e.g., by tailing a Kafka topic on query completions)Online by definition
We can do the analysis using the Presto modules as librariesWould involve making changes in the core engine code
Service can be scaled automatically horizontally by the Uber auto-scaler based on trafficStatic configuration
Does not need powerful machinesPresto coordinators need powerful hosts

The requirements for the query analysis service were:

  1. It should be able to run in a microservice form factor in the Uber Stateless Microservices platform (UP).
  2. The service should have a minimal CPU/memory footprint.
  3. The service should have 100% coverage for all Presto query shapes at Uber (including in-house UDFs).
  4. The service should have 100% coverage for the entire Uber Data Lake (i.e., support for all Presto connectors and catalogs).
  5. The analysis should be done based on the current state of the Uber Data Lake by talking to the production catalogs.
  6. The analyzed query should be similar to the query that would be in production (i.e., with all the transformations and optimizations that would happen during query execution).
  7. Ability to parse non-Presto query shapes was not a requirement.
  8. Fast development times (i.e., any new analysis should not require a change in the Presto modules, but could be done in the microservice itself).

The query parsing could have been done using an Apache Calcite ™ parser. This would have had the advantage of being able to parse non-Presto query shapes, too. However, it had the following drawbacks:

  1. Apache Calcite might not be able to parse all Presto query shapes.
  2. We not only need to be able to parse the query, but analyze and even optimize the query using the Presto production optimization rules.
  3. We wanted to be able to share code (e.g., query fingerprinting logic) between the Production Presto and Preon as needed.

The service was designed in the shape of a microservice that creates a mini Presto server with various REST interfaces added to enable different analyses. Since the Presto server would not need to talk to any workers, all HTTP clients and task communication machinery was removed from the server.

Image

Figure 1: Preon as a microservice

The Preon service uses Presto modules from the Uber Presto repo in order to do the analysis. We have had to make minimal changes to the Presto modules for them to be used by Preon. In addition, we also had to create a presto-preon module in Presto to give us the bindings to instantiate this mini Presto Server.

Preon provides internal APIs to:

  • Create query statement i.e. just parse the Query
  • Analyze query i.e. resolve table and columns against the production data lake
  • Create a Plan for a Query with full Planning Optimizers
  • Create a Plan for a Query with a given list of Optimizers

In general the analyzers then implement the analysis as a visitor on the in memory parsed/analyzed/optimized query. Various analysis endpoints have been created in Preon using these internal APIs. Some example analysis are:

  • Predicate Analysis: This endpoint takes a query and returns the predicate columns that are used to filter data from a table and their predicate values. A query such as “select * from some_table where uuid = 20” would return a predicate of <table:some_table, column: uuid, value: 20>. Only single value and range value predicates are returned.

  • LastUpdate: This endpoint takes a query and returns the last modification time of any of the datasets read in the query. It does this by creating a query plan (the same plan that would be created by production Presto) and for each leaf tablescan node, gets the last modification time as registered in HMS. Optionally the endpoint can look further into the modification time of files in Uber Data lake as well.

  • Materialized view: The endpoint ascertains whether a given query is candidate for improvement using a materialized view. To meet this criteria, the query should have been aggregated by one partition column throughout the query. Then the query can be potentially sped up by materializing the computation for a partition only once.

  • Structural validation of a query e.g. syntax errors

  • Getting the list of tables/columns read/updated by the given query.

Since Preon is a micro service hosted in the Uber UP infrastructure, it enjoys many of the benefits of the ecosystem such as:

  • Automatic horizontal  auto scaling of the # of Preon instances based on utilization
  • Load balancing of the requests between the instances
  • Ability to vertically scale the instances
  • Automated logging, monitoring and alerting in case of service failure, CPU utilization and latency spikes
  • Different environments for different classes of traffic e.g. staging, critical, non-critical
  • Standard CI/CD
  • Infrastructure support to scaffold new API calls

In addition, the latency and reliability of each endpoint in Preon is monitored separately through additional metrics. Though Preon provides reliability SLA at par with production Presto clusters, yet the endpoints are integrated into the callers in a fail open manner so that the query flow still continues even if an endpoint call fails or times out.

Though Preon provides many benefits as mentioned above, it also creates the additional responsibility of keeping the Presto used in Preon in sync with the Uber Presto. The Uber Presto code sits in a separate repository to make it easier to upgrade from open source. Preon code meanwhile sits in the main Uber java repository to make better use of developer tools.  So, we have to make sure that the Preon uses the latest artifacts from the Uber Presto repo. Using a new artifact sometimes requires changes in analysis code as well. Not using the latest artifacts can cause analysis to fail.  For example, if any new SQL functions or UDFs have been added in the Uber Presto, then the query validation (using Preon) would fail if the query is using those new functions/UDFs unless the Presto modules used by Preon are also upgraded. Towards this end, we have created a rotation to upgrade Preon Presto on a regular cadence.

There are many production use cases at Uber that require Preon to get insights about queries. Two major use cases of Preon are Query result caching and Data layout formatting.

Preon provides an API (LastUpdate) for the Upstream callers to check the validity of the previous runs of the query. There are two upstream callers at Uber that use this functionality in order to reduce redundant query traffic. In the case of the first caller, the hash of every executed query is stored in Redis with a TTL of 24 hrs. When a new query is submitted and its hash is present in redis, then Preon is consulted to see if the most recent modification time of any of the datasets in the query is prior to its last execution time. If yes, then the previous results are returned to the caller. Since the underlying data has not changed, it is safe to return the previous results. Additionally, sometimes just matching the query hash is not sufficient. For example when a query is reading data over a moving window e.g. last 7 days, then while the query would look similar to a previous one, but the data being read may not be exactly the same and hence we have to exclude such queries from this feature.

 This feature has been running for over 2 years at Uber and we have seen around 5 – 7% of queries getting deduplicated due to this feature. Given that Presto at Uber runs about 500k queries per day, this is a significant reduction in # of queries.

Image

Figure 2: Number of queries deduplicated by count and by percentage

Every Presto query executed at Uber is post processed through the Preon Predicate Analysis endpoint. The post processing is done  by tailing the Presto Query Events topic and making a call to Preon and logging the results in another kafka topic.

Image

Figure 3: Table Format Recommender system

Then an offline analysis is done periodically to examine what columns are often used as predicates to query the top tables. Based on the values used for the predicates and the cardinality of those columns, a recommendation is generated about which column the data in the table should be sorted on and expected read savings by sorting the data. The read savings are realized because of predicate pushdown in Presto where Presto can skip reading sections of files or entire files because the min/max statistics in the file footer does not match the value that is being used for filtering the data. We have seen significant reduction in the amount of data read from just a couple of tables after sorting them based on the recommendation.

Image

Figure 4: Weekly Data read savings from the Data Layout Reformatting.

In the first case, the weekly data read from the table reduced from 4PB per week to about 2PB per week and in the second case the reduction was from 40PB per week to about 20PB per week.

Bucketing is another option to consider but it might require backfill of data which can be costly. Also, since changing the table layout would also need compute resources, the benefit from the query savings need to be weighed against the cost of reformatting and thus this might only make sense for the heavily read tables.

Some other use cases that currently use Preon are:

  • Query validation i.e. checking queries for structural issues before Query is accepted.
  • Permission check i.e. rejecting query early if the user does not have permissions on the tables/columns.
  • Routing queries to Cloud clusters based on the presence of tables in the Cloud.

Preon currently gets invoked multiple times in the lifetime of a Presto query execution at Uber.

Image

Figure 5: Preon calls in Query Processing Workflow at Uber

We also plan to use Preon for cost based routing and automatically enabling certain optimizations such as CTE materialization when the query analysis shows significant cost benefit from it. Another potential use case is just in time scheduling of ETL workloads based on when the input partitions become available. This will need insights regarding what partitions are being read by a query.

We believe that query analysis should be promoted as a first class citizen and query engines should provide custom pluggable analysis facilities. While there has been some progress regarding query rewrite capabilities within the engine, however as we have shown above in many cases, insights are needed regarding a query before its execution. We hope to see developments regarding standalone query analysis in the open source community and be able to contribute to it.

Presto is a registered trademark of LF Projects, LLC in the United States and/or other countries. No endorsement by The LF Projects, LLC is implied by the use of this mark. Kafka and Apache Calcite is a trademark of Apache Software Foundation in the United States and/or other countries.

首页 - Wiki
Copyright © 2011-2024 iteam. Current version is 2.137.1. UTC+08:00, 2024-11-22 15:00
浙ICP备14020137号-1 $访客地图$