disruptor 中多消费者 ,bufferSize 和 newFixedThreadPool数量对消费的影响为什么是这样
bufferSize 设为 1024 ,消费者设置为10个, 线程池设置为5 时,运行一万次任务,打印出来只有1024 个
但是 线程池改为10 个就能打印出来 10000个
或者 bufferSize 设为 10000 就能打印出来 10000个
猜测 bufferSize 设为 2048 ,消费者设置为10个, 线程池设置为5 时 ,打印出来只有2048 个
到底为什么, disruptor 中 bufferSize 和线程数量 线程池数量的关系是什么?
public class Main { // 01:46:38 // https://www.bilibili.com/video/BV1xd4y1g7iU/?spm_id_from=333.1073.top_right_bar_window_history.content.click&vd_source=ff8b7f852278821525f11666b36f180a
public static void main(String[] args) throws InterruptedException {
RingBuffer<Order> ringBuffer = RingBuffer.create(
ProducerType.MULTI,
new EventFactory<Order>() {
@Override
public Order newInstance() {
return new Order();
}
},
1024,
new BlockingWaitStrategy()
);
SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
Consumer[] consumers = new Consumer[10];
for(int i = 0 ; i < consumers.length; i ++){
consumers[i] = new Consumer("C"+i); // 实例化每个消费者 并赋ID
}
WorkerPool<Order> orderWorkerPool = new WorkerPool<>(
ringBuffer,
sequenceBarrier,
new EventExceptionHandler(),
consumers
);
ringBuffer.addGatingSequences(orderWorkerPool.getWorkerSequences());
orderWorkerPool.start(Executors.newFixedThreadPool(10)); // 启动线程池
CyclicBarrier latch = new CyclicBarrier(100); // 控制并发
for (int i =0 ; i < 100; i ++){
Producer producer = new Producer(ringBuffer);
new Thread(new Runnable() {
@Override
public void run() {
try {
latch.await();
} catch (Exception e) {
e.printStackTrace();
}
for (int j = 0; j < 100; j++) { // 并发
producer.sendData(UUID.randomUUID().toString());
}
}
}
).start();
}
System.out.println("---------------线程创建完毕 , 开始生产数据---------------------");
Thread.sleep(10000);
System.out.println("任务总是" + consumers[2].getCount());
}
}
public class Consumer implements WorkHandler<Order> {
private String cosumerId;
private static AtomicInteger count = new AtomicInteger(0);
public Consumer(String cosumerId) {
this.cosumerId = cosumerId;
}
public int getCount() {
return count.get();
}
@Override
public void onEvent(Order order) throws Exception {
System.out.println("当前消费者:" + this.cosumerId + "消息id: " + order.getId());
count.incrementAndGet();
}
}
```java
public class Producer {
private RingBuffer<Order> ringBuffer;
public Producer(RingBuffer<Order> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void sendData(String uuid) {
long sequence = ringBuffer.next();
try {
Order order = ringBuffer.get(sequence);
order.setId(uuid);
}finally {
ringBuffer.publish(sequence);
}
}
}
```java
@Data
public class Order {
private String id;
private String name;
private double price;
}
public class EventExceptionHandler implements ExceptionHandler<Order> {
@Override
public void handleEventException(Throwable ex, long sequence, Order order) {
System.out.println("异常sequence:" + sequence + "order:" + order);
}
@Override
public void handleOnStartException(Throwable ex) {
}
@Override
public void handleOnShutdownException(Throwable ex) {
}
}
disruptor使用ringBuffer环形队列,bufferSize存放就是的消息数量,线程池大小就是能消费消息的能力,队列满了再发送消息要么覆盖要么丢弃,所以打印少。线程池大能及时消费更多消息,或者队列长度大能存下更多消息以慢慢消费,所以打印多。