标签 MQ 下的文章

在做词焙查词接口时,如果词库不存在这个单词,则需要调用外部 API 更新词库,而目前用的大模型 API 调用时长较长(超过 5 秒)且不可控,故需要异步去做这件事。

这种场景很适合使用消息队列(RabbitMQ、Kafka、Redis PUB/SUB 等等),但为了使架构尽可能简单,不想轻易引入中间件,加上考虑到 Go channel 很适合在不同协程之间传递消息,于是使用 channel 封装了一个简易的消息队列:支持多队列、支持多消费者,而且整个封装才不到 300 行代码。

消息定义

消息队列,首先要有消息。最简单的一个消息就是 ID + 数据:

type Msg struct {
    ID   uint64
    Data any
}

消息 ID 用一个 uint64 类型的自增序列足矣。

队列定义

有了消息,接下来就可以定义队列了。

最简单的队列用一个 slice 用于保存消息,然再加上 EnqueueDequeue 即可:

type Queue struct {
    msgs []*Msg
}

func (q *Queue) Enqueue(msg *Msg) error
func (q *Queue) Dequeue() *Msg

但这样在并发的时候会出现问题,你可能会想到用一个 sync.Lock 来加锁,可行,但可以利用 chan 来实现无锁队列,具体原理是:新增 inout 两个 chan,然后在同一个协程里将 in 接收到的消息放入 msgs,用 out 来发送出队消息。

Queue 的定义如下:

- 阅读剩余部分 -