在做词焙查词接口时,如果词库不存在这个单词,则需要调用外部 API 更新词库,而目前用的大模型 API 调用时长较长(超过 5 秒)且不可控,故需要异步去做这件事。

这种场景很适合使用消息队列(RabbitMQ、Kafka、Redis PUB/SUB 等等),但为了使架构尽可能简单,不想轻易引入中间件,加上考虑到 Go channel 很适合在不同协程之间传递消息,于是使用 channel 封装了一个简易的消息队列:支持多队列、支持多消费者,而且整个封装才不到 300 行代码。

消息定义

消息队列,首先要有消息。最简单的一个消息就是 ID + 数据:

type Msg struct {
    ID   uint64
    Data any
}

消息 ID 用一个 uint64 类型的自增序列足矣。

队列定义

有了消息,接下来就可以定义队列了。

最简单的队列用一个 slice 用于保存消息,然再加上 EnqueueDequeue 即可:

type Queue struct {
    msgs []*Msg
}

func (q *Queue) Enqueue(msg *Msg) error
func (q *Queue) Dequeue() *Msg

但这样在并发的时候会出现问题,你可能会想到用一个 sync.Lock 来加锁,可行,但可以利用 chan 来实现无锁队列,具体原理是:新增 inout 两个 chan,然后在同一个协程里将 in 接收到的消息放入 msgs,用 out 来发送出队消息。

Queue 的定义如下:

type Queue struct {
    wg sync.WaitGroup

    in  chan *Msg
    out chan *Msg

    msgs []*Msg
}

// 消息入队
func (q *Queue) Enqueue(msg *Msg) error {
    if q.in == nil {
        return errors.New("queue: queue is closed")
    }
    q.in <- msg
    return nil
}

// 消息出队
func (q *Queue) Dequeue() <-chan *Msg {
    return q.out
}

// 启动队列处理
func (q *Queue) Start() {
    q.wg.Add(1)
    go q.run()
}

// 等待队列消费
func (q *Queue) Wait() {
    q.wg.Wait()
}

// 关闭队列(只能出队,不能入队)
func (q *Queue) Close() {
    if q.in != nil {
        close(q.in)
    }
}

// 消息转发
func (q *Queue) run() {
    defer q.wg.Done()

    var nextMsg *Msg
    var out chan *Msg

    for {
        if len(q.msgs) > 0 {
            nextMsg = q.msgs[0]
            out = q.out
        } else {
            nextMsg = nil
            out = nil
        }

        select {
        case msg, ok := <-q.in:
            if !ok {
                // channel is closed
                q.in = nil
            } else {
                q.msgs = append(q.msgs, msg)
            }
        case out <- nextMsg:
            q.msgs = q.msgs[1:]
        }

        if q.in == nil && len(q.msgs) == 0 {
            close(q.out)
            return
        }
    }
}

然后再增加一个 New 函数:

func NewQueue(inBufSize, outBufSize int) *Queue {
    if inBufSize <= 0 {
        inBufSize = 1
    }
    if outBufSize <= 0 {
        outBufSize = 1
    }

    return &Queue{
        in:   make(chan *Msg, inBufSize),
        out:  make(chan *Msg, outBufSize),
        msgs: make([]*Msg, 0, 1024),
    }
}

消息队列

有了队列,那我们就可以实现消息队列了。前面我们提到,这个消息队列要实现的两个功能:支持多队列支持多消费者。那也就是说一个 MQ 实例里面允许有多个 Queue 实例,即 map[string]*Queue,然后每个队列是支持多个消费者的,即每个 Queue 启动之后需要开启多个消费者协程。

先定义消费者函数签名:

type ConsumerFunc func(ctx context.Context, msg *Msg) error

再把 QueueConsumerFunc 绑定到同一个结构体:

type Q struct {
    mu sync.Mutex

    // 消息队列
    queue *Queue
    // 消费者函数
    consumer ConsumerFunc
    // 并发数
    concurrency int
    // 当前消息序号
    seq uint64
}

然后就可以实现我们的 MQ 了:

type MQ struct {
    wg sync.WaitGroup
    mu sync.Mutex

    running bool

    queues map[string]*Q

    // 先前的文章有介绍过 contract.Logger,这里不影响代码阅读,不再赘述,
    // 感兴趣可以看上篇文章:《告别 Redis/MySQL:用一百行 Go 代码实现持久化 Set》,链接见文末
    log contract.Logger
}

func NewMQ(log contract.Logger) *MQ {
    return &MQ{
        queues: make(map[string]*Q),
        log:    log,
    }
}

// 注册一个队列
func (m *MQ) RegisterQueue(name string, consumer ConsumerFunc, concurrency int) error {
    m.mu.Lock()
    defer m.mu.Unlock()

    if m.running {
        return errors.New("mq: cannot register queue while MQ is running")
    }

    // 避免重复注册
    if _, exists := m.queues[name]; exists {
        return errors.New("mq: queue already registered")
    }

    m.queues[name] = &Q{
        queue:       NewQueue(concurrency, concurrency),
        consumer:    consumer,
        concurrency: concurrency,
    }
    return nil
}

// 发布消息
func (m *MQ) Publish(queueName string, data any) error {
    m.mu.Lock()
    q, exists := m.queues[queueName]
    m.mu.Unlock()

    if !exists {
        return errors.New("mq: queue not found")
    }

    q.mu.Lock()
    defer q.mu.Unlock()
    msg := &Msg{
        ID:   q.seq,
        Data: data,
    }
    q.seq++

    return q.queue.Enqueue(msg)
}

// 启动 MQ 处理
func (s *MQ) Start(ctx context.Context) error {
    s.mu.Lock()
    defer s.mu.Unlock()

    if s.running {
        return errors.New("mq: MQ is already running")
    }

    s.running = true

    for name, q := range s.queues {
        q.queue.Start()
        for workerID := 0; workerID < q.concurrency; workerID++ {
            s.wg.Add(1)
            go s.consume(ctx, name, workerID, q)
        }
        s.log.Infof("Started MQ queue %s with concurrency %d", name, q.concurrency)
    }
    return nil
}

func (s *MQ) Stop() {
    s.mu.Lock()

    if !s.running {
        s.mu.Unlock()
        return
    }

    s.running = false

    for _, q := range s.queues {
        q.queue.Close()
    }
    s.mu.Unlock()

    s.wg.Wait()
}

func (s *MQ) consume(ctx context.Context, queueName string, workerID int, q *Q) {
    defer s.wg.Done()

    for {
        defer func() {
            if r := recover(); r != nil {
                s.log.Errorf("Recovered from panic in queue[%s] worker[%d]: %v", queueName, workerID, r)
            }
        }()

        select {
        case <-ctx.Done():
            s.log.Infof("Context canceled. Stopping consumer for queue[%s] worker[%d]", queueName, workerID)
            q.queue.Close()

        case msg, ok := <-q.queue.Dequeue():
            if !ok {
                // 队列已关闭,退出
                s.log.Infof("Queue[%s] worker[%d] is stopping as the queue is closed", queueName, workerID)
                return
            }
            if err := q.consumer(ctx, msg); err != nil {
                // 处理消息失败,记录日志或进行其他处理
                s.log.Errorf("Failed to process message ID[%d] from queue[%s] worker[%d]: %v", msg.ID, queueName, workerID, err)
            }
        }
    }
}

至此,我们这个简易消息队列就实现了。使用方法也很简单:

func main() {
    logger := NewLogger("debug")
    mq := NewMQ(logger)

    ctx := context.TODO()

    consumer1 := func(ctx context.Context, msg *Msg) error {
        logger.Infof("Consumer1 processing message ID: %d, Data: %v", msg.ID, msg.Data)
        return
    }

    err := mq.RegisterQueue("queue1", consumer1, 2)
    if err != nil {
        logger.Errorf("Failed to register queue1: %v", err)
        return
    }

    err = mq.Start(ctx)
    if err != nil {
        logger.Errorf("Failed to start MQ: %v", err)
        return
    }

    // 然后生产者发布消息:
    err = mq.Publish("queue1", "message for queue1")
    if err != nil {
        logger.Errorf("Failed to push message to queue1: %v", err)
    }
}

扩展

这个简易消息队列目前其实存在一个问题,就是当进程意外退出的时候,队列可能没有被清空,有一定概率丢数据。因为我现在的场景是允许丢数据的,所以暂时没加上这个逻辑,不过其实也不麻烦,最简单的方式只需要在 Close() 的时候把 msgs 直接 dump 到磁盘,启动 Queue 的时候也先从磁盘加载先前的数据,可以参考上一篇文章的原理;当然对可靠性要求更高的也可以用上 WAL(Write-ahead logging),总之先跑起来,然后根据问题继续迭代


标签: go, MQ

添加新评论