Uber’s Next Gen Push Platform on gRPC

In our last blog post we talked about how we went from polling for refreshing the app to a push-based flow to build our app experience. 

All our apps need to be synced with real-time information, whether it’s through pickup time, arrival time, and route lines on the screen, or nearby drivers when you open the app. We use our push platform to deliver these messages that power the real-time user experiences as described in our previous post, which we strongly recommend that you review to learn about the details of the architecture before proceeding. 

This blog post will cover how we changed our protocol from Server Sent Events (HTTP1.1) to gRPC-based bidirectional streaming (QUIC/HTTP3), the challenges we faced, the final results, and some key learnings. 

Motivation to Move to gRPC 

In this section we will talk about the reasons to move RAMEN (Real-time Asynchronous MEssaging Network) from SSE (Server-Sent Events) to gRPC as the protocol for delivering messages. 

Before jumping into that let’s have a quick look at how we built RAMEN using SSE as the underlying protocol. This will help in understanding the changes that we made to enable gRPC at each layer. 

Existing SSE-based Architecture

Here is a quick overview of the client’s implementation: 

Figure 2: RAMEN SSE Architecture frontend

  1. Mobile RAMEN SSE Client initiates a connection to RAMEN Server via the SSE /ramen/receive endpoint, and maintains a persistent connection. 
  2. RAMEN Server pushes all messages to the mobile client through the SSE connection as and when they are generated.  
  3. Mobile Ramen module then notifies consumers, who are the end users of these messages (built by feature teams), when new messages are received. 
  4. Ramen Server also sends a heartbeat message to the mobile side every 4 seconds. The mobile client assumes that the connection is broken and reconnects if it sees no heartbeat or messages up to 7 seconds. 
  5. Mobile Ramen module sends batched acknowledgements to Ramen Server via another endpoint /ramen/ack every 30 seconds.

Figure 3: RAMEN SSE Architecture backend

As a quick recap from part 1, here is how the server side has been implemented: 

  1. Streamgate service implements the RAMEN Protocol on Netty and has all the logic related to handling connections, messages, and storage. Redis®* and  Apache Cassandra® are used for storing messages.

  2. StreamgateFE service acts as an Apache Helixᵀᴹ Spectator and listens to topology changes from Apache ZooKeeperᵀᴹ. It implements a reverse proxy.

  3. Helix Controller, as the name suggests, is a five-node standalone service solely responsible for running Apache Helix Controller processes and is the brain of topology management. Whenever any Streamgate node starts or stops, it detects a change and re-allocates the sharding partitions.

  4. Streamgate sends messages via the SSE endpoint and receives acknowledgement via the ack endpoint. 

Limitations of RAMEN SSE 

The RAMEN protocol based on SSE that we built is unidirectional and the only streaming endpoint exposed to our clients. RAMEN sends messages via event stream, however, message acknowledgments are delivered by regular RPC requests every 30 seconds. 

Reliability Concerns 

While this protocol was working well, there were certain limitations that we wanted to address. The delivery state of a message is unknown for up to 30 seconds after it is written to a RAMEN connection. A lot of critical messages like offers sent to drivers have a validity of 30 seconds. This prevents us from resending critical push messages like driver offers. 

We wanted to move towards an instant, real-time acknowledgment, and bidirectional streaming would provide the most efficient way to accomplish this. However, our protocol is built on top of HTTP 1.1 and the only way to support bidirectional streaming is by having 2 different unidirectional connections in opposite directions. This is suboptimal, as it would double the number of connections that infrastructure has to handle and it would also lead to race conditions where one connection is established while another is not or worse, it is established in a different data center. Another way to do this is to send a new HTTP ack request for each message when it is received. This results in a larger amount of data being uploaded due to headers, leading to higher upstream bandwidth usage.

Heartbeat/Connection Management 

Streaming reliability is very dependent on how we handle connections. The heartbeat mechanism is essential for knowing whether the connection is alive. Because of large buffers at various network proxies and lossy mobile networks, the only way to know if a connection is broken is heartbeats. In RAMEN protocol, heartbeats are sent on the same stream as messages. Hence those are subject to head-of-line blocking. If a large message is sent over a slow network, the Client may disconnect the connection if it does not see a heartbeat for a while. On the contrary, with protocols like HTTP2, heartbeats and control messages are sent over a different stream making it more efficient.

Binary Data

RAMEN operates over SSE and this protocol by default is text-based. We send escaped JSON separated by newline characters. So we cannot send binary payloads (images, speech, etc.) efficiently and reduce the size of regular payloads. We evaluated implementing a custom SSE implementation that sends binary, since we control the client side as well. However, this gets further into a non-standard protocol territory and will be painful to maintain in the long run. 

Leveraging QUIC/HTTP3 Features

QUIC/HTTP3 is now supported very widely and has great benefits like streams, multiplexing, heartbeat mechanism, binary, flow control, etc. We had QUIC/HTTP3 connections that terminated at the frontend infrastructure. So gateway layer services cannot leverage these QUIC/HTTP3 benefits. At the edge infrastructure layer, there is no application state awareness and hence it cannot fully leverage these features.

Connection Management and Polyglot Clients 

All mobile clients use custom SSE implementation that uses text parsing and pattern matching to extract individual messages within the stream. The client also performs connection lifecycle management using heartbeats and acks that can be tricky and hard to maintain.

The number of clients that use RAMEN were growing fast as the number of use cases and Lines of Business increased. We had written clients for RAMEN in various programming languages to support different types of apps. Changing protocol, iterating on features is becoming increasingly hard now. Some of these clients like mobile apps need further optimizations like flow control, payload size optimization, visibility improvements, etc. Without very solid client implementations, it is very hard to build and scale new protocols. 

Moving Towards gRPC

gRPC is a high-performance, widely adopted RPC framework with standardized implementations of client and server across many platforms and languages. The major reasons for moving towards gRPC are highlighted below:

Bidirectional Streaming

gRPC has first-class support for bidirectional streaming, which is the most attractive feature when considering RAMEN’s long term vision. The acknowledgement can be sent over the same stream instantly without extra networking calls from the mobile client. This could significantly improve the acknowledgement reliability for RAMEN.

Real-time acknowledgments allow us to measure the RTT of RAMEN messages and understand the networking conditions. They also helps the RAMEN backend to reduce the memory footprint of the message queue.

QUIC/HTTP3

QUIC/HTTP3 essentially  removes the head-of-line blocking, consistently and significantly improving mobile networking latency compared to HTTP2. As we learned from the previous QUIC experiments, QUIC brought us a 10-30 percent improvement in tail-end latencies for HTTPS traffic and we wished to leverage it here for RAMEN gRPC as well. 

gRPC can use Cronet as transport, which allows RAMEN to reuse the QUIC session from the real-time traffic, further reducing the latency for the first RAMEN message to the mobile side. 

Tooling

gRPC comes with great community support for tooling, CI/CD for performance monitoring, and polyglot support. In the long term, these benefits will simplify architecture a lot and creating new apps, clients, and protocols will become much easier.  

Advanced Use Cases

gRPC will make development of features like network prioritization and payload diffing easier. Although gRPC does not support prioritization as is, the bi-di streaming APIs will help us construct pseudo-prioritization on top of gRPC.

Data Modeling 

We used proto3 protobuf to define our contracts between server and client. Having a well-defined contract makes it easier for implementation in various RAMEN clients. thereby easing out migrations and ensuring fewer bugs.

**RPC Contract
**The RPC was defined to be a bidirectional endpoint, as we discussed in an earlier section. Both the server and client would keep streaming data back and forth. At a high level, the server would send messages and the client would respond back with acknowledgements. 

Figure 4: Ramen Contracts 

This is how we defined our Request and Response contract in our protobuf schema:

Figure 5: Ramen request response models 

Request Data Model

  1. SeqID is used for bookkeeping purposes where we store the sequence number against each message and then use it for tracking delivery. 
  2. Message acks are used for acknowledging messages and feature acks are used for acknowledgments sent by the feature team plugins that listen to the RAMEN message. 
  3. We also have control messages where the client can dictate any runtime changes to the server, such as termination of connection. We also plan to leverage it for advanced constructs like flow control and stream prioritization in the future as well. 

Response Data Model 

We have modeled the response in a generic way where a message can fall into any category such as RAMEN message, control message, or a heartbeat. 

  1. RAMEN messages contain all the actual messages that power various use cases.
  2. Control messages are used to indicate to the client to disconnect, etc.
  3. Heartbeats are sent out to indicate that the connection is healthy every 5 seconds. 
Sequence Diagram

Figure 6: RAMEN sequence diagram

Backend Changes

In this section we talk about the major changes that we had to make to enable gRPC on the server side.We decided to keep the same Streamgate service as the backend for the new RAMEN gRPC connections as well. Streamgate maintains a cache of messages and mailboxes in memory for performance and fail-open reasons. We did not want to create a separate runtime for gRPC based connections, as we will need to replicate all of this in 2 places per user, doubling the load and resource usage of the system. This would have also led to inconsistencies and more chances of impacting users.

Figure 7: RAMEN responsibilities and interactions

This was mostly a facade-level change and all the internal business logic remained the same such as Message Storage, Orchestration, Connection Lifecycle Management, and Flow Control. So it was possible to change this facade from HTTP to gRPC without many potential risks and rollback to the older stack in case we run into any issues. gRPC internal server implementation is based on Netty as well.  So we do not lose any benefits we had with gRPC migration. Our internal code can continue to use the byte buffers.  

Separate Runtime for Streamgate Frontend

StreamgateFE is the service that takes care of routing requests to the correct streamgate instance that is sharded by userID. For enabling gRPC, we decided to separate out the frontend runtimes to enable proxying of gRPC requests seamlessly. The gRPC traffic flows in via the new runtime and gets proxied to the right streamgate instance in the cluster just like the older flow for HTTP traffic (SSE protocol).

Figure 8: Streamgate Front end proxy architecture 

We used the GRPCproxy to implement the frontend layer for routing requests–StreamgateFE-gRPC (streaming and non streaming). We picked this implementation since it provided a simple interface to proxy requests coming into the frontend layer and integrated with our Helix layer to ensure sticky routing.  We plugged in the Helix backend routing into the gRPC proxy integration as described below

Figure 9: Front End proxy integration code block

The above code snippet from our gRPC proxy implementation showcases how we fetch metadata from the call (i.e., method name, user ID for routing the request) to figure out the right gRPC channel that has been established (if already) with the Streamgate instance, and then use that channel to route the request. 

Channel Management with gRPC

We build one gRPC channel per Streamgate host and both stream and unary requests go through the same channel. We assumed that gRPC can handle the mix of streaming and unary (RPC)  calls in the same channel when we started implementation and it worked well for us eventually. 

In this section we talk about the major changes we had to make on the client side to support the new RAMEN gRPC protocol. 

RAMEN gRPC Contract Implementation

In order to minimize the migration efforts for RAMEN’s consumers, the new RAMEN module was designed in such a way to keep the public interfaces intact. On the mobile client, we wrote a layer to interact with the new RAMEN gRPC client and parse/read messages and pass this on to the consumers. This ensured that the consumers can continue to use these RAMEN messages without any migration.

Figure 10: Ramen mobile architecture

RAMEN gRPC Connection Management

It is critical for the mobile client to establish a reliable connection with the backend, so that the backend can push messages to the mobile apps to keep the user engaged and notified whenever there are updates from Uber’s backend system.      

The RAMEN connection might be broken because of various reasons such as network error, backend disconnection, or heartbeat timeout. To manage and maintain the steady gRPC bi-di connection, the mobile client needs to monitor the connection status via RAMEN heartbeats. Because of heartbeat timeout or a disconnect control message received from the server if any of these disconnects happens, the RAMEN should be able to re-establish the connection within the exponential backoff period.

Network Stack Changes 

Figure 11: Ramen networking stack changes

RAMEN SSE was built on top of OkHttp and iOS networking libraries and consisted of various interceptors to support the core functionalities, such as failover and redirect, network monitoring, and headers and OAuth token enrichment. To migrate RAMEN to gRPC, we had to redesign the mobile networking stack to have the same functionality as the SSE stack.

Failover and Redirect

Captures network failures or redirects and makes a call on changing the hostname that will be used for subsequent network calls to the Uber server from the mobile app.

Headers and OAuth tokens are added on to all HTTP requests by an interceptor.

Network Monitoring

Captures network performance data, like end-to-end latency, time to create connection, time to TLS (Transport Layer Security), time to lookup host, response status code, network errors, request path, etc. This data is used heavily to monitor performance of network calls, regressions, outages, etc.

Figure 12: Ramen mobile architecture layers

Key Learnings 

Concurrency Issues

We had a single instance of streamObserver per connection that we uses to write messages into the connection. It is recommended (as per the gRPC specs) to synchronize the onNext method since it is not thread-safe. We ran into issues when we were not doing the same.

Message Flow Control

We wrapped our streamObserver using the ServerCallStreamObserver API provided by gRPC, which provides richer functionality than the native streamObserver APIs. Our main use case was that we wanted to start receiving acks from the client only after the connection is successfully bootstrapped, so we block any new messages from client to server until the first bootstrap message for connection initiation is complete.

Figure 13: ServerCallStreamObserver Integration

Graceful Shutdown Handling

Shutdown handling was another tricky part where we had to invoke all the active streamObservers’s cancel methods to ensure connection termination. So we wrapped it around in ShutdownHandler wrapper and kept a map of handlers that we had to terminate when a shutdown signal was received.

Handling Missing Callbacks

We noticed that we weren’t receiving client termination calls correctly in some cases. Our timer tasks like sending heartbeats were getting canceled when a connection callback was received. But in those cases where we were not getting a callback, we ran into cases where our tasks would run forever, leading to really high CPU and memory usage. We added a check to see if the stream was writable before trying to write into the stream each time and canceling any tasks that were trying to write into the stream.

Figure 14: Stream writable check

Payload Compression

In the development stage we observed a high rate of heartbeat timeout. After debugging we found some RAMEN message payload sizes are on the larger side (> 1MB), and in extremely slow cellular connectivity conditions (Edge or 3G) it would take 20 to 50 seconds to download the whole message. In these cases, the heartbeat message was blocked by these huge payloads, and further investigation showed that the backend did not enable compression for the payload. After the backend added the Gzip as compression for the payload, our experiment showed that it only took 5 seconds to download these huge payload messages.

We also investigated RAMEN SSE implementation, there was no such heartbeat timeout for such cases, as the lower-level protocol allows for reading chunks of the payload instead of waiting for the whole payload to arrive as seen in gRPC.

Fallback Mechanism 

We invested in building tooling on the mobile side for quickly falling back from gRPC to the SSE protocol. We ran into issues during our rollouts where we realized that having a robust fallback mechanism will enable us to mitigate major outages. The fallback layer was built to detect failures in connectivity via the gRPC stack and would quickly fallback to the SSE-based stack if needed. 

Results 

We have completely rolled out RAMEN over gRPC across all mobile apps (Rider, Driver, and Eats) worldwide on both Android and iOS. Some of the key results that we accomplished:

  1. gRPC Connect Latency (p95) has improved by a minimum of 45% as a result of all the above-mentioned changes. Features that rely on RAMEN can start early, as with improved connection latency they will receive the data early through RAMEN.
  2. Push success rates have increased by a minimum of 1-2% across all apps and the average number of messages sent per session has improved in RAMEN gRPC. 
  3. With real-time acknowledgments from clients, we have better visibility into RTT, unlocking opportunities to judiciously use the network bandwidth.
  4. We have a consistent implementation across all clients leading to fewer chances of misses and outages in the future. 

HTTP2 Directly

One of the ways to solve the above-mentioned problems is to create our own client/server implementation that can leverage all HTTP2 features and enable bidirectional streaming. While the HTTP2 specification is simpler to understand, it is far more difficult to implement all the functionality ourselves. It will take far too long to get there and ensuring consistency in implementation across various languages/platforms will be very painful. It will be almost like reinventing gRPC. 

Reactive Streams

Reactive sockets/streams is another RPC framework that addresses the same problems. However, there has not been much activity around its development and support. gRPC, on the contrary, is more mature and has a strong focus on performance and tooling. Since the rest of Uber was moving to gRPC, we felt it was a good idea to have the same framework end to end and reduce the cost of support for multiple frameworks.

Next Steps and Future Vision

The schema for communication between client and server is currently written using Thrift and the serialization over the wire is JSON. So we have a gRPC RAMEN connection, but the messages sent are all JSON encoded. We are working on an intermediate goal to start with sending binary-encoded messages on the wire, by encoding messages in wire but retaining thrift contracts and then incrementally get to a stage where we are in an end goal of protobuf encoding on the wire and proto contracts.

Leveraging gRPC streams 

We still have head-of-line blocking within the gRPC connection when there are large messages that clog the pipe and prevent other messages from going through. We plan to leverage multiple streams within a gRPC connection and have flow control and stream prioritization over these streams as described below to reduce blocking for critical messages. 

Flow Control and Stream Prioritization 

HTTP2 protocol describes few advanced features like network prioritization by setting stream priorities. Setting up higher priority for a stream will enable higher wire level prioritization of requests and responses. However, no implementation of HTTP2 today supports this feature, including gRPC-Java. Also, we do not have a great flow control mechanism today, where we can decide to buffer/hold messages when the network bandwidth is overwhelmed. We would like to prioritize and send messages based on network conditions.

We believe that these shortcomings can still be solved using application-level enforcement of priorities. There are various approaches to solve this problem, but it is difficult to make such a choice globally without proper experiments/data around it. However, gRPC itself makes development of higher order protocols easier and we can experiment with such ideas quickly. Now that we have rolled out RAMEN gRPC from HTTP, we will evaluate these network prioritization techniques. 

We would like to thank all the engineers who took on a collaborative effort to roll out gRPC across Uber apps globally. The Edge Streaming team in the Uber Bangalore site is leading the next steps towards the future vision. If you’re interested in solving high scale distributed systems problems that have direct business impact, please apply to join our team.

Apache®, Apache Cassandra®, Apache Helix, Apache Zookeeper, Apache Kafka, Helix, Zookeeper, and Cassandra® are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries. No endorsement by The Apache Software Foundation is implied by the use of these marks.

Redis is a registered trademark of Redis Ltd. Any rights therein are reserved to Redis Ltd. Any use by Uber is for referential purposes only and does not indicate any sponsorship, endorsement or affiliation between Redis and Uber.

All diagrams in this document have been created in Lucidchart (www.lucidchart.com)

Home - Wiki
Copyright © 2011-2024 iteam. Current version is 2.129.0. UTC+08:00, 2024-07-07 05:39
浙ICP备14020137号-1 $Map of visitor$