概述
本文介绍一种思路,用以利用leveldb 来构建一个带索引的查询kv数据库
场景
每一条数据,有多个标签可以索引到它,每个标签是一个key->value 对
比如,名片存储:每个人有Id ,也有姓名,Id 是唯一的,姓名则不是,通过姓名,可以索引到,所有这个名字的人
实现
实现两个函数,一个Set 设置数据,以及一个Query 查询数据函数
原理
每写入一个数据,则会写入多个kv 对,tag 则写入为 tag+Id ,以tag 作为前缀。查询的时候,通过迭代所有相同前缀的数据,即可查询出所有相关数据。
同理,组合索引,也可以利用类似的方式进行,比如tag+tag1+Id作为key 写入,注意,tag 和tag1 的顺序需要按照一定规则固定,即写入和查询需要一致(如本实例,可以将tagKey 做字典序排序)
数据
type Data struct {
Id string
Tags map[string]string
}
这里定义一个数据,有一个Id(唯一),可以理解为数据库里面的primary field,以及若干个tags
写入函数实现
func (d *MyDb) Set(data *Data) error {
d.mu.Lock()
defer d.mu.Unlock()
batch := new(leveldb.Batch)
//查询Id是否已经存在,如果存在,则删除原来的tags
value, err := d.db.Get([]byte(data.Id), nil)
if err != nil {
if err != leveldb.ErrNotFound {
return err
}
data := Data{}
err := json.Unmarshal(value, &data)
if err == nil {
for k, v := range data.Tags {
batch.Delete([]byte(k + "=" + v + "," + data.Id))
}
}
}
bytes, err := json.Marshal(data)
if err != nil {
return err
}
//写入id 以及tags,tags 必须为前缀,以便后续查询
batch.Put([]byte(data.Id), bytes)
for k, v := range data.Tags {
batch.Put([]byte(k+"="+v+","+data.Id), []byte(data.Id))
}
return d.db.Write(batch, nil)
}
查询函数实现
func (d *MyDb) Query(tagKey string, tagValue string) map[string]*Data {
d.mu.Lock()
defer d.mu.Unlock()
res := map[string]*Data{}
iter := d.db.NewIterator(nil, nil)
prefix := []byte(tagKey + "=" + tagValue + ",id=")
for ok := iter.Seek(prefix); ok; iter.Next() {
key := iter.Key()
if bytes.Index(key, prefix) != 0 {
break
}
id := iter.Value()
value, err := d.db.Get([]byte(id), nil)
if err != nil {
continue
}
data := Data{}
err = json.Unmarshal(value, &data)
if err != nil {
//TODO delete key
continue
}
if data.Tags[tagKey] != tagValue {
//TODO delete key
continue
}
res[string(id)] = &data
}
return res
}
查询是通过一对 tag 来查询的。返回拥有所有相同tag 的数据
使用实例
db := Open("test")
db.Set(&Data{
Id: "10001",
Tags: map[string]string{
"name": "join",
},
})
db.Set(&Data{
Id: "10002",
Tags: map[string]string{
"name": "humboldt",
},
})
db.Set(&Data{
Id: "10003",
Tags: map[string]string{
"name": "humboldt",
},
})
res := db.Query("name", "humboldt")
fmt.Printf("%#v\n", res["10002"])
fmt.Printf("%#v\n", res["10003"])