Enhancing Distributed System Load Shedding with TCP Congestion Control Algorithm

Load shedding is a common problem in event driven systems. But even more problematic when that load needs to be prioritized according to different priorities. Here we present how we solved this problem using a well known algorithm that is used in TCP congestion control.

photo of Andrew Meleka
Andrew Meleka

Software Engineer

photo of Stefan Litsche
Stefan Litsche

Principal Software Engineer

Posted on Apr 23, 2024

A busy port where hundreds of containers wait to be loaded to ships or trailers, photo by CHUTTERSNAP on Unsplash

Introduction

Our team is responsible for sending out communications to all our customers at Zalando - e.g. confirming a placed order, informing about new content from a favourite brand or announcing sales campaigns. During the preparation of those messages as well during sending those out via different service providers we have to deal with limited resources. We cannot process all requested communication as fast as possible. This leads occasionally to some backlog of requests.

But not all communication is equally important. The business stakeholders have requested to ensure that we process the communication which supports critical business operations within the given service level objectives (SLOs).

This has led us to investigate the space of solutions for load shedding. Load shedding has been addressed in Skipper already. But our system is event driven, all requests we process are delivered as events via Nakadi. Skipper's feature does not help here. But why not use the same underlying idea?

We know if our system runs within its normal limits that we meet our SLOs. If we would control the ingestion of message requests into our system we would be able to process the task in a timely manner. Additionally we would need to combine this control of ingestion with prioritization of those requests which support critical business operations.

Overview of the System

First, let me introduce you to the system under the load.

Communication Platform Overview

Communication Platform Overview


Nakadi is a distributed event bus that offers a RESTful API on top of Kafka-like queues. This component serves a couple of thousands of event types published by different teams Zalando wide for different purposes. Out of those more than 1000 different event types trigger customer communication.

The Stream Consumer is the microservice that acts as the entry point for the events into the entire platform. It is responsible for consuming the events from Nakadi, applying few processing, and pushing them to the RabbitMQ broker. Every Nakadi event type is processed by an instance of the Event Listener.

RabbitMQ is a message broker and should be considered as the backbone of our platform. It is responsible for receiving the events from stream consumer and making them available for the downstream services.

Our Platform consists of many services. These microservices are responsible for processing the events. This includes but is not limited to:

  • Rendering messages (both push notification & email)
  • Checking for the customers' consent, preference and blocklist
  • Checking for the customers' eligibility
  • Storing templates and different Zalando's tenants' configurations

Inside the platform, we have a lot of components that are interacting with each other, and the communication between those components is done mostly via RabbitMQ.

Each service will be publishing to 1 or more exchanges, and consuming from 1 or more queues, the same applies to the other services, so we have a lot of communication going on between the services, and RabbitMQ is the middleman for all of that.

High Level Design

High Level Architecture

We know that having suitable backlog size behind each application, can guarantee their scaling out as well as the best throughput, then we can achieve our SLOs. The system has capabilities to adjust the resources acquired from kubernetes based on the demand (using a scaling mechanism based on CPU/memory/endpoint-calls/backlogs).

We consider the whole platform as a system with an interface, and we strive to protect it at the interface level, by avoiding overwhelming that system with messages that it can't handle in proper time. This means we can steer the ingestion based on the priority and the available capacity of the system.

Stream Consumer will implement the adaptive concurrency management using Additive Increase Multiplicative Decrease (AIMD). This algorithm reacts to the reduced service capacity. Whenever congestion is detected, the request rate is reduced by a multiplier.

We needed to find proper indicators for the reduced service capacity. The Stream Consumer publishes the messages to RabbitMQ, so we have been looking for some indicators available from RabbitMQ. As the first indicator we decided to use errors. Whenever we can’t publish we should reduce the consumption rate. The second is more subtle. RabbitMQ is able to apply back-pressure when slow consumers are detected and the system resources are consumed too fast. In this case RabbitMQ will slow down the publish rate which the publisher will experience in the increase in the publish time. Stream Consumer will observe those metrics and adjust the consumption rate.

Reducing the consumption for all event types would help to run the system within its limits, but it does not prioritize the critical ones yet. The component shall be able to adjust the rate of how fast stream consumer consume events from Nakadi selectively. Therefore every event-type will get assigned a rate based on its priority and the system load. It shall ensure that every reader gets its dedicated capacity assigned. If there is more capacity available the system will adjust accordingly and provide a higher rate to events which have a higher demand (backlog).

Thus it's not needed to determine the tipping point throughput for a single service. The AIMD algorithm also adapts increased capacity after scaling the system. Most importantly, the algorithm requires a local variable only, which avoids central coordination like a shared database.

By following this approach we

  • Avoid multiple changes in all the microservices by scoping it to one component.
  • Achieve prioritization on the service consumption level, hence avoid the need to prioritize messages inside the platform.
  • Get a scalable solution with no single point of failure.
  • Use Nakadi to persist the backlog, hence reducing risk to overload RabbitMQ.

We will need to tune the actual value (latency of publishing to RabbitMQ) used as an indicator for reducing ingestion. It should have enough load on the system to trigger scaling of services in the platform as well as reduce the number of messages stored in RabbitMQ.

Low Level Design

Changes in Stream Consumer

Changes in Stream Consumer
  • Statistics Collector Collects the statistics about the latency (e.g. P50) publishing to RabbitMQ as well as any exception thrown while publishing.
  • Congestion Detector It decides whether there is any congestion in the system or not (depending on the fact of latency availability or exceptions thrown), based on the data it receives from the statistics collector and comparing them with the configured numbers in the service.
  • Throttle Provided as an instance per each consumer. This is the class that implements the AIMD algorithm. It should be instantiated by the consumer providing it with the priority of that event, that priority then will affect the increase/decrease of the permitted events/sec that can be consumed.

How the Design Works

  1. When the Stream Consumer starts, all the event listeners start with an initial consumption batch size. They will also instantiate a throttle instance.
  2. The statistics collector cron job kicks in, collecting some statistics about latency (P50) and exceptions, and then calls the congestion detector to provide the results.
  3. The congestion detector checks the data it receives, and makes a decision whether there is congestion or not by comparing the data received with the limits set in the configurations. Congestion detector passes its decision to all the throttles associated with each event listener through an observer pattern.
  4. The throttle, once called, and depending on the decision from the congestion detector as well as the priority it was given when the consumer started, will decide the new batch size using the AIMD. (Note: there is no coordination between different throttles!).
  5. As modifying the batch size is currently not supported natively by Nakadi, the application will slow down/speed up the consumption accordingly.

How priorities affect the events consumption speeding up/slowing down

Let’s suppose that we have 3 priorities in our system, from P1 to P3, where P1 is the highest, P3 is the lowest. Stream consumer should already have a defined number for the speeding up/slowing down in the configurations per each priority.

First scenario, signal for consumption speeding up (relieved RabbitMQ cluster)

  • For each priority, there will be a defined value for the speeding up, let’s assume some numbers here:
    • P1: 15
    • P2: 10
    • P3: 5
  • So the new consumption rates (batch sizes) will be:
    • P1: Previous value + 15
    • P2: Previous value + 10
    • P3: Previous value + 5

Additive Increase

Additive Increase


Second scenario, signal for consumption slowing down (RabbitMQ cluster under load)

  • Here also, per priority, different value for slowing down should be set, let’s assume here those numbers:
    • P1: 20% decrease
    • P2: 40% decrease
    • P3: 60% decrease
  • So the new consumption rate will decrease by the following percentages:
    • P1: Previous value * (20% (P1)) => 20% decrease
    • P2: Previous value * (40% (P2)) => 40% decrease
    • P3: Previous value * (60% (P3)) => 60% decrease

Multiplicative Decrease

Multiplicative Decrease


So, the rule of thumb here is:

  • Whenever the RabbitMQ cluster is not under load, we speed up the consumption rate for all consumers, but we give more capacity for higher priority event types, more than less priority event types.
  • Whenever the RabbitMQ cluster is under load, we slow down the consumption rate by a percentage for all the consumers, but those with high priority decrease by much fewer percentage compared to those with less priority.

Results

So far, we have been running the solution in production for around 6 months, and we have seen a lot of improvements in the platform, including:

  1. Less stress on RabbitMQ cluster, as the messages are not pushed to it unless there is enough capacity to handle them. RabbitMQ Messages

    RabbitMQ Messages

    Around 300k messages in one of the application's queue backlog, the other applications are not under load, that's obvious from the few number of messages in their queues. The reduced stress on RabbitMQ cluster is also visible comparing the number of messages in the queues with the number of messages in the backlog in Nakadi (point 3 below).

  2. Prioritization of messages, higher priority messages are sent first, and lower priority messages are sent later.

    Order Confirmation Processing Time

    Order Confirmation Processing Time

    Commercial Messages Processing Time

    Commercial Messages Processing Time

    In the above diagrams, you see that the processing time for order confirmation is relatively stable. This is important as it’s a high priority use case. In contrast, commercial messages experience an increase in the processing time. This is acceptable as this is a low priority use case.

  3. Events that can't be processed at the moment are still in Nakadi, so they can be processed later or easily discarded in case of emergency.

    Nakadi Backlog

    Nakadi Backlog

    As we can see, the backlog is being consumed without putting pressure on the platform. Messages of lower priority can be discarded in case of emergency.

    Nakadi Order Confirmation Backlog

    Nakadi Order Confirmation Backlog

    The order confirmation is a P1 priority message, so it's being consumed first (during the same period less priority messages were growing in the backlog).

Conclusion

Utilizing the TCP congestion control algorithm to control traffic proved to be effective in event driven systems. In general, it's much better to control how much traffic is ingested into your system from the source, rather than letting it flood the system and then trying to deal with it.

In our case, it helped us to solve the problem of prioritization of messages, messages are only allowed to enter the system based on their priority and the capacity the system can handle. It also helped us to avoid using the RabbitMQ cluster as a storage for millions of messages - with a smaller queue size in RabbitMQ we follow best practices. In case of emergency, we can easily discard messages, as most of them will still be in the source.

Resources

Stop Rate Limiting! Capacity Management Done Right | Strange Loop Conference | 2017



Related posts