For investors
股价:
5.36 美元 %For investors
股价:
5.36 美元 %认真做教育 专心促就业
Java程序员都知道,Disruptor是一个高性能的线程间通信的框架,即在同一个JVM进程中的多线程间消息传递,由LMAX开发。
Disruptor性能是如此之高,LMAX利用它可以处理每秒6百万订单,用1微秒的延迟获得吞吐量为100K+。那么Go语言生态圈中有没有这样的库呢?
go-disruptor就是对Java Disruptor的移植,它也提供了与Java Disruptor类似的API设计,使用起来也算不上麻烦。
至于性能呢,下面就会介绍,这也是本文的重点。
因为Disruptor的高性能, 好多人对它都有所关注, 有一系列的文章介绍Disruptor,比如下列的文章和资源:
Disruptor Google Group
Bad Concurrency(Michael Barker)
LMAX(Planet)
LMAX Exchange
Disruptor presentation @ QCon SF
Disruptor Technical Paper
Mechanical Sympathy(Martin Thompson)
Martin Fowler's Technical Review
.NET Disruptor Port
Introduction to the Disruptor
Disruptor wiki
也有一些中文的翻译和介绍,比如并发编程网的Disrutpor专题。
阿里巴巴封仲淹:如何优雅地使用Disruptor。
Disruptor由LMAX开发,LMAX目标是要称为世界上最快的交易平台,为了取得低延迟和高吞吐率的目标,它们不得不开发一套高性能的生产者-消费者的消息框架。Java自己的Queue的性能还是有所延迟的,下图就是Disruptor和JDK ArrayBlockingQueue的性能比较。
X轴显示的是延迟时间,Y轴是操作次数。可以看到Disruptor的延迟小,吞吐率高。
Disruptor有多种使用模型和配置,官方的一些模型的测试结果的链接在这里。
我想做的其实就是go-disruptor和官方的Java Disruptor的性能比较。因为Disruptor有多种配置方式,单生产者和多生产者,单消费者和多消费者,配置的不同性能差别还是蛮大的,所以公平地讲,两者的比较应该使用相同的配置,尽管它们是由不同的编程语言开发的。
我选取的一个测试方案是:3个生产者和一个消费者,如果使用一个生产者Java Disruptor的性能会成倍的提升。
Java Disruptor
Java的测试主类如下:
public class Main { private static final int NUM_PUBLISHERS = 3;//Runtime.getRuntime().availableProcessors(); private static final int BUFFER_SIZE = 1024 * 64; private static final long ITERATIONS = 1000L * 1000L * 20L; private final ExecutorService executor = Executors.newFixedThreadPool(NUM_PUBLISHERS + 1, DaemonThreadFactory.INSTANCE); private final CyclicBarrier cyclicBarrier = new CyclicBarrier(NUM_PUBLISHERS + 1); private final RingBuffer ringBuffer = createMultiProducer(ValueEvent.EVENT_FACTORY, BUFFER_SIZE, new BusySpinWaitStrategy()); private final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); private final ValueAdditionEventHandler handler = new ValueAdditionEventHandler(); private final BatchEventProcessor batchEventProcessor = new BatchEventProcessor<>(ringBuffer, sequenceBarrier, handler); private final ValueBatchPublisher[] valuePublishers = ew ValueBatchPublisher[NUM_PUBLISHERS]; { for (int i = 0; i < NUM_PUBLISHERS; i++) { valuePublishers[i] = new ValueBatchPublisher(cyclicBarrier, ringBuffer, ITERATIONS / NUM_PUBLISHERS, 16); } ringBuffer.addGatingSequences(batchEventProcessor.getSequence()); } public long runDisruptorPass() throws Exception { final CountDownLatch latch = new CountDownLatch(1); handler.reset(latch, batchEventProcessor.getSequence().get() + ((ITERATIONS / NUM_PUBLISHERS) * NUM_PUBLISHERS)); Future[] futures = new Future[NUM_PUBLISHERS]; for (int i = 0; i < NUM_PUBLISHERS; i++) { futures[i] = executor.submit(valuePublishers[i]); } executor.submit(batchEventProcessor); long start = System.currentTimeMillis(); cyclicBarrier.await(); //start test for (int i = 0; i < NUM_PUBLISHERS; i++) { futures[i].get(); } //all published latch.await(); //all handled long opsPerSecond = (ITERATIONS * 1000L) / (System.currentTimeMillis() - start); batchEventProcessor.halt(); return opsPerSecond; } public static void main(String[] args) throws Exception { Main m = new Main(); System.out.println("opsPerSecond:" + m.runDisruptorPass()); }}
生产者和消费者类如下:
public final class ValueAdditionEventHandler implements EventHandler{ private long value = 0; private long count; private CountDownLatch latch; public long getValue() { return value; } public void reset(final CountDownLatch latch, final long expectedCount) { value = 0; this.latch = latch; count = expectedCount; } @Override public void onEvent(final ValueEvent event, final long sequence, final boolean endOfBatch) throws Exception { value = event.getValue(); if (count == sequence) { latch.countDown(); } }}
public final class ValueBatchPublisher implements Runnable{ private final CyclicBarrier cyclicBarrier; private final RingBuffer ringBuffer; private final long iterations; private final int batchSize; public ValueBatchPublisher( final CyclicBarrier cyclicBarrier, final RingBuffer ringBuffer, final long iterations, final int batchSize) { this.cyclicBarrier = cyclicBarrier; this.ringBuffer = ringBuffer; this.iterations = iterations; this.batchSize = batchSize; } @Override public void run() { try { cyclicBarrier.await(); for (long i = 0; i < iterations; i += batchSize) { long hi = ringBuffer.next(batchSize); long lo = hi - (batchSize - 1); for (long l = lo; l <= hi; l++) { ValueEvent event = ringBuffer.get(l); event.setValue(l); } ringBuffer.publish(lo, hi); } } catch (Exception ex) { throw new RuntimeException(ex); } }}
public final class ValueEvent{ private long value; public long getValue() { return value; } public void setValue(final long value) { this.value = value; } public static final EventFactory EVENT_FACTORY = new EventFactory() { public ValueEvent newInstance() { return new ValueEvent(); } };}
生产者使用三个线程去写数据,一个消费者进行处理。生产者运行在三个线程中,批处理写入,每次写16个数据。
实际测试每秒能达到183486238 的吞吐率, 也就是1.8亿的吞吐率。
go-disruptor
下面看看go-disruptor的性能能达到多少。
我们知道,Go语言内置的goroutine之间的消息传递是通过channel实现的,go-disruptor官方网站上比较了go-disruptor和channel的性能,明显go-disruptor要比channel要好:
cenarioPer Operation Time
Channels: Buffered, Blocking, GOMAXPROCS=158.6 ns
Channels: Buffered, Blocking, GOMAXPROCS=286.6 ns
Channels: Buffered, Blocking, GOMAXPROCS=3, Contended Write194 ns
Channels: Buffered, Non-blocking, GOMAXPROCS=126.4 ns
Channels: Buffered, Non-blocking, GOMAXPROCS=229.2 ns
Channels: Buffered, Non-blocking, GOMAXPROCS=3, Contended Write110 ns
Disruptor: Writer, Reserve One4.3 ns
Disruptor: Writer, Reserve Many1.0 ns
Disruptor: Writer, Reserve One, Multiple Readers4.5 ns
Disruptor: Writer, Reserve Many, Multiple Readers0.9 ns
Disruptor: Writer, Await One3.0 ns
Disruptor: Writer, Await Many0.7 ns
Disruptor: SharedWriter, Reserve One13.6 ns
Disruptor: SharedWriter, Reserve Many2.5 ns
Disruptor: SharedWriter, Reserve One, Contended Write56.9 ns
Disruptor: SharedWriter, Reserve Many, Contended Write3.1 ns
在与Java Disruptor相同的测试条件下go-disruptor的性能呢?
下面是测试代码:
package mainimport ( "fmt" "runtime" "sync" "time" disruptor "github.com/smartystreets/go-disruptor")const ( RingBufferSize = 1024 * 64 RingBufferMask = RingBufferSize - 1 ReserveOne = 1 ReserveMany = 16 ReserveManyDelta = ReserveMany - 1 DisruptorCleanup = time.Millisecond * 10)var ringBuffer = [RingBufferSize]int64{}func main() { NumPublishers := 3 //runtime.NumCPU() totalIterations := int64(1000 * 1000 * 20) iterations := totalIterations / int64(NumPublishers) totalIterations = iterations * int64(NumPublishers) fmt.Printf("Total: %d, Iterations: %d, Publisher: %d, Consumer: 1\n", totalIterations, iterations, NumPublishers) runtime.GOMAXPROCS(NumPublishers) var consumer = &countConsumer{TotalIterations: totalIterations, Count: 0} consumer.WG.Add(1) controller := disruptor.Configure(RingBufferSize).WithConsumerGroup(consumer).BuildShared() controller.Start() defer controller.Stop() var wg sync.WaitGroup wg.Add(NumPublishers + 1) var sendWG sync.WaitGroup sendWG.Add(NumPublishers) for i := 0; i < NumPublishers; i++ { go func() { writer := controller.Writer() wg.Done() wg.Wait() current := disruptor.InitialSequenceValue for current < totalIterations { current = writer.Reserve(ReserveMany) for j := current - ReserveMany; j <= current; j++ { ringBuffer[j&RingBufferMask] = j } writer.Commit(current-ReserveMany, current) } sendWG.Done() }() } wg.Done() t := time.Now().UnixNano() wg.Wait() //waiting for ready as a barrier fmt.Println("start to publish") sendWG.Wait() fmt.Println("Finished to publish") consumer.WG.Wait() fmt.Println("Finished to consume") //waiting for consumer t = (time.Now().UnixNano() - t) / 1000000 //ms fmt.Printf("opsPerSecond: %d\n", totalIterations*1000/t)}type countConsumer struct { Count int64 TotalIterations int64 WG sync.WaitGroup}func (cc *countConsumer) Consume(lower, upper int64) { for lower <= upper { message := ringBuffer[lower&RingBufferMask] if message != lower { warning := fmt.Sprintf("\nRace condition--Sequence: %d, Message: %d\n", lower, message) fmt.Printf(warning) panic(warning) } lower++ cc.Count++ //fmt.Printf("count: %d, message: %d\n", cc.Count-1, message) if cc.Count == cc.TotalIterations { cc.WG.Done() return } }}
实际测试go-disruptor的每秒的吞吐率达到137931020 。
好了,至少我们在相同的测试case情况下得到了两组数据,另外我还做了相同case情况的go channel的测试,所以一共三组数据:
Java Disruptor : 183486238 ops/s
go-disruptor : 137931020 ops/s
go channel : 6995452 ops/s
可以看到go-disruptor的性能要略微低于Java Disruptor,但是也已经足够高了,达到1.4亿/秒,所以它还是值的我们关注的。go channel的性能远远不如前两者。
Go Channel
如果通过Go Channel实现,每秒的吞吐率为 6995452。
代码如下:
func main() { NumPublishers := 3 //runtime.NumCPU() totalIterations := int64(1000 * 1000 * 20) iterations := totalIterations / int64(NumPublishers) totalIterations = iterations * int64(NumPublishers) channel := make(chan int64, 1024*64) var wg sync.WaitGroup wg.Add(NumPublishers + 1) var readerWG sync.WaitGroup readerWG.Add(1) for i := 0; i < NumPublishers; i++ { go func() { wg.Done() wg.Wait() for i := int64(0); i < iterations; { select { case channel <- i: i++ default: continue } } }() } go func() { for i := int64(0); i < totalIterations; i++ { select { case msg := <-channel: if NumPublishers == 1 && msg != i { //panic("Out of sequence") } default: continue } } readerWG.Done() }() wg.Done() t := time.Now().UnixNano() wg.Wait() readerWG.Wait() t = (time.Now().UnixNano() - t) / 1000000 //ms fmt.Printf("opsPerSecond: %d\n", totalIterations*1000/t)}
/