Tools and Techniques for Load Testing Ingestion Pipeline

Bharat Gulati
Myntra Engineering
Published in
9 min readMar 11, 2024

--

Background

Myntra, a leading fashion-oriented E-commerce platform in India, offers a diverse range of products, from face-wash to designer shoes, mugs to carpets, catering to enhancing personal style and surroundings. The expansive catalog and dynamic user activity on the platform result in frequent changes to product metadata, including pricing, inventory, and ranking. To ensure the search results on Myntra’s Search Engine stay consistently up-to-date, a robust system of topologies hosted over Apache Storm powers the ingestion workflow to our search data-stores.

As the complexity of product use-cases grew, we found ourselves managing numerous intricate integrations to support ingestion. In anticipation of the next phase of expansion, we recognised the need to engineer a reliable method for load-testing and benchmarking our ingestion ecosystem. Our goal was to identify bottlenecks and scale them efficiently to prevent any impact on SLAs.

This post explores the challenges associated with load testing an Ingestion Pipeline and delves into the solutions we developed to address these challenges effectively.

Pre-requisites

This blog presupposes that the reader possesses familiarity with the following technologies and their terminology. If not, we recommend reviewing the pertinent documentation to acquire a brief understanding of their concepts before proceeding.

  1. Apache Storm (https://storm.apache.org/)
  2. Apache Kafka (https://kafka.apache.org/)
  3. Gatling (https://gatling.io/)

Problem Statement

The load-testing framework for an Ingestion Pipeleine must encompass the following capabilities:

  1. Ability to generate synthetic test events to be used for load-testing
  2. Ability to send synthetic events to production components and trigger workflows (i.e. topologies logically chained together, refer Fig. 1) as well as individual topologies.
  3. Ability to trigger workflows via multiple interfaces as applicable — Http Endpoints (wrapper service over Kafka producer), Kafka Integration, Database writes (for CDC use-case), etc.
  4. Test events should not impact any activity of production users by mutating the actual product metadata which the search engine uses.
  5. Load testing should validate the actual underlying hardware, i.e. no separate test clusters to avoid sources of inconsistencies owing to differences across environments
  6. Ability to abort a running load test by a single click (kill-switch) in case an issue is observed.
  7. Ability to horizontally scale up load generation systems to achieve the desired incident traffic.
  8. Ensure that the cost impact of performing a load test is minimal.
  9. Ability to purge test data in a timely manner if needed.
  10. Ability to handle topologies which have fan-in (N input events merged to form 1 output event) and fan-out (1 input event exploded to N output events) pattern.
  11. Ability to monitor key metrics for a workflow — throughput, event conversion ratio (num source events/ num sink events), E2E latencies, Error rate, etc.
Fig. 1 Sample Workflow demonstrating multiple topologies chained to achieve the overall ingestion

Solution

In pursuit of the aforementioned goals, we devised the following High-Level Plan through a series of iterative brainstorming sessions, ensuring alignment with our stated criteria.

Fig. 2 High-Level Solution Diagram

(1) Event Collector

This layer incorporates a cron job that intercepts production event traffic, randomly samples a subset of events daily, compiles them into a CSV file, and uploads the file to a cloud store. For Kafka topics, a new consumer is initiated to listen to the same topic. As for services, a Request Log Filter has been implemented, logging incoming requests, including path parameters, query parameters, headers, and the body, into a log file. This log feed is seamlessly integrated into our Centralised Logging Platform, subsequently publishing it to a Kafka topic which serves as our integration point.

(2) Load-Test UI and Generators

Myntra has a Centralised load-testing platform built over top of Gatling, a popular load-testing framework especially for load testing of services. We continued to leverage the same by introducing KafkaProtocol which we leverage to publish the messages to a Kafka Topic as part of the Simulation.

To allow tracking of the event movement across the workflow, we ensure that the test events/requests are stamped with important metadata information in the header:

  1. load-test-id: helps distinguish between events generated by different load test script executions
  2. src-timestamp: indicates the time at which the event was created in the load generation script. This header will then be used by the receiving components within the topology to identify total time elapsed since inception when they encountered the event to give a cumulative latency view.

(3) Source

To mitigate any interference with regular production event traffic, our approach involves co-hosting test topics alongside their production counterparts. We suffix the test topics with “_test” and implement aggressive retention policies, facilitating seamless auto-cleanup. This strategy ensures a clear segregation between test events and source events, enabling efficient gate-keeping if there’s a need to promptly turn off the test flow.

(4) Processing Workflow

As previously outlined, a workflow comprises of one or more topologies logically chained together. Each of these topologies requires specific modifications tailored to their use cases to render them suitable for load testing, guided by the thumb rules defined below.

Fig. 3 Topology Modifications

Add test spouts corresponding to every production spout that exists in the topology. Likewise for every sink bolt (i.e. which writes to a datastore and doesn’t emit any further tuples)

For simpler topologies (say less than 3 components) duplicate all components to achieve name-spacing (right side of Fig. 3)

For complex topologies — keep the internal transformation bolts shared so as to better utilise the resources. To ensure sinks are name-spaced add a header-aware router bolt which separates out the stream and directs it to the relevant sink bolts. (refer left side of Fig. 3)

Ensure that test spouts are integrated with kill-switch present in config-service in the following manner. This will help ensure that the test spout becomes inactive as soon as the kill-switch is hit and stops producing any further events while the production spouts continue to function as usual.

class MySpout extends BaseRichSpout {
...

protected transient Supplier<Boolean> isActiveFlowSupplier;
protected final Lock activeFlowLock;
protected final long schedulerInitialDelayMillis;
protected final long schedulerDelayMillis;

private transient ScheduledExecutorService scheduledExecutorService;
private transient ScheduledFuture<?> scheduledFuture;
private transient volatile boolean isActiveFlow;

@Override
public void open(Map stormConf, TopologyContext context, SpoutOutputCollector collector) {
// logic for setting up the spout

isActiveFlowSupplier = buildIsActiveFlowSupplier(); // create isActiveFlowSupplier
isActiveFlow = false;
scheduledExecutorService = Executors.newScheduledThreadPool(1);
}

@Override
public void nextTuple() {
if (!isActiveFlow) {
return;
}
// logic for emitting tuples
}

@Override
public void activate() {
if (isActiveFlowSupplier == null) {
doActivate();
return;
}

scheduledFuture = scheduledExecutorService.scheduleWithFixedDelay(this::activeStateChangeListener,
schedulerInitialDelayMillis, schedulerDelayMillis, TimeUnit.MILLISECONDS);
}

@Override
public void deactivate() {
if (isActiveFlowSupplier == null) {
doDeactivate();
return;
}

if (scheduledFuture != null) {
scheduledFuture.cancel(false);
scheduledFuture = null;
}

activeFlowLock.lock();
try {
if (!isActiveFlow) {
return;
}
doDeactivate();
isActiveFlow = false;
} finally {
activeFlowLock.unlock();
}
}

@Override
public void close() {
try {
super.close();
} finally {
if (scheduledExecutorService == null) {
return;
}
scheduledExecutorService.shutdown();
scheduledExecutorService = null;
}
}

protected void doActivate() {
// logic for activating the spout
}

protected void doDeactivate() {
// logic for deactivating the spout
}

protected void activeStateChangeListener() {
activeFlowLock.lock();
try {
boolean newState = Boolean.FALSE != isActiveFlowSupplier.get();
if (newState == isActiveFlow) {
return;
}
if (newState) {
doActivate();
} else {
doDeactivate();
}
isActiveFlow = newState;
} finally {
activeFlowLock.unlock();
}
}
}

Ensure that the sink bolts are integrated with kill-switch in the following manner. This will help reduce the stress on the systems as soon as the kill-switch is hit by silently ack-ing any pending tuples in its queue. Ensure to have a check inside the loop construct for fan-out bolts.

class MySinkBolt extends BaseRichBolt {
...

protected transient Supplier<Boolean> isActiveFlowSupplier;

@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
// logic for setting up the bolt
isActiveFlowSupplier = buildIsActiveFlowSupplier(); // create isActiveFlowSupplier
}

@Override
public void execute(Tuple input) {
if (isActiveFlowSupplier != null && Boolean.FALSE == isActiveFlowSupplier.get()) {
if (!TupleUtils.isTick(input)) {
getOutputCollector().ack(input);
}
return;
}

// actual logic of bolt execution

// in case of fan-out bolt where a loop might exist
// while (condition) {
// if (isActiveFlowSupplier != null && Boolean.FALSE == isActiveFlowSupplier.get()) {
// break;
// }
// actual transformation logic and emitting
//}
// getOutputCollector().ack(input);
}
}

Ensure fan-in bolts are either name-spaced or their bucketing logic is header aware to segregate test events from production events.

Ensure that header metadata stamped in tuples is being passed forward by all spouts and bolts and written to the Kafka headers or database record (as per the schema definition) by the sink bolts

Ensure that metrics are being emitted by the spouts and bolts pertaining to throughput, cumulative latencies since the event originated, success/failure, etc. To seamlessly achieve the same in a non-intrusive manner, the following lifecycle APIs of Apache Storm were used:

  1. org.apache.storm.kafka.spout.KafkaTupleListener: for sending Kafka Spout metrics at the following lifecycle stages — emit, ack.
  2. org.apache.storm.hooks.TaskHook: for sending Spout, Bolt metrics at various lifecycle stages — emit, ack, fail, etc.
  3. org.apache.storm.hooks.WorkerHook: for initialising metrics http endpoint from which prometheus collected could scrap the metrics and other common components.

Related contributions made to Storm Github by us to improve Worker Hooks integration: https://github.com/apache/storm/pull/3546, https://github.com/apache/storm/pull/3547

# flux definition of my-topology.yaml

name: "my-topology"

config:
topology.auto.task.hooks: ["com.abc.framework.MyTaskHook"]
...

components:
- id: "kafkaTupleListener"
className: "com.abc.framework.MyTupleListener"

- id: "spoutConfigBuilder"
className: "org.apache.storm.kafka.spout.KafkaSpoutConfig$Builder"
...
configMethods:
- name: "setTupleListener"
args:
- ref: "kafkaTupleListener"

...

spouts:
...

bolts:
...

streams:
...

workerHooks:
- id: "my-worker-hook"
className: "com.abc.framework.MyWorkerHook"

(5) Config Service

A service which hosts configs which can be changed in runtime and leads to changes in behaviour of running production systems pertaining to the implementation surrounding those conditions. For our use-case, this config-service holds the kill-switch flag to which all our topologies are integrated. Since Myntra already had a pre-existing dedicated config-service we went ahead with the same, otherwise the same could be achieved by Zookeeper (and other alternatives etcd, consul, spring cloud config, etc.)

(6) Sink

To safeguard against corrupting production data and mitigate the potential cascading impact on production read traffic, we introduced namespace segregation in the database schema. Myntra’s search engine utilises Apache Lucene-based Document Datastore and also writes some product metadata information to a key-value store. In the case of key-value store, we opted to prefix keys with “test_” to distinguish test records and included an expiry mechanism for automatic cleanup of test data. For document stores, a separate test collection was created on the same cluster for operational simplicity.

(7) Metrics Backend

The metrics produced by different components within the workflow are consolidated in our Centralised Metrics System, utilising Prometheus as the Metric Store and Grafana for dash-boarding. This setup enables us to monitor both production and load test events seamlessly in a unified location, facilitated by selecting the appropriate filters based on load-test-id and event type labels.

(8) Synthetic Read Traffic

By directing test data to a distinct test collection in our sink stores, devoid of organic read traffic, the write performance can notably deviate from its production counterpart. To address this, we’ve created scripts to simulate synthetic read traffic that closely mirrors patterns experienced in production. This must run concurrently as a background Load Test Suite alongside the primary Ingestion Load Test Suite.

Conclusion

With the outlined skeleton design, we successfully integrated load-testing capabilities into our existing search ingestion pipeline, tailored to our specific requirements and ecosystem. This adaptation has enabled us to pinpoint crucial watermark levels at which the system experiences performance constraints, and we are actively addressing these issues in the upcoming quarter.
However, it’s essential to acknowledge that the implemented design is not flawless and comes with its set of recognised limitations:

  1. Interference with actual measurements resulting from introduction of name-spaced components and an increase in payload size due to metadata.
  2. Black-box testing constraints as existing workflow must undergo changes before it can be onboarded to the load test suite.
  3. Technology-specific constraints — The strategies outlined above may not be directly applicable to ingestion systems utilising alternative technologies such as Apache Spark, Akka, Apache Airflow, etc

If you made it through this far and enjoyed the content, please show your appreciation by smashing the clap button and share it within your tech community. Feel free to post your comments about any clarifications that you seek, scope of improvements that you identified and/or additional limitations which the system has.

--

--