pika hash 表 知识总结

本文主要对 pika 中 Hash 数据结构的使用做一个小结。

Pika 是 360 开源的一个非关系型数据库,可以兼容 Redis 系统的大部分命令。支持主从同步。主要区别是 Pika 支持的数据量不受内存的限制,仅和硬盘大小有关。底层使用了 RocksDB 做 KV 数据的存储。

本文主要对Pika 的 Hash 数据结构做个小结。

命令支持

接口状态
HDEL支持
HEXISTS支持
HGET支持
HGETALL支持
HINCRBY支持
HINCRBYFLOAT支持
HKEYS支持
HLEN支持
HMGET支持
HMSET支持
HSET暂不支持单条命令设置多个field value,如有需求请用HMSET
HSETNX支持
HVALS支持
HSCAN支持
HSTRLEN支持

存储引擎

由于 Pika 数据最终会进入RocksDB,而RocksDB仅支持K-V数据结构, 因此 需要把两层结构的 Hash 数据转换为一层的KV存储结构。例如,执行如下的命令:

1
HSET key field value

Pika首先将创建 hash 的 meta k-v 值,用来保存 hash 结构的元数据, 其数据格式如下:

为了保存 field 和 value 值,将会再创建一个k-v,格式如下:

后创建的k-v 存储了field 和 value。

命令操作

为了更好的了解hash 的操作,下面对几类命令逐个学习:

创建/更新操作

例如 Hset 操作:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
Status RedisHashes::HSet(const Slice& key, const Slice& field,
                         const Slice& value, int32_t* res) {
  rocksdb::WriteBatch batch;
  // 此操作需要加锁
  // 函数结束,锁解除
  ScopeRecordLock l(lock_mgr_, key);

  int32_t version = 0;
  uint32_t statistic = 0;
  std::string meta_value;
  // 获取meta 数据
  Status s = db_->Get(default_read_options_, handles_[0], key, &meta_value);
  if (s.ok()) {
    ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);

    if (parsed_hashes_meta_value.IsStale()
      || parsed_hashes_meta_value.count() == 0) {
      // 如果meta 存在,但是没有用到
      // 则直接更新meta & field & value
      version = parsed_hashes_meta_value.InitialMetaValue();
      parsed_hashes_meta_value.set_count(1);
      batch.Put(handles_[0], key, meta_value);
      HashesDataKey data_key(key, version, field);
      batch.Put(handles_[1], data_key.Encode(), value);
      *res = 1;
    } else {
      // 如果存在,且时间未过期, 版本正确
      version = parsed_hashes_meta_value.version();
      std::string data_value;
      HashesDataKey hashes_data_key(key, version, field);

      // 获取field 数据
      s = db_->Get(default_read_options_,
          handles_[1], hashes_data_key.Encode(), &data_value);
      if (s.ok()) {
        // 如果当前存的field 数据正确
        *res = 0;
        if (data_value == value.ToString()) { // 值也相等,则不操作
          return Status::OK();
        } else {
          // 修改kv
          batch.Put(handles_[1], hashes_data_key.Encode(), value);
          statistic++;
        }
      } else if (s.IsNotFound()) {
          // 如果没有存在kv, 则添加,并更新meta
        parsed_hashes_meta_value.ModifyCount(1);
        batch.Put(handles_[0], key, meta_value);
        batch.Put(handles_[1], hashes_data_key.Encode(), value);
        *res = 1;
      } else {
        // 获取失败
        return s;
      }
    }
  } else if (s.IsNotFound()) {
    // 若meta 未找到, 编码,写入
    char str[4];
    EncodeFixed32(str, 1);
    HashesMetaValue meta_value(std::string(str, sizeof(int32_t)));
    version = meta_value.UpdateVersion();
    batch.Put(handles_[0], key, meta_value.Encode());
    HashesDataKey data_key(key, version, field);
    batch.Put(handles_[1], data_key.Encode(), value);
    *res = 1;
  } else {
    return s;
  }

  // 最后批量写
  s = db_->Write(default_write_options_, &batch);
  // 更新总的统计信息
  UpdateSpecificKeyStatistics(key.ToString(), statistic);
  return s;
}

可以看出,在做 HSet 操作时,会对 metadata 和 field value 同时操作,并需要同时更新,而且由于Pika是多线程服务,需要加锁操作。在频繁访问同一个hash中的数据时,其锁粒度是一个hash的key,可能会有大量的锁冲突出现

读操作

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Status RedisHashes::HGet(const Slice& key, const Slice& field,
                         std::string* value) {
  std::string meta_value;
  int32_t version = 0;
  rocksdb::ReadOptions read_options;
  const rocksdb::Snapshot* snapshot;
  ScopeSnapshot ss(db_, &snapshot);
  read_options.snapshot = snapshot;

  // 获取meta 数据
  Status s = db_->Get(read_options, handles_[0], key, &meta_value);
  if (s.ok()) {
    ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
    if (parsed_hashes_meta_value.IsStale()) {
    // 如果存在meta,且生效
      return Status::NotFound("Stale");
    } else if (parsed_hashes_meta_value.count() == 0) {
      return Status::NotFound();
    } else {
      // 获取key 值
      version = parsed_hashes_meta_value.version();
      HashesDataKey data_key(key, version, field);
      s = db_->Get(read_options, handles_[1], data_key.Encode(), value);
    }
  }
  return s;
}

读操作比较简单,无需加锁。先获取metadata值,再通过metadata 计算出 field 存储key,返回结果即可。

删除操作

删除操作有两种,一种是删除整个hash 表(DEL),一种是删除一个field(HDEL)。首先看下删除整个hash 表的操作。

DEL

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Status RedisHashes::Del(const Slice& key) {
  std::string meta_value;
  ScopeRecordLock l(lock_mgr_, key); // 删除操作需要加锁
  Status s = db_->Get(default_read_options_, handles_[0], key, &meta_value);
  // 获取 metadata 值
  if (s.ok()) {
    ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);

    if (parsed_hashes_meta_value.IsStale()) {
      // 如果失效了
      return Status::NotFound("Stale");
    } else if (parsed_hashes_meta_value.count() == 0) {
      // 无值
      return Status::NotFound();
    } else {
      // 更新统计值,更新meta_value 即可。 
      uint32_t statistic = parsed_hashes_meta_value.count();
      parsed_hashes_meta_value.InitialMetaValue();
      s = db_->Put(default_write_options_, handles_[0], key, meta_value);
      UpdateSpecificKeyStatistics(key.ToString(), statistic);
    }
  }
  return s;
}

这里有个需要注意的地方,hash 删表,并不是删除所有数据,只是把meta_value 值更新即可。(修改count值,时间戳,以及version值) 由于field的key 是通过metadata 中的版本值计算出来的,由于meta_value 版本更新,所有 field value 均失效。 这个是pika 的一个特性,叫秒删功能。顾名思义,可以做到快速删除hash值,由于其删除hash 只重置了meta值,而hash数据结构已经存在的kv在进行compact时进行

HDEL

下面是删除一个field 的方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
// 从参数可以看出 HDEL 是支持同时删除多个field 的
Status RedisHashes::HDel(const Slice& key,
                         const std::vector<std::string>& fields,
                         int32_t* ret) {
  uint32_t statistic = 0;
  std::vector<std::string> filtered_fields;
  std::unordered_set<std::string> field_set;

  // field 去重
  for (auto iter = fields.begin(); iter != fields.end(); ++iter) {
    std::string field = *iter;
    if (field_set.find(field) == field_set.end()) {
      field_set.insert(field);
      filtered_fields.push_back(*iter);
    }
  }

  rocksdb::WriteBatch batch;
  rocksdb::ReadOptions read_options;
  const rocksdb::Snapshot* snapshot;

  std::string meta_value;
  int32_t del_cnt = 0;
  int32_t version = 0;

  // 加锁
  ScopeRecordLock l(lock_mgr_, key);
  ScopeSnapshot ss(db_, &snapshot);
  read_options.snapshot = snapshot;
  Status s = db_->Get(read_options, handles_[0], key, &meta_value);
  if (s.ok()) {
    ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
    if (parsed_hashes_meta_value.IsStale()
      || parsed_hashes_meta_value.count() == 0) {
      *ret = 0;
      return Status::OK();
    } else {
      std::string data_value;
      version = parsed_hashes_meta_value.version();
      // 遍历所有数据,并删除
      for (const auto& field : filtered_fields) {
        HashesDataKey hashes_data_key(key, version, field);
        s = db_->Get(read_options, handles_[1],
                hashes_data_key.Encode(), &data_value);
        if (s.ok()) {
          del_cnt++;
          statistic++;
          batch.Delete(handles_[1], hashes_data_key.Encode());
        } else if (s.IsNotFound()) {
          continue;
        } else {
          return s;
        }
      }
      *ret = del_cnt;
      parsed_hashes_meta_value.ModifyCount(-del_cnt);
      batch.Put(handles_[0], key, meta_value);
    }
  } else if (s.IsNotFound()) {
    *ret = 0;
    return Status::OK();
  } else {
    return s;
  }
  s = db_->Write(default_write_options_, &batch);
  UpdateSpecificKeyStatistics(key.ToString(), statistic);
  return s;
}

HDEL 操作支持批量操作。

数据的清理

上面提到了,Pika 对于 hash 做了秒删的功能,那秒删之后field中的数据,如何做清理工作呢? 经过研究,发现其实pika没有做主动删除的逻辑,只是通过RocksDB 在做compaction(数据压缩)时调用filter 来实现的。 RocksDB 的compaction, 主要是为了压缩内存和硬盘的使用空间,提升查找速度(LSM 树便是不断的把树结构做merge,做内存落盘和数据压缩)。在copact操作时,提供了可定制的filter 接口。在pika 中,就是通过实现该接口来做秒删功能的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
// 针对存储数据的k-v 结构的过滤 (还有一种meta 数据过滤的方法)
class BaseDataFilter : public rocksdb::CompactionFilter {
 public:
  BaseDataFilter(rocksdb::DB* db,
                 std::vector<rocksdb::ColumnFamilyHandle*>* cf_handles_ptr) :
    db_(db),
    cf_handles_ptr_(cf_handles_ptr),
    cur_key_(""),
    meta_not_found_(false),
    cur_meta_version_(0),
    cur_meta_timestamp_(0) {}

  bool Filter(int level, const Slice& key,
              const rocksdb::Slice& value,
              std::string* new_value, bool* value_changed) const override {
    ParsedBaseDataKey parsed_base_data_key(key);
    Trace("==========================START==========================");
    Trace("[DataFilter], key: %s, data = %s, version = %d",
          parsed_base_data_key.key().ToString().c_str(),
          parsed_base_data_key.data().ToString().c_str(),
          parsed_base_data_key.version());

    // 如果是复杂数据结构的key, 两个值不相等,需要取meta中的版本和时间戳
    if (parsed_base_data_key.key().ToString() != cur_key_) {
      cur_key_ = parsed_base_data_key.key().ToString();
      std::string meta_value;
      // destroyed when close the database, Reserve Current key value
      if (cf_handles_ptr_->size() == 0) {
        return false;
      }
      // 基于datakey,算出metakey
      // 查看meta 的状态
      Status s = db_->Get(default_read_options_,
              (*cf_handles_ptr_)[0], cur_key_, &meta_value);
      if (s.ok()) {
        meta_not_found_ = false;
        ParsedBaseMetaValue parsed_base_meta_value(&meta_value);
        cur_meta_version_ = parsed_base_meta_value.version();
        cur_meta_timestamp_ = parsed_base_meta_value.timestamp();
      } else if (s.IsNotFound()) {
        meta_not_found_ = true;
      } else {
        cur_key_ = "";
        Trace("Reserve[Get meta_key faild]");
        return false;
      }
    }

    if (meta_not_found_) {
      Trace("Drop[Meta key not exist]");
      return true;
    }

    //判断版本和过期时间
    int64_t unix_time;
    rocksdb::Env::Default()->GetCurrentTime(&unix_time);
    if (cur_meta_timestamp_ != 0
      && cur_meta_timestamp_ < static_cast<int32_t>(unix_time)) {
      Trace("Drop[Timeout]");
      return true;
    }

    if (cur_meta_version_ > parsed_base_data_key.version()) {
      Trace("Drop[data_key_version < cur_meta_version]");
      return true;
    } else {
      Trace("Reserve[data_key_version == cur_meta_version]");
      return false;
    }
  }

  const char* Name() const override { return "BaseDataFilter"; }

 private:
  rocksdb::DB* db_;
  std::vector<rocksdb::ColumnFamilyHandle*>* cf_handles_ptr_;
  rocksdb::ReadOptions default_read_options_;
  mutable std::string cur_key_;
  mutable bool meta_not_found_;
  mutable int32_t cur_meta_version_;
  mutable int32_t cur_meta_timestamp_;
};

从上述代码中可以看出,其实在pika中,对于大批量的数据(比如list,hash,set 等数据结构)均是具有秒删功能的,使用秒删功能比直接删除一方面可以节省执行时间,另一方面可以减少内存碎片,算是以空间换时间的典型例子了。

数据扫描

hash 结构除了需要做kv操作外,还有类似扫描key 的操作(HKEYS, HVALS, HGETALL, HSCAN)。例如 HKEYS 命令会返回所有hash 结构中的 fields. 在 pika 中是如何解决该问题的? 问题的解决需要我们从pika 依赖的RocksDB 中找到答案。

由于 RocksDB 是 基于 LSM 树实现的存储引擎,其 KEY 是有序的,因此,可以通过 RocksDB 的区间查询操作做数据查询。 对于 HKEYS, HVALS, HGETALL 操作,会扫描 HASH 中的所有值,因此其扫描的数据,是 (keySize + key + version) 为前缀做索引前缀; 对于 HSCAN 则在 (keySize + key + version) 的基础上,增加 HSCAN 提供的前缀信息做前缀搜索即可。

学习小结

  • pika 中,可以存在相同的key 不同存储类型的数据。
  • hash 存储值不超过 2^32 , 由于 hash size 存储在 4bytes 的空间中。
  • 从Hset中,可以看到,在设计数据结构时,尽量减小 hash 中key 值的数量,减少锁meta的时间。
  • pika hash 结构具有秒删功能,对于大批量数据的hash 结果,删除操作和正常命令一样会快速执行。(这个和redis有一定区别)
  • pika 中异步删除策略是依赖于RocksDB 的compaction 提供的filter 接口实现的。
  • 令人惊喜的是,也有go版本LSM树的实现。(moss)
  • 除了pika外,最近比较火的TiDB的底层存储也是使用的 RocksDB 实现的。

备注:本文源码来自 github.com/Qihoo360/blackwidow 中。

0%