cover_image

RTA实战之Disruptor队列

yy 拍码场
2022年07月14日 02:33

背景

在RTA系统(信息流广告获客流量筛选环节)中我们遇到一个这样的场景:RTA请求的入参和决策过程以及结果都需要一条一条的记录至数据库中,方便业务人员进行后续的效果分析,但我们都知道,RTA系统是个动辄超过几十万QPS的特殊业务系统,也就意味着系统每秒需要处理超过几十万条请求记录。一般在这种场景下,我们首先会想到利用内存队列进行批处理操作,以便达到降低数据库QPS的压力,否则大量的请求直接打入至数据库,集群规模和压力都是不敢想象的。但这时候如果简单利用Java内置的内存队列(如ArrayBlockingQueue、ConcurrentLinkedQueue等),通过压测后我们会发现单机处理能力上不去,因为这些内存队列会直接压榨机器的性能,导致单机处理能力达不到预期的效果。所以这时候我们就想要去找一款比Java自带内存队列具备更优性能,更强大处理能力的内存队列,用以应对此种场景,提高我们的单机处理能力。

后来通过调研,参考了Log4j等一些知名的开源项目,里面用到了一款叫Disruptor的高性能内存队列,通过对它的调研和了解,发现很适合RTA系统的这个场景,我们以disruptor为中心构建超百亿的消息处理模型。

Disruptor是什么

disruptor是一个高性能的线程间消息传递库,或者可以简单理解为一个进程内的消息队列库,由英国外汇交易公司LMAX开发并且开源,诞生的目的是为了优化系统模块间传递数据的延迟问题,于2010年QCon上演讲获得大家关注,并得到了著名的软件开发专家martin fowler撰文介绍,并在2011年获得了oracle官方的duke奖项,disruptor译为破坏者,意为打破常规,它的成功主要得益于LMAX对并发、性能、非阻塞算法的研究,在使用disruptor的处理业务中,单线程每秒达到了600万个(当时披露的数据,CPU一直在进步),有很多有名的开源项目也在使用或间接的借鉴它的设计思路。在我们项目业务处理中,通常都有异步处理的场景需求(或者生产-消费处理模式),不同场景下关注吞吐、性能和资源消耗,由此关注到了disruptor,并引入它作为我们处理分发数据的模块,disruptor本身采用Java语言构建,我们也知道像类似Java这类的语言都是带着“包袱”跑的,这也让我们好奇它的高性能是如何很好的做到的,这将是我们学习的很好的模板,下面我们主要介绍下disruptor的技术关注点和一些具体业务的使用场景。

Disruptor的实现高性能的考量点

disruptor的设计不仅将考量点放在软件层面,同时对硬件层面比如cpu的工作方式也有考量,这是我们软件开发人员的盲点(通常来说我们的关注一般在应用软件层面)。disruptor团队把这个称之为"机器同理心"(mechanical sympathy),意为更多的了解机器工作细节,我们在上面运行的代码也会设计的更好,总结来说它的技术考量点主要包含了如下:

  • 并发中的锁的使用
  • cpu缓存高效使用
  • 数据结构选择与内存分配

我们依次来看它的这些点

并发中锁的使用

并发是我们软件开发人员非常熟悉的话题,我们接触的系统都在以并发的方式运行着以充分利用处理器资源。正常来说写并发程序容易,写安全高效的并发程序难。并发程序带来的直接安全问题是对资源的竞争访问,资源可以是内存中的某个变量,IO句柄等等,我们这里考虑的是内存的变量,更具体来说是在多个线程间共享了同一块内存数据,我们需要让所有的线程安全的访问这块数据,这样才能保证程序运行的正确性。Java语言使用2个关键字synchronized和volatile来有效的解决并发编程中的互斥访问和数据变化的可见性问题,我们知道使用synchronized包裹的代码块就是给该代码块加上锁,这样保证同一时刻只会一个线程拿到锁并获得执行,这同时意味着其他的线程需要在此排队依次得到锁后进行访问,这就是我们说的互斥性(同一把锁上的变量也具备可见性)。另一个关键字volatile提供另一种轻量锁,但它非阻塞,它解决的是内存数据变化后的可见性问题,也就是保证数据变更后所有线程看到数据值是正确的。这2个关键字有效的保障了程序并发中的正确性问题,但引入锁的操作会极大的降低处理效率,锁意味着需要操作系统的协助,由它来协调访问,这带来了很多重量级的操作比如操作系统有内核态的切换,线程有上下文的切换,更附带着cpu缓存的失效等等,这都将降低了cpu有效处理效率,这让使用锁变成了一个大开销,disruptor也给出一个示例,我们来直观的感受下不同锁状况下的开销:

MethodTime(ms)
Single thread300
Single thread with lock10,000
Two threads with lock224,000
Single thread with CAS5,700
Two threads with CAS30,000
Single thread with volatile write4,700

实验的示例是调用一个函数循环将64位整形变量递增5亿次的耗时成本,由图可以看出不管是单线程还是2个线程的耗时比对,无锁 < cas操作 < 加锁,disruptor也不可避免的会面对并发的问题,因此也必须要使用锁来保证程序的正确性,它的锁操作替换的就是cas操作。

CAS操作

cas的全称是Compare-and-Swap(比较并交换),是一种在并发情况下实现正确操作的有效机制,它也是非阻塞的(乐观锁),所以它的开销比常规的应用锁开销要小的多,由上图的消耗比对也可以看出。在Java语言中也提供了使用cas操作的原子变量以Atomic打头,使用它可以安全高效的执行递增操作。具体来看cas操作是在机器指令中得到支持的,例如x86上的“lock cmpxchg”,它保证了原子更新的特性,由此看它可以理解为是硬件层面支持的锁定操作,不会带来操作系统层面的开销以及对应的上下文切换。disruptor中也采用cas的方法来避免应用锁带来的开销,比如在多生产者状况下,数据的生产会竞争找对应满足条件的空位置来保存发布的数据:

java中AtomicLong变量的自增操作

    public final long getAndIncrement() {        return unsafe.getAndAddLong(this, valueOffset, 1L);    }    // Unsafe类下的操作函数    public final long getAndAddLong(Object var1, long var2, long var4) {        long var6;        do {            var6 = this.getLongVolatile(var1, var2);        } while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4));
return var6; }

disruptor生产者使用cas操作占取空位

    public boolean compareAndSet(final long expectedValue, final long newValue){      return UNSAFE.compareAndSwapLong(this, VALUE_OFFSET, expectedValue, newValue);    }

CPU缓存高效使用

缓存的概念在程序世界无处不在,比如代码层面大量在使用各种缓存系统(java caffine,redis,memcache等等)来保存数据,在我们常用的http协议中也有对应的缓存机制指令,更广泛的说在系统层面我们也在大量的使用例如cdn加速等技术,这些都是缓存的范畴。同样对微观世界下的cpu来说,也在使用缓存。在cpu的周围排列了多级缓存来加快数据访问,以此来缓解cpu和直接内存之间的速度失衡,cpu的缓存结构图参考如下:

图片缓存越靠近cpu的访问速度越快,容量越小,并以一种叫缓存行(cache line)的形式组成,缓存行是从内存装入数据基本单位,大小是2的整数幂,通常为32-256个字节,最常见的是64字节,cpu访问缓存时间参考如下:

缓存需要的时钟周期
L13~4个
L210~20个
L340~45个
Memory120~240个

说明:时钟周期和cpu的主频有关,假定2.5GHZ的主频,对应时钟周期为0.4ns,那么一次内存的访问在48~96ns,一次L2缓存访问4~8ns

伪共享问题

如果两个变量在同一个缓存行中,其中一个失效了,那么整个缓存行都会失效,这被称为"伪共享"。那么为了获得高性能,就需要避免缓存失效带来的伪共享问题(这里就是以空间换时间思路),以64字节的缓存行举例,如果我们要让变量int64 x不被其他变量影响,我们只需要在其前后分别添加7个int64类型的空变量即可,这样就能够保证x变量永远不会和其他变量在同一个缓存行中,这样x变量也不会因为其变量的修改导致其缓存失效了,这就是disruptor设计思路,disruptor对高频访问的变量进行了冗余填充,例如如下的结构:

class Value extends LhsPadding {    protected long value;}
abstract class RingBufferPad { protected byte p10, p11, p12, p13, p14, p15, p16, p17, p20, p21, p22, p23, p24, p25, p26, p27, p30, p31, p32, p33, p34, p35, p36, p37, p40, p41, p42, p43, p44, p45, p46, p47, p50, p51, p52, p53, p54, p55, p56, p57, p60, p61, p62, p63, p64, p65, p66, p67, p70, p71, p72, p73, p74, p75, p76, p77;}
abstract class RingBufferFields<E> extends RingBufferPad {
}

需要说明的是这些填充操作,在Java8中得到了很好的支持,在Java8中新增了一个注解:@Contended,加上这个注解的类会自动补齐缓存行,同时该特性需要在jvm启动时设置-XX:-RestrictContended才会生效

数据结构选择与内存分配

队列通常使用链表或者数组作为底层的存储结构,又分为有界和无界队列,他们的区别在于队列大小的限制,更大的队列意味着大的容量边界,这在生产者速度大于消费者速度的场景下可以提供更大的缓冲,但最坏的情况下如果不加以控制,即使还没到队列的边界,也可能因为内存耗尽而发生应用崩溃,这在我们之前的生产事件中遇到过此类问题,所以一般使用来说,我们通常会限定队列的大小,也就是使用有界队列。队列同样又可以分为线程安全和非线程安全,在Java的的容器包下就提供了此类选择供不同场景下的使用,虽然保持简单也就是单线程无锁是最高效的选择,但真实业务场景下的消息交换通常都会并发执行,也就是并发安全是绕不过去的特性,队列的并发安全主要体现在队头的生产者、队尾的消费者竞争以及数据的存储地方,这将需要锁参与其中,如前文所述,锁的引入会带来极大的系统开销,最好的选择是无锁设计(但竞争无法避免),这里就是用cas操作取代锁。另外我们在从内存的角度来看,减少内存的分配和销毁对任意的系统来说都有好处,对Java语言来说尤其如此(它是带着垃圾收集(GC)这个“包袱”运行的),GC的存在会带来延迟上的抖动,所以如何做到对GC友好也是重要的考量,正常来说一个对象(内存)朝生夕死或者永久存活对GC来说相对是比较好的方式。结合如上的分析我们来考虑底层的存储结构,如选用链表存储一方面锁会带入其中,另外节点会被反复分配和销毁,同时还带有额外的内存开销,这对GC都不够友好(另外链表离散的内存分配对CPU缓存也不够友好),具体我们来看disruptor的主要设计点:

  • 环形数组存储
  • 无锁设计(cas操作)
  • 解决“伪共享”

disruptor使用环形数组作为存储结构,支持并发安全,具体通过游标的递增操作完成生产或者消费数据传递,递增可以施加cas操作,不引入锁。在环形数组创建时会有初始化动作完成对象内存预分配,该块区域将以永久对象的方式存在,这里连续的内存空间对缓存也够好。另外数组的大小要求为2的整数幂,这里移动游标后的位置需要进行取模运算,2的幂的长度取模运算可转为与操作效率更高。因为事件对象预分配,装入数据时其实是进行填充操作避免了创建的动作。同时disruptor大量使用字节填充的方式避免“伪共享”的问题,如已提到的游标的定义就是如此。

Disruptor整体设计

如上我们介绍了disruptor的一些具体技术点和设计上的考虑,下面将介绍它的整体的实现点,如下是原文档中提供的核心类视图,从结构上主要围绕着队列(RingBuffer)来构建,RingBuffer使用EntryFactory(EventFactory)工厂预先创建事件对象,生产者按顺序声明事件填充进队列,并标记事件可被消费,这里生产者区分单生产者和多生产者分别对应ClaimStrategy的不同实现策略,单生产者无并发的烦恼,所以性能上会更好,这也是disruptor推荐的一种方式。消费者按顺序消费事件,并且可以为其指定不同的等待策略(WaitStrtegy), 等待策略的不同主要体现在消费的延时性和cpu使用资源之间的取舍,这是非常重要的点,我们看图中写的几种策略:

  • BlockingWaitStrategy:加锁,用于CPU资源比较紧缺,吞吐量和延迟并不重要的场景
  • YieldingWaitStrategy:CPU自旋方式,性能和CPU资源间比较折中,延迟比较均匀,可用在低延迟的场景
  • BusySpinWaitStrategy:性能最高的模式,对低延迟要求高,对应CPU资源占用多
图片

另外,disruptor支持多种灵活的消费模式可以应对复杂的真实业务处理场景,除了常见的点对点的消费,也支持多对多的消费(广播模式),另外还有链式、星形等多种消费组合,这也是它区别于普通队列非常重要的特性。disruptor将这些复杂特性描述为一种叫消费者依赖图的结构,并使用一种叫Barrier的组件来协调这些消费者之间的依赖关系。

Disruptor性能指标

如下内容均来自disruptor文档中给出的测试数据,主要作为参考使用,根据disruptor的测试,他选择的是Java中满足场景条件下性能最好的有界队列ArrayBlockingQueue(ABQ)作为参考比对

  • 以下是对应不同消费场景下的吞吐量测试结果:
图片
  • 以下是延迟性能的测试结果:图片

Disruptor业务使用

在我们生产的业务处理中会有很多需要异步处理的场景,一般来说会使用Java线程池(ThreadPoolExecutor),它实现的也是生产-消费模式,该模式下disruptor会是一个很好的选择。在此我们选一个业务中的使用场景作为具体说明,在我们的广告投放业务中,存在每秒需要处理着超过50w的请求策略,对于每次请求处理中,除了正常的同步业务处理流外,还存在着大量的无法在同步处理中完成的任务需求,比如我们需要存储和处理每次请求中携带的数据,这需要访问存储库并进行查询、处理、回写等动作,该场景下每日处理着超百亿的消息,我们既希望处理速度愈快愈好,也希望资源的消耗比较保守,我们以disruptor为处理核心,多生产者,多消费者并且每个消息只会被一个消费者消费,具体代码参考:

创建队列实例
    private static DisruptorQueue<PModelValEvent> disruptorQueue = DisruptorQueueFactory.createWorkPoolQueue(1 << 10,true,    new CustomizableThreadFactory("disruptorQueue-"),consumers);                    //具体的构建disruptor队列方法,即同一事件会被一组消费者其中之一消费    public static <T> DisruptorQueue<T> createWorkPoolQueue(int queueSize, boolean isMultiProducer, ThreadFactory threadFactory,                                                         Consumer<T>... consumers) {        // 指定队列大小queueSize为2的幂,指定多生产者模式ProducerType.MULTI         Disruptor<EventObj<T>> disruptor = new Disruptor(new EventObjFactory(),                queueSize, threadFactory,                isMultiProducer ? ProducerType.MULTI : ProducerType.SINGLE,                new SleepingWaitStrategy(DEFAULT_RETRIES, DEFAULT_SLEEP_NS));        // 指定消费者组        disruptor.handleEventsWithWorkerPool(consumers);        return new DisruptorQueue(disruptor);    }
消费者-消费事件
   public class PModelValConsumer extends Consumer<PModelValEvent> {        @Override        public void handle(PModelValEvent obj) {           // 消费者处理业务        }     }
生产者-发布事件
    // disruptor当队列满了后,是一种阻塞队列,结合sentinel,不同场景下可允许阻塞时后续丢弃    public static void publish(PModelValEvent event) {        try (Entry entry = SphU.entry("disruptorQueue-PModelValEvent2Q")) {            disruptorQueue.add(event);        } catch (Exception ex) {            if (log.isErrorEnabled()) {                Cat.logError("disruptorQueue-PModelValEvent 失败", ex);            }        }    }
发布消息耗时参考
图片

另外我们在一次场景中使用disruptor中发生了内存泄漏,分析后产生原因在于:长队列 + 复杂事件(单个事件内存大), 如上文提到的环形队列的设计,只有在环形数组满的时候才会让新的事件覆盖之前的事件,对于Java系统来说,意味着事件里引用的对象内存得不到及时的释放,由此产生了被消费完的事件其引用的对象仍然存活,disruptor使用文档中也提到了该点的处理方式,只要在事件处理完成后及时的将其引用置为null。

 @Override    public void onEvent(EventObj<T> eventObj) throws Exception {        try {            this.handle(eventObj.getObj());        } finally {            eventObj.setObj(null);        }    }

总结思考

在我们日常开发中大量的在使用消息组件作为数据的分发,大部分情况下我们提到消息组件是指分布式的消息队列(进程间消息队列),但进程内的消息队列也是我们日常开发中重要工具,在一些场景中我们用队列来解决并发的问题,在具体的业务处理流中围绕队列来构建也可以让系统具备更大的伸缩性,disruptor对事件的高效分发以及丰富的特性会是我们一个很好的选择。我们介绍了disruptor主要的一些设计技术点,但它不仅于此,disruptor的设计和实现也是我们学习的一个很好的参考。

参考

本文参考:https://github.com/LMAX-Exchange/disruptor

招聘信息

Java、大数据、前端、测试等各种技术岗位热招中,欢迎扫码了解~

图片


更多福利请关注官方订阅号“拍码场

图片

好内容不要独享!快告诉小伙伴们吧!



继续滑动看下一个
拍码场
向上滑动看下一个