300 行代码实现一个消息队列
在做词焙查词接口时,如果词库不存在这个单词,则需要调用外部 API 更新词库,而目前用的大模型 API 调用时长较长(超过 5 秒)且不可控,故需要异步去做这件事。
这种场景很适合使用消息队列(RabbitMQ、Kafka、Redis PUB/SUB 等等),但为了使架构尽可能简单,不想轻易引入中间件,加上考虑到 Go channel
很适合在不同协程之间传递消息,于是使用 channel
封装了一个简易的消息队列:支持多队列、支持多消费者,而且整个封装才不到 300 行代码。
消息定义
消息队列,首先要有消息。最简单的一个消息就是 ID + 数据:
type Msg struct {
ID uint64
Data any
}
消息 ID 用一个 uint64
类型的自增序列足矣。
队列定义
有了消息,接下来就可以定义队列了。
最简单的队列用一个 slice
用于保存消息,然再加上 Enqueue
和 Dequeue
即可:
type Queue struct {
msgs []*Msg
}
func (q *Queue) Enqueue(msg *Msg) error
func (q *Queue) Dequeue() *Msg
但这样在并发的时候会出现问题,你可能会想到用一个 sync.Lock
来加锁,可行,但可以利用 chan
来实现无锁队列,具体原理是:新增 in
和 out
两个 chan
,然后在同一个协程里将 in
接收到的消息放入 msgs
,用 out
来发送出队消息。
Queue
的定义如下:
type Queue struct {
wg sync.WaitGroup
in chan *Msg
out chan *Msg
msgs []*Msg
}
// 消息入队
func (q *Queue) Enqueue(msg *Msg) error {
if q.in == nil {
return errors.New("queue: queue is closed")
}
q.in <- msg
return nil
}
// 消息出队
func (q *Queue) Dequeue() <-chan *Msg {
return q.out
}
// 启动队列处理
func (q *Queue) Start() {
q.wg.Add(1)
go q.run()
}
// 等待队列消费
func (q *Queue) Wait() {
q.wg.Wait()
}
// 关闭队列(只能出队,不能入队)
func (q *Queue) Close() {
if q.in != nil {
close(q.in)
}
}
// 消息转发
func (q *Queue) run() {
defer q.wg.Done()
var nextMsg *Msg
var out chan *Msg
for {
if len(q.msgs) > 0 {
nextMsg = q.msgs[0]
out = q.out
} else {
nextMsg = nil
out = nil
}
select {
case msg, ok := <-q.in:
if !ok {
// channel is closed
q.in = nil
} else {
q.msgs = append(q.msgs, msg)
}
case out <- nextMsg:
q.msgs = q.msgs[1:]
}
if q.in == nil && len(q.msgs) == 0 {
close(q.out)
return
}
}
}
然后再增加一个 New
函数:
func NewQueue(inBufSize, outBufSize int) *Queue {
if inBufSize <= 0 {
inBufSize = 1
}
if outBufSize <= 0 {
outBufSize = 1
}
return &Queue{
in: make(chan *Msg, inBufSize),
out: make(chan *Msg, outBufSize),
msgs: make([]*Msg, 0, 1024),
}
}
消息队列
有了队列,那我们就可以实现消息队列了。前面我们提到,这个消息队列要实现的两个功能:支持多队列、支持多消费者。那也就是说一个 MQ
实例里面允许有多个 Queue
实例,即 map[string]*Queue
,然后每个队列是支持多个消费者的,即每个 Queue
启动之后需要开启多个消费者协程。
先定义消费者函数签名:
type ConsumerFunc func(ctx context.Context, msg *Msg) error
再把 Queue
和 ConsumerFunc
绑定到同一个结构体:
type Q struct {
mu sync.Mutex
// 消息队列
queue *Queue
// 消费者函数
consumer ConsumerFunc
// 并发数
concurrency int
// 当前消息序号
seq uint64
}
然后就可以实现我们的 MQ
了:
type MQ struct {
wg sync.WaitGroup
mu sync.Mutex
running bool
queues map[string]*Q
// 先前的文章有介绍过 contract.Logger,这里不影响代码阅读,不再赘述,
// 感兴趣可以看上篇文章:《告别 Redis/MySQL:用一百行 Go 代码实现持久化 Set》,链接见文末
log contract.Logger
}
func NewMQ(log contract.Logger) *MQ {
return &MQ{
queues: make(map[string]*Q),
log: log,
}
}
// 注册一个队列
func (m *MQ) RegisterQueue(name string, consumer ConsumerFunc, concurrency int) error {
m.mu.Lock()
defer m.mu.Unlock()
if m.running {
return errors.New("mq: cannot register queue while MQ is running")
}
// 避免重复注册
if _, exists := m.queues[name]; exists {
return errors.New("mq: queue already registered")
}
m.queues[name] = &Q{
queue: NewQueue(concurrency, concurrency),
consumer: consumer,
concurrency: concurrency,
}
return nil
}
// 发布消息
func (m *MQ) Publish(queueName string, data any) error {
m.mu.Lock()
q, exists := m.queues[queueName]
m.mu.Unlock()
if !exists {
return errors.New("mq: queue not found")
}
q.mu.Lock()
defer q.mu.Unlock()
msg := &Msg{
ID: q.seq,
Data: data,
}
q.seq++
return q.queue.Enqueue(msg)
}
// 启动 MQ 处理
func (s *MQ) Start(ctx context.Context) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.running {
return errors.New("mq: MQ is already running")
}
s.running = true
for name, q := range s.queues {
q.queue.Start()
for workerID := 0; workerID < q.concurrency; workerID++ {
s.wg.Add(1)
go s.consume(ctx, name, workerID, q)
}
s.log.Infof("Started MQ queue %s with concurrency %d", name, q.concurrency)
}
return nil
}
func (s *MQ) Stop() {
s.mu.Lock()
if !s.running {
s.mu.Unlock()
return
}
s.running = false
for _, q := range s.queues {
q.queue.Close()
}
s.mu.Unlock()
s.wg.Wait()
}
func (s *MQ) consume(ctx context.Context, queueName string, workerID int, q *Q) {
defer s.wg.Done()
for {
defer func() {
if r := recover(); r != nil {
s.log.Errorf("Recovered from panic in queue[%s] worker[%d]: %v", queueName, workerID, r)
}
}()
select {
case <-ctx.Done():
s.log.Infof("Context canceled. Stopping consumer for queue[%s] worker[%d]", queueName, workerID)
q.queue.Close()
case msg, ok := <-q.queue.Dequeue():
if !ok {
// 队列已关闭,退出
s.log.Infof("Queue[%s] worker[%d] is stopping as the queue is closed", queueName, workerID)
return
}
if err := q.consumer(ctx, msg); err != nil {
// 处理消息失败,记录日志或进行其他处理
s.log.Errorf("Failed to process message ID[%d] from queue[%s] worker[%d]: %v", msg.ID, queueName, workerID, err)
}
}
}
}
至此,我们这个简易消息队列就实现了。使用方法也很简单:
func main() {
logger := NewLogger("debug")
mq := NewMQ(logger)
ctx := context.TODO()
consumer1 := func(ctx context.Context, msg *Msg) error {
logger.Infof("Consumer1 processing message ID: %d, Data: %v", msg.ID, msg.Data)
return
}
err := mq.RegisterQueue("queue1", consumer1, 2)
if err != nil {
logger.Errorf("Failed to register queue1: %v", err)
return
}
err = mq.Start(ctx)
if err != nil {
logger.Errorf("Failed to start MQ: %v", err)
return
}
// 然后生产者发布消息:
err = mq.Publish("queue1", "message for queue1")
if err != nil {
logger.Errorf("Failed to push message to queue1: %v", err)
}
}
扩展
这个简易消息队列目前其实存在一个问题,就是当进程意外退出的时候,队列可能没有被清空,有一定概率丢数据。因为我现在的场景是允许丢数据的,所以暂时没加上这个逻辑,不过其实也不麻烦,最简单的方式只需要在 Close()
的时候把 msgs
直接 dump 到磁盘,启动 Queue 的时候也先从磁盘加载先前的数据,可以参考上一篇文章的原理;当然对可靠性要求更高的也可以用上 WAL
(Write-ahead logging),总之先跑起来,然后根据问题继续迭代