Enabling Offline Inferences at Uber Scale
At Uber we use data from user support interactions to identify gaps in our products and create better, more delightful experiences for our users. Support interactions with customers include information about broken product experiences, any technical or operational issues faced, and even their general sentiment towards the product and company. Understanding the root cause of a broken product experience requires additional context, such as details of the trip or the order. For example, the root cause for a customer issue about a delayed order might be due to a bad route given to the courier. In this case, we would want to attribute the poor customer experience to courier routing errors so that the Maps team can fix the same.
Initially, we had manual agents review a statistically significant sample from resolved support interactions. They would manually verify and label the resolved support issues and assign root cause attribution to different categories and subcategories of issue types. We wanted to build a proof-of-concept (POC) that automates and scales this manual process by applying ML and NLP algorithms on the semi-structured or unstructured data from all support interactions, on a daily basis.
This article describes the approach we took and the end-to-end design of our data processing and ML pipelines for our POC, which optimized the ease of building and maintaining such high scale offline inference workflows by engineers and data scientists on the team.
Problem
For our POC, we wanted to build out a reporting dashboard powered by offline ML inferences to identify the root causes of the issues from all support interactions from the previous day and attribute them to appropriate categories and sub-categories. We wanted the dashboard to be automatically refreshed each day. Once released, the reporting dashboard will provide the insights to different product teams to understand their product issues and customer support footprint.
We first reviewed our existing end-to-end setup for such ML workflow, mostly built and maintained by data scientists on the team. We noticed that most were data analysis in Jupyter® Notebooks that were executed manually in an ad-hoc manner. The final reporting and presentation from those notebooks were done on UIs like Streamlit. This was far from ideal from a maintenance perspective because:
- Reports went stale as there was no cadence for refresh
- Reports were impacted by data pipeline issues due to resource constraints in scheduled jobs
- Reports were impacted by data quality issues such as upstream tables not getting populated in time
- Model accuracy decreased with time impacting the output SLAs
We realized that the above issues were a mix of data-infrastructure-related aspects and model-performance-related aspects of the end-to-end system setup. While data scientists love iterating on features and improving the model performance, they don’t want to get bogged down by pipeline and infrastructure management such as resource pool allocation for their jobs. Similarly, while the data engineers love working on high-throughput prediction systems, they lack the domain knowledge to iterate on features or models if model performance begins to dip.
For our project, we wanted to design a system that will address the above gaps by decoupling the data from data science with low maintenance overhead and faster iteration for improvements. More specifically, our core goals for our design was:
- A refresh cadence can be set for inference execution to keep the reports fresh with SLAs
- Abstract away infra layer and data pipeline maintenance issues from data scientists
- Enable better handling of impact from data quality issues from upstream tables
- Ensure ML model accuracy does not go stale time impacting the output SLAs
In short, the framework would be helpful to our data scientists for their analysis work. Additionally, as part of the solutioning, we wanted to use Uber’s internal platforms and tooling whenever possible.
The Solution
Our end-to-end system design can be broken down into three components:
- ML platform for model hosting and training
- Model execution setup with monitoring and alerting
- Handling high scale batch inferences
ML Platform for Model Hosting and Training
Uber’s Michelangelo Platform provides an easy way to run batch training and inference on datasets. However, it only supported limited models out of the box (Apache Spark™ MLlib algorithms and XGBoost). Michelangelo PyML was introduced to fix the flexibility tradeoffs that came with Michelangelo. Broadly, it gives users the ability to export custom models as docker images and then load them into Michelangelo.
In our test models, deeper models (BiLSTM) were giving us better accuracy than the MLlib and XGBoost models. Hence, we decided to go ahead with an approach that could let us train custom models. Custom model support also offered us more flexibility for the future – e.g., even if we switch to using XGBoost models, we wouldn’t have been able to run neural net training in Michelangelo.
Picking the Uber Michelangelo Platform in our design ensured that we were not introducing a new or different framework for model hosting and training. Thus it would not add any migration overhead for anyone wanting to use our end-to-end framework.
Figure 1: Overview of the PyML architecture. First, a model’s artifacts and dependencies get uploaded to and versioned by Michelangelo’s (MA) backend. Afterwards, a Docker image gets built which can be deployed as a Docker container for online predictions or leveraged to run large scale offline predictions. See here for details.
Model Execution Setup with Monitoring and Alerting
We had to fetch data from our warehouse (Apache Hive™) via a query, execute predictions in batch, and save inferences for consumption by our reporting dashboard. While the custom model support approach with Michelangelo PyML gave us flexibility to run our own models, batch predictions via a query weren’t available in Michelangelo PyML.
We could write custom data pipelines (using Piper, Uber’s data workflow management tech) that would make API calls to effectively hydrate the correctly predicted granular and subgranular root causes. However, that was not an ideal framework for data analysts and scientists as they had to rely on engineers to set up and manage these Piper pipelines.
As an alternative, we explored uWorc (Universal Workflow Orchestrator). The tool has a simple drag-and-drop interface that can manage the entire life cycle of a batch or streaming pipeline, without having to write a single line of code. With uWorc, it’s simple to create complex data extraction and transformation queries, supply them as input to a batched prediction workbench notebook, then output the result through another pipeline – all with minimal to no code. Workbench bundles are just bundled Jupyter notebooks that can be run with a bash-like command on being scheduled.
Additionally, uWorc also allows us to set up a cadence for the workflow with options to set up sufficient alerting and monitoring.
Figure 2: uWorc has a drag and drop workflow editor. See here for details.
After evaluating the pros and cons, we chose to run the batched inference in a workbench notebook, set up on uWorc.
We also wanted to avoid adding a series of complex joins from within the notebook. Hence we changed the logic to read the input data features from an input intermediate Hive table, which would in turn be populated by a Hive component in the same uWorc workflow. This ensured that the notebook only has to do a simpler “SELECT * FROM intermediate_table” query.
In the end, our uWorc workflow orchestrated the following:
- Population of an intermediate Hive table to gather all the input features for the models
- Once above step is successful, trigger the workbench notebook for batched prediction
The separation of fetching features and execution of batch inferences helps isolate the failures into different uWorc components, which can then be manually re-run and avoid duplicate executions. For example, if the hive query succeeds but the notebook bundle fails, only the notebook bundle needs to be re-run.
With this setup we were already able to achieve a majority of the goals for the framework. We could configure a refresh cadence for inference execution via uWorc to keep the reports fresh with SLAs by adding alerting and monitoring for the overall job. The split up of the feature extraction queries and the ML inference execution logic via an intermediate Hive table allows for easier and better debugging in case of failures and enables quick root cause identification to either upstream tables or to inference job issues. We still had the challenge of having the design meet the scaling requirements – to handle millions of offline batch inferences on each execution.
Handling High Scale Batch Inferences
Since there are upwards of one million predictions per execution, using Apache Spark™ was a natural choice. The workbench notebook already supports a SparkMagic kernel, which connects to Drogon (Uber’s Spark Job Server) and for all Spark-as-a-Service efforts at Uber.
Figure 3: Architecture of “Spark-as-a-Service” at Uber.
While implementing our batch inference job, we realized that since Spark’s driver and the executors run in the shared Yarn cluster, they couldn’t access custom packages installed in the Jupyter notebook. Fortunately, SparkMagic’s config allows the specification of a Docker build, which can have all the Python packages that the notebook needs to execute, so we took this route with our notebook bundle.
Our notebook bundle was scheduled on Peloton (Uber’s unified resource scheduler). The bundle then connects to Drogon via the SparkMagic kernel, with the config provided in the notebook and executes the PySpark code in the notebook on the Spark driver/executor. The model files, pickled tokenizers, input transformers, etc., are all put in the Hadoop file system, so that they are available to the Spark executors.
Below is a sample code that highlights the overall structure and framework from our approach.
Figure 4: Sample code framework for executing offline batch inferences.
Other Considerations
Once we productionized our first model for the proof of concept, we felt the need to account for backfilling, maintaining freshness, and having quick retry capabilities to deal with failures and outages. We then enhanced our design to include those capabilities.
Backfilling and ML Model Versioning
When the ML models are iteratively improved, we may want the previous predictions in the output inference table, predictions from the older models, to also be predicted using the newer version of the ML model. For example, our ML classifier accuracy was very low (only 60%) and we wanted to enable iterative ML model improvements as a plug-and-play. Since our pipeline was set up to run once everyday, we wanted to re-do the inferences on the last month of data with a newer version of our ML model in order to fix any mis-attribution of support issues to incorrect sub-categories.
To help with the backfilling, the inference table in our framework has a field that determines the model version that predicted this, so that in the future iterations, when we improve the model’s accuracy, we can backfill the existing contacts in the system with a new model version identifier, and even compare the increase in performance, if need be.
Feature Freshness and Inference Consistency
The input query that fetches all the features needed for the model depends on a number of upstream tables, all of which have different freshness SLAs. Plus, we noticed that fetching the entire feature dataset for making inferences includes non-trivial processing time and isn’t instantaneous.
To ensure idempotency in our query executions with respect to consistent data in the output, we updated the query to fetch the required features with data being older than the maximum freshness SLA of all the dependent tables.
Retries and Inference Consistency
To ensure that any retries to handle failures and outages don’t result in inconsistent/duplicate data, we updated the query to use the already available {{execution_date}} parameter from the uWorc workflow, so that only the data between {{execution_date}} – max_SLA – 1 and {{execution_date}} – max_SLA is given as an input to the inference model.
Visualization
For our reporting dashboard, we wanted a visualization tool that was performant and able to handle large volumes of data with basic charting capabilities. We decided to use Google Data Studio (GDS) to build our reporting dashboard by initially consuming the inferences directly from the output Hive table. We realized that it was not able to meet our performance requirements for loading the visualizations under a few seconds to apply any filter action. Hence, we pushed the Hive output and other relevant attributes for visualization onto Big Query and then consumed it on our GDS reporting dashboard.
Ensuring ML Model Accuracy
On our continuous model accuracy goals, by capturing the ML model versions in the output table in our design, we allow for an easy way to track the inference predictions back to the model and this enables different ways to determine the accuracy of the model.
Since we were building a proof of concept, we were not actively iterating on the model accuracy goal. However, we realized that we could leverage our existing agents (who were manually performing the root-cause attribution) for measuring and maintaining model performance over time with a “human-in-the-loop” set up. Instead of using the agent labeling data directly for manual reports, we could compare their labels with our model’s inferences to evaluate model performance continuously and prevent the model from aging decay.
Summary
Our design has been successful in meeting our set goals, specifically around low maintenance overhead for the ML notebooks for teams at Uber. We already see adoption of our design and framework for other projects by ML engineers and data scientists on the team.
Acknowledgements
We would like to acknowledge Rahul Bhatnagar who contributed significantly to the Product Insights POC project and to this blog during his time at Uber as a Senior Software Engineer on the Customer Obsession team.
Apache®, Apache Spark™, Apache Hive™, Hive™, and Spark™ are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries. No endorsement by The Apache Software Foundation is implied by the use of these marks.