专注于 JetBrains IDEA 全家桶,永久激活,教程
持续更新 PyCharm,IDEA,WebStorm,PhpStorm,DataGrip,RubyMine,CLion,AppCode 永久激活教程

高性能队列Disruptor框架的详细说明与实战使用

Disruptor的使用

1.简介

The LMAX Disruptor is a high performance inter-thread messaging library. It grew out of LMAX’s research into concurrency, performance and non-blocking algorithms and today forms a core part of their Exchange’s infrastructure.

(LMAX Disruptor是一个高性能的线程间消息传递库。它源于LMAX对并发性、性能和非阻塞算法的研究,如今已成为其Exchange基础架构的核心部分。)

-- 引用自GITHUB介绍

Disruptor是一个高性能的异步处理框架,或者可以认为是最快的消息框架(轻量的JMS),也可以认为是一个观察者模式的实现,或者事件监听模式的实现。以下是介绍wiki地址:

https://github.com/LMAX-Exchange/disruptor/wiki

2.Disruptor的设计方案

Disruptor通过以下设计来解决队列速度慢的问题: 环形数组结构 为了避免垃圾回收,采用数组而非链表。同时,数组对处理器的缓存机制更加友好。 元素位置定位 数组长度2^n,通过位运算,加快定位的速度。下标采取递增的形式。不用担心index溢出的问题。index是long类型,即使100万QPS的处理速度,也需要30万年才能用完。 无锁设计 每个生产者或者消费者线程,会先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据。 下面忽略数组的环形结构,介绍一下如何实现无锁设计。整个过程通过原子变量CAS,保证操作的线程安全。

3.Disruptor实现特征

另一个关键的实现低延迟的细节就是在Disruptor中利用无锁的算法,所有内存的可见性和正确性都是利用内存屏障或者CAS操作。使用CAS来保证多线程安全,与大部分并发队列使用的锁相比,CAS显然要快很多。CAS是CPU级别的指令,更加轻量,不必像锁一样需要操作系统提供支持,所以每次调用不需要在用户态与内核态之间切换,也不需要上下文切换。 只有一个用例中锁是必须的,那就是BlockingWaitStrategy(阻塞等待策略),唯一的实现方法就是使用Condition实现消费者在新事件到来前等待。许多低延迟系统使用忙等待去避免Condition的抖动,然而在系统忙等待的操作中,性能可能会显著降低,尤其是在CPU资源严重受限的情况下,例如虚拟环境下的WEB服务器。

4.Disruptor实现生产者消费者模型

这里我们按照原作者Demo介绍制作一个放入LongValue的生产者和消费者模型,相关的代码如下所示:

maven依赖
    <dependencies>
        <dependency>
            <groupId>com.lmax</groupId>
            <artifactId>disruptor</artifactId>
            <version>3.2.1</version>
        </dependency>
    </dependencies>

LongEvent
//定义事件event  通过Disruptor 进行交换的数据类型。
public class LongEvent  {

    private Long value;

    public Long getValue() {
        return value;
    }

    public void setValue(Long value) {
        this.value = value;
    }

}

LongEventFactory
public class LongEventFactory implements EventFactory<LongEvent> {

    public LongEvent newInstance() {

        return new LongEvent();
    }

}

LongEventHandler
// 消费者获得数据
public class LongEventHandler implements EventHandler<LongEvent> {

    @Override
    public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception {
        System.out.println("消费者获得数据:" + longEvent.getValue());
    }
}

LongEventProducer
// 生产者
public class LongEventProducer {

    private RingBuffer<LongEvent> ringBuffer;

    public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void onData(ByteBuffer byteBuffer) {
        // 获取事件队列的下表位置
        long sequence = ringBuffer.next();
        try {
            // 取出空队列
            LongEvent longEvent = ringBuffer.get(sequence);
            // 给空队列赋值
            longEvent.setValue(byteBuffer.getLong(0));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            System.out.println("生产者发送数据....");
            ringBuffer.publish(sequence);
        }

    }

}

MainTest
public class MainTest {

    public static void main(String[] args) {

        // 1. 创建线程池
        ExecutorService executor = Executors.newCachedThreadPool();

        // 2. 创建工厂
        LongEventFactory longEventFactory = new LongEventFactory();

        // 3.创建ringbuffer 大小
        int ringbuffer = 1024 * 1024; // 2的N次方

        // 4. 创建disruptor
        Disruptor<LongEvent> longEventDisruptor = new Disruptor<>(
                longEventFactory, ringbuffer, executor,
                ProducerType.MULTI, new YieldingWaitStrategy()
        );

        // 5. 连接消费者
        longEventDisruptor.handleEventsWith(new LongEventHandler());
        // 6. 启动
        longEventDisruptor.start();

        // 7.创建ringbuffer容器
        RingBuffer<LongEvent> ringBuffer = longEventDisruptor.getRingBuffer();

        // 8.创建生产者
        LongEventProducer longEventProducer = new LongEventProducer(ringBuffer);

        // 9. 指定缓冲区的大小
        ByteBuffer byteBuffer = ByteBuffer.allocate(8);
        for (int i = 0; i < 10; i++) {
            byteBuffer.putLong(0,i);
            longEventProducer.onData(byteBuffer);
        }
        executor.shutdown();
        longEventDisruptor.shutdown();

    }
}

执行结果如下:
生产者发送数据....
生产者发送数据....
生产者发送数据....
生产者发送数据....
消费者获得数据:0
生产者发送数据....
消费者获得数据:1
生产者发送数据....
消费者获得数据:2
生产者发送数据....
消费者获得数据:3
生产者发送数据....
消费者获得数据:4
生产者发送数据....
消费者获得数据:5
生产者发送数据....
消费者获得数据:6
消费者获得数据:7
消费者获得数据:8
消费者获得数据:9

文章永久链接:https://tech.souyunku.com/26162

未经允许不得转载:搜云库技术团队 » 高性能队列Disruptor框架的详细说明与实战使用

JetBrains 全家桶,激活、破解、教程

提供 JetBrains 全家桶激活码、注册码、破解补丁下载及详细激活教程,支持 IntelliJ IDEA、PyCharm、WebStorm 等工具的永久激活。无论是破解教程,还是最新激活码,均可免费获得,帮助开发者解决常见激活问题,确保轻松破解并快速使用 JetBrains 软件。获取免费的破解补丁和激活码,快速解决激活难题,全面覆盖 2024/2025 版本!

联系我们联系我们