Tools and Techniques for Load Testing Ingestion Pipeline

Add test spouts corresponding to every production spout that exists in the topology. Likewise for every sink bolt (i.e. which writes to a datastore and doesn’t emit any further tuples)

For simpler topologies (say less than 3 components) duplicate all components to achieve name-spacing (right side of Fig. 3)

For complex topologies — keep the internal transformation bolts shared so as to better utilise the resources. To ensure sinks are name-spaced add a header-aware router bolt which separates out the stream and directs it to the relevant sink bolts. (refer left side of Fig. 3)

Ensure that test spouts are integrated with kill-switch present in config-service in the following manner. This will help ensure that the test spout becomes inactive as soon as the kill-switch is hit and stops producing any further events while the production spouts continue to function as usual.

class MySpout extends BaseRichSpout { ...

protected transient Supplier<Boolean> isActiveFlowSupplier;

protected final Lock activeFlowLock;
protected final long schedulerInitialDelayMillis;
protected final long schedulerDelayMillis;

private transient ScheduledExecutorService scheduledExecutorService;

private transient ScheduledFuture<?> scheduledFuture;
private transient volatile boolean isActiveFlow;

public void open(Map stormConf, TopologyContext context, SpoutOutputCollector collector) {

isActiveFlowSupplier = buildIsActiveFlowSupplier();

isActiveFlow = false;

scheduledExecutorService = Executors.newScheduledThreadPool(1); }

public void nextTuple() {

if (!isActiveFlow) {
return; } }

public void activate() {

if (isActiveFlowSupplier == null) { doActivate();

return;

}

scheduledFuture = scheduledExecutorService.scheduleWithFixedDelay(this::activeStateChangeListener,

schedulerInitialDelayMillis, schedulerDelayMillis, TimeUnit.MILLISECONDS); }

public void deactivate() {

if (isActiveFlowSupplier == null) { doDeactivate();

return;

}

if (scheduledFuture != null) {

scheduledFuture.cancel(false);
scheduledFuture = null; } activeFlowLock.lock();

try {

if (!isActiveFlow) {
return; } doDeactivate();

isActiveFlow = false;

} finally { activeFlowLock.unlock(); } }

public void close() {

try {
super.close();
} finally {
if (scheduledExecutorService == null) {
return; } scheduledExecutorService.shutdown();

scheduledExecutorService = null;

} }

protected void doActivate() {

}

protected void doDeactivate() {

}

protected void activeStateChangeListener() {

activeFlowLock.lock();

try {

boolean newState = Boolean.FALSE != isActiveFlowSupplier.get();
if (newState == isActiveFlow) {
return; }

if (newState) {

doActivate();

} else {

doDeactivate(); } isActiveFlow = newState;

} finally {

activeFlowLock.unlock(); } }

}

Ensure that the sink bolts are integrated with kill-switch in the following manner. This will help reduce the stress on the systems as soon as the kill-switch is hit by silently ack-ing any pending tuples in its queue. Ensure to have a check inside the loop construct for fan-out bolts.

class MySinkBolt extends BaseRichBolt { ...

protected transient Supplier<Boolean> isActiveFlowSupplier;

public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

isActiveFlowSupplier = buildIsActiveFlowSupplier(); }

public void execute(Tuple input) {

if (isActiveFlowSupplier != null && Boolean.FALSE == isActiveFlowSupplier.get()) {
if (!TupleUtils.isTick(input)) { getOutputCollector().ack(input); }

return;

} }

}

Ensure fan-in bolts are either name-spaced or their bucketing logic is header aware to segregate test events from production events.

Ensure that header metadata stamped in tuples is being passed forward by all spouts and bolts and written to the Kafka headers or database record (as per the schema definition) by the sink bolts

Ensure that metrics are being emitted by the spouts and bolts pertaining to throughput, cumulative latencies since the event originated, success/failure, etc. To seamlessly achieve the same in a non-intrusive manner, the following lifecycle APIs of Apache Storm were used:

  1. org.apache.storm.kafka.spout.KafkaTupleListener: for sending Kafka Spout metrics at the following lifecycle stages — emit, ack.
  2. org.apache.storm.hooks.TaskHook: for sending Spout, Bolt metrics at various lifecycle stages — emit, ack, fail, etc.
  3. org.apache.storm.hooks.WorkerHook: for initialising metrics http endpoint from which prometheus collected could scrap the metrics and other common components.

Related contributions made to Storm Github by us to improve Worker Hooks integration: https://github.com/apache/storm/pull/3546, https://github.com/apache/storm/pull/3547

name: "my-topology"

config:

topology.auto.task.hooks: ["com.abc.framework.MyTaskHook"]
...

components:

- id: "kafkaTupleListener"
className: "com.abc.framework.MyTupleListener"

- id: "spoutConfigBuilder"

className: "org.apache.storm.kafka.spout.KafkaSpoutConfig$Builder"
...
configMethods:
- name: "setTupleListener"
args:
- ref: "kafkaTupleListener"

...

spouts:

...

bolts:

...

streams:

...

workerHooks:

- id: "my-worker-hook"
className: "com.abc.framework.MyWorkerHook"

(5) Config Service

A service which hosts configs which can be changed in runtime and leads to changes in behaviour of running production systems pertaining to the implementation surrounding those conditions. For our use-case, this config-service holds the kill-switch flag to which all our topologies are integrated. Since Myntra already had a pre-existing dedicated config-service we went ahead with the same, otherwise the same could be achieved by Zookeeper (and other alternatives etcd, consul, spring cloud config, etc.)

(6) Sink

To safeguard against corrupting production data and mitigate the potential cascading impact on production read traffic, we introduced namespace segregation in the database schema. Myntra’s search engine utilises Apache Lucene-based Document Datastore and also writes some product metadata information to a key-value store. In the case of key-value store, we opted to prefix keys with “test_” to distinguish test records and included an expiry mechanism for automatic cleanup of test data. For document stores, a separate test collection was created on the same cluster for operational simplicity.

(7) Metrics Backend

The metrics produced by different components within the workflow are consolidated in our Centralised Metrics System, utilising Prometheus as the Metric Store and Grafana for dash-boarding. This setup enables us to monitor both production and load test events seamlessly in a unified location, facilitated by selecting the appropriate filters based on load-test-id and event type labels.

(8) Synthetic Read Traffic

By directing test data to a distinct test collection in our sink stores, devoid of organic read traffic, the write performance can notably deviate from its production counterpart. To address this, we’ve created scripts to simulate synthetic read traffic that closely mirrors patterns experienced in production. This must run concurrently as a background Load Test Suite alongside the primary Ingestion Load Test Suite.

With the outlined skeleton design, we successfully integrated load-testing capabilities into our existing search ingestion pipeline, tailored to our specific requirements and ecosystem. This adaptation has enabled us to pinpoint crucial watermark levels at which the system experiences performance constraints, and we are actively addressing these issues in the upcoming quarter.
However, it’s essential to acknowledge that the implemented design is not flawless and comes with its set of recognised limitations:

  1. Interference with actual measurements resulting from introduction of name-spaced components and an increase in payload size due to metadata.
  2. Black-box testing constraints as existing workflow must undergo changes before it can be onboarded to the load test suite.
  3. Technology-specific constraints — The strategies outlined above may not be directly applicable to ingestion systems utilising alternative technologies such as Apache Spark, Akka, Apache Airflow, etc

If you made it through this far and enjoyed the content, please show your appreciation by smashing the clap button and share it within your tech community. Feel free to post your comments about any clarifications that you seek, scope of improvements that you identified and/or additional limitations which the system has.

inicio - Wiki
Copyright © 2011-2025 iteam. Current version is 2.139.0. UTC+08:00, 2025-01-10 12:28
浙ICP备14020137号-1 $mapa de visitantes$