问题出现

在做词焙词库更新的时候遇到一个问题:如果某一个单词是一个非法的单词,那就需要进行标记,之后再次遇到的时候可以直接跳过。

这个方案要实现的话,可能第一时间会想到用 Redis 的 Set;或者数据库里加一张表,一行一个非法单词。

但是词焙本身是没有用到 Redis 的,如果要用还得配置下内存淘汰策略;这么简单的需求放数据库的话又有点杀鸡用牛刀了。

所以我选择了直接使用内存 + 定期持久化到文件,整个技术方案不难,加起来就一百行左右的代码。

基础功能

虽然整个技术方案不复杂,但我们也拆解需求,逐步完善。

业务侧关心的方法:Add(新增)和 Exist(判断是否存在),至于怎么持久的细节是业务侧不太关心的,所以我们先实现最简单的集合:

type PersistentList struct {
    records map[string]struct{} // 内存中的数据集合

    mu sync.RWMutex  // 用于并发控制的读写锁
}

// Add 将 value 添加到缓存中。
func (c *PersistentList) Add(value string) {
    c.mu.Lock()
    defer c.mu.Unlock()

    if _, found := c.records[value]; !found {
        c.records[value] = struct{}{}
    }
}

// Exist 检查 value 是否存在于缓存中。
func (c *PersistentList) Exist(value string) bool {
    c.mu.RLock()
    _, found := c.records[value]
    c.mu.RUnlock()
    return found
}

到这里,最简单的内存集合就实现了,接下来我们增加持久化功能!

持久化

我们可以将集合逐行保存到文本文件,这么做有两个好处:一是进程启动时加载集合的逻辑很简单,二是持久化的时候即使每 Add 一个元素就直接写文件,因为是追加写,性能也还是可以的。

修改结构体定义,新增 peddingfile,以及再增加一个 chan 用于发送退出信号:

type PersistentList struct {
    records map[string]struct{} // 内存中的数据集合
    pending []string            // 待持久化的新数据

    mu    sync.RWMutex  // 用于并发控制的读写锁
    file  *os.File      // 持久化文件句柄
    close chan struct{} // 用于关闭后台任务的信号

    log contract.Logger
}

我们将读取数据和写入数据这两个逻辑分开实现:

从文件加载数据

如前面所说,加载数据是逐行读取然后放进 records 里就行,我们把这个逻辑放在实例化时进行:

// contract.Logger 的定义见文末
func NewPersistentList(dataFile string, log contract.Logger) (*PersistentList, error) {
    file, err := os.OpenFile(dataFile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644)
    if err != nil {
        return nil, err
    }

    cache := &PersistentList{
        records: make(map[string]struct{}),
        file:    file,
        close:   make(chan struct{}),
        log:     log,
    }

    scanner := bufio.NewScanner(file)
    for scanner.Scan() {
        word := scanner.Text()
        if word != "" {
            cache.records[word] = struct{}{}
        }
    }

    if err := scanner.Err(); err != nil && err != io.EOF {
        file.Close()
        return nil, err
    }

    log.Infof("Loaded %d data into persistent cache from %s", len(cache.records), dataFile)
    return cache, nil
}

持久化数据到文件

我们可以每 Add 一个数据就追加写文件,但这里做一个小小的过度设计,批量刷盘,具体做法就是每次 Add 时追加到 pending 数组,后台每分钟写一次磁盘:

// 修改 Add 方法:
func (c *PersistentList) Add(value string) {
    c.mu.Lock()
    defer c.mu.Unlock()

    if _, found := c.records[value]; !found {
        c.records[value] = struct{}{}
        c.pending = append(c.pending, value) // 新增了这行
    }
}

// 新增刷盘方法:
func (c *PersistentList) syncToFile() error {
    c.mu.Lock()
    defer c.mu.Unlock()

    if len(c.pending) == 0 {
        return nil
    }

    count := len(c.pending)
    writer := bufio.NewWriter(c.file)
    for _, word := range c.pending {
        if _, err := writer.WriteString(word + "\n"); err != nil {
            return err
        }
    }

    if err := writer.Flush(); err != nil {
        return err
    }

    // 清空待刷盘数据,准备下一次写入
    c.pending = nil

    c.log.Infof("Successfully synced %d new keys to file.", count)
    return nil
}

// 新增后台任务方法(给进程启动时以协程异步执行):
func (c *PersistentList) Serve(ctx context.Context) error {
    const syncInterval = 1 * time.Minute
    ticker := time.NewTicker(syncInterval)
    defer ticker.Stop()
    defer c.file.Close()

    for {
        select {
        case <-ctx.Done():
            c.log.Warn("Context canceled. Performing final sync...")
            return c.syncToFile()
        case <-ticker.C:
            if err := c.syncToFile(); err != nil {
                return err
            }
        case <-c.close:
            c.log.Warn("Received close signal. Performing final sync...")
            return c.syncToFile()
        }
    }
}

到这里,一个带有持久化功能的 Set 就已经实现了,可以这样使用:

func main() {
    myset, err := NewPersistentList("/path/to/setfile")
    if err != nil {
        panic(err)
    }

    ctx, cancel := context.WithCancel(context.TODO())
    go myset.Serve(ctx)

    myset.Add("asdfgh")
    fmt.Println(myset.Exist("asdfgh"))
    fmt.Println(myset.Exist("qwerty"))

    cancel() // 退出进程时通知刷盘
}

但是上面这个 context.WithCancel 的方法还是略麻烦了,还记得我们刚刚定义的 close 这个 chan 吗,我们可以增加一个 Shutdown 方法:

func (c *PersistentList) Shutdown() {
    c.close <- struct{}{}
}

这样就可以直接使用 myset.Shutdown() 来关闭了

定义

contract.Logger 定义如下:

package contract

type Logger interface {
    Debug(msg string)
    Debugf(msg string, args ...interface{})

    Info(msg string)
    Infof(msg string, args ...interface{})

    Warn(msg string)
    Warnf(msg string, args ...interface{})

    Error(msg string)
    Errorf(msg string, args ...interface{})

    Close() error
}

标签: 词焙

添加新评论