300 行代码实现一个消息队列
在做词焙查词接口时,如果词库不存在这个单词,则需要调用外部 API 更新词库,而目前用的大模型 API 调用时长较长(超过 5 秒)且不可控,故需要异步去做这件事。
这种场景很适合使用消息队列(RabbitMQ、Kafka、Redis PUB/SUB 等等),但为了使架构尽可能简单,不想轻易引入中间件,加上考虑到 Go channel
很适合在不同协程之间传递消息,于是使用 channel
封装了一个简易的消息队列:支持多队列、支持多消费者,而且整个封装才不到 300 行代码。
消息定义
消息队列,首先要有消息。最简单的一个消息就是 ID + 数据:
type Msg struct {
ID uint64
Data any
}
消息 ID 用一个 uint64
类型的自增序列足矣。
队列定义
有了消息,接下来就可以定义队列了。
最简单的队列用一个 slice
用于保存消息,然再加上 Enqueue
和 Dequeue
即可:
type Queue struct {
msgs []*Msg
}
func (q *Queue) Enqueue(msg *Msg) error
func (q *Queue) Dequeue() *Msg
但这样在并发的时候会出现问题,你可能会想到用一个 sync.Lock
来加锁,可行,但可以利用 chan
来实现无锁队列,具体原理是:新增 in
和 out
两个 chan
,然后在同一个协程里将 in
接收到的消息放入 msgs
,用 out
来发送出队消息。
Queue
的定义如下: