Rocksdb 的 MergeOperator 简单使用记录
本篇仅仅是一个记录 MergeOperator 的使用方式。
Rocksdb 使用MergeOperator 来代替Update 场景中的读改写操作,即用户的一个Update 操作需要调用rocksdb的 Get + Put 接口才能完成。
而这种情况下会引入一些额外的读写放大,对于支持SQL这种update 频繁的场景来说实在是不划算,所以Merge 操作横空出世,用户只需要实现自己的 Merge 操作,通过option 传入,接下来有update 的场景时只需要调用一个Merge
就可以完成了,后续针对当前key的 real update 都会在后台Compaction 以及 用户调研 Get
或者 迭代器操作 时会进行合并。当然,Merge本身也存在问题,就是如果kMergeType得不到及时得compaction 调度,那可能读得负载就重了,因为读需要将之前的未Merge 都进行Merge 才能返回。
因为MergeOperator
虚基类 的函数太多了,会区分 full merge 和 partial merge,但是对于很多用户来说就是一个计数累加或者 string-append 操作,并没有过于复杂的操作,所以rocksdb 提供了 更为通用的虚基类AssociativeMergeOperator
来屏蔽复杂的Full merge 和 partial merge,继承这个类则只需要主体实现一个Merge
和 Name
函数即可。
如下代码使用了Rocksdb 已经封装好的两个 Merge操作,一个是StringAppendOperator
,另一个是UInt64AddOperator
,Merge
本身就是写一个key/value,只不过key的type是kMergeType
,value 也是实际存在的。
StringAppendOperator
StringAppend
的简单测试代码如下,我们使用同一个key进行两次Merge
操作,相当于写入了两条kMergeType
的key到db中,然后调用一次Flush,会生成一个sst文件(进行Merge),再Get会发现这个key的value 按照StringAppend中的行为完成了Merge。
#include <iostream>
#include <vector>#include <rocksdb/db.h>
#include <rocksdb/table.h>
#include <rocksdb/options.h>
#include <rocksdb/merge_operator.h>
#include <rocksdb/filter_policy.h>
#include <rocksdb/perf_context.h>
#include <rocksdb/iostats_context.h>
#include <rocksdb/trace_reader_writer.h>
#include "utilities/merge_operators.h"using namespace rocksdb;using namespace std;
rocksdb::DB* db;
rocksdb::Options option;void OpenDB() {option.create_if_missing = true;option.compression = rocksdb::CompressionType::kNoCompression;rocksdb::BlockBasedTableOptions table_options;table_options.no_block_cache = true;table_options.cache_index_and_filter_blocks = false;option.table_factory.reset(NewBlockBasedTableFactory(table_options));// 默认会用 逗号分隔 不同的mergeoption.merge_operator = MergeOperators::CreateStringAppendOperator();auto s = rocksdb::DB::Open(option, "./db", &db);if (!s.ok()) {cout << "open faled : " << s.ToString() << endl;exit(-1);}cout << "Finish open !"<< endl;
}void DoWrite() {int j = 0;string key = std::to_string(j);std::string value;char buf[8];rocksdb::Status s;EncodeFixed64(buf, 2);s = db->Merge(rocksdb::WriteOptions(),key, "2");s = db->Merge(rocksdb::WriteOptions(),key, "3");db->Flush(rocksdb::FlushOptions());if (!s.ok()) {cout << "Merge value failed: " << s.ToString() << endl;exit(-1);}s = db->Get(rocksdb::ReadOptions(), key, &value);if (!s.ok()) {cout << "Get after only merge is failed " << s.ToString() << endl;exit(-1);}cout << "Get merge value " << value.size() << " " << value << endl;
}int main() {OpenDB();DoWrite();return 0;
}
输出如下:
Finish open !
Finish merge !
Get merge value len: 3 data: 2,3
可以看到Get到的value 已经进行合并了。
UInt64AddOperator
这个是一个自增计数的Merge 案例。
需要主要的是如果自己实现 MergeOperator
底层有编解码,那上层用户侧请求的写入也需要 编码方式写入 以及 按照底层的解码方式读取。
Rocksdb实现的案例代码在拿到用户传入的value的时候会进行编解码:
// A 'model' merge operator with uint64 addition semantics
// Implemented as an AssociativeMergeOperator for simplicity and example.
class UInt64AddOperator : public AssociativeMergeOperator {public:bool Merge(const Slice& /*key*/, const Slice* existing_value,const Slice& value, std::string* new_value,Logger* logger) const override {uint64_t orig_value = 0;if (existing_value){// 解码以存在的value,则我们上层调用Merge 写入的时候需要按照Fixed64进行编码orig_value = DecodeInteger(*existing_value, logger);}uint64_t operand = DecodeInteger(value, logger);assert(new_value);new_value->clear();PutFixed64(new_value, orig_value + operand);return true; // Return true always since corruption will be treated as 0}const char* Name() const override { return "UInt64AddOperator"; }private:// Takes the string and decodes it into a uint64_t// On error, prints a message and returns 0uint64_t DecodeInteger(const Slice& value, Logger* logger) const {uint64_t result = 0;if (value.size() == sizeof(uint64_t)) {result = DecodeFixed64(value.data());} else if (logger != nullptr) {// If value is corrupted, treat it as 0ROCKS_LOG_ERROR(logger, "uint64 value corruption, size: %" ROCKSDB_PRIszt" > %" ROCKSDB_PRIszt,value.size(), sizeof(uint64_t));}return result;}};
案例代码:
#include <iostream>
#include <vector>#include <rocksdb/db.h>
#include <rocksdb/table.h>
#include <rocksdb/options.h>
#include <rocksdb/merge_operator.h>
#include <rocksdb/filter_policy.h>
#include <rocksdb/perf_context.h>
#include <rocksdb/iostats_context.h>
#include <rocksdb/trace_reader_writer.h>
#include "utilities/merge_operators.h"using namespace rocksdb;using namespace std;
rocksdb::DB* db;
rocksdb::Options option;static bool LittleEndian() {int i = 1;return *((char*)(&i));
}inline uint32_t DecodeFixed32(const char* ptr) {if (LittleEndian()) {// Load the raw bytesuint32_t result;memcpy(&result, ptr, sizeof(result)); // gcc optimizes this to a plain loadreturn result;} else {return ((static_cast<uint32_t>(static_cast<unsigned char>(ptr[0])))| (static_cast<uint32_t>(static_cast<unsigned char>(ptr[1])) << 8)| (static_cast<uint32_t>(static_cast<unsigned char>(ptr[2])) << 16)| (static_cast<uint32_t>(static_cast<unsigned char>(ptr[3])) << 24));}
}inline uint64_t DecodeFixed64(const char* ptr) {if (LittleEndian()) {// Load the raw bytesuint64_t result;memcpy(&result, ptr, sizeof(result)); // gcc optimizes this to a plain loadreturn result;} else {uint64_t lo = DecodeFixed32(ptr);uint64_t hi = DecodeFixed32(ptr + 4);return (hi << 32) | lo;}
}inline void EncodeFixed64(char* buf, uint64_t value) {if (LittleEndian()) {memcpy(buf, &value, sizeof(value));} else {buf[0] = value & 0xff;buf[1] = (value >> 8) & 0xff;buf[2] = (value >> 16) & 0xff;buf[3] = (value >> 24) & 0xff;buf[4] = (value >> 32) & 0xff;buf[5] = (value >> 40) & 0xff;buf[6] = (value >> 48) & 0xff;buf[7] = (value >> 56) & 0xff;}
}void OpenDB() {option.create_if_missing = true;option.compression = rocksdb::CompressionType::kNoCompression;rocksdb::BlockBasedTableOptions table_options;table_options.no_block_cache = true;table_options.cache_index_and_filter_blocks = false;option.table_factory.reset(NewBlockBasedTableFactory(table_options));option.merge_operator = MergeOperators::CreateUInt64AddOperator();auto s = rocksdb::DB::Open(option, "./db", &db);if (!s.ok()) {cout << "open faled : " << s.ToString() << endl;exit(-1);}cout << "Finish open !"<< endl;
}void DoWrite() {int j = 0;string key = std::to_string(j);std::string value;char buf[8];rocksdb::Status s;// 因为底层实现的Uint64AddOperator 会进行编码 以及 解码EncodeFixed64(buf, 2);// 对同一个key ,merge 两个2,则最后Get的时候会变成4s = db->Merge(rocksdb::WriteOptions(),key, std::string(buf,8));s = db->Merge(rocksdb::WriteOptions(),key, std::string(buf,8));db->Flush(rocksdb::FlushOptions());if (!s.ok()) {cout << "Merge value failed: " << s.ToString() << endl;exit(-1);}cout << "Finish merge !" << endl;s = db->Get(rocksdb::ReadOptions(), key, &value);if (!s.ok()) {cout << "Get after only merge is failed " << s.ToString() << endl;exit(-1);}cout << "Get merge value " << value.size() << " " << DecodeFixed64(value.data()) << endl;
}int main() {OpenDB();DoWrite();return 0;
}
输出如下:
Finish open !
Finish merge !
Get merge value 8 4
相关文章:

Java项目:考试系统Java基础Gui(java+Gui)
源码获取:博客首页 "资源" 里下载! 功能简介: 所属课程、题目内容、题目选项、题目答案、题目等级、学生管理、试卷管理、题目管理、时间控制 服务页面: public class ServerClient extends javax.swing.JFrame {/** …

软件工程需求设计说明书
Java即时通聊天程序 设计需求说明书 专业班级: 计本班1202班 项目组成员: 杨宗坤 刘瑞 满亚洲 指导教师: 张利峰 开始日期: 完成日期: 编写目的: 本说明书是在充分理解系统需求分析…

Nagios 安装文档
安装前的装备工作(1)解决安装Nagios的依赖关系:Nagios基本组件的运行依赖于httpd、gcc和gd。可以通过以下命令来检查nagios所依赖的rpm包是否已经安装完成:#yum -y install httpd gcc glibc glibc-common *gd* php php-mysql mysql mysql-server --skip-…

Comprehensive Guide to build a Recommendation Engine from scratch (in Python) / 从0开始搭建推荐系统...
https://www.analyticsvidhya.com/blog/2018/06/comprehensive-guide-recommendation-engine-python/, 一篇详细的入门级的推荐系统的文章,这篇文章内容详实,格式漂亮,推荐给大家. 下面是翻译,翻译关注的是意思&#x…

关于std::string 在 并发场景下 __grow_by_and_replace free was not allocated 的异常问题
使用string时发现了一些坑。 我们知道stl 容器并不是线程安全的,所以在使用它们的过程中往往需要一些同步机制来保证并发场景下的同步更新。 应该踩的坑还是一个不拉的踩了进去,所以还是记录一下吧。 string作为一个容器,随着我们的append 或…

Java项目:银行管理系统+文档Java基础Gui(java+Gui)
源码获取:博客首页 "资源" 里下载! 功能介绍: 登录、打印、取款、改密、转账、查询、挂失、存款、退卡 服务模块: public class atmFrame extends JFrame {private JPanel contentPane;private user user; // private…

ie旋转滤镜Matrix
旋转一个元素算是一个比较常见的需求了吧,在支持CSS3的浏览器中可以使用transform很容易地实现,这里有介绍:http://www.css88.com/archives/2168,这里有演示http://www.css88.com/tool/css3Preview/Transform.html,就不…

音频(3):iPod Library Access Programming Guide:Introduction
NextIntroduction介绍iPod库访问(iPod Library Access)让应用程序可以播放用户的歌曲、有声书、和播客。这个API设计使得基本播放变得非常简单,同时也支持高级的搜索和播放控制功能。iPod library access 通过打开iOS允许的音乐相关的广阔范围…

【2019/4/30】周进度报告
冲刺可以推迟了,但这不妨碍知识储备(另外这周看了看梦断代码,感觉还是很有意思的一本书)。 第七周所花时间约9个小时代码量700多行,主要是阅读代码为主(框架内代码)博客量1篇了解到的知识点 1.y…

关于 智能指针 的线程安全问题
先说结论,智能指针都是非线程安全的。 多线程调度智能指针 这里案例使用的是shared_ptr,其他的unique_ptr或者weak_ptr的结果都是类似的,如下多线程调度代码: #include <memory> #include <thread> #include <v…

Java项目:无库版商品管理系统(java+Gui+文档)
源码获取:博客首页 "资源" 里下载! 功能介绍: 添加商品、修改商品、删除商品、进货出货、查看流水、注册 登录业务处理: public class LoginView extends JFrame implements ComponentListener{private JPanel center…

LTE QCI分类 QoS
http://blog.163.com/gzf_lte/blog/static/20840310620130140057204/ http://blog.163.com/gzf_lte/blog/static/208403106201301403652527/ http://blog.sina.com.cn/u/1731932381 lte2010 QCI (QoS Class Identifier)同时应用于GBR和Non-GBR承载。一个QCI是一个值࿰…

CSS 单行溢出文本只显示部分内容
.cc-item div { width:175px; text-overflow:clip; //该属性适用于IE6,IE7 max-width:175px; //该属性适用于IE8,FF,谷歌}

Audio声音
转载于:https://www.cnblogs.com/kubll/p/10799187.html

Rocksdb Ribbon Filter : 结合 XOR-filter 以及 高斯消元算法 实现的 高效filter
文章目录前言XOR-filter 实现原理xor filter 的构造原理xor filter 构造总结XOR-filter 和 ADD-filter对比XOR-filter 在计算上的优化Ribbon filter高斯消元法总结参考前言 还是起源于前几天的Rocksdb meetup,其中Peter C. Dillinger 这位大佬分享了自己为rocksdb实…

Java项目:无库版银行管理系统(java+Gui+文档)
源码获取:博客首页 "资源" 里下载! 功能介绍: 注册用户、编辑用户、删除用户、存取款、查看流水 存入业务处理: public class depositFrame extends JFrame {private JPanel contentPane;private JTextField inputFiel…

iptables-save和iptables-restore
iptables-save用来把当前的规则存入一个文件里以备iptables-restore使用。它的使用很简单,只有两个参数:iptables-save [-c] [-t table]参数-c的作用是保存包和字节计数器的值。这可以使我们在重启防火墙后不丢失对包和字节的统计。带-c参数的iptables-s…

代码之美——Doom3源代码赏析2
http://www.csdn.net/article/2013-01-17/2813778-the-beauty-of-doom3-source-code/2 摘要:Dyad作者、资深C工程师Shawn McGrathz在空闲时翻看了Doom3的源代码,发出了这样的惊叹:“这是我见过的最整洁、最优美的代码!”“Doom 3的…

什么是JavaBean
按着Sun公司的定义,JavaBean是一个可重复使用的软件组件。实际上JavaBean是一种Java类,通过封装属性和方法成为具有某种功能或者处理某个业务的对象,简称bean。由于javabean是基于java语言的,因此javabean不依赖平台,具…

关于 linux io_uring 性能测试 及其 实现原理的一些探索
文章目录先看看性能AIO 的基本实现io_ring 使用io_uring 基本接口liburing 的使用io_uring 非poll 模式下 的实现io_uring poll模式下的实现io_uring 在 rocksdb 中的应用总结参考先看看性能 io_uring 需要内核版本在5.1 及以上才支持,liburing的编译安装 很简单&am…

添加引用方式抛出和捕获干净的WebService异常
转载:http://www.cnblogs.com/ahdung/p/3953431.html 说明:【干净】指的是客户端在捕获WebService(下称WS)抛出的异常时,得到的ex.Message就是WS方法中抛出的异常消息,不含任何“杂质”。 前提:…

Java项目:车租赁管理系统(java+Gui+文档)
源码获取:博客首页 "资源" 里下载! 功能介绍: 登陆界面、管理员界面、用户界面、汽车租赁文档 系统主页: SuppressWarnings("serial") public class SystemMainView extends JFrame implements ActionListe…

TFS中的测试计划(十)
现在有一个测试用例,用来测试登录,并且有两组测试数据。打开团队项目的web门户的测试。新建一个测试计划。命名为测试计划1添加完测试计划后,就可以向这个计划里添加测试用例了,选择登录测试。运行测试,就会生成下图左…

跟着Rocskdb 学 存储引擎:读写链路的代码极致优化
文章目录1. 读链路1.1 FileIndexer1.1.1 LevelDB sst查找实现1.1.2 Rocksdb FileIndexer实现1.2 PinnableSlice 减少内存拷贝1.3 Cache1.3.1 LRU Cache1.3.2 Clock Cache1.4 ThreadLocalPtr 线程私有存储1.4.1 version系统1.4.2 C thread_local vs ThreadLocalPtr1.4.3 ThreadL…

Java项目:人力管理系统(java+Gui+文档)
源码获取:博客首页 "资源" 里下载! 功能介绍: 角色员工、管理员,员工信息表,查询、更新,修改,移除、添加 用户管理控制层: /*** author yy*/Controller RequestMapping(…

senfile函数实例的运行过程截图
//要传输的文件内容如下所示: 启动服务器,等待客户端连接(在同一台主机上模拟的) 客户端远程登录,这里是在本地登录 这个要注意一点就是远程登陆的时候一定要带上端口号不然连接失败!!转载于:ht…

马年计划2014-2-21
新的一年到来了! 刚刚过去的一年里,我已浪费很多时光! 新年新气象,为避免重蹈覆辙,此时我必须要立个新年计划,马年计划! (1)一天必须做两道ACM题。 (2&#…

java jsp页面如何添加C标签
在https://mvnrepository.com/找两个jar包分别是: <dependency> <groupId>javax.servlet.jsp.jstl</groupId> <artifactId>jstl-api</artifactId> <version>1.2</version> </dependency> <dependency> <g…

如何用 ndctl/ipmctl 管理工具 配置不同访问模式的pmem设备
文章目录1 PMEM 底层架构2 PMEM 逻辑架构3 ipmctl 创建 不同模式的 region3.1 安装3.2 创建AppDirect mode的region3.3 创建 Memory Mode模式3.4 创建 混合模式3.5 查看创建的结果4 ndctl 创建不同类型的 namespaces4.1 安装4.2 创建/删除 一个任意类型的namespace4.3 指定类型…

[PHP]php基础练习题学习随笔
1、解释一下PHP中常量、变量、可变变量并举例说明;超级全局变量有哪些? 常量是单个值的标识符(名称),通过define()设置,在脚本中无法改变该值,常量自动全局。<?php #对大小写不敏感为true&a…