从零实现一个消息队列:当一条消息需要被多个不同消费者消费
半年多以前写过一篇《300 行代码实现一个消息队列》,实现了一个最简单的消息队列,能够把消息事件放到异步去处理。但随着业务的发展,简单的异步处理已经不能满足需求了:一条消息需要被多个不同的消费者消费。
按照原先的消息队列设计,要实现这个需求,要么是多注册几个队列,生产者往多个队列发消息;要么是在消费者那多开几个协程去分别处理。但这两种方案都高耦合,很不优雅。
优雅的方案应该是引入一个负责转发消息的中间层,生产者只需要将消息发给这个中间层,再由中间层转发给不同的队列(消费者),这个中间层我们称为 Exchange
队列 Queue 调整
在开始介绍 Exchange 之前,我们需要对 Queue 进行调整。
先前我们定义了一个 type Q struct,它的作用是将 Queue、ComsumerFunc、并发数和消息序号封装在一起,现在我们可以将消费者函数和消费者并发数直接作为 Queue 的属性,并将消息序号交由 Exchange 管理。因此 Q 没有存在的必要了,同时 Queue 修改为:
type Queue struct {
wg sync.WaitGroup
in chan *Msg
out chan *Msg
msgs []*Msg
// 消费者函数
consumer ConsumerFunc
// 消费者并发数
concurrency int
}
// 同时更新实例化函数
func NewQueue(inBufSize, outBufSize, concurrency int, consumer ConsumerFunc) *Queue {
if inBufSize <= 0 {
inBufSize = 1
}
if outBufSize <= 0 {
outBufSize = 1
}
return &Queue{
in: make(chan *Msg, inBufSize),
out: make(chan *Msg, outBufSize),
msgs: make([]*Msg, 0, 1024),
consumer: consumer,
concurrency: concurrency,
}
}然后再新加两个方法,以便做消费者调度:
func (q *Queue) GetConsumer() ConsumerFunc {
return q.consumer
}
func (q *Queue) GetConcurrency() int {
return q.concurrency
}至此,Queue 的改造就完成了。
Exchange
Exchange 应该实现以下接口:
type ExchangeInterface interface {
Publish(any) error // 发布消息
Bind(*Queue) // 注册队列
GetQueues() []*Queue // 获取全部队列(供 MQ 调度)
Start() // 启动 Exchange 的消息转发
Close() // 关闭 Exchange
}具体实现如下:
import (
"sync"
)
type Exchange struct {
mu sync.Mutex
// 绑定的队列列表
queues []*Queue
// 当前消息序号
seq uint64
}
func NewExchange() Exchange {
return &Exchange{}
}
func (e *Exchange) Publish(data any) error {
e.mu.Lock()
defer e.mu.Unlock()
if len(e.queues) == 0 {
return nil // 没有绑定的队列,直接丢弃消息
}
msg := &Msg{
ID: e.seq,
Data: data,
}
e.seq++
for _, q := range e.queues {
cloned := *msg
q.Enqueue(&cloned) // TODO: 可按需增加错误处理逻辑,例如记录失败的消息等
}
return nil
}
func (e *Exchange) Bind(q *Queue) {
e.mu.Lock()
defer e.mu.Unlock()
e.queues = append(e.queues, q)
}
func (e *Exchange) Type() ExchangeType {
return Fanout
}
func (e *Exchange) GetQueues() []*Queue {
return e.queues
}
func (e *Exchange) Start() {
for _, q := range e.queues {
q.Start()
}
}
func (e *Exchange) Close() {
for _, q := range e.queues {
q.Close()
}
}MQ 调整
MQ 最主要的调整就是 Q 废弃后需要将直接调度 queues 改为 exchanges
type MQ struct {
wg sync.WaitGroup
mu sync.Mutex
running bool
- queues map[string]*Q
+ exchanges map[string]*Exchange
log contract.Logger
}
func NewMQ(log contract.Logger) *MQ {
return &MQ{
- queues: make(map[string]*Q),
- log: log,
+ exchanges: make(map[string]Exchange),
+ log: log,
}
}然后修改注册队列的方法:
// 注册一个队列,自动创建对应的 Exchange
func (m *MQ) RegisterQueue(name string, typ ExchangeType, consumer ConsumerFunc, concurrency int) error {
m.mu.Lock()
defer m.mu.Unlock()
if m.running {
return errors.New("mq: cannot register queue while MQ is running")
}
if _, exists := m.exchanges[name]; !exists {
m.exchanges[name] = NewExchange()
}
m.exchanges[name].Bind(NewQueue(concurrency, concurrency, concurrency, consumer))
return nil
}发布消息的方法也需要移除消息序号的逻辑,同时调整为通过 Exchange 去发布消息:
func (m *MQ) Publish(queueName string, data any) error {
m.mu.Lock()
e, exists := m.exchanges[queueName]
m.mu.Unlock()
if !exists {
return errors.New("mq: queue not found")
}
return e.Publish(data)
}相应的,启动消息队列和消费调度的方法也需要调整:
func (m *MQ) Start(ctx context.Context) error {
m.mu.Lock()
defer m.mu.Unlock()
if m.running {
return errors.New("mq: MQ is already running")
}
m.running = true
for exchangeName, exchange := range m.exchanges {
exchange.Start()
for idx, queue := range exchange.GetQueues() {
for workerID := 0; workerID < queue.GetConcurrency(); workerID++ {
m.wg.Add(1)
go m.consume(ctx, exchangeName, workerID, queue)
}
m.log.Infof("Started Exchange[%s].queue[%d] with concurrency %d", exchangeName, idx, queue.GetConcurrency())
}
}
return nil
}
func (m *MQ) consume(ctx context.Context, queueName string, workerID int, q *Queue) {
defer m.wg.Done()
for {
defer func() {
if r := recover(); r != nil {
m.log.Errorf("Recovered from panic in queue[%s] worker[%d]: %v", queueName, workerID, r)
}
}()
select {
case <-ctx.Done():
m.log.Infof("Context canceled. Stopping consumer for queue[%s] worker[%d]", queueName, workerID)
q.Close()
case msg, ok := <-q.Dequeue():
if !ok {
// 队列已关闭,退出
m.log.Infof("Queue[%s] worker[%d] is stopping as the queue is closed", queueName, workerID)
return
}
if err := q.GetConsumer()(ctx, msg); err != nil {
// 处理消息失败,记录日志或进行其他处理
m.log.Errorf("Failed to process message ID[%d] from queue[%s] worker[%d]: %v", msg.ID, queueName, workerID, err)
}
}
}
}最后将停止 MQ 的方法也做相应调整:
func (m *MQ) Stop() {
m.mu.Lock()
if !m.running {
m.mu.Unlock()
return
}
m.running = false
for _, e := range m.exchanges {
e.Close()
}
m.mu.Unlock()
m.wg.Wait()
}至此,我们的消息队列支持同一条消息给多个不同的消费者消费了。没错,细心的你可能发现了,现在这个队列跟 RabbitMQ 的 FanoutExchange 非常像😉