在使用Apache RocketMQ进行消息消费时,通常会用到两种消息监听器:MessageListenerOrderly
和MessageListenerConcurrently
。这两种监听器在消息消费的处理方式上有显著的不同,理解它们的区别对于选择合适的消费模式至关重要。
1. 概述
1.1 MessageListenerOrderly
MessageListenerOrderly
用于顺序消费消息,即确保同一消息队列中的消息按照顺序消费。在某些对消息顺序要求较高的场景中,这种方式尤为重要。
1.2 MessageListenerConcurrently
MessageListenerConcurrently
用于并发消费消息,即允许多个线程同时消费不同消息队列中的消息。它适用于对消息顺序要求不高,但追求高并发、高吞吐量的场景。
2. 实现原理
2.1 MessageListenerOrderly
MessageListenerOrderly
保证顺序消费的核心机制是:每个消息队列对应一个消费线程,这个线程会顺序处理该队列中的消息,处理完一个再处理下一个,确保消息的有序性。
代码示例:
public class OrderlyConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderly_consumer_group");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("OrderlyTopic", "*");
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("Consume message: %s%n", new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.printf("Orderly Consumer Started.%n");
}
}
在上述代码中,OrderlyConsumer
注册了一个MessageListenerOrderly
,消费消息时会顺序处理同一个队列中的消息。
顺序消费机制详解
消息队列的概念:
每个Topic下有多个队列(Queue),消息被分布到不同的队列中。例如,Topic A有4个队列,分别是Queue 0, Queue 1, Queue 2, Queue 3。消息生产者发送消息时,这些消息会被分布到这4个队列中。
消费线程的分配:
当消费者订阅了某个Topic,并使用
MessageListenerOrderly
进行消费时,RocketMQ会为每个队列分配一个独立的线程来处理该队列中的消息。
顺序处理的实现:
每个线程只处理分配给自己的队列中的消息,按照消息的存储顺序依次消费。由于一个线程只处理一个队列中的消息,因此能够保证该队列中消息的顺序性。
顺序消费的具体执行流程
消息投递:
生产者将消息发送到Topic,消息被分布到不同的队列中。
线程分配:
消费者启动后,RocketMQ为每个队列分配一个消费线程。
顺序消费:
每个消费线程按顺序依次处理队列中的消息,确保同一队列中的消息按顺序被消费。
这种设计在保证消息顺序的前提下,最大化地利用多线程的并发处理能力,提高消费效率。
2.2 MessageListenerConcurrently
MessageListenerConcurrently
的设计目标是提高消息消费的并发性。RocketMQ内部会为每个消息队列分配一个消费线程池,多个线程可以并发消费不同队列中的消息。
代码示例:
public class ConcurrentlyConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("concurrently_consumer_group");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("ConcurrentlyTopic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("Consume message: %s%n", new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Concurrently Consumer Started.%n");
}
}
在上述代码中,ConcurrentlyConsumer
注册了一个MessageListenerConcurrently
,多个线程可以同时消费不同队列中的消息。
并发消费机制详解
并发性和高吞吐量:
MessageListenerConcurrently
的主要目的是提高消息消费的并发性,从而提高系统的吞吐量。这意味着消费者能够同时处理多个消息,减少处理延迟,提高消息处理的速度。
消费线程池:
与
MessageListenerOrderly
不同,MessageListenerConcurrently
为每个消息队列分配了一个消费线程池。线程池中包含多个线程,这些线程可以同时处理队列中的消息。
多线程并发消费:
每个消息队列都有一个独立的消费线程池,线程池中的多个线程可以并发地处理来自同一队列的消息,从而提升整体消费效率。
并发消费的具体执行流程
消息投递:
生产者将消息发送到Topic,消息被分布到不同的队列中。
线程池分配:
消费者启动后,RocketMQ为每个消息队列分配一个消费线程池。
并发消费:
线程池中的多个线程并发处理队列中的消息,不同队列中的消息可以同时被多个线程处理。
线程池配置示例
在实际使用中,可以通过配置文件或代码配置来调整线程池的大小,以达到最佳的性能效果。例如:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("concurrently_consumer_group");
consumer.setConsumeThreadMin(10); // 最小线程数
consumer.setConsumeThreadMax(20); // 最大线程数
通过这种配置,可以控制线程池的大小,使得消息消费在系统负载和资源利用率之间达到平衡。
3. 代码层面深度剖析
3.1 顺序消费的实现细节
在DefaultMQPushConsumer
中,顺序消费的实现依赖于ConsumeMessageOrderlyService
类:
public class ConsumeMessageOrderlyService implements ConsumeMessageService {
private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
private final MessageListenerOrderly messageListenerOrderly;
@Override
public void start() {
this.consumeExecutor = Executors.newFixedThreadPool(this.defaultMQPushConsumerImpl.getConsumerGroup().getConsumeThreadMax());
}
@Override
public void submitConsumeRequest(...) {
this.consumeExecutor.submit(new ConsumeRequest());
}
class ConsumeRequest implements Runnable {
@Override
public void run() {
// Lock the message queue and ensure the orderly consumption
// ...
messageListenerOrderly.consumeMessage(msgs, context);
}
}
}
在ConsumeMessageOrderlyService
中,每个消息队列都有一个消费线程池,线程池中的线程通过对消息队列加锁来保证顺序消费。ConsumeRequest
类实现了Runnable
接口,线程池中的线程会调用其run
方法,处理消息。
3.2 并发消费的实现细节
在DefaultMQPushConsumer
中,并发消费的实现依赖于ConsumeMessageConcurrentlyService
类:
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
private final MessageListenerConcurrently messageListenerConcurrently;
@Override
public void start() {
this.consumeExecutor = Executors.newFixedThreadPool(this.defaultMQPushConsumerImpl.getConsumerGroup().getConsumeThreadMax());
}
@Override
public void submitConsumeRequest(...) {
this.consumeExecutor.submit(new ConsumeRequest());
}
class ConsumeRequest implements Runnable {
@Override
public void run() {
// Consume message concurrently
messageListenerConcurrently.consumeMessage(msgs, context);
}
}
}
在ConsumeMessageConcurrentlyService
中,消息队列的并发消费通过一个消费线程池实现。线程池中的每个线程会调用ConsumeRequest
类的run
方法,处理消息。
4. 总结
MessageListenerOrderly
和MessageListenerConcurrently
分别适用于不同的业务场景:
MessageListenerOrderly
适用于对消息顺序有严格要求的场景,通过对消息队列加锁和单线程消费确保消息顺序。
MessageListenerConcurrently
适用于追求高并发、高吞吐量的场景,通过消费线程池并发处理消息,提升消费效率。
选择合适的消息监听器需要根据具体业务需求进行权衡,以达到最佳的性能和功能效果。