当前位置: 首页 > 编程日记 > 正文

shuffle调优

目录

  • 一、概述
  • 二、shuffle的定义
  • 三、ShuffleMananger发展概述
  • 四、HashShuffleManager的运行原理
    • 4.1 未经优化的HashShuffleManager
    • 4.2 优化后的HashShuffleManager
  • 五、SortShuffleManager运行原理
    • 5.1 普通运行机制
    • 5.2 bypass运行机制
  • 六、shuffle相关参数调优
    • spark.shuffle.file.buffer
    • spark.reducer.maxSizeInFlight
    • spark.shuffle.io.retryWait
    • spark.shuffle.memoryFraction(已经弃用)
    • spark.shuffle.manager(已经弃用)
    • spark.shuffle.sort.bypassMergeThreshold
    • spark.shuffle.consolidateFiles(已经弃用)

一、概述

大多数Spark作业的性能主要就是消耗在了shuffle环节,因为该环节包含了大量的磁盘、序列化、网络数据传输等操作.因此,如果要让作业的性能更上一层楼,就有必要对shuffle过程进行调优.但是也必须提醒大家的是,影响一个Spark作业性能的因素,主要还是代码开资源参数以及数据倾斜,shuffle调优只能在整个Spark的性能调优中占到一小部分而已.因此大家务必把握住调优的基本原则,千万不要舍本逐末

二、shuffle的定义

Spark的运行主要分为2部分: 一部分是驱动程序,其核心是SparkContext; 另一部分是Worker节点上Task,它是运行实际任务的.程序运行的时候,Driver和Executor进程相互交互: 运行什么任务,即Driver会分配Task到Executor,Driver跟Executor进行网络传输;任务数据从哪儿获取,即Task要从Driver抓取其上游的Task的数据结果,所有有这个过程汇总就不断的产生网络结果.其中,下一个Stage向上一个Stage要数据这个过程,我们就称之为shuffle.

三、ShuffleManager发展概述

在Spark的源码中,负责shuffle过程的执行、计算和处理的组建主要就是ShuffleManager,也即shuffle管理器.而随着Spark的版本的发展,ShuffleManager也在不断迭代,变得越来越先进.

Spark1.2版本以前,默认的shuffle计算引擎是HashShuffleManager.该ShuffleManager的HashShuffleManager有着一个非常严重的弊端,就是会产生大量的中间磁盘文件,进而由大量的磁盘IO操作影响了性能.

因此在Spark1.2以后的版本,默认的ShuffleManager改成了SortShuffleManager.SortShuffleManager相较于HashShuffleManager来说,有了一定的改进.主要就在于,每个Task在进行shuffle操作时,虽然也会产生较多的临时磁盘文件,但是最后会将所有的临时文件合并(merge)成一个磁盘文件,因此每个Task就只有一个磁盘文件.在下一个stage的shuffle read task拉取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可.

下面我们详细分析一下HashShuffleManager和SortShuffleManager的原理.

四、HashShuffleManager的运行原理

4.1 未经优化的HashShuffleManager

图解说明

屏幕快照 2019-05-10 11.53.35

文字说明

上面说明了未经优化的HashShuffleManager的原理.这里我们先明确一个假设前提: 每个Executor只有1个CPU core,也就是说,无论这个Executor上分配多少个task线程,同一时间都只能执行一个task线程.

我们先从shuffle write开始说起.shuffle write阶段,主要就是在一个stage结束计算之后,为了下一个stage可以执行shuffle类的算子(比如reduceByKey),而将每个task处理的数据key进行"分类".所谓"分类",就是对相同的key执行hash算法,从而将相同key都写入同一个磁盘文件中,而每一个磁盘文件都只属于下游stage的一个task.在将数据写入磁盘之前,会先将数据写入内存缓冲区中,当内存缓冲区填满之后,才会溢写到磁盘磁盘文件中去.

那么每个执行shuffle write的task,要为下一个stage创建多少个磁盘文件呢?很简单,下一个stage的task由多少个,当前stage的没饿过task就要创建多少份磁盘文件.比如下一个stage共有100个task,那么当前stage的每个task都要创建100份磁盘文件.如果当前stage有50个task,总共有10个Executor,每个Executor执行5个Task,那么每个Executor上总共就要创建500个磁盘文件,所有Executor上会创建5000个磁盘文件.由此看见,未经优化的shuffle write操作所产生的磁盘文件的数量是及其惊人的.

接着我们来说手shuffle read. shuffle read,通常就是一个stage刚开始时要做的事情.此时该stage的每一个task就需要将上一个stage的计算结果中的所有相同key,从各个节点上通过网络都拉取到自己所在的节点上,然后进行key的聚合或连接等操作.由于shuffle write的过程中,task给下游stage的每个task都创建了一个磁盘文件,因此shuffle read的过程汇总,每个task只要从上游stage的所有task所在节点上,拉取属于自己的那一个磁盘文件即可.

shuffle read的拉取过程时一边拉取一遍进行聚合的.每个shuffle task都会有一个自己的buffer缓冲区,每次都只能拉取于buffer缓冲区相同大小的数据,然后通过内存中的一个Map进行聚合等操作.聚合完一批数据后,再拉取下一批数据,并放到buffer缓冲区中进行聚合操作.依次类推,知道最后将所有数据都拉取完,并得到最终的结果.

4.2 优化后的HashShuffleManager

图解说明

屏幕快照 2019-05-10 12.12.56

文字说明

上图说明了优化后的HashShuffleManager的原理.这里说的优化,是指我们可以设置一个参数,spark.shuffle.consolidateFiles.该参数默认值为false,将其设置为true即可开启优化机制.通常来说,如果我们使用HashShuffleManager,那么都建议开启这个选项.

开启consolidate机制后,在shuffle write过程汇总,task就不是为下游stage的每个task创建一个磁盘文件了.此时会出现shuffleFileGroup的概念,每个shuffleFileGroup会对应一批磁盘文件,磁盘文件的数量与下游stage的task数量是相同的.一个Executor上有多少个CPU core,就可以并行执行多少个task.而第一批并行执行的每个task都会创建一个shuffleFileGroup,并将数据写入对应的磁盘文件内.

当Executor的CPU core执行完一批task,接着执行下一批task时,下一批task就会复用之前已有的shuffleFileGroup,包括其中的磁盘文件.也就是说,此时task会讲数据吸入已有的磁盘文件中,而不会写入新的磁盘文件中.因此,consolidate机制允许不同的task复用同一批磁盘文件,这样就可以有效将多个task的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升shuffle write的性能.

假设第二个stage有100个task,第一个stage有50个task,总共还是有10个Executor,每个Executor执行5个task.那么原本使用未经优化的HashShuffleManager时,每个Executor会产生500个磁盘文件,所有Executor会产生5000个磁盘文件的.但是此时经过优化之后,每个Executor创建的磁盘文件的数量的计算公式为: CPU core的数量 * 下一个stage的task数量.也就是说,每个Executor此时只会创建100个磁盘文件,所有Executor只会创建1000个磁盘文件.

就是说,将第一个阶段的所有磁盘文件进行归并,归并成一个文件.然后进入第二阶段.

Property NameDefaultMeaning
spark.shuffle.consolidateFilesfalseIf set to "true", consolidates intermediate files created during a shuffle. Creating fewer files can improve filesystem performance for shuffles with large numbers of reduce tasks. It is recommended to set this to "true" when using ext4 or xfs filesystems. On ext3, this option might degrade performance on machines with many (>8) cores due to filesystem limitations.

spark.shuffle.consolidateFiles在1.6.0版本已移

五、SortShuffleManager运行原理

SortShuffleManager的运行机制主要分为两种,一种是普通运行机制,另一种是bypass运行机制.当shuffle read task的数量小于等于sprak.shuffle.sort.bypassMergeThreshold参数的值时(默认为200),就会启用bypass机制.

5.1 普通运行机制

图解说明

屏幕快照 2019-05-10 13.08.36

5.2 bypass运行机制

图解说明

屏幕快照 2019-05-10 12.56.15

文字说明

上图说明了bypass SortShuffleManager的原理.bypass运行机制的触发条件如下:

  • shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值.
  • 不是聚合类的shuffle算子(比如reduceByKey)

此时task会为每个下游task都创建一个临时磁盘文件,并将数据按key进行hash,然后根据key的hash值,将key写入对应的磁盘文件之中.当然,写入磁盘文件时也是先写入内存缓冲区,缓冲区写满之后再溢写到磁盘文件.最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件.

该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一摸一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已.因此少量的最终磁盘文件,也让该机制相对为经优化的HashShuffleManager来说,shuffle read的性能会更好.

而该机制与普通SortShuffleManager运行机制的不同在于:第一,磁盘写机制不同;第二,不会进行排序.也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省了这部分的性能开销.

转载于:https://www.cnblogs.com/suixingc/p/shuffle-diao-you.html

相关文章:

Java8 以后的 LocalDateTime,你真的会用吗?

本文从 LocalDateTime 类的创建、转换、格式化与解析、计算与比较以及其他操作几个方面详细介绍了 LocalDateTime 类在 Java 8 中的使用。掌握 LocalDateTime 类的使用可以大大提高日期时间处理效率和质量,希望本文对读者有所帮助。

斐波那契算法举例(iterative Fibonacci algorithm)

// count_change.cpp : Defines the entry point for the console application.// #include "stdafx.h" /*-------------------------------------------------------------实例:要想得到一个迭代的斐波那契算法需要一点点智慧。给了半美元、四分之一美…

Java项目:零食商城系统(java+SSM+jsp+MySQL+EasyUI)

源码获取:博客首页 "资源" 里下载! 系统主要实现的功能有:用户浏览商品、加入商品到购物车、登录注册、提交订单,会员中心修改个人信息、查看订单等。 后台管理员登录后可以分角色添加管理员,不同角色有不同…

skiplist 跳表(1)

最近学习中遇到一种新的数据结构,很实用,搬过来学习。 原文地址:skiplist 跳表 为什么选择跳表 目前经常使用的平衡数据结构有:B树,红黑树,AVL树,Splay Tree, Treep等。 想象一下,给…

前端学习笔记系列一:14 vue3.X中alias的配置

第一步: 第二步: // vue.config.js module.exports { configureWebpack: { resolve: { alias: { assets: /assets, components: /components, views: /views, } } }, } 并且,在没有自行配置alias的时候,就已经可以使用inquire(‘…

【转】sed 简明教程

本文转自:http://coolshell.cn/articles/9104.html awk于1977年出生,今年36岁本命年,sed比awk大2-3岁,awk就像林妹妹,sed就是宝玉哥哥了。所以 林妹妹跳了个Topless,他的哥哥sed坐不住了,也一定…

帕斯卡三角形(Pascal's triangle)

// The following code is compiled on VC2005 // #include "stdafx.h" /*-----------------------------------------------下面数值模式称为帕斯卡三角形(Pascals triangle)11 11 2 11 3 3 11 4 6 4 1 ...三角形边界上的数都是1,内部的每个数数是位…

Java项目:高校学生社团活动管理系统(java+springboot+freemark+jpa+mysql)

源码获取:博客首页 "资源" 里下载! 前台: 1、社团信息浏览搜索、社团活动风采、新闻信息浏览搜索。 2、学生注册登录。 3、登录后可自己申请创建社团,也可申请加入其他社团活动。 4、管理自己社团的申请人员。 5个…

linux nfs共享文件

linux文件共享可以有多种方式:samba,nfs,ftp等等 nfs在linux之间效率高些: function nfs(){share_folder"/data1 192.168.0.239(rw,sync,no_root_squash)"yum install nfs-utils rpcbindecho $share_folder >> /etc/exportsexportfs -rv…

我有一个朋友写出了17种触发NPE的代码!避免这些坑

在JUnit4中,使用Mockito框架时,any() 是一个参数匹配器,当与基本数据类型一起使用时,需要使用相应的类型特定的匹配器,例如使用anyInt() 而不是any()。要防范它,不在高超的编码技巧,在细。的可能性,却并不是万能的,比如开发者在使用Optional,不检查是否存在,直接调用Optional.get(),那么会得到一个NoSuchElementException。我有一个朋友,写代码的时候常常遭到NPE背刺,痛定思痛,总结了NPE出没的17个场景,哪一个你还没有遇到过?

黑色星期五Friday the Thirteenth

题目描述 13号又是一个星期五。13号在星期五比在其他日子少吗?为了回答这个问题,写一个程序,要求计算每个月的十三号落在周一到周日的次数。给出N年的一个周期,要求计算1900年1月1日至1900N-1年12月31日中十三号落在周一到周日的次数,N为正整…

帕斯卡三角形与道路问题

苏珊很为难.她步行去学校,路上老是遇到斯廷基.斯廷基:"嘿嘿,苏珊,我可以陪你一起走吗?" 苏珊:"不!请走开."苏珊心想:我有办法了.每天早上我走不同的路线去学校.这样斯廷基就不知道在哪儿找到我了.这张地图表示苏珊的住所和学校之间的所有街道.苏珊去学校…

Java项目:学生信息管理系统(java+SSM+JSP+layui+maven+mysql)

源码获取:博客首页 "资源" 里下载! 一、项目简述 功能包括: 三角色管理: 学生,教师,管理员,在线选课,成绩录入,学生管理,选课管理,教室管理等等。…

Java for LeetCode 067 Add Binary

Given two binary strings, return their sum (also a binary string). For example, a "11" b "1" Return "100". 解题思路&#xff1a; JAVA实现如下&#xff1a; static public String addBinary(String a, String b) {if (a.length() <…

ON DUPLICATE KEY UPDATE 导致mysql自增主键ID跳跃增长

具体解决方案可以根据项目来选择,如果项目不大,可以考虑1和2。如果不考虑高并发问题,可以考虑3。

一起学JDK源码 -- System类

System类是被final修饰的,不能被继承。

python csv模块心得

2019独角兽企业重金招聘Python工程师标准>>> with open(tiger.csv, wb) as csvfile:writer csv.writer(csvfile, quotingcsv.QUOTE_ALL)row [中国, 美国, 台湾, 马来西亚]writer.writerow([unicode(s).encode("utf-8") for s in row]) 转载于:https://m…

全局变量及输出语句

全局变量 是系统已经定义好的变量&#xff0c;主要反映sql数据库的操作状态。 全局变量名称以开头‘ 举例 identity:返回最后插入的标识值 error&#xff1a;返回执行的上一个T_sql语句的错误号 常用的输出语句 print&#xff1a;结果有消息中以文的形式显示 select&#xff1a…

Nested Mappings

/*hanzhiguang coded at 2009.07.30 1:20*/ // nesting_map.cpp : Defines the entry point for the console application. // /*------------------------------------------------------------------------- 给定自然数n,找出所有不同的有序对i和j,其中 1<j<i<n,使得…

Java项目:CRM客户关系管理系统(java+Springboot+maven+mysql)

源码获取&#xff1a;博客首页 "资源" 里下载&#xff01; Springboot项目CRM客户关系管理系统: 系统实现了CRM客户关系系统的基本功能&#xff0c;主要有看板&#xff08;当月参与的业务机会、当月转化情况、将要结束的业务机会等&#xff09;、业务机会&#xff0…

linux下occi操作oracle数据库,中文乱码的问题

转载&#xff1a;http://www.linuxidc.com/Linux/2008-02/11238.htm 前几日调通了OCI连接数据库的问题后&#xff0c;用Oracle自带的例子测试了一下&#xff0c;能正常读取数据&#xff08;都是英文的&#xff09;&#xff0c;就放心了&#xff0c;转去开发别的模块。这几天做数…

tomcat启动时一闪而过的问题

在CMD窗口中输入 cd E:\apache-tomcat-7.0.52\bin 后再输入E:显示进入相应目录E:\apache-tomcat-7.0.52\bin后&#xff0c;再输入startup 后窗口一闪而过&#xff0c;可通过以下步骤进行调试解决&#xff1a;1.检查确认JAVA_HOME配置正确&#xff0c;可以在命令行中输入java显示…

The Long-Term Stability of Ecosystems

The Long-Term Stability of Ecosystems Plant communities assemble themselves flexibly, and their particular structure depends on the specific history of the area. Ecologists use the term “succession”to refer to the changes that happen in plant communities…

big endian little endian

大端(big-endian)和小端(little-endian)<转>2007-12-07 20:36补&#xff1a;x86机是小端(修改分区表时要注意)&#xff0c;单片机一般为大端 今天碰一个关于字节顺序的问题,虽然看起来很简单,但一直都没怎么完全明白这个东西,索性就找了下资料,把它弄清楚. 因为现行的…

Java项目:学生考勤管理系统(java+SSM+Poi导出+Easyui+JFreeChart+maven+mysql)

源码获取&#xff1a;博客首页 "资源" 里下载&#xff01; 这个项目适合SSM框架的初学者&#xff08;涉及大量增删改查&#xff0c;很适合初学者&#xff09;以及对Shiro安全框架和Poi技术感兴趣的同学。 项目功能&#xff1a; 用户管理功能&#xff08;登录、退出登…

【STL源码剖析读书笔记】【第5章】关联式容器之hashtable

1、hashtable在插入、删除、搜寻操作上具有“常数平均时间”的表现&#xff0c;不依赖输入元素的随机性。 2、hashtable通过hashfunction将元素映射到不同的位置&#xff0c;但当不同的元素通过hash function映射到相同的位置时&#xff0c;便产生了“碰撞”问题。解决碰撞问题…

Event自定义事件

//index.html文件<!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><meta http-equiv"X-UA-Compatible"…

byte endian(biglittle endian)

1. 大小端的区别 little endian:把低位字节存放在内存的低位; // big endian: 将低位字节存放在内存的高位; 比如&#xff1a;0x1234,则12 就属于高位字节&#xff1b;34 属于低位字节 假如从地址0x0000 0000开始的一个字节中保存数据0x12345678, 这2中字节序在内存当中存…

鸡啄米vc++2010系列32(标签控件Tab Control 下)

上一节中鸡啄米讲了标签控件知识的上半部分&#xff0c;本节继续讲下半部分。 标签控件的创建 MFC为标签控件的操作提供了CTabCtrl类。 与之前的控件类似&#xff0c;创建标签控件可以在对话框模板中直接拖入Tab Control&#xff0c;也可以使用CTabCtrl类的Create成员函数创建。…

Java项目:网上图书商城系统(java+SSM+Jsp+MySQL+Redis+JWT+Shiro+RabbitMQ+EasyUI)

源码获取&#xff1a;博客首页 "资源" 里下载&#xff01; 这个项目涉及到Shiro整合JWT、秒杀功能所具备的基本要求(限流、乐观锁、接口隐藏、JMeter高并发测试等等)、消息中间件RabbitMQ的异步邮件通知和死信队列、沙箱支付宝模拟支付等等技术亮点。 项目功能&#…