RabbitMQ 本身不保证严格顺序,需要在应用层实现。

支持顺序消息所面临的挑战:

  1. 多个消费者:并行消费会打乱顺序。

    单个消费会降低系统吞吐量,可能出现数据堆积问题。

    在分布式环境下,需要谨慎处理并发问题

  2. 消息重试:失败的消息重新入队会破坏顺序。

    但是不重试数据丢失问题需要消费者自己保证。还有消息失败、节点故障等其他异常情况。

  3. 多个队列:消息分布在不同的队列中。

    如果指定到单个队列,可能出现数据倾斜等一系列问题和挑战。

实现方案

实现顺序消息的前提条件,需要满足的条件:

  • 每次只预取一条消息(prefetch = 1),获取通过其他方式保证每次消费一条

  • 避免自动重试破坏顺序

  • 根据业务逻辑将消息分组(不同队列)

各方案的特点和适用场景:

特性 1.单队列单消费 2.基于消息分组 3.Redis维护状态 4.消息处理器
实现复杂度 ⭐(非常简单) ⭐⭐(中等) ⭐⭐⭐(复杂) ⭐⭐⭐⭐(最复杂)
性能吞吐量 ⭐(最低) ⭐⭐(较低) ⭐⭐⭐(中等) ⭐⭐⭐⭐(较高)
资源消耗 ⭐(最低) ⭐⭐(较低) ⭐⭐⭐(需要Redis) ⭐⭐⭐(内存和CPU较高)
顺序保证强度 ⭐⭐⭐⭐⭐(绝对保证) ⭐⭐⭐(应用层保证) ⭐⭐⭐⭐(强保证) ⭐⭐⭐⭐(强保证)
扩展性 ⭐(无法扩展) ⭐⭐⭐(可按组扩展) ⭐⭐⭐⭐(弹性较好) ⭐⭐⭐⭐(弹性好)
可靠性/容错 ⭐⭐(单点故障) ⭐⭐⭐(中等) ⭐⭐⭐⭐(高) ⭐⭐⭐(依赖本地状态)
  • 低流量(< 100 msg/s):方案1最简单可靠

  • 中流量(100 - 1000 msg/s):方案2或方案3

  • 高流量(> 1000 msg/s):方案4或专门的消息队列

  • 必须绝对保证:方案1(牺牲性能)或方案3(复杂但可扩展)

  • 可以接受偶尔乱序:方案2

  • 允许缓冲和延迟:方案4

单队列单消费

  1. 实现极其简单:只需配置一个消费者,无需复杂逻辑。

  2. 严格保证全局顺序:因为只有一个消费者,消息绝对按入队顺序处理。

  3. 零外部依赖:不依赖Redis等外部组件,部署简单。

  4. 无状态:服务本身是无状态的,易于理解和管理。

缺点:

  1. 性能瓶颈严重:单消费者模型无法利用多核CPU,吞吐量极低。
  2. 无扩展性:无法通过增加消费者来提高处理能力,是系统的单点。
  3. 容错性差:如果该消费者所在节点故障,整个消息处理将完全中断。
  4. 资源利用率低:在消息处理包含I/O等待(如DB操作)时,CPU资源被浪费。

适用场景:

  • 消息量非常小(每秒几条到几十条)

  • 顺序性是最高优先级,宁可慢也不能乱

  • 开发和维护资源有限

  • 适合原型开发或非核心业务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@Configuration
public class RabbitConfig {

@Bean
public Queue orderQueue() {
return new Queue("order.queue", true, false, false);
}

@Bean
public DirectExchange orderExchange() {
return new DirectExchange("order.exchange");
}

@Bean
public Binding orderBinding() {
return BindingBuilder.bind(orderQueue())
.to(orderExchange())
.with("order.key");
}
}

@Component
public class OrderMessageProducer {

@Autowired
private RabbitTemplate rabbitTemplate;

public void sendOrderMessage(String orderId, List<String> messages) {
for (int i = 0; i < messages.size(); i++) {
OrderMessage orderMessage = new OrderMessage(orderId, i, messages.get(i));
rabbitTemplate.convertAndSend("order.exchange", "order.key", orderMessage);
}
}
}

@Component
public class OrderMessageConsumer {

@RabbitListener(queues = "order.queue", concurrency = "1")
public void processOrderMessage(OrderMessage message) {
try {
// 处理消息
System.out.println("处理顺序消息: " + message.getSequence() + ", 内容: " + message.getContent());
// 模拟业务处理
Thread.sleep(100);
} catch (Exception e) {
// 记录日志,但不重新入队
System.err.println("消息处理失败: " + e.getMessage());
}
}
}

基于消息分组

保证同一批次顺序的消息分为一组。

  1. 较好的扩展性:不同消息组可以并行处理,提高了系统整体吞吐量。

  2. 资源利用率提升:可以利用多个消费者实例。

  3. 实现相对简单:逻辑清晰,易于理解和调试。

  4. 灵活性:可以按业务维度(如订单ID、用户ID)进行分组。

缺点:

  1. 顺序保证有漏洞

    • 依赖消息按序到达消费者(在网络分区或重试时可能不保证)
    • 状态维护在内存中,应用重启会导致状态丢失
  2. 乱序消息处理复杂:需要对乱序消息进行缓冲或拒绝,可能造成消息积压。

  3. 内存状态管理ConcurrentHashMap可能内存泄漏(如果某些组永远没有最后一条消息)。

  4. 组内串行:同一个消息组仍然只能串行处理,如果某个大组消息很多,会成为瓶颈。

适用场景:

  • 消息分组明确且组间无依赖的业务

  • 消息量中等,对完全的顺序性要求不是极端严格

  • 可以接受在应用重启时丢失部分顺序上下文

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@Data
public class SequentialMessage {
private String groupId; // 消息组ID
private int sequence; // 序列号
private String content; // 消息内容
private int total; // 总消息数
}

// 生产者发送保证:按消息分组到指定组(队列)
@Component
public class SequentialMessageProducer {

@Autowired
private RabbitTemplate rabbitTemplate;

public void sendSequentialMessages(String groupId, List<String> messages) {
for (int i = 0; i < messages.size(); i++) {
SequentialMessage message = new SequentialMessage();
message.setGroupId(groupId);
message.setSequence(i);
message.setContent(messages.get(i));
message.setTotal(messages.size());

// 使用groupId作为routing key,确保同一组消息进入同一队列
rabbitTemplate.convertAndSend("sequential.exchange", groupId, message);
}
}
}

// 消费者保证:逻辑中维护顺序关系
@Component
public class SequentialMessageConsumer {
// 维护组内上一条消息的序列号,保证顺序消费
private Map<String, Integer> lastProcessedSequence = new ConcurrentHashMap<>();

@RabbitListener(queues = "sequential.queue")
public void processSequentialMessage(SequentialMessage message) {
String groupId = message.getGroupId();
int currentSequence = message.getSequence();

// 检查消息顺序(上一条序列号)
int lastSequence = lastProcessedSequence.getOrDefault(groupId, -1);

if (currentSequence == lastSequence + 1) {
// 按顺序处理消息
try {
processMessage(message);
lastProcessedSequence.put(groupId, currentSequence);

// 如果是最后一条消息,清理状态
if (currentSequence == message.getTotal() - 1) {
lastProcessedSequence.remove(groupId);
}

} catch (Exception e) {
System.err.println("消息处理失败: " + e.getMessage());
// 可以考虑将失败的消息转移到死信队列
}
} else {
// 顺序不对,重新入队或记录日志
System.err.println("消息顺序错误,期望: " + (lastSequence + 1) + ", 实际: " + currentSequence);
// 这里可以重新发布消息,或者使用延迟重试
}
}

private void processMessage(SequentialMessage message) {
System.out.println("处理组 " + message.getGroupId() + " 的第 " + message.getSequence() + " 条消息: " + message.getContent());
}
}

使用 Redis 维护

  1. 强顺序保证:通过分布式锁和持久化状态,即使在应用重启后也能保持顺序。

  2. 更好的扩展性:多个消费者实例可以协作处理不同消息组。

  3. 容错性高:单个消费者故障不会影响其他组的处理。

  4. 状态持久化:Redis中的数据在应用重启后仍然存在。

缺点:

  1. 外部依赖:强依赖Redis的可用性和性能,系统架构更复杂。
  2. 实现复杂度高:需要处理分布式锁、状态同步等复杂问题。
  3. 网络开销:每次处理消息都需要与Redis交互,增加了延迟。
  4. Redis成为瓶颈:如果消息组非常多,Redis可能成为性能和单点故障的瓶颈。
  5. 成本更高:需要维护Redis集群。

适用场景:

  • 对顺序性要求很高,且需要应用多实例部署

  • 消息组数量在合理范围内(不是海量级别)

  • 已有Redis基础设施或愿意承担其运维成本

  • 需要保证在应用重启后仍能正确处理顺序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@Component
public class RedisSequentialConsumer {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

private static final String SEQUENCE_KEY_PREFIX = "msg:sequence:";

@RabbitListener(queues = "redis.sequential.queue")
public void processWithRedis(SequentialMessage message) {
String lockKey = SEQUENCE_KEY_PREFIX + message.getGroupId() + ":lock";
String sequenceKey = SEQUENCE_KEY_PREFIX + message.getGroupId();

// 使用分布式锁确保同一时间只有一个消费者处理同一组消息
boolean locked = tryLock(lockKey, message.getGroupId(), 30);

if (!locked) {
// 获取锁失败,重新入队
throw new AmqpRejectAndDontRequeueException("无法获取锁,稍后重试");
}

try {
Integer lastSequence = (Integer) redisTemplate.opsForValue().get(sequenceKey);
lastSequence = lastSequence == null ? -1 : lastSequence;

if (message.getSequence() == lastSequence + 1) {
processMessage(message);
redisTemplate.opsForValue().set(sequenceKey, message.getSequence());

if (message.getSequence() == message.getTotal() - 1) {
// 清理状态
redisTemplate.delete(sequenceKey);
redisTemplate.delete(lockKey);
}
} else {
System.err.println("顺序错误,期望: " + (lastSequence + 1) + ", 实际: " + message.getSequence());
}
} finally {
if (locked) {
releaseLock(lockKey, message.getGroupId());
}
}
}

private boolean tryLock(String key, String value, long expireSeconds) {
Boolean success = redisTemplate.opsForValue().setIfAbsent(key, value, expireSeconds, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}

private void releaseLock(String key, String value) {
Object currentValue = redisTemplate.opsForValue().get(key);
if (value.equals(currentValue)) {
redisTemplate.delete(key);
}
}
}

消息处理器

  1. 高吞吐量:通过缓冲和异步处理,提高了消息处理效率。

  2. 智能缓冲:能够处理暂时的乱序,等待前置消息到达。

  3. 弹性扩展:可以很好地处理消息组的动态变化。

  4. 减少网络往返:在内存中缓冲处理,比方案3的Redis交互更快。

缺点:

  1. 实现最复杂:需要管理内存缓冲区、处理线程、状态同步等。
  2. 内存消耗大:在内存中缓冲消息,可能占用大量堆内存。
  3. 状态易丢失:应用重启会导致内存中所有缓冲消息和状态丢失。
  4. 调试困难:异步处理逻辑复杂,出现问题难以跟踪和调试。
  5. 内存泄漏风险:需要精心设计缓冲区的清理机制。

适用场景:

  • 消息量大且对性能要求高

  • 可以接受应用重启时丢失顺序上下文

  • 有足够的技术能力实现和维护复杂处理逻辑

  • 消息处理允许一定的延迟(由于缓冲机制)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
@Component
public class AdvancedSequentialProcessor {

private Map<String, BlockingQueue<SequentialMessage>> messageBuffers = new ConcurrentHashMap<>();
private Map<String, Boolean> processingFlags = new ConcurrentHashMap<>();
private ExecutorService processorExecutor = Executors.newCachedThreadPool();

@RabbitListener(queues = "advanced.sequential.queue")
public void receiveMessage(SequentialMessage message) {
String groupId = message.getGroupId();

// 初始化消息缓冲区
messageBuffers.putIfAbsent(groupId, new LinkedBlockingQueue<>());
BlockingQueue<SequentialMessage> buffer = messageBuffers.get(groupId);

try {
buffer.put(message);

// 如果该组消息没有正在处理,启动处理器
if (processingFlags.putIfAbsent(groupId, true) == null) {
processorExecutor.submit(() -> processMessageGroup(groupId));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

private void processMessageGroup(String groupId) {
BlockingQueue<SequentialMessage> buffer = messageBuffers.get(groupId);
int expectedSequence = 0;

try {
while (true) {
// 查看队首元素但不移除
SequentialMessage message = buffer.peek();

if (message == null) {
// 缓冲区为空,停止处理
break;
}

if (message.getSequence() == expectedSequence) {
// 移除并处理消息
buffer.poll();
processSingleMessage(message);
expectedSequence++;

// 如果是最后一条消息,结束处理
if (message.getSequence() == message.getTotal() - 1) {
break;
}
} else if (message.getSequence() > expectedSequence) {
// 等待前面的消息
Thread.sleep(100);
} else {
// 重复消息,直接丢弃
buffer.poll();
}
}
} catch (Exception e) {
System.err.println("处理消息组 " + groupId + " 时发生错误: " + e.getMessage());
} finally {
// 清理状态
processingFlags.remove(groupId);
if (buffer.isEmpty()) {
messageBuffers.remove(groupId);
}
}
}

private void processSingleMessage(SequentialMessage message) {
System.out.println("高级处理器 - 组 " + message.getGroupId() +
" 序列 " + message.getSequence() + ": " +
message.getContent());
}
}