300 行代码实现一个消息队列
在做词焙查词接口时,如果词库不存在这个单词,则需要调用外部 API 更新词库,而目前用的大模型 API 调用时长较长(超过 5 秒)且不可控,故需要异步去做这件事。
这种场景很适合使用消息队列(RabbitMQ、Kafka、Redis PUB/SUB 等等),但为了使架构尽可能简单,不想轻易引入中间件,加上考虑到 Go channel 很适合在不同协程之间传递消息,于是使用 channel 封装了一个简易的消息队列:支持多队列、支持多消费者,而且整个封装才不到 300 行代码。
消息定义
消息队列,首先要有消息。最简单的一个消息就是 ID + 数据:
type Msg struct {
ID uint64
Data any
}消息 ID 用一个 uint64 类型的自增序列足矣。
队列定义
有了消息,接下来就可以定义队列了。
最简单的队列用一个 slice 用于保存消息,然再加上 Enqueue 和 Dequeue 即可:
type Queue struct {
msgs []*Msg
}
func (q *Queue) Enqueue(msg *Msg) error
func (q *Queue) Dequeue() *Msg但这样在并发的时候会出现问题,你可能会想到用一个 sync.Lock 来加锁,可行,但可以利用 chan 来实现无锁队列,具体原理是:新增 in 和 out 两个 chan,然后在同一个协程里将 in 接收到的消息放入 msgs,用 out 来发送出队消息。
Queue 的定义如下:
