Administrator
发布于 2024-07-17 / 205 阅读
3
0

RocketMQ消息消费:MessageListenerOrderly和MessageListenerConcurrently的区别

#MQ

在使用Apache RocketMQ进行消息消费时,通常会用到两种消息监听器:MessageListenerOrderlyMessageListenerConcurrently。这两种监听器在消息消费的处理方式上有显著的不同,理解它们的区别对于选择合适的消费模式至关重要。

1. 概述

1.1 MessageListenerOrderly

MessageListenerOrderly用于顺序消费消息,即确保同一消息队列中的消息按照顺序消费。在某些对消息顺序要求较高的场景中,这种方式尤为重要。

1.2 MessageListenerConcurrently

MessageListenerConcurrently用于并发消费消息,即允许多个线程同时消费不同消息队列中的消息。它适用于对消息顺序要求不高,但追求高并发、高吞吐量的场景。

2. 实现原理

2.1 MessageListenerOrderly

MessageListenerOrderly保证顺序消费的核心机制是:每个消息队列对应一个消费线程,这个线程会顺序处理该队列中的消息,处理完一个再处理下一个,确保消息的有序性。

代码示例:

public class OrderlyConsumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderly_consumer_group");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.subscribe("OrderlyTopic", "*");
​
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.printf("Consume message: %s%n", new String(msg.getBody()));
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
​
        consumer.start();
        System.out.printf("Orderly Consumer Started.%n");
    }
}

在上述代码中,OrderlyConsumer注册了一个MessageListenerOrderly,消费消息时会顺序处理同一个队列中的消息。

顺序消费机制详解

  1. 消息队列的概念

    • 每个Topic下有多个队列(Queue),消息被分布到不同的队列中。例如,Topic A有4个队列,分别是Queue 0, Queue 1, Queue 2, Queue 3。消息生产者发送消息时,这些消息会被分布到这4个队列中。

  2. 消费线程的分配

    • 当消费者订阅了某个Topic,并使用MessageListenerOrderly进行消费时,RocketMQ会为每个队列分配一个独立的线程来处理该队列中的消息。

  3. 顺序处理的实现

    • 每个线程只处理分配给自己的队列中的消息,按照消息的存储顺序依次消费。由于一个线程只处理一个队列中的消息,因此能够保证该队列中消息的顺序性。

顺序消费的具体执行流程

  1. 消息投递

    • 生产者将消息发送到Topic,消息被分布到不同的队列中。

  2. 线程分配

    • 消费者启动后,RocketMQ为每个队列分配一个消费线程。

  3. 顺序消费

    • 每个消费线程按顺序依次处理队列中的消息,确保同一队列中的消息按顺序被消费。

这种设计在保证消息顺序的前提下,最大化地利用多线程的并发处理能力,提高消费效率。

2.2 MessageListenerConcurrently

MessageListenerConcurrently的设计目标是提高消息消费的并发性。RocketMQ内部会为每个消息队列分配一个消费线程池,多个线程可以并发消费不同队列中的消息。

代码示例:

public class ConcurrentlyConsumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("concurrently_consumer_group");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.subscribe("ConcurrentlyTopic", "*");
​
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.printf("Consume message: %s%n", new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
​
        consumer.start();
        System.out.printf("Concurrently Consumer Started.%n");
    }
}

在上述代码中,ConcurrentlyConsumer注册了一个MessageListenerConcurrently,多个线程可以同时消费不同队列中的消息。

并发消费机制详解

  1. 并发性和高吞吐量

    • MessageListenerConcurrently的主要目的是提高消息消费的并发性,从而提高系统的吞吐量。这意味着消费者能够同时处理多个消息,减少处理延迟,提高消息处理的速度。

  2. 消费线程池

    • MessageListenerOrderly不同,MessageListenerConcurrently为每个消息队列分配了一个消费线程池。线程池中包含多个线程,这些线程可以同时处理队列中的消息。

  3. 多线程并发消费

    • 每个消息队列都有一个独立的消费线程池,线程池中的多个线程可以并发地处理来自同一队列的消息,从而提升整体消费效率。

并发消费的具体执行流程

  1. 消息投递

    • 生产者将消息发送到Topic,消息被分布到不同的队列中。

  2. 线程池分配

    • 消费者启动后,RocketMQ为每个消息队列分配一个消费线程池。

  3. 并发消费

    • 线程池中的多个线程并发处理队列中的消息,不同队列中的消息可以同时被多个线程处理。

线程池配置示例

在实际使用中,可以通过配置文件或代码配置来调整线程池的大小,以达到最佳的性能效果。例如:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("concurrently_consumer_group");
consumer.setConsumeThreadMin(10); // 最小线程数
consumer.setConsumeThreadMax(20); // 最大线程数

通过这种配置,可以控制线程池的大小,使得消息消费在系统负载和资源利用率之间达到平衡。

3. 代码层面深度剖析

3.1 顺序消费的实现细节

DefaultMQPushConsumer中,顺序消费的实现依赖于ConsumeMessageOrderlyService类:

public class ConsumeMessageOrderlyService implements ConsumeMessageService {
    private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
    private final MessageListenerOrderly messageListenerOrderly;
​
    @Override
    public void start() {
        this.consumeExecutor = Executors.newFixedThreadPool(this.defaultMQPushConsumerImpl.getConsumerGroup().getConsumeThreadMax());
    }
​
    @Override
    public void submitConsumeRequest(...) {
        this.consumeExecutor.submit(new ConsumeRequest());
    }
​
    class ConsumeRequest implements Runnable {
        @Override
        public void run() {
            // Lock the message queue and ensure the orderly consumption
            // ...
            messageListenerOrderly.consumeMessage(msgs, context);
        }
    }
}

ConsumeMessageOrderlyService中,每个消息队列都有一个消费线程池,线程池中的线程通过对消息队列加锁来保证顺序消费。ConsumeRequest类实现了Runnable接口,线程池中的线程会调用其run方法,处理消息。

3.2 并发消费的实现细节

DefaultMQPushConsumer中,并发消费的实现依赖于ConsumeMessageConcurrentlyService类:

public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
    private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
    private final MessageListenerConcurrently messageListenerConcurrently;
​
    @Override
    public void start() {
        this.consumeExecutor = Executors.newFixedThreadPool(this.defaultMQPushConsumerImpl.getConsumerGroup().getConsumeThreadMax());
    }
​
    @Override
    public void submitConsumeRequest(...) {
        this.consumeExecutor.submit(new ConsumeRequest());
    }
​
    class ConsumeRequest implements Runnable {
        @Override
        public void run() {
            // Consume message concurrently
            messageListenerConcurrently.consumeMessage(msgs, context);
        }
    }
}

ConsumeMessageConcurrentlyService中,消息队列的并发消费通过一个消费线程池实现。线程池中的每个线程会调用ConsumeRequest类的run方法,处理消息。

4. 总结

MessageListenerOrderlyMessageListenerConcurrently分别适用于不同的业务场景:

  • MessageListenerOrderly适用于对消息顺序有严格要求的场景,通过对消息队列加锁和单线程消费确保消息顺序。

  • MessageListenerConcurrently适用于追求高并发、高吞吐量的场景,通过消费线程池并发处理消息,提升消费效率。

选择合适的消息监听器需要根据具体业务需求进行权衡,以达到最佳的性能和功能效果。


评论