半年多以前写过一篇《300 行代码实现一个消息队列》,实现了一个最简单的消息队列,能够把消息事件放到异步去处理。但随着业务的发展,简单的异步处理已经不能满足需求了:一条消息需要被多个不同的消费者消费。

按照原先的消息队列设计,要实现这个需求,要么是多注册几个队列,生产者往多个队列发消息;要么是在消费者那多开几个协程去分别处理。但这两种方案都高耦合,很不优雅。

优雅的方案应该是引入一个负责转发消息的中间层,生产者只需要将消息发给这个中间层,再由中间层转发给不同的队列(消费者),这个中间层我们称为 Exchange

队列 Queue 调整

在开始介绍 Exchange 之前,我们需要对 Queue 进行调整。

先前我们定义了一个 type Q struct,它的作用是将 QueueComsumerFunc、并发数和消息序号封装在一起,现在我们可以将消费者函数和消费者并发数直接作为 Queue 的属性,并将消息序号交由 Exchange 管理。因此 Q 没有存在的必要了,同时 Queue 修改为:

type Queue struct {
    wg sync.WaitGroup

    in  chan *Msg
    out chan *Msg

    msgs []*Msg

    // 消费者函数
    consumer ConsumerFunc
    // 消费者并发数
    concurrency int
}

// 同时更新实例化函数
func NewQueue(inBufSize, outBufSize, concurrency int, consumer ConsumerFunc) *Queue {
    if inBufSize <= 0 {
        inBufSize = 1
    }
    if outBufSize <= 0 {
        outBufSize = 1
    }

    return &Queue{
        in:          make(chan *Msg, inBufSize),
        out:         make(chan *Msg, outBufSize),
        msgs:        make([]*Msg, 0, 1024),
        consumer:    consumer,
        concurrency: concurrency,
    }
}

然后再新加两个方法,以便做消费者调度:

func (q *Queue) GetConsumer() ConsumerFunc {
    return q.consumer
}

func (q *Queue) GetConcurrency() int {
    return q.concurrency
}

至此,Queue 的改造就完成了。

Exchange

Exchange 应该实现以下接口:

type ExchangeInterface interface {
    Publish(any) error   // 发布消息
    Bind(*Queue)         // 注册队列
    GetQueues() []*Queue // 获取全部队列(供 MQ 调度)
    Start()              // 启动 Exchange 的消息转发
    Close()              // 关闭 Exchange
}

具体实现如下:

import (
    "sync"
)

type Exchange struct {
    mu sync.Mutex

    // 绑定的队列列表
    queues []*Queue
    // 当前消息序号
    seq uint64
}

func NewExchange() Exchange {
    return &Exchange{}
}

func (e *Exchange) Publish(data any) error {
    e.mu.Lock()
    defer e.mu.Unlock()

    if len(e.queues) == 0 {
        return nil // 没有绑定的队列,直接丢弃消息
    }

    msg := &Msg{
        ID:   e.seq,
        Data: data,
    }
    e.seq++

    for _, q := range e.queues {
        cloned := *msg
        q.Enqueue(&cloned) // TODO: 可按需增加错误处理逻辑,例如记录失败的消息等
    }

    return nil
}

func (e *Exchange) Bind(q *Queue) {
    e.mu.Lock()
    defer e.mu.Unlock()

    e.queues = append(e.queues, q)
}

func (e *Exchange) Type() ExchangeType {
    return Fanout
}

func (e *Exchange) GetQueues() []*Queue {
    return e.queues
}

func (e *Exchange) Start() {
    for _, q := range e.queues {
        q.Start()
    }
}

func (e *Exchange) Close() {
    for _, q := range e.queues {
        q.Close()
    }
}

MQ 调整

MQ 最主要的调整就是 Q 废弃后需要将直接调度 queues 改为 exchanges

 type MQ struct {
     wg sync.WaitGroup
     mu sync.Mutex
 
     running bool
 
-    queues map[string]*Q
+    exchanges map[string]*Exchange
 
     log contract.Logger
 }

 func NewMQ(log contract.Logger) *MQ {
     return &MQ{
-        queues: make(map[string]*Q),
-        log:    log,
+        exchanges: make(map[string]Exchange),
+        log:       log,
     }
 }

然后修改注册队列的方法:

// 注册一个队列,自动创建对应的 Exchange
func (m *MQ) RegisterQueue(name string, typ ExchangeType, consumer ConsumerFunc, concurrency int) error {
    m.mu.Lock()
    defer m.mu.Unlock()

    if m.running {
        return errors.New("mq: cannot register queue while MQ is running")
    }

    if _, exists := m.exchanges[name]; !exists {
        m.exchanges[name] = NewExchange()
    }

    m.exchanges[name].Bind(NewQueue(concurrency, concurrency, concurrency, consumer))
    return nil
}

发布消息的方法也需要移除消息序号的逻辑,同时调整为通过 Exchange 去发布消息:

func (m *MQ) Publish(queueName string, data any) error {
    m.mu.Lock()
    e, exists := m.exchanges[queueName]
    m.mu.Unlock()

    if !exists {
        return errors.New("mq: queue not found")
    }

    return e.Publish(data)
}

相应的,启动消息队列和消费调度的方法也需要调整:

func (m *MQ) Start(ctx context.Context) error {
    m.mu.Lock()
    defer m.mu.Unlock()

    if m.running {
        return errors.New("mq: MQ is already running")
    }

    m.running = true

    for exchangeName, exchange := range m.exchanges {
        exchange.Start()
        for idx, queue := range exchange.GetQueues() {
            for workerID := 0; workerID < queue.GetConcurrency(); workerID++ {
                m.wg.Add(1)
                go m.consume(ctx, exchangeName, workerID, queue)
            }
            m.log.Infof("Started Exchange[%s].queue[%d] with concurrency %d", exchangeName, idx, queue.GetConcurrency())
        }
    }
    return nil
}

func (m *MQ) consume(ctx context.Context, queueName string, workerID int, q *Queue) {
    defer m.wg.Done()

    for {
        defer func() {
            if r := recover(); r != nil {
                m.log.Errorf("Recovered from panic in queue[%s] worker[%d]: %v", queueName, workerID, r)
            }
        }()

        select {
        case <-ctx.Done():
            m.log.Infof("Context canceled. Stopping consumer for queue[%s] worker[%d]", queueName, workerID)
            q.Close()

        case msg, ok := <-q.Dequeue():
            if !ok {
                // 队列已关闭,退出
                m.log.Infof("Queue[%s] worker[%d] is stopping as the queue is closed", queueName, workerID)
                return
            }
            if err := q.GetConsumer()(ctx, msg); err != nil {
                // 处理消息失败,记录日志或进行其他处理
                m.log.Errorf("Failed to process message ID[%d] from queue[%s] worker[%d]: %v", msg.ID, queueName, workerID, err)
            }
        }
    }
}

最后将停止 MQ 的方法也做相应调整:

func (m *MQ) Stop() {
    m.mu.Lock()

    if !m.running {
        m.mu.Unlock()
        return
    }

    m.running = false

    for _, e := range m.exchanges {
        e.Close()
    }
    m.mu.Unlock()

    m.wg.Wait()
}

至此,我们的消息队列支持同一条消息给多个不同的消费者消费了。没错,细心的你可能发现了,现在这个队列跟 RabbitMQ 的 FanoutExchange 非常像😉

标签: go, MQ

赞赏

添加新评论