Disruptor详细介绍之快速入门
1. Disruptor是什么
1.1 技术背景
LMAX是在英国注册并受到FCA监管(监管号码为509778)的外汇黄金交易所, LMAX架构是LMAX内部研发并应用到交易系统的一种技术。它之所以引起人们的关注,是因为它是一个非常高性能系统,这个系统是建立在JVM平台上,核心是一个业务逻辑处理器,官方号称它能够在一个线程里每秒处理6百万订单.
一个仅仅部署在4台服务器上的服务,每秒向Database写入数据超过100万行数据,每分钟产生超过1G的数据。而每台服务器(8核12G)上CPU占用不到100%,load不超过5。
1.2 对比阻塞队列
可以和BlockingQueue做对比,不过disruptor除了能完成同样的工作场景外,能做更多的事,效率也更高。业务逻辑处理器完全是运行在内存中(in-memory),使用事件源驱动方式(event sourcing). 业务逻辑处理器的核心是Disruptors,这是一个并发组件,能够在无锁的情况下实现网络的Queue并发操作。LMAX的研究表明,现在的所谓高性能研究方向似乎和现代CPU设计是相左的。
The disruptor component provides asynchronous SEDA behavior much as the standard SEDA Component, but utilizes a Disruptor instead of a BlockingQueue utilized by the standard SEDA。
举例部分典型的使用方式:
说明: 2.0版本之后,Consumer的概念被EventProcessor代替,都是类似事件处理消费。
P=Producer, EP=Event processor,下面的模型中p代表了产生数据结构Event,EP代表消费数据Event,消费的过程中做逻辑处理。
* UniCast a series of items between 1 publisher and 1 event processor. * * +----+ +-----+ * | P1 |--->| EP1 | * +----+ +-----+
* Produce an event replicated to two event processors and fold back to a single third event processor. * * +-----+ * +----->| EP1 |------+ * | +-----+ | * | v * +----+ +-----+ * | P1 | | EP3 | * +----+ +-----+ * | ^ * | +-----+ | * +----->| EP2 |------+ * +-----+
* Pipeline a series of stages from a publisher to ultimate event processor. * Each event processor depends on the output of the event processor. * * +----+ +-----+ +-----+ +-----+ * | P1 |--->| EP1 |--->| EP2 |--->| EP3 | * +----+ +-----+ +-----+ +-----+
* MultiCast a series of items between 1 publisher and 3 event processors. * * +-----+ * +----->| EP1 | * | +-----+ * | * +----+ +-----+ * | P1 |--->| EP2 | * +----+ +-----+ * | * | +-----+ * +----->| EP3 | * +-----+
* Sequence a series of events from multiple publishers going to one event processor. * * +----+ * | P1 |------+ * +----+ | * v * +----+ +-----+ * | P1 |--->| EP1 | * +----+ +-----+ * ^ * +----+ | * | P3 |------+ * +----+ .... 官方代码examples有更多详细示例。
下面是官方给出的和ArrayBlockingQueue对比测试结果:
Nehalem 2.8Ghz – Windows 7 SP1 64-bit | Sandy Bridge 2.2Ghz – Linux 2.6.38 64-bit | |||
ABQ | Disruptor | ABQ | Disruptor | |
Unicast: 1P – 1C | 5,339,256 | 25,998,336 | 4,057,453 | 22,381,378 |
Pipeline: 1P – 3C | 2,128,918 | 16,806,157 | 2,006,903 | 15,857,913 |
Sequencer: 3P – 1C | 5,539,531 | 13,403,268 | 2,056,118 | 14,540,519 |
Multicast: 1P – 3C | 1,077,384 | 9,377,871 | 260,733 | 10,860,121 |
Diamond: 1P – 3C | 2,113,941 | 16,143,613 | 2,082,725 | 15,295,197 |
Comparative throughput (in ops per sec)
1.3 Disruptor构成
先介绍几个相关的核心概念。
- 环形队列ringbuffer
数据缓冲区,不同线程之间传递数据的BUFFER。RingBuffer是存储消息的地方,通过一个名为cursor的Sequence对象指示队列的头,协调多个生产者向RingBuffer中添加消息,并用于在消费者端判断RingBuffer是否为空。巧妙的是,表示队列尾的Sequence并没有在RingBuffer中,而是由消费者维护。这样的好处是多个消费者处理消息的方式更加灵活,可以在一个RingBuffer上实现消息的单播,多播,流水线以及它们的组合。在RingBuffer中维护了一个名为gatingSequences的Sequence数组来跟踪相关Seqence。
- Producer/Consumer
Producer即生产者,比如下图中的P1. 只是泛指调用 Disruptor 发布事件(我们把写入缓冲队列的一个元素定义为一个事件)的用户代码。
Consumer和EventProcessor是一个概念,新的版本中由EventProcessor概念替代了Consumer。
有两种实现策略,一个是SingleThreadedStrategy(单线程策略)另一个是 MultiThreadedStrategy(多线程策略),两种策略对应的实现类为SingleProducerSequencer、MultiProducerSequencer【都实现了Sequencer类,之所以叫Sequencer是因为他们都是通过Sequence来实现数据写,Sequence的概念参见③】 ,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法。具体使用哪个根据自己的场景来定,[多线程的策略使用了AtomicLong(Java提供的CAS操作),而单线程的使用long,没有锁也没有CAS。这意味着单线程版本会非常快,因为它只有一个生产者,不会产生序号上的冲突]
Producer生产event数据,EventHandler作为消费者消费event并进行逻辑处理。消费消息的进度通过Sequence来控制。
③Sequence
Sequence是一个递增的序号,说白了就是计数器;其次,由于需要在线程间共享,所以Sequence是引用传递,并且是线程安全的;再次,Sequence支持CAS操作;最后,为了提高效率,Sequence通过padding来避免伪共享,关于解决伪共享的问题,可以参见下面章节详细的介绍。
通过顺序递增的序号来编号管理通过其进行交换的数据(事件),对数据(事件)的处理过程总是沿着序号逐个递增处理。一个 Sequence 用于跟踪标识某个特定的事件处理者( RingBuffer/Consumer )的处理进度。生产者对RingBuffer的互斥访问,生产者与消费者之间的协调以及消费者之间的协调,都是通过Sequence实现。几乎每一个重要的组件都包含Sequence。
说明:虽然一个 AtomicLong 也可以用于标识进度,但定义 Sequence 来负责该问题还有另一个目的,那就是防止不同的 Sequence 之间的CPU缓存伪共享(Flase Sharing)问题。
④Sequence Barrier
用于保持对RingBuffer的 main published Sequence 和Consumer依赖的其它Consumer的 Sequence 的引用。 Sequence Barrier 还定义了决定 Consumer 是否还有可处理的事件的逻辑。SequenceBarrier用来在消费者之间以及消费者和RingBuffer之间建立依赖关系。在Disruptor中,依赖关系实际上指的是Sequence的大小关系,消费者A依赖于消费者B指的是消费者A的Sequence一定要小于等于消费者B的Sequence,这种大小关系决定了处理某个消息的先后顺序。因为所有消费者都依赖于RingBuffer,所以消费者的Sequence一定小于等于RingBuffer中名为cursor的Sequence,即消息一定是先被生产者放到Ringbuffer中,然后才能被消费者处理。不好理解的话,可以看下面章节事例配合了解。
SequenceBarrier在初始化的时候会收集需要依赖的组件的Sequence,RingBuffer的cursor会被自动的加入其中。需要依赖其他消费者和/或RingBuffer的消费者在消费下一个消息时,会先等待在SequenceBarrier上,直到所有被依赖的消费者和RingBuffer的Sequence大于等于这个消费者的Sequence。当被依赖的消费者或RingBuffer的Sequence有变化时,会通知SequenceBarrier唤醒等待在它上面的消费者。
⑤Wait Strategy
当消费者等待在SequenceBarrier上时,有许多可选的等待策略,不同的等待策略在延迟和CPU资源的占用上有所不同,可以视应用场景选择:
- BusySpinWaitStrategy : 自旋等待,类似Linux Kernel使用的自旋锁。低延迟但同时对CPU资源的占用也多。
- BlockingWaitStrategy : 使用锁和条件变量。CPU资源的占用少,延迟大。
- SleepingWaitStrategy : 在多次循环尝试不成功后,选择让出CPU,等待下次调度,多次调度后仍不成功,尝试前睡眠一个纳秒级别的时间再尝试。这种策略平衡了延迟和CPU资源占用,但延迟不均匀。
- YieldingWaitStrategy : 在多次循环尝试不成功后,选择让出CPU,等待下次调。平衡了延迟和CPU资源占用,但延迟也比较均匀。
- PhasedBackoffWaitStrategy : 上面多种策略的综合,CPU资源的占用少,延迟大。
- Event
在 Disruptor 的语义中,生产者和消费者之间进行交换的数据被称为事件(Event)。它不是一个被 Disruptor 定义的特定类型,而是由 Disruptor 的使用者定义并指定。
- EventProcessor
EventProcessor 持有特定消费者(Consumer)的 Sequence,并提供用于调用事件处理实现的事件循环(Event Loop)。通过把EventProcessor提交到线程池来真正执行,有两类Processor:
其中一类消费者是BatchEvenProcessor。每个BatchEvenProcessor有一个Sequence,来记录自己消费RingBuffer中消息的情况。所以,一个消息必然会被每一个BatchEvenProcessor消费。
另一类消费者是WorkProcessor。每个WorkProcessor也有一个Sequence,多个WorkProcessor还共享一个Sequence用于互斥的访问RingBuffer。一个消息被一个WorkProcessor消费,就不会被共享一个Sequence的其他WorkProcessor消费。这个被WorkProcessor共享的Sequence相当于尾指针
- EventHandler
Disruptor 定义的事件处理接口,由用户实现,用于处理事件,是 Consumer 的真正实现。开发者实现EventHandler,然后作为入参传递给EventProcessor的实例。
综上所述,附官方类图:
2. Disruptor什么时候用
Disruptor适用于两个独立的处理过程(两个线程)之间交换数据。下面以两个简单场景举例:
例如场景一:
停车批量入场数据上报,数据上报后需要对每条入场数据存入DB,还需要发送kafka消息给其他业务系统。如果执行完所有的操作,再返回,那么接口耗时比较长,我们可以批量上报后验证数据正确性,通过后按单条入场数据写入环形队列,然后直接返回成功。
实现方式一:启 动2个消费者线程,一个消费者去执行db入库,一个消费者去发送kafka消息。
实现方式二:启动4个消费者,2个消费者并发执行db入库,两个消费者并发发送kafka消息,充分利用cpu多核特性,提高执行效率。
实现方式三:如果要求写入DB和kafka后,需要给用户发送短信。那么可以启动三个消费者线程,一个执行db插入,一个执行kafka消息发布,最后一个依赖前两个线程执行成功,前两个线程都执行成功后,该线程执行短信发送。
例如场景二:
你在网上使用信用卡下订单。一个简单的零售系统将获取您的订单信息,使用信用卡验证服务,以检查您的信用卡号码,然后确认您的订单 – 所有这些都在一个单一过程中操作。当进行信用卡有效性检查时,服务器这边的线程会阻塞等待,当然这个对于用户来说停顿不会太长。
在MAX架构中,你将此单一操作过程分为两个,第一部分将获取订单信息,然后输出事件(请求信用卡检查有效性的请求事件)给信用卡公司. 业务逻辑处理器将继续处理其他客户的订单,直至它在输入事件中发现了信用卡已经检查有效的事件,然后获取该事件来确认该订单有效。
3. Disruptor为什么快
2.1数组实现
用数组实现, 解决了链表节点分散, 不利于cache预读问题,可以预分配用于存储事件内容的内存空间;并且解决了节点每次需要分配和释放, 需要大量的垃圾回收GC问题 (数组内元素的内存地址的连续性存储的,在硬件级别,数组中的元素是会被预加载的,因为只要一个元素被加载到缓存行,其他相邻的几个元素也会被加载进同一个缓存行)
2.2求余操作优化
求余操作本身也是一种高耗费的操作, 所以ringbuffer的size设成2的n次方, 可以利用位操作来高效实现求余。要找到数组中当前序号指向的元素,可以通过mod操作,正常通过sequence mod array length = array index,优化后可以通过:sequence & (array length-1) = array index实现。比如一共有8槽,3&(8-1)=3,HashMap就是用这个方式来定位数组元素的,这种方式比取模的速度更快。
2.3 预读与批量
相比链表队列,实现数组预读,减少结点操作空间释放和申请,从而减少gc次数。生产者支持单生产,多生产者模式,单生产者cursor使用普通long实现,无锁加快速度,多生产者才使用Sequence(AtomicLong)
生产和消费元素支持单线程批量操作数据。
2.4 Lock-Free
系统态的锁会导致线程cache丢失. 锁竞争的时候需要进行仲裁. 这个仲裁会涉及到操作系统的内核切换, 并且在此过程中操作系统需要做一系列操作, 导致原有线程的指令缓存和数据缓很可能被丢掉
– 用户态的锁往往是通过自旋锁来实现(自旋即忙等), 而自旋在竞争激烈的时候开销是很大的(一直在消耗CPU资源)
disruptor不使用锁, 使用CAS(Compare And Swap/Set),严格意义上说仍然是使用锁, 因为CAS本质上也是一种乐观锁, 只不过是CPU级别指令, 不涉及到操作系统, 所以效率很高(AtomicLong实现Sequence)
CAS说明:
- CAS依赖于处理器的支持, 当然大部分现代处理器都支持.
- CAS相对于锁是非常高效的, 因为它不需要涉及内核上下文切换进行仲裁.
- CAS并不是免费的, 它会涉及到对指令pipeline加锁, 并且会用到内存barrier(用来刷新内存状态,简单理解就是把缓存中,寄存器中的数据同步到内存中去)
2.5 解决伪共享{False Sharing}
Cpu cache简单示意图:
上面谈到lock的耗费, 主要也是由于内核的切换导致cache的丢失
所以cache是优化的关键, cache越接近core就越快,也越小 。
其中L1,L2,L3等级缓存都是由缓存行组成的, 通常是64字节, 一个Java的long类型是8字节,因此在一个缓存行中可以存8个long类型的变量. 缓存行是缓存更新的基本单位, 就算你只读一个变量, 系统也会预读其余7个, 并cache这一行, 并且这行中的任一变量发生改变, 都需要重新加载整行, 而非仅仅重新加载一个变量.
伪共享举例:
比如在链表中往往会连续定义head和tail指针, 所以对于cache-line的预读, 很有可能会导致head和tail在同一cache-line。在实际使用中, 往往producer线程会持续更改tail指针, 而consumer线程会持续更改head指针
当producer线程和consumer线程分别被分配到core2和core1, 就会出现以下状况,由于core1不断改变h, 导致该cache-line过期, 对于core2, 虽然他不需要读h, 或者t也没有改变, 但是由于cache-line的整行更新, 所以core2仍然需要不停的更新它的cache,core2的缓存未命中被一个和它本身完全不相干的值h, 而被大大提高, 导致cache效率底下,而实际情况下, core1会不断更新h, 而core2会不断更新t, 导致core1和core2都需要频繁的重新load cache, 这就是伪共享问题
在Disruptor里我们对RingBuffer的cursor和BatchEventProcessor的序列进行了缓存行填充,如下:
class LhsPadding { protected long p1; protected long p2; protected long p3; protected long p4; protected long p5; protected long p6; protected long p7; LhsPadding() { } }
2.6 使用内存屏障
内存屏障本身不是一种优化方式, 而是你使用lock-free(CAS)的时候, 必须要配合使用内存屏障,因为CPU和memory之间有多级cache, CPU core只会更新cache-line, 而cache-line什么时候flush到memory, 这个是有一定延时的 ,在这个延时当中, 其他CPU core是无法得知你的更新的, 因为只有把cache-line flush到memory后, 其他core中的相应的cache-line才会被置为过期数据,所以如果要保证使用CAS能保证线程间互斥, 即乐观锁, 必须当一个core发生更新后, 其他所有core立刻知道并把相应的cache-line设为过期, 否则在这些core上执行CAS读到的都是过期数据.
内存屏障 = “立刻将cache-line flush到memory, 没有延时”
注:可参考java中volatile的原理,同样实现了内存屏障。
4. 使用Disruptor开发
下面以车辆入场为例,入场后需要存入数据库,需要发送kafka消息,两步执行完后,给用户发送短信。代码实现如下:(参见代码运行)
开发步骤:
- 定义事件
/** * @author mawming * @version 1.0 * @create date:2016-9-12. */ public class InParkingDataEvent { private String carLicense = ""; public void setCarLicense(String carLicense) { this.carLicense = carLicense; } public String getCarLicense() { return carLicense; } }
- 定义事件处理的具体实现
/** * @author mawming * @version 1.0 * @create date:2016-9-12. */ public class ParkingDataInDbHandler implements EventHandler<InParkingDataEvent>,WorkHandler<InParkingDataEvent>{ @Override public void onEvent(InParkingDataEvent event) throws Exception { long threadId = Thread.currentThread().getId(); String carLicense = event.getCarLicense(); System.out.println(String.format("Thread Id %s save %s into db ....",threadId,carLicense)); } @Override public void onEvent(InParkingDataEvent event, long sequence, boolean endOfBatch) throws Exception { // TODO Auto-generated method stub this.onEvent(event); } }
public class ParkingDataSmsHandler implements EventHandler<InParkingDataEvent> { @Override public void onEvent(InParkingDataEvent event, long sequence, boolean endOfBatch) throws Exception { long threadId = Thread.currentThread().getId(); String carLicense = event.getCarLicense(); System.out.println(String.format("Thread Id %s send %s in plaza sms to user",threadId,carLicense)); } }
public class ParkingDataToKafkaHandler implements EventHandler<InParkingDataEvent> { @Override public void onEvent(InParkingDataEvent event, long sequence, boolean endOfBatch) throws Exception { long threadId = Thread.currentThread().getId(); String carLicense = event.getCarLicense(); System.out.println(String.format("Thread Id %s send %s in plaza messsage to kafka...",threadId,carLicense)); } } 3.发布事件类实现(Disruptor 要求 RingBuffer.publish 必须得到调用,如果发生异常也一样要调用 publish ,那么,很显然这个时候需要调用者在事件处理的实现上来判断事件携带的数据是否是正确的或者完整的)
public class InParkingDataEventPublisher implements Runnable{ Disruptor<InParkingDataEvent> disruptor; private CountDownLatch latch; //private static int LOOP=10000;//模拟一万车辆入场 private static int LOOP=10;//模拟10车辆入场 public InParkingDataEventPublisher(CountDownLatch latch,Disruptor<InParkingDataEvent> disruptor) { this.disruptor=disruptor; this.latch=latch; } @Override public void run() { InParkingDataEventTranslator tradeTransloator=new InParkingDataEventTranslator(); for(int i=0;i<LOOP;i++){ disruptor.publishEvent(tradeTransloator); try { Thread.currentThread().sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } } latch.countDown(); System.out.println("生产者写完" +LOOP + "个消息"); } } class InParkingDataEventTranslator implements EventTranslator<InParkingDataEvent>{ @Override public void translateTo(InParkingDataEvent event, long sequence) { this.generateTradeTransaction(event); } private InParkingDataEvent generateTradeTransaction(InParkingDataEvent event){ int num = (int)(Math.random()*8000); num = num + 1000; event.setCarLicense("京Z" + num); System.out.println("Thread Id " + Thread.currentThread().getId() + " 写完一个event"); return event; } }
- 定义用于事件处理的线程池, 指定等待策略, 启动 Disruptor,执行完毕后关闭Disruptor
/** * @author mawming * @version 1.0 * @create date:2016-9-12. * 测试 P1生产消息,C1,C2消费消息,C1和C2会共享所有的event元素! C3依赖C1,C2处理结果 */ public class TestP1c12c3 { public static void main(String[] args) throws InterruptedException { long beginTime=System.currentTimeMillis(); int bufferSize=1024; //Disruptor交给线程池来处理,共计 p1,c1,c2,c3四个线程 ExecutorService executor=Executors.newFixedThreadPool(4); //构造缓冲区与事件生成 Disruptor<InParkingDataEvent> disruptor=new Disruptor<InParkingDataEvent>(new EventFactory<InParkingDataEvent>() { @Override public InParkingDataEvent newInstance() { return new InParkingDataEvent(); } }, bufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy()); //使用disruptor创建消费者组C1,C2 EventHandlerGroup<InParkingDataEvent> handlerGroup=disruptor .handleEventsWith(new ParkingDataToKafkaHandler(),new ParkingDataInDbHandler()); ParkingDataSmsHandler smsHandler=new ParkingDataSmsHandler(); //声明在C1,C2完事之后执行JMS消息发送操作 也就是流程走到C3 handlerGroup.then(smsHandler); disruptor.start();//启动 CountDownLatch latch=new CountDownLatch(1); //生产者准备 executor.submit(new InParkingDataEventPublisher(latch, disruptor)); latch.await();//等待生产者结束 disruptor.shutdown(); executor.shutdown(); System.out.println("总耗时:"+(System.currentTimeMillis()-beginTime)); } }
5. 相关资料
Disruptor源码地址: https://github.com/LMAX-Exchange/disruptor
在JDK的多线程与并发库一文中, 提到了BlockingQueue实现了生产者-消费者模型
BlockingQueue是基于锁实现的, 而锁的效率通常较低. 有没有使用CAS机制实现的生产者-消费者?
Disruptor就是这样.
disruptor使用观察者模式, 主动将消息发送给消费者, 而不是等消费者从队列中取; 在无锁的情况下, 实现queue(环形, RingBuffer)的并发操作, 性能远高于BlockingQueue
1.生产者-消费者
1.1使用Disruptor类
RingBuffer通过Disruptor实例获得
public class Client { public static void main(String[] args) throws Exception { //1.配置并获得Disruptor ExecutorService executor = Executors.newCachedThreadPool(); LongEventFactory factory = new LongEventFactory(); // 设置RingBuffer大小, 需为2的N次方(能将求模运算转为位运算提高效率 ), 否则影响性能 int ringBufferSize = 1024 * 1024; //创建disruptor, 泛型参数:传递的事件的类型 // 第一个参数: 产生Event的工厂类, Event封装生成-消费的数据 // 第二个参数: RingBuffer的缓冲区大小 // 第三个参数: 线程池 // 第四个参数: SINGLE单个生产者, MULTI多个生产者 // 第五个参数: WaitStrategy 当消费者阻塞在SequenceBarrier上, 消费者如何等待的策略. //BlockingWaitStrategy 使用锁和条件变量, 效率较低, 但CPU的消耗最小, 在不同部署环境下性能表现比较一致 //SleepingWaitStrategy 多次循环尝试不成功后, 让出CPU, 等待下次调度; 多次调度后仍不成功, 睡眠纳秒级别的时间再尝试. 平衡了延迟和CPU资源占用, 但延迟不均匀. //YieldingWaitStrategy 多次循环尝试不成功后, 让出CPU, 等待下次调度. 平衡了延迟和CPU资源占用, 延迟也比较均匀. //BusySpinWaitStrategy 自旋等待,类似自旋锁. 低延迟但同时对CPU资源的占用也多. Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, ringBufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy()); // 注册事件消费处理器, 也即消费者. 可传入多个EventHandler ... disruptor.handleEventsWith(new LongEventHandler()); // 启动 disruptor.start(); //2.将数据装入RingBuffer RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); // 创建生产者, 以下方式一使用原始api, 方式二使用新API //LongEventProducer producer = new LongEventProducer(ringBuffer); LongEventProducerWithTranslator producer = new LongEventProducerWithTranslator(ringBuffer); ByteBuffer byteBuffer = ByteBuffer.allocate(8); // 这里只是笔者实验, 不是必须要用ByteBuffer保存long数据 for(int i = 0; i < 100; ++i){ byteBuffer.putLong(0, i); producer.produceData(byteBuffer); } disruptor.shutdown(); //关闭 disruptor 阻塞直至所有事件都得到处理 executor.shutdown(); // 需关闭 disruptor使用的线程池, 上一步disruptor关闭时不会连带关闭线程池 } }
// Event封装要传递的数据 public class LongEvent { private long value; public long getValue() { return value; } public void setValue(long value) { this.value = value; } }
// 产生Event的工厂 public class LongEventFactory implements EventFactory { @Override public Object newInstance() { return new LongEvent(); } }
public class LongEventHandler implements EventHandler<LongEvent> { // 消费逻辑 @Override public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception { System.out.println(longEvent.getValue()); } }
//生产者实现一 public class LongEventProducer { // 生产者持有RingBuffer的引用 private final RingBuffer<LongEvent> ringBuffer; public LongEventProducer(RingBuffer<LongEvent> ringBuffer){ this.ringBuffer = ringBuffer; } public void produceData(ByteBuffer bb){ // 获得下一个Event槽的下标 long sequence = ringBuffer.next(); try { // 给Event填充数据 LongEvent event = ringBuffer.get(sequence); event.setValue(bb.getLong(0)); } finally { // 发布Event, **观察者去消费, 将sequence传递给该消费者 //publish应该放在 finally块中以确保一定会被调用->如果某个事件槽被获取但未提交, 将会堵塞后续的publish动作。 ringBuffer.publish(sequence); } } }
//生产者实现二 public class LongEventProducerWithTranslator { // 使用EventTranslator, 封装 获取Event的过程 private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR = new EventTranslatorOneArg<LongEvent, ByteBuffer>() { @Override public void translateTo(LongEvent event, long sequeue, ByteBuffer buffer) { event.setValue(buffer.getLong(0)); } }; private final RingBuffer<LongEvent> ringBuffer; public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void produceData(ByteBuffer buffer){ // 发布 ringBuffer.publishEvent(TRANSLATOR, buffer); } }
1.2 直接使用RingBuffer
给出了两种方式:EventProcessor与WorkPool(可处理多消费者)
public class ClientForEventProcessor { public static void main(String[] args) throws Exception { int BUFFER_SIZE = 1024; int THREAD_NUMBERS = 4; // 这里直接获得RingBuffer. createSingleProducer创建一个单生产者的RingBuffer // 第一个参数为EventFactory,产生数据Trade的工厂类 // 第二个参数是RingBuffer的大小,需为2的N次方 // 第三个参数是WaitStrategy, 消费者阻塞时如何等待生产者放入Event final RingBuffer<Trade> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<Trade>() { @Override public Trade newInstance() { return new Trade(UUID.randomUUID().toString()); } }, BUFFER_SIZE, new YieldingWaitStrategy()); //SequenceBarrier, 协调消费者与生产者, 消费者链的先后顺序. 阻塞后面的消费者(没有Event可消费时) SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); //创建事件处理器 (消费者): 处理ringBuffer, 用TradeHandler的方法处理(实现EventHandler), 用sequenceBarrier协调生成-消费 //如果存在多个消费者(老api, 可用workpool解决) 那重复执行 创建事件处理器-注册进度-提交消费者的过程, 把其中TradeHandler换成其它消费者类 BatchEventProcessor<Trade> transProcessor = new BatchEventProcessor<Trade>(ringBuffer, sequenceBarrier, new TradeHandler()); //把消费者的消费进度情况注册给RingBuffer结构(生产者) !如果只有一个消费者的情况可以省略 ringBuffer.addGatingSequences(transProcessor.getSequence()); //创建线程池 ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS); //把消费者提交到线程池, !说明EventProcessor实现了callable接口 executors.submit(transProcessor); // 生产者, 这里新建线程不是必要的 Future<?> future= executors.submit(new Callable<Void>() { @Override public Void call() throws Exception { long seq; for (int i = 0; i < 10; i++) { seq = ringBuffer.next(); ringBuffer.get(seq).setPrice(Math.random() * 9999); ringBuffer.publish(seq); } return null; } }); Thread.sleep(1000); //等上1秒,等待消费完成 transProcessor.halt(); //通知事件处理器 可以结束了(并不是马上结束!) executors.shutdown(); } }
public class ClientForWorkPool { public static void main(String[] args) throws InterruptedException { int BUFFER_SIZE = 1024; int THREAD_NUMBERS = 4; RingBuffer<Trade> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<Trade>() { public Trade newInstance() { return new Trade(UUID.randomUUID().toString()); } }, BUFFER_SIZE); SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); // 第三个参数: 异常处理器, 这里用ExceptionHandler; 第四个参数WorkHandler的实现类, 可为数组(即传入多个消费者) WorkerPool<Trade> workerPool = new WorkerPool<Trade>(ringBuffer, sequenceBarrier, new IgnoreExceptionHandler(), new TradeHandler()); ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS); workerPool.start(executors); // 生产10个数据 for (int i = 0; i < 8; i++) { long seq = ringBuffer.next(); ringBuffer.get(seq).setPrice(Math.random() * 9999); ringBuffer.publish(seq); } Thread.sleep(1000); workerPool.halt(); executors.shutdown(); } }
// 封装交易数据 public class Trade { private String id; // 订单ID private String name; private double price; // 金额 public Trade(String id) { this.id = id; } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public double getPrice() { return price; } public void setPrice(double price) { this.price = price; } }
// 消费者, 这里实现一个接口就行, 写两个是为了同时测试EventProcessor和WorkPool public class TradeHandler implements EventHandler<Trade>, WorkHandler<Trade> { @Override public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception { this.onEvent(event); } @Override public void onEvent(Trade event) throws Exception { //具体的消费逻辑 System.out.println(event.getId()); } }
1.3 多生产者-多消费者
一个Event只能被某一个消费者处理
public static void main(String[] args) throws Exception { //创建RingBuffer RingBuffer<Order> ringBuffer = RingBuffer.create(ProducerType.MULTI, new EventFactory<Order>() { @Override public Order newInstance() { return new Order(); } }, 1024 * 1024, new YieldingWaitStrategy()); SequenceBarrier barriers = ringBuffer.newBarrier(); Consumer[] consumers = new Consumer[3]; for(int i = 0; i < consumers.length; i++){ consumers[i] = new Consumer("ct" + i); } // 3个消费者 WorkerPool<Order> workerPool = new WorkerPool<Order>(ringBuffer, barriers, new MyExceptionHandler(), consumers); ringBuffer.addGatingSequences(workerPool.getWorkerSequences()); ExecutorService executors = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); workerPool.start(executors); // 10个生产者, 每个生成者生产20个数据 for (int i = 0; i < 10; i++) { final Producer p = new Producer(ringBuffer); new Thread(new Runnable() { @Override public void run() { for(int j = 0; j < 2; j++){ p.produceData(UUID.randomUUID().toString()); } } }).start(); } System.out.println("----开始生产----"); Thread.sleep(1000); // 等待消费完成 System.out.println("总共消费数量:" + consumers[0].getCount() ); workerPool.halt(); executors.shutdown(); } static class MyExceptionHandler implements ExceptionHandler { public void handleEventException(Throwable ex, long sequence, Object event) {} public void handleOnStartException(Throwable ex) {} public void handleOnShutdownException(Throwable ex) {} } }
public class Consumer implements WorkHandler<Order>{ private String consumerId; // 消费计数器 private static AtomicInteger count = new AtomicInteger(0); public Consumer(String consumerId){ this.consumerId = consumerId; } @Override public void onEvent(Order order) throws Exception { System.out.println("当前消费者: " + this.consumerId + ", 消费信息: " + order.getId()); count.incrementAndGet(); } public int getCount(){ return count.get(); } }
public class Producer { private final RingBuffer<Order> ringBuffer; public Producer(RingBuffer<Order> ringBuffer){ this.ringBuffer = ringBuffer; } public void produceData(String data){ long sequence = ringBuffer.next(); try { Order order = ringBuffer.get(sequence); order.setId(data); } finally { ringBuffer.publish(sequence); } } }
public class Order { private String id; private String name; private double price; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public double getPrice() { return price; } public void setPrice(double price) { this.price = price; } }
2. 并行处理
除了实现生产者-消费者模型, Disruptor还可以进行多路并行处理(一个Event可以进入多个路径同时进行处理, 因为不同路径操作的是同一个Event, 路径可以汇合)
public class Client { public static void main(String[] args) throws InterruptedException { long beginTime=System.currentTimeMillis(); int bufferSize=1024; ExecutorService executor=Executors.newFixedThreadPool(7); // 注意: 线程数>=handler数+1 Disruptor<Trade> disruptor = new Disruptor<Trade>( new EventFactory<Trade>() { @Override public Trade newInstance() { return new Trade(UUID.randomUUID().toString()); } }, bufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy()); // 菱形操作 /* // 创建消费者组(含H1,H2) H1,H2并行执行无先后顺序 EventHandlerGroup<Trade> handlerGroup = disruptor.handleEventsWith(new Handler1(), new Handler2()); // C1,C2都完成后执行C3, 像JMS传递消息 handlerGroup.then(new Handler3()); */ // 顺序操作 /* disruptor.handleEventsWith(new Handler1()).handleEventsWith(new Handler2()).handleEventsWith(new Handler3()); */ // 六边形操作. H1, H4串行执行; H2, H5串行执行; 而H1,H4 与 H2,H5 并行执行 Handler1 h1 = new Handler1(); Handler2 h2 = new Handler2(); Handler3 h3 = new Handler3(); Handler4 h4 = new Handler4(); Handler5 h5 = new Handler5(); disruptor.handleEventsWith(h1, h2); disruptor.after(h1).handleEventsWith(h4); disruptor.after(h2).handleEventsWith(h5); disruptor.after(h4, h5).handleEventsWith(h3); disruptor.start(); // 启动生产线程 executor.submit(new TradePublisher(disruptor)); Thread.sleep(1000); // 等待消费完成 disruptor.shutdown(); executor.shutdown(); System.out.println("总耗时:"+(System.currentTimeMillis()-beginTime)); } }
public class TradePublisher implements Runnable { private Disruptor<Trade> disruptor; private static final int LOOP = 100;// 模拟百次交易的发生 public TradePublisher(Disruptor<Trade> disruptor) { this.disruptor = disruptor; } @Override public void run() { TradeEventTranslator tradeTransloator = new TradeEventTranslator(); for (int i = 0; i < LOOP; i++) { disruptor.publishEvent(tradeTransloator); } } } class TradeEventTranslator implements EventTranslator<Trade> { private Random random = new Random(); @Override public void translateTo(Trade event, long sequence) { this.generateTrade(event); } private Trade generateTrade(Trade trade) { trade.setPrice(random.nextDouble() * 9999); return trade; } }
public class Handler1 implements EventHandler<Trade> { @Override public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception { System.out.println("handler1: set name"); event.setName("h1"); } }
public class Handler2 implements EventHandler<Trade> { @Override public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception { System.out.println("handler2: set price"); event.setPrice(17.0); } }
public class Handler3 implements EventHandler<Trade> { @Override public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception { System.out.println("handler3: name: " + event.getName() + " , price: " + event.getPrice() + "; instance: " + event.getId()); } }
public class Handler4 implements EventHandler<Trade> { @Override public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception { System.out.println("handler4: append name"); event.setName(event.getName() + "h4"); } }
public class Handler5 implements EventHandler<Trade> { @Override public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception { System.out.println("handler5: add price"); event.setPrice(event.getPrice() + 3.0); } }