disruptor 中多消费者 ,bufferSize 和 newFixedThreadPool数量对消费的影响为什么是这样

disruptor 中多消费者 ,bufferSize 和 newFixedThreadPool数量对消费的影响为什么是这样
bufferSize 设为 1024 ,消费者设置为10个, 线程池设置为5 时,运行一万次任务,打印出来只有1024 个
但是 线程池改为10 个就能打印出来 10000个
或者 bufferSize 设为 10000 就能打印出来 10000个
猜测 bufferSize 设为 2048 ,消费者设置为10个, 线程池设置为5 时 ,打印出来只有2048 个
到底为什么, disruptor 中 bufferSize 和线程数量 线程池数量的关系是什么?


public class Main { // 01:46:38 // https://www.bilibili.com/video/BV1xd4y1g7iU/?spm_id_from=333.1073.top_right_bar_window_history.content.click&vd_source=ff8b7f852278821525f11666b36f180a
    public static void main(String[] args) throws InterruptedException {
        RingBuffer<Order> ringBuffer = RingBuffer.create(
                ProducerType.MULTI,
                new EventFactory<Order>() {
                    @Override
                    public Order newInstance() {
                        return new Order();
                    }
                },
                1024,
                new BlockingWaitStrategy()
        );

        SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();

        Consumer[] consumers = new Consumer[10];
        for(int i = 0 ; i < consumers.length; i ++){

            consumers[i] = new Consumer("C"+i); // 实例化每个消费者 并赋ID
        }

        WorkerPool<Order> orderWorkerPool = new WorkerPool<>(
                ringBuffer,
                sequenceBarrier,
                new EventExceptionHandler(),
                consumers
        );
        ringBuffer.addGatingSequences(orderWorkerPool.getWorkerSequences());

        orderWorkerPool.start(Executors.newFixedThreadPool(10)); // 启动线程池
        CyclicBarrier latch = new CyclicBarrier(100); // 控制并发

        for (int i =0 ; i < 100; i ++){
            Producer producer = new Producer(ringBuffer);
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        latch.await();

                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    for (int j = 0; j < 100; j++) { // 并发
                        producer.sendData(UUID.randomUUID().toString());
                    }
                }
            }
            ).start();
        }
        System.out.println("---------------线程创建完毕 , 开始生产数据---------------------");
        Thread.sleep(10000);
        System.out.println("任务总是" + consumers[2].getCount());

    }
}

public class Consumer implements WorkHandler<Order> {
    private  String cosumerId;
    private static AtomicInteger count = new AtomicInteger(0);
    public  Consumer(String cosumerId) {
        this.cosumerId = cosumerId;
    }

    public int getCount() {
        return count.get();
    }

    @Override
    public void onEvent(Order order) throws Exception {
        System.out.println("当前消费者:" + this.cosumerId + "消息id: " + order.getId());
        count.incrementAndGet();
    }
}

```java
public class Producer {
    private RingBuffer<Order> ringBuffer;
    public  Producer(RingBuffer<Order> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }
    public  void  sendData(String uuid) {
        long sequence = ringBuffer.next();
        try {
            Order order = ringBuffer.get(sequence);
            order.setId(uuid);

        }finally {
            ringBuffer.publish(sequence);
        }
    }
}


```java
@Data
public class Order {
    private String id;
    private String name;
    private  double price;
}


public class EventExceptionHandler implements ExceptionHandler<Order> {
    @Override
    public void handleEventException(Throwable ex, long sequence, Order order) {
        System.out.println("异常sequence:" + sequence + "order:" + order);
    }

    @Override
    public void handleOnStartException(Throwable ex) {

    }

    @Override
    public void handleOnShutdownException(Throwable ex) {

    }
}

disruptor使用ringBuffer环形队列,bufferSize存放就是的消息数量,线程池大小就是能消费消息的能力,队列满了再发送消息要么覆盖要么丢弃,所以打印少。线程池大能及时消费更多消息,或者队列长度大能存下更多消息以慢慢消费,所以打印多。