当前位置: 首页 > 编程日记 > 正文

Rocksdb 的优秀代码(三)-- 工业级 线程池实现分享

文章目录

    • 前言
    • 1. Rocksdb线程池概览
    • 2. Rocksdb 线程池实现
      • 2.1 基本数据结构
      • 2.2 线程池创建
      • 2.3 线程池 调度线程执行
      • 2.4 线程池销毁线程
      • 2.5 线程池优先级调度
      • 2.6 动态调整线程池 线程数目上限
    • 3. 总结

前言

Rocksdb 作为一个第三方库的形态嵌入到各个存储系统之中存储元数据,当rocksdb被使用的时候其内部会自启动一些线程,随着需要处理的用户数据越来越多,为了保证性能,rocksdb会让这一些线程也会不断增加。而在分布式存储场景,往往一个机器节点会有很多rocksdb实例(64个实例,每一个实例都会有compaction/flush线程),这个时候在Rocksdb内部使用合理的线程管理方式会节省系统CPU调度资源。

所以Rocksdb自实现的Thread Pool就是为了更好得管理Rocksdb内部线程,除了一些基本的线程调度之外,还会有可控制的线程优先级的调度,因为大多数场景Rocksdb让Flush线程的优先级高于Compaction线程,而有的场景则需要Compaction的优先级高于Flush,为了更快速的compaction清理掉旧数据。

接下来简单看一下Rocksdb 线程池的基本实现,本人已经将该线程池代码摘出来单独维护,可作为一个独立线程池去调度。

https://github.com/BaronStack/ThreadPool

线程池存在的目的 正如上面Rocksdb使用线程池的目的一样, 能够更加方便得管理我们应用中的线程,包括但不限于:线程创建,线程资源约束,线程优先级调度,线程销毁 等。

1. Rocksdb线程池概览

Rocksdb 实现的线程池支持的特性:

  • 创建/销毁线程
  • 动态增加、减少线程池线程数目上限(线程池数目需要设置上限,因为Compaction/Flush占用的资源也不能无限增加,需根据实际的Rocksdb 写入量来动态增加)
  • 支持动态调整 线程CPU 和 I/O优先级(为了暴露足够的接口给用户,来让用户选择两个功能调度的优先顺序)

2. Rocksdb 线程池实现

2.1 基本数据结构

// 线程池核心的数据结构
struct Impl {private:bool low_io_priority_;  // I/O 优先级bool low_cpu_priority_; // CPU 优先级Env::Priority priority_; // 线程优先级Env*         env_;       // 获取当前线程池的环境变量int total_threads_limit_; // 线程池线程总数std::atomic_uint queue_len_;  // 当前线程池中执行线程的排队长度bool exit_all_threads_; // 清理线程池时会调度所有未执行的线程bool wait_for_jobs_to_complete_; // 等待所有线程池的线程执行完毕// Entry per Schedule()/Submit() callstruct BGItem {void* tag = nullptr;std::function<void()> function; // 执行函数std::function<void()> unschedFunction; // 不执行函数};using BGQueue = std::deque<BGItem>;BGQueue       queue_; // deque 保存线程池中调度的线程相关的信息:线程函数、函数参数std::mutex               mu_;std::condition_variable  bgsignal_; // 条件变量,唤醒正在睡眠的线程std::vector<port::Thread> bgthreads_; // 保存需要调度的线程
}

线程池类:

class ThreadPoolImpl : public ThreadPool {private:std::unique_ptr<Impl>   impl_;// 线程池核心数据结构
};

2.2 线程池创建

Rocksdb维护了一个Env 类,这个类再同一个进程中的多个rocksdb实例之间是能够共享的。所以Rocksdb将这个类作为线程池的入口,从而让Flush/Compaction 这样的线程调度过程中,多个db可以只使用同一个线程池。

Rocksdb实现了多个环境变量:HdfsEnvPosixEnv等,方便Rocksdb的文件操作/线程操作 接口在不同的环境平台下进行扩展,当然如果用户变更了新的平台,只需要支持Env基类的接口,就能扩展到用户的新平台。

Env默认实例是PosixEnv,为了保证多db实例间共享同一个环境变量,PosixEnv仅维护一个单例。

// 创建Env,初始化几个类的单例
// 这里注意调用的顺序,先调用ThreadLocalPtr实例的初始化,再调用PosixEnv的
// 这样在Env析构的时候能够反方向析构,从而保证ThreadLocal的信息最后一个被清理
Env* Env::Default() {ThreadLocalPtr::InitSingletons(); // Threadlocal 实例数据,用来访问当前db实例运行的线程状态信息CompressionContextCache::InitSingleton();INIT_SYNC_POINT_SINGLETONS();static PosixEnv default_env; // 创建posix env	return &default_env;
}

紧接着通过 PosixEnv的构造函数创建线程池

// 根据Env设置的线程优先级,为每一个优先级创建一个线程池(方便优先级线程池的调度)
// 创建多个线程池: enum Priority { BOTTOM, LOW, HIGH, USER, TOTAL };
std::vector<ThreadPoolImpl> thread_pools_;PosixEnv::PosixEnv(): checkedDiskForMmap_(false),forceMmapOff_(false),page_size_(getpagesize()),thread_pools_(Priority::TOTAL),allow_non_owner_access_(true) {ThreadPoolImpl::PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr));// 根据优先级创建线程池,默认创建四个线程池,但一般只会用到两个(LOW,HIGH)for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) {thread_pools_[pool_id].SetThreadPriority(static_cast<Env::Priority>(pool_id));// This allows later initializing the thread-local-env of each thread.thread_pools_[pool_id].SetHostEnv(this);}thread_status_updater_ = CreateThreadStatusUpdater();
}

2.3 线程池 调度线程执行

线程池调度栈如下:从入口到具体的线程函数的执行

Env::Schedule() // Env对外接口PosixEnv::Schedule()ThreadPoolImpl::Schedule() // 线程池的调度入口ThreadPoolImpl::Impl::Submit() // 将线程函数、参数、线程回收函数封装,添加到待调度队列queue_ThreadPoolImpl::Impl::StartBGThreads() ThreadPoolImpl::Impl::BGThreadWrapper() // 更新当前执行的线程状态并启动一个调度队列中的线程ThreadPoolImpl::Impl::BGThread()// 从待调度队列queue_中调度线程func() // 执行线程函数

Env的实例调用Schedule接口,接收待调度的线程执行函数,参数,所属优先级线程池,以及线程销毁函数及其参数。

  virtual void Schedule(void (*function)(void* arg), void* arg,Priority pri = LOW, void* tag = nullptr,void (*unschedFunction)(void* arg) = nullptr) = 0;

后续会执行到ThreadPoolImpl::Impl::Submit()

void ThreadPoolImpl::Impl::Submit(std::function<void()>&& schedule,std::function<void()>&& unschedule, void* tag) {// 后续需要更新当前线程池的线程调度队列,需要保证更新过程的原子性std::lock_guard<std::mutex> lock(mu_);// 需要销毁线程池了,不接受新的线程加入if (exit_all_threads_) {return;}// 启动线程StartBGThreads();// 更新线程函数相关的信息 到线程调度队列尾部(双端队列)queue_.push_back(BGItem());// 更新auto& item = queue_.back();item.tag = tag;item.function = std::move(schedule);item.unschedFunction = std::move(unschedule);queue_len_.store(static_cast<unsigned int>(queue_.size()),std::memory_order_relaxed);// 如果正在执行的线程没有超过线程池线程数限制,则唤醒一个正在休眠的线程if (!HasExcessiveThread()) {// Wake up at least one waiting thread.bgsignal_.notify_one();} else { // 。。。这个逻辑不太懂,超过限制之后 不应该就不唤醒了吗?// Need to wake up all threads to make sure the one woken// up is not the one to terminate.WakeUpAllThreads();}
}

后续的执行就是按照以上调用栈进行的,从线程调度队列头部取线程函执行。

2.4 线程池销毁线程

线程池的销毁也就是Env变量的析构函数,db被destory或者close,则会进入该逻辑,Env的默认环境变量是PosixEnv,即Env的子类。则会先调用PosixEnv 的析构函数,其中线程池相关的清理逻辑:

整体的调用栈如下:

~PosixEnv()ThreadPoolImpl::JoinAllThreads() ThreadPoolImpl::Impl::JoinThreads()

在析构函数中调用相关的线程清理工作:

~PosixEnv() override {// 通过Posix startthread 的接口调度的线程函数并发执行完毕for (const auto tid : threads_to_join_) {pthread_join(tid, nullptr);}// 让不同优先级线程池中待执行线程执行完for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) {thread_pools_[pool_id].JoinAllThreads();}// 放置Posix析构过程中不应该thread_status_updater_ ,防止一些子线程更新线程状态出错// Delete the thread_status_updater_ only when the current Env is not// Env::Default().  This is to avoid the free-after-use error when// Env::Default() is destructed while some other child threads are// still trying to update thread status.if (this != Env::Default()) {delete thread_status_updater_;}
}

其中JoinAllThreads函数用来唤醒所有子线程的执行,并设置标记防止接收新的线程

void ThreadPoolImpl::Impl::JoinThreads(bool wait_for_jobs_to_complete) {std::unique_lock<std::mutex> lock(mu_);assert(!exit_all_threads_);wait_for_jobs_to_complete_ = wait_for_jobs_to_complete;// 原子(加锁)方式更新如下变量,用作在submit函数中屏蔽接收新的线程exit_all_threads_ = true;// prevent threads from being recreated right after they're joined, in case// the user is concurrently submitting jobs.// 重置线程池的线程上限,防止用户并发调用submit添加待调度线程total_threads_limit_ = 0;lock.unlock();bgsignal_.notify_all(); //唤醒所有等待在bgsignal_的线程for (auto& th : bgthreads_) {// join 执行,直到执行完。th.join();}bgthreads_.clear();exit_all_threads_ = false;wait_for_jobs_to_complete_ = false;
}

2.5 线程池优先级调度

之前说过Rocksdb线程池支持 用户针对不同LOW/HIGH 线程池的I/O或者CPU的优先级设置。

比如 设置LOW线程池具有更低的I/O优先级和CPU优先级

target_->LowerThreadPoolIOPriority(Env::Priority::LOW);
target_->LowerThreadPoolCPUPriority(Env::Priority::LOW);

具体底层的设置方式是针对之前提到的线程数据结构中的两个参数Impl::low_io_priority_Impl::low_c pu_priority_进行置位true。在ThreadPoolImpl::Impl::BGThread调度函数执行之前,会通过系统调用setprioritysyscall(SYS_ioprio_set,,,)设置当前线程的I/O和CPU优先级。

void ThreadPoolImpl::Impl::BGThread(size_t thread_id) {bool low_io_priority = false;bool low_cpu_priority = false;while (true) {// Wait until there is an item that is ready to runstd::unique_lock<std::mutex> lock(mu_);...bool decrease_io_priority = (low_io_priority != low_io_priority_);bool decrease_cpu_priority = (low_cpu_priority != low_cpu_priority_);lock.unlock();#ifdef OS_LINUX// Linux 系统支持 设置CPU优先级if (decrease_cpu_priority) {setpriority(PRIO_PROCESS,// Current thread.0,// Lowest priority possible.19);low_cpu_priority = true;}if (decrease_io_priority) {
#define IOPRIO_CLASS_SHIFT (13)
#define IOPRIO_PRIO_VALUE(class, data) (((class) << IOPRIO_CLASS_SHIFT) | data)// Put schedule into IOPRIO_CLASS_IDLE class (lowest)// These system calls only have an effect when used in conjunction// with an I/O scheduler that supports I/O priorities. As at// kernel 2.6.17 the only such scheduler is the Completely// Fair Queuing (CFQ) I/O scheduler.// To change scheduler://  echo cfq > /sys/block/<device_name>/queue/schedule// Tunables to consider://  /sys/block/<device_name>/queue/slice_idle//  /sys/block/<device_name>/queue/slice_sync// 设置I/O优先级syscall(SYS_ioprio_set, 1,  // IOPRIO_WHO_PROCESS0,                  // current threadIOPRIO_PRIO_VALUE(3, 0));low_io_priority = true;}
#else// 非Linux系统的话就不做任何处理了,仅仅保证变量被使用而已,防止编译warning (void)decrease_io_priority;  // avoid 'unused variable' error(void)decrease_cpu_priority;
#endiffunc();}
}

2.6 动态调整线程池 线程数目上限

支持动态调整线程池可调度的线程数目上限,这个能够限制线程池资源的占用,主要用作Rocksdb 中调整Flush和Compaction的各自所处的HIGH和LOW线程池中的线程数目上限。能够根据db的工作负载,动态增加或者减少线程池中可调度的线程数目。

void ThreadPoolImpl::IncBackgroundThreadsIfNeeded(int num) {impl_->SetBackgroundThreadsInternal(num, false);
}void ThreadPoolImpl::Impl::SetBackgroundThreadsInternal(int num,bool allow_reduce) {std::unique_lock<std::mutex> lock(mu_);// 如果线程池已经要被销毁了,就不用增加线程池的调度线程数目上限了if (exit_all_threads_) {lock.unlock();return;}// 增加线程数目或者减少线程数目// 唤醒休眠的线程并调度后台线程继续执行。if (num > total_threads_limit_ ||(num < total_threads_limit_ && allow_reduce)) {total_threads_limit_ = std::max(0, num);WakeUpAllThreads();StartBGThreads();}
}

3. 总结

到此整个线程池的基本实现就描述完成了,这是一个非常成熟的线程池(经历过接近十年的工业级考验,2012年facebook开始开发rocksdb),规模虽小,但五脏俱全。其能够支撑引擎级别的线程调度压力,保证引擎的核心逻辑flush和compaction的高效调度。

目前该线程池的独立实现已经放在了https://github.com/BaronStack/ThreadPool 中,拥有完备的线程池调度/销毁,优先级配置,欢迎star。

相关文章:

Java项目:网上电商项目(前后端分离+java+vue+Springboot+ssm+mysql+maven+redis)

源码获取&#xff1a;博客首页 "资源" 里下载&#xff01; 一、项目简述 本系统功能包括&#xff1a; 一款基于SpringbootVue的电商项目&#xff0c;前后端分离项目&#xff0c;前台后台都有&#xff0c;前台商品展示购买&#xff0c;购物车分类&#xff0c;订 单查…

3月7日 ArrayList集合

ArrayList与数组的区别&#xff1a; 数组是连续的、同一类型数据的一块区域&#xff0c;而集合可以是不连续的、多种数据类型的。 1.ArrayList ArrayList al new ArrayList(); al.Add(3); al.Add(5.09); al.Add("gfdg"); al.Inse…

什么时候出生好?

从年龄来说&#xff0c;女人头一胎的怀孕时间最好在35岁以前&#xff0c;因为过了35岁后不孕和头胎生育缺陷的比例会大幅度升高。那么&#xff0c;从孩子的角度&#xff0c;什么时候出生好&#xff1f;很多人不考虑这个问题&#xff0c;能不能怀上还难说那。迷信的人则追求出生…

数据库搜索与索引

索引是对数据库表中一列或多列的值进行排序的一种结构&#xff0c;使用索引可快速访问数据库表中的特定信息。如果想按特定职员的姓来查找他或她&#xff0c;则与在表中搜索所有的行相比&#xff0c;索引有助于更快地获取信息。 索引的一个主要目的就是加快检索表中数据&#x…

Clion 远程开发 配置

文章目录1. 增加远端服务工具2. 配置远端服务器3. 配置编译选项4. 设置远端开发路径Clion作为C/C语言友好的IDE&#xff0c;除了高效的代码索引 以及 基本的本地开发 能力之外还需要有远程开发能力&#xff0c;即我们工作中的代码处于远端linux服务器之上&#xff0c;通过在本地…

Java项目:朴素风个人博客系统(前后端分离+java+vue+Springboot+ssm+mysql+maven+redis)

源码获取&#xff1a;博客首页 "资源" 里下载&#xff01; 一、项目简述 本系统功能包括&#xff1a; 基于vue Springboo痼J后端分离项目个人博客系统&#xff0c;注册 登录&#xff0c;首页展示&#xff0c;喜爰图书展示&#xff0c;后台图书维护&#xff0c;个人…

NodeJS+Mongodb+Express做CMS博客系统

楼主正在用业余时间开发中…… &#xff0c;目前的版本仅支持会员系统&#xff0c;尝鲜一下吧~ hi-blog 一个 nodejsexpressmongodb 的 cms 系统怎么启动 默认你已经安装了 mongodb &#xff1b;那么你得这样操作&#xff1a;安装项目 -> 初始化管理员 -> 运行项目 1、请…

Piranha实验总结

操作系统&#xff1a;rhel5.8分别在DirectorMaster和DirectorBackup上部署浮动资源(VIP IPVS策略)测试2个Director在DR模式下是否都可以正常工作。测试完成后都撤掉浮动资源。DirectorMaster[rootlocalhost ~]#yum install piranha[rootlocalhost ~]#piranha-passwdNew Passwor…

虚IP切换原理

高可用性HA&#xff08;High Availability&#xff09;指的是通过尽量缩短因日常维护操作&#xff08;计划&#xff09;和突发的系统崩溃&#xff08;非计划&#xff09;所导致的停机时间&#xff0c;以提高系统和应用的可用性。HA系统是目前企业防止核心计算机系统因故障停机的…

vim 键盘宏操作 -- 大道至简

最近利用vim做一些文本处理时 发现vim 支持的键盘宏是一个好东西啊&#xff0c;高效优雅得处理大量需要重复性操作的文本&#xff0c;让人爱不释手&#xff01;&#xff01;&#xff01; 希望接下来对键盘宏的分享能够实际帮助到大家。 后文中描述的一些vim操作会汇集成指令字…

Java项目:家居购物商城系统(java+html+jdbc+mysql)

源码获取&#xff1a;博客首页 "资源" 里下载&#xff01; 一、项目简述 功能&#xff1a; Java Web精品项目源码&#xff0c;家居商城分类展示&#xff0c;商品展示&#xff0c; 商品下单&#xff0c;购物车&#xff0c;个人中心&#xff0c;后台管理&#xff0c;用…

leetcode:Search in Rotated Sorted Array

题目要求&#xff1a; Suppose a sorted array is rotated at some pivot unknown to you beforehand. (i.e., 0 1 2 4 5 6 7 might become 4 5 6 7 0 1 2). You are given a target value to search. If found in the array return its index, otherwise return -1. You may a…

解决Debian-7.1下Chrome浏览器字体难看的问题

首先在 Advance Setting 的 font 标签页下做如下配置&#xff1a; 然后在用户目录下创建 .fonts.conf 文件&#xff0c;内容如下&#xff1a; <?xml version1.0?> <!DOCTYPE fontconfig SYSTEM fonts.dtd> <fontconfig><match target"font"&g…

HDU.4903.The only survival(组合 计数)

题目链接 惊了 \(Description\) 给定\(n,k,L\)&#xff0c;表示&#xff0c;有一张\(n\)个点的无向完全图&#xff0c;每条边的边权在\([1,L]\)之间。求有多少张无向完全图满足&#xff0c;\(1\)到\(n\)的最短路为\(k\)。\(n,k\leq 12,\ L\leq10^9\)。 \(Solution\) 考虑暴力&a…

Rocksdb 写入数据后 GetApproximateSizes 获取的大小竟然为0?

项目开发中需要从引擎 获取一定范围的数据大小&#xff0c;用作打点上报&#xff0c;测试过程中竟然发现写入了一部分数据之后通过GetApproximateSizes 获取写入的key的范围时取出来的数据大小竟然为0。。。难道发现了一个bug?&#xff08;欣喜&#xff09; 因为写入的数据是…

Java项目:在线婚纱摄影预定系统(java+javaweb+SSM+springboot+mysql)

源码获取&#xff1a;博客首页 "资源" 里下载&#xff01; 一、项目简述 功能&#xff1a; 前后用户的登录注册&#xff0c;婚纱照片分类&#xff0c;查看&#xff0c;摄影师预 订&#xff0c;后台订单管理&#xff0c;图片管理等等。 二、项目运行 环境配置&am…

Linux Terminal 控制终端的使用

1. Open new Terminal&#xff1a;Ctrl Alt T 或者 Ctrl Shift N 2. Open Tab&#xff1a;Ctrl Shift T 3. Close Tab&#xff1a;Ctrl Shift W 4. Close Window&#xff1a;Ctrl Shift Q 5. Copy : Ctrl Shift C 6. Paste: Ctrl Shift V 7. Full Screen: F11 8.…

如何防止代码腐烂

http://blog.jobbole.com/5643/ 很多团队都有这个问题&#xff0c;一个项目的代码本来开始设计得好好的&#xff0c;一段时间以后&#xff0c;代码就会变得难以理解&#xff0c;难以维护&#xff0c;难以修改。为什么&#xff1f;我一直在思考这个问题。 让我们先看一个人的情况…

CORS在Spring中的实现

CORS: 通常情况下浏览器禁止AJAX从外部获取资源&#xff0c;因此就衍生了CORS这一标准体系&#xff0c;来实现跨域请求。 CORS是一个W3C标准&#xff0c;全称是"跨域资源共享"&#xff08;Cross-origin resource sharing&#xff09;。它允许浏览器向跨源(协议 域名…

从BloomFilter到Counter BloomFilter

文章目录前言1. Traditional BloomFilter2. Counter BloomFilter本文traditional bloomfilter 和 counter bloomfilter的C实现 均已上传至&#xff1a; https://github.com/BaronStack/BloomFilter 前言 Bloomfilter 是一个老生常谈的数据结构&#xff0c;应用在存储领域的各…

进程、线程、多线程相关总结

进程、线程、多线程相关总结 一、说说概念 1、进程&#xff08;process&#xff09; 狭义定义&#xff1a;进程就是一段程序的执行过程。 广义定义&#xff1a;进程是一个程序关于某个数据集合的一次运行。它是操作系统动态执行的基本单元&#xff0c;在传统的操作系统中&#…

Java项目:在线蛋糕商城系统(java+jsp+jdbc+mysql)

源码获取&#xff1a;博客首页 "资源" 里下载&#xff01; 一、项目简述 功能&#xff1a; 主页显示热销商品&#xff1b;所有蛋糕商品展示&#xff0c;可进行商品搜 索&#xff1b;点击商品进入商品详情页&#xff0c;具有立即购买和加入购物 车功能&#xff0c;可…

业界对生成图片缩略图的做法归纳

网站如果有很多用户上传图片(相册&#xff0c;商品图片)&#xff0c;一般的做法是将用户图片保存在磁盘上面(数据库中记录图片的地址)。用户上传的时候按照原图、中图、小图等各个尺寸都生成一份保存在磁盘上。比如php的网店系统echsop就是这么做的&#xff0c;而shopex之类也大…

thinkPHP5.0 URL路由优化

在tp中访问页面的时候URL地址是 域名/模块/控制器/方法&#xff0c;在点击首页的时候URL是 域名/index/index/index 而不是只显示域名&#xff0c;这样不利于SEO&#xff0c;而且强迫症的我看着很不爽&#xff0c;这个时候我们需要优化路由 Route::rule(路由表达式,路由地址,请…

Rocksdb 获取当前db内部的有效key个数 (估值)

文章目录1. 基本接口2. Memtable key个数统计3. Immutable Memtable key个数统计4. Sstables key个数统计5. 疑问Rocksdb因为是AppendOnly 方式写入&#xff0c;所以没有办法提供db内部唯一key个数的接口&#xff08;可能存在多版本的key&#xff0c;对用户来说只有一个userkey…

Java项目:网上花店商城系统(java+jsp+servlert+mysql+ajax)

源码获取&#xff1a;博客首页 "资源" 里下载&#xff01; 一、项目简述 功能&#xff1a; 一套完整的网上花店商场系统&#xff0c;系统支持前台会员的注册 登陆系统留言&#xff0c;花朵的品种选择&#xff0c;详情浏览&#xff0c;加入购物 车&#xff0c;购买花…

使用Uboot启动内核并挂载NFS根文件系统

配置编译好内核之后&#xff0c;将生成的内核文件uImage拷贝到/tftpboot/下&#xff0c;通过tftp服务器将内核下载到开发板&#xff0c;使用命令&#xff1a;tftp 31000000 uImage.下载完成之后配置bootargs环境变量&#xff1a;setenv bootargs noinitrd consolettySAC0,11520…

Centos系统上安装php遇到的错误解决方法集锦

Centos系统上安装php遇到的错误解决方法集锦1.configure: error: xml2-config not found. Please check your libxml2 installationyum install libxml2 libxml2-devel2.configure: error: Cannot find OpenSSL’s yum install openssl openssl-devel3.configure: error: Pleas…

2.27 MapReduce Shuffle过程如何在Job中进行设置

一、shuffle过程 总的来说&#xff1a; *分区 partitioner*排序 sort*copy (用户无法干涉) 拷贝*分组 group可设置 *压缩 compress*combiner map task端的Reduce二、示例 package com.ibeifeng.hadoop.senior.mapreduce;import java.io.IOException; import java.util.StringTo…

Rocksdb Slice使用中的一个小坑

本文记录一下使用Rocksdb Slice过程中的一个小小坑&#xff0c;差点没一口老血吐出来。 rocksdb的Slice 数据结构是一个小型得不可变类string数据结构&#xff0c;设计出来的目的是为了保证rocksdb内部处理用户输入的key在从内存到持久化磁盘的整个处理链路是不会被修改的&…