disruptor学习分享

是什么?

LMAX在线交易出品的一个高效的无锁并发框架
它高效核心在于其无锁队列RingBuffer的独特设计。
它可以用来进行线程之间的数据交互。

老生重谈:锁

并发编程中,为了保证准确性,引入了锁的机制,包括乐观锁,悲观锁等。有锁就涉及到资源的竞争,竞争就可能出现死锁,这样的情况下,你只能重启你的机器了。

考虑一个简单自增的问题:

从1加到10亿,(测试机器 Mac Air)

  • 单线程简单自增,耗时5S左右
  • 单线程加锁自增,仅仅简单加锁,没有竞争,耗时40S左右,慢一个数量级

考虑并发

  • 两个线程简单自增,耗时减少,但是结果无法保证准确性,比如存在脏读等问题。
  • 两个线程加锁自增,按照理解,时间应该减半,但是因为引入了锁机制,导致竞争,实际时间更加长。

disruptor怎么做

disruptor在需要保证线程安全的地方,用到了CAS操作,这是一个CPU级别的指令,类似于乐观锁,即Campare and Set/Swap. JAVA 从1.5版本新引入了AtomicLong等支持CAS指令的数据结构。
测试代码可以看出来,在引入了AtomicLong的情况下,单线程,耗时为18S左右,多线程,耗时26S左右。

测试代码

说说数据结构:链表 or 数组

既然是数据交换,那就存在一个产生数据(producer),一个消费数据(consumer),和数据存储(RingBuffer),数据存储,我们可以理解,应该是一个队列,数据先到先处理,在java类库中,提供了例如LinkedBlockingQueue、ArrayBlockingQueue等,而disruptor之所以高效,是因为它没有直接使用java类库中提供的队列,而是自己写的RingBuffer。

它有什么特点呢?

  • 顾名思义,它是一个环,准确说,是一个用数组实现的环形队列。
  • 不像传统队列,维护对头,队尾,它只有Sequencer,指向下一个可用的数据缓存区
  • 新产生的数据对原来数据进行写覆盖,不进行remove操作。
  • 队列大小一定是2的N次方

它有什么好处呢?

  • 相比于链表,它寻址更快,时间复杂度控制在O(n)
  • 在初始化时,就已经分配好了内存,而且新产生的只覆盖,所以更少的GC
  • sequece 一直自增,进行位操作可以快速定位到实际slot , sequece & (array length-1) = array index,比如一共有8槽,9 &(8-1)= 1

对比例子代码

说说硬件:缓存

伪共享

CPU和内存之间存在着多级缓存,我们都知道越靠近CPU的缓存越快,存储速度依次排列为L1,L2,L3,Memery,但是他们的存储空间大小依次排列为倒序,因此我们不能把所有数据都放在L1,我们需要把我们的数据从Memery中Load到cache里面,从而进行访问。
下图有一些数据:

cpu与缓存

如果你想让你端到端的延时为10ms,那你mermery中load耗时80ns相对来说是一个比较重的操作。
更为严重的问题是,我们的数据在缓存中,不是独立项存储的,你可以想象缓存为一个阵列,由多个缓存行组成,缓存行大小根据机器不同,有差异,常见为64个字节。每次LRU(或者其他算法)的时候,它会把你目标数据的相邻数据也load进来,放入一整行的缓存中。

load

现在假设我们要操作A.B两个数据,他们正好在内存中是紧挨着的,线程1想要对数据A进行写入操作,它把A从Memery中load到L1来,相应的B也被免费的load到L1中。

update

现在线程1需要对A进行写入,同时线程2需要对B进行写入,他们需要争夺对这个缓存行的所有权,加入线程1成功对A进行了写入,那线程2需要对自己的缓存置为失效。
通过这样的一个方式,说明了两个不相关的线程,本来操作自己的数据,但是因为另外一个线程对自己数据的更改,导致自己的数据需要重新从Memery中load,这样会把自己的整体速度给拖慢。

这就是伪共享,因为每次你访问A的同时,你也会得到B,而且每次你访问B,同时你也会得到A。他们仿佛是一体的,但是实际没有任何关系。

disruptor怎么做

引入缓存行填充机制,在RingBuffer中,需要有一个指向当前数据区的序列号(Sequencer),在有生成者和消费者对RingBuffer进行,数据读写的时候,我们对这个序列号进行缓存行填充机制,保证一个序列号在内存中,占有一个缓冲行。


源码

通过代码演示,我们也可以看到的确存在差异,而且随着读写的线程变多,这样的差距越大。

false share code
官方给到的代码,没有进行完全填充(也就是没有沾满一个缓存行),我自己写的例子有进行改进。

等待策略

在典型的消费者/生成者模型中,会存在等待现象,disruptor提供了以下的几种等待策略:

  • BlockingWaitStrategy 是最低效的策略,但其对CPU的消耗最小并且在各种不同部署环境中能提供更加一致的性能表现
  • SleepingWaitStrategy 的性能表现跟 BlockingWaitStrategy 差不多,对 CPU 的消耗也类似,但其对生产者线程的影响最小,适合用于异步日志类似的场景;
  • YieldingWaitStrategy 的性能是最好的,适合用于低延迟的系统。在要求极高性能且事件处理线数小于 CPU 逻辑核心数的场景中,推荐使用此策略;例如,CPU开启超线程的特性。
    经测试前两种效果差不多,延迟在微秒以内,可以忽略,cpu占用不高,YieldingWaitStrategy模式队列空闲时CPU达到100%,不适合

Demo

  • 简单例子
    直接使用ringbuffer
public static void main(String[] args) {
    int size = 1<<10;
    ExecutorService executors = Executors.newCachedThreadPool();
    //创建一个disruptor,指定ringbuffer的size和处理数据的factory
    Disruptor<TestObject> disruptor = new Disruptor<TestObject>(new TestObjectFactory(), size, executors);
    //disruptor里面设置一个处理方式
    disruptor.handleEventsWith(new TestObjectHandler());
    RingBuffer<TestObject> ringBuffer = disruptor.start();
    for (long i = 0; i < 1000; i++) {
    //下一个可以用的序列号
        long seq = ringBuffer.next();
        try {
            //这个序列号的slot 放入数据   
            TestObject valueEvent = ringBuffer.get(seq);
            valueEvent.setValue(i);
        } finally {
            //发布通知,并且这一步一定要放在finally中,因为调用了ringBuffer.next(),就一定要发布,否则会导致disruptor状态的错乱
            ringBuffer.publish(seq);
        }
    }
    disruptor.shutdown();
    executors.shutdown();
}
  • 复杂例子


    多消费者

    这个例子中,我们需要有不同的消费者,并且有些消费者之间存在依赖关系,有些消费者之间可以并行处理。

public static void main(String[] args) throws InterruptedException {
    long beginTime = System.currentTimeMillis();

    int bufferSize = 4;
    ExecutorService executor = Executors.newFixedThreadPool(10);// 大于consumer的数量

    Disruptor<TestObject> disruptor = new Disruptor<TestObject>(new TestObjectFactory(), bufferSize, executor,
            ProducerType.SINGLE, new BusySpinWaitStrategy());

    // //使用disruptor创建消费者AnalysisHandler,CalcHandler,两个可以并行执行
    // EventHandlerGroup<TestObject>
    // handlerGroup=disruptor.handleEventsWith(new
    // TestObjectAnalysisHandler(),new TestObjectCalcHandler());
    //
    // //声明在AnalysisHandler,CalcHandler完事之后执行NotifyHandler
    // EventHandlerGroup<TestObject> then = handlerGroup.then(new
    // TestObjectNotifyHandler());
    //
    // //最终调用多个线程,进行数据的写入
    // then.thenHandleEventsWithWorkerPool(new TestObjectDBHandler(),new
    // TestObjectDBHandler());

    // 上面的也可以直接通过链式调用
    disruptor.handleEventsWith(new TestObjectAnalysisHandler(), new TestObjectCalcHandler())
            .then(new TestObjectNotifyHandler())
            .thenHandleEventsWithWorkerPool(new TestObjectDBHandler(), new TestObjectDBHandler());

    disruptor.start();// 启动

    CountDownLatch latch = new CountDownLatch(1);
    // 生产者准备
    executor.submit(new TestObjectPublisher(latch, disruptor));
    latch.await();// 等待生产者完事.
    disruptor.shutdown();
    executor.shutdown();

    System.out.println("总耗时:" + (System.currentTimeMillis() - beginTime));
}

代码地址

应用场景:

个人思考下来,它适合一切异步环境,但是对于并发量小的场景不一定需要。在log4j2中,已经使用了disruptor进行日志记录。同样是用异步,选择disruptor会更快。

  1. 在一些获取验证码,发短信的场景下,对实时性要求不够,如果收不到,用户可以再次要求重发。
  2. 对于一些奖品,卡券的发放,在高峰期,可以只入队,在之后用异步的方式慢慢发放。
  3. 对于比较复杂的逻辑可以进行并发操作

总结:

disruptor作为一个高并发框架,从CPU层面对整个代码进行优化。具有如下特点

  1. 队列使用数组结构,而不是使用传统的链表结构,寻址更快
  2. 新生产的对象采用覆盖的方式(不是传统阻塞队列,删除->添加的逻辑),减少GC回收的负担
  3. 从CPU层面优化,对Sequencer进行内存分配补齐,消除Java伪共享(cpu缓存行)
  4. 多个线程同时访问,由于他们都通过序号器Sequencer访问ringBuffer,通过CAS取代了加锁和同步块,这也是并发编程的一个指导性原则:把同步块最小化到一个变量上。

参考文献

Q&A

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 159,458评论 4 363
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 67,454评论 1 294
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 109,171评论 0 243
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 44,062评论 0 207
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 52,440评论 3 287
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 40,661评论 1 219
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 31,906评论 2 313
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 30,609评论 0 200
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 34,379评论 1 246
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 30,600评论 2 246
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 32,085评论 1 261
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 28,409评论 2 254
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 33,072评论 3 237
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 26,088评论 0 8
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 26,860评论 0 195
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 35,704评论 2 276
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 35,608评论 2 270

推荐阅读更多精彩内容