标签 MQ 下的文章

半年多以前写过一篇《300 行代码实现一个消息队列》,实现了一个最简单的消息队列,能够把消息事件放到异步去处理。但随着业务的发展,简单的异步处理已经不能满足需求了:一条消息需要被多个不同的消费者消费。

按照原先的消息队列设计,要实现这个需求,要么是多注册几个队列,生产者往多个队列发消息;要么是在消费者那多开几个协程去分别处理。但这两种方案都高耦合,很不优雅。

优雅的方案应该是引入一个负责转发消息的中间层,生产者只需要将消息发给这个中间层,再由中间层转发给不同的队列(消费者),这个中间层我们称为 Exchange

- 阅读剩余部分 -

在做词焙查词接口时,如果词库不存在这个单词,则需要调用外部 API 更新词库,而目前用的大模型 API 调用时长较长(超过 5 秒)且不可控,故需要异步去做这件事。

这种场景很适合使用消息队列(RabbitMQ、Kafka、Redis PUB/SUB 等等),但为了使架构尽可能简单,不想轻易引入中间件,加上考虑到 Go channel 很适合在不同协程之间传递消息,于是使用 channel 封装了一个简易的消息队列:支持多队列、支持多消费者,而且整个封装才不到 300 行代码。

消息定义

消息队列,首先要有消息。最简单的一个消息就是 ID + 数据:

type Msg struct {
    ID   uint64
    Data any
}

消息 ID 用一个 uint64 类型的自增序列足矣。

队列定义

有了消息,接下来就可以定义队列了。

最简单的队列用一个 slice 用于保存消息,然再加上 EnqueueDequeue 即可:

type Queue struct {
    msgs []*Msg
}

func (q *Queue) Enqueue(msg *Msg) error
func (q *Queue) Dequeue() *Msg

但这样在并发的时候会出现问题,你可能会想到用一个 sync.Lock 来加锁,可行,但可以利用 chan 来实现无锁队列,具体原理是:新增 inout 两个 chan,然后在同一个协程里将 in 接收到的消息放入 msgs,用 out 来发送出队消息。

Queue 的定义如下:

- 阅读剩余部分 -