告别 Redis/MySQL:用一百行 Go 代码实现持久化 Set
问题出现
在做词焙词库更新的时候遇到一个问题:如果某一个单词是一个非法的单词,那就需要进行标记,之后再次遇到的时候可以直接跳过。
这个方案要实现的话,可能第一时间会想到用 Redis 的 Set;或者数据库里加一张表,一行一个非法单词。
但是词焙本身是没有用到 Redis 的,如果要用还得配置下内存淘汰策略;这么简单的需求放数据库的话又有点杀鸡用牛刀了。
所以我选择了直接使用内存 + 定期持久化到文件,整个技术方案不难,加起来就一百行左右的代码。
基础功能
虽然整个技术方案不复杂,但我们也拆解需求,逐步完善。
业务侧关心的方法:Add
(新增)和 Exist
(判断是否存在),至于怎么持久的细节是业务侧不太关心的,所以我们先实现最简单的集合:
type PersistentList struct {
records map[string]struct{} // 内存中的数据集合
mu sync.RWMutex // 用于并发控制的读写锁
}
// Add 将 value 添加到缓存中。
func (c *PersistentList) Add(value string) {
c.mu.Lock()
defer c.mu.Unlock()
if _, found := c.records[value]; !found {
c.records[value] = struct{}{}
}
}
// Exist 检查 value 是否存在于缓存中。
func (c *PersistentList) Exist(value string) bool {
c.mu.RLock()
_, found := c.records[value]
c.mu.RUnlock()
return found
}
到这里,最简单的内存集合就实现了,接下来我们增加持久化功能!
持久化
我们可以将集合逐行保存到文本文件,这么做有两个好处:一是进程启动时加载集合的逻辑很简单,二是持久化的时候即使每 Add
一个元素就直接写文件,因为是追加写,性能也还是可以的。
修改结构体定义,新增 pedding
和 file
,以及再增加一个 chan
用于发送退出信号:
type PersistentList struct {
records map[string]struct{} // 内存中的数据集合
pending []string // 待持久化的新数据
mu sync.RWMutex // 用于并发控制的读写锁
file *os.File // 持久化文件句柄
close chan struct{} // 用于关闭后台任务的信号
log contract.Logger
}
我们将读取数据和写入数据这两个逻辑分开实现:
从文件加载数据
如前面所说,加载数据是逐行读取然后放进 records
里就行,我们把这个逻辑放在实例化时进行:
// contract.Logger 的定义见文末
func NewPersistentList(dataFile string, log contract.Logger) (*PersistentList, error) {
file, err := os.OpenFile(dataFile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
return nil, err
}
cache := &PersistentList{
records: make(map[string]struct{}),
file: file,
close: make(chan struct{}),
log: log,
}
scanner := bufio.NewScanner(file)
for scanner.Scan() {
word := scanner.Text()
if word != "" {
cache.records[word] = struct{}{}
}
}
if err := scanner.Err(); err != nil && err != io.EOF {
file.Close()
return nil, err
}
log.Infof("Loaded %d data into persistent cache from %s", len(cache.records), dataFile)
return cache, nil
}
持久化数据到文件
我们可以每 Add
一个数据就追加写文件,但这里做一个小小的过度设计,批量刷盘,具体做法就是每次 Add
时追加到 pending
数组,后台每分钟写一次磁盘:
// 修改 Add 方法:
func (c *PersistentList) Add(value string) {
c.mu.Lock()
defer c.mu.Unlock()
if _, found := c.records[value]; !found {
c.records[value] = struct{}{}
c.pending = append(c.pending, value) // 新增了这行
}
}
// 新增刷盘方法:
func (c *PersistentList) syncToFile() error {
c.mu.Lock()
defer c.mu.Unlock()
if len(c.pending) == 0 {
return nil
}
count := len(c.pending)
writer := bufio.NewWriter(c.file)
for _, word := range c.pending {
if _, err := writer.WriteString(word + "\n"); err != nil {
return err
}
}
if err := writer.Flush(); err != nil {
return err
}
// 清空待刷盘数据,准备下一次写入
c.pending = nil
c.log.Infof("Successfully synced %d new keys to file.", count)
return nil
}
// 新增后台任务方法(给进程启动时以协程异步执行):
func (c *PersistentList) Serve(ctx context.Context) error {
const syncInterval = 1 * time.Minute
ticker := time.NewTicker(syncInterval)
defer ticker.Stop()
defer c.file.Close()
for {
select {
case <-ctx.Done():
c.log.Warn("Context canceled. Performing final sync...")
return c.syncToFile()
case <-ticker.C:
if err := c.syncToFile(); err != nil {
return err
}
case <-c.close:
c.log.Warn("Received close signal. Performing final sync...")
return c.syncToFile()
}
}
}
到这里,一个带有持久化功能的 Set
就已经实现了,可以这样使用:
func main() {
myset, err := NewPersistentList("/path/to/setfile")
if err != nil {
panic(err)
}
ctx, cancel := context.WithCancel(context.TODO())
go myset.Serve(ctx)
myset.Add("asdfgh")
fmt.Println(myset.Exist("asdfgh"))
fmt.Println(myset.Exist("qwerty"))
cancel() // 退出进程时通知刷盘
}
但是上面这个 context.WithCancel
的方法还是略麻烦了,还记得我们刚刚定义的 close
这个 chan
吗,我们可以增加一个 Shutdown
方法:
func (c *PersistentList) Shutdown() {
c.close <- struct{}{}
}
这样就可以直接使用 myset.Shutdown()
来关闭了
定义
contract.Logger
定义如下:
package contract
type Logger interface {
Debug(msg string)
Debugf(msg string, args ...interface{})
Info(msg string)
Infof(msg string, args ...interface{})
Warn(msg string)
Warnf(msg string, args ...interface{})
Error(msg string)
Errorf(msg string, args ...interface{})
Close() error
}