当前位置: 首页 > 编程日记 > 正文

Spark Streaming实践和优化

2019独角兽企业重金招聘Python工程师标准>>> hot3.png

Spark Streaming实践和优化 博客分类: spark

在流式计算领域,Spark Streaming和Storm时下应用最广泛的两个计算引擎。其中,Spark Streaming是Spark生态系统中的重要组成部分,在实现上复用Spark计算引擎。如图1所示,Spark Streaming支持的数据源有很多,如Kafka、Flume、TCP等。Spark Streaming的内部数据表示形式为DStream(Discretized Stream,离散的数据流),其接口设计与RDD非常相似,这使得它对Spark用户非常友好。Spark Streaming的核心思想是把流式处理转化为“微批处理”,即以时间为单位切分数据流,每个切片内的数据对应一个RDD,进而可以采用Spark引擎进行快速计算。由于Spark Streaming采用了微批处理方式,因此严格来说只是一个近实时的处理系统,而不是真正的流式处理系统。

图片描述

图1 Spark Streaming数据流

Storm是这个领域另一个著名的开源流式计算引擎,这是一个真正的流式处理系统,它每次从数据源读一条数据,然后单独处理。相比于Spark Streaming,Storm有更快速的响应时间(小于一秒),更适合低延迟的应用场景,比如信用卡欺诈系统,广告系统等。但是对比Storm,Spark Streaming的优势是吞吐量大,响应时间也可以接受(秒级),并且兼容Spark系统中的其他工具库,如MLlib和GraphX。从而,对于时间不敏感且流量很大的系统,Spark Streaming是更优的选择。

Spark Streaming在Hulu应用

Hulu是美国的专业在线视频网站,每天会有大量用户在线观看视频,进而产生大量用户观看的行为数据。这些数据通过收集系统进入Hulu的大数据平台,存储并做进一步处理。在大数据平台之上,各个团队会根据需要设计相应的算法对数据进行分析和挖掘以便产生商业价值:推荐团队从这些数据里挖掘出用户感兴趣的内容并做精准推荐,广告团队根据用户的历史行为推送最合适的广告,数据团队从数据的各个维度进行分析从而为公司的策略制定提供可靠依据。

Hulu大数据平台的实现依循Lambda架构。Lambda架构是一个通用的大数据处理框架,包含离线的批处理层、在线的加速层和服务层三部分,具体如图2所示。服务层一般使用HTTP服务或自定制的客户端对外提供数据访问,离线的批处理层一般使用批处理计算框架Spark和MapReduce进行数据分析,在线的加速层一般使用流式实时计算框架Spark Streaming和Storm进行数据分析。

图片描述

图2 lambda架构原理图

图片描述

图3 Hulu数据收集流程

对于实时计算部分,Hulu内部使用了Kafka、Codis和Spark Streaming。下面按照数据流的过程,介绍我们的项目。

收集数据

从服务器日志中收集数据,主要包括两个部分:

  • 来自网页、手机App、机顶盒等设备用户产生的视频观看、广告点击等行为,这些行为数据记录在各自的Nginx服务的日志中。
  • 使用Flume将用户行为数据同时导入HDFS和Kafka,其中HDFS中的数据用于离线分析,而Kafka中数据则用于流式实时分析。

存储标签数据

Hulu使用Hbase存储用户标签数据,包括基本信息如性别、年龄、是否付费,以及其他模型推测出来的偏好属性。这些属性需要作为计算模型的输入,同时HBase随机读取的速度比较慢,需要将数据同步到缓存服务器中以加快数据读取速度。Redis是一个应用广泛的开源缓存服务器,但其本身是个单机系统,不能很好地支持大量数据的缓存。为解决Redis扩展性差的问题,豌豆荚开源了Codis,一个分布式Redis解决方案。Hulu将Codis打成Docker镜像,并实现一键式构建缓存系统,附带自动监控和修复功能。为了更精细的监控,Hulu构建了多个Codis缓存,分别是:

  • codis-profile,同步HBase中的用户属性;
  • codis-action,缓存来自Kafka的用户行为;
  • codis-result,记录计算结果。

实时处理数据

在一切准备就绪,启动Spark Streaming程序:

  • Spark Streaming启动Kafka Receiver,持续地从Kafka服务器拉取数据;
  • 每隔两秒,Kafka的数据被整理成一个RDD,交给Spark引擎处理;
  • 对一条用户行为,Spark会从codis-action缓存中拿到该用户的行为记录,然后把新的行为追加进去;
  • Spark从codis-action和codis-profile中获得该用户的所有相关属性,然后执行广告和推荐的计算模型,最后把结果写入codis-result,进而供服务层实时读取这些结果。

Spark Streaming优化经验

实践中,业务逻辑首先保证完成,使得在Kafka输入数据量较小的情况下系统稳定运行,且输入输出满足项目需求。然后开始调优,修改Spark Streaming的参数,比如Executor的数量,Core的数量,Receiver的流量等。最后发现仅调参数无法完全满足本项目的业务场景,所以有更进一步的优化方案,总结如下。

Executor初始化

很多机器学习的模型在第一次运行时,需要执行初始化方法,还会连接外部的数据库,常常需要5-10分钟,这会成为潜在的不稳定因素。在Spark Streaming应用中,当Receiver完成初始化,它就开始源源不断地接收数据,并且由Driver定期调度任务消耗这些数据。如果刚启动时Executor需要几分钟做准备,会导致第一个作业一直没有完成,这段时间内 Driver不会调度新的作业。这时候在Kafka Receiver端会有数据积压,随着积压的数据量越来越大,大部分数据会撑过新生代进入老年代,进而给Java GC带来严重的压力,容易引发应用程序崩溃。

本项目的解决方案是,修改Spark内核,在每个Executor接收任务之前先执行一个用户自定义的初始化函数,初始化函数中可以执行一些独立的用户逻辑。示例代码如下:

[plain]  view plain copy
  1. // sc:是SparkContext, setupEnvironment是Hulu扩展的API  
  2. sc.setupEnvironment(() => {  
  3.   application.initialize() // 用户应用程序初始化,需执行几分钟  
  4. })  

代码1

该方案需要更改Spark的任务调度器,首先将每个Executor设置为未初始化状态。此时,调度器只会给未初始化状态的Executor分配初始化任务(执行前面提到的初始化函数)。等初始化任务完毕,调度器更新Executor的状态为已初始化,这样的Executor才可以分配正常的计算任务。

异步处理Task中的业务逻辑

本项目中,模型的输入参数均来自Codis,甚至模型内部也可能访问外部存储,直接导致模型计算时长不稳定,很多时间消耗在网络等待上。

为提高系统吞吐量,增大并行度是常用的优化方案,但在本项目的场景中并不适用。Spark作业的调度策略是,等待上一个作业的所有Task执行完毕,然后调度下一个作业。如果单个Task的运行时间不稳定,易发生个别Task拖慢整个作业的情况,以至于资源利用率不高;甚至并行度越大问题越严重。一种常用解决Task不稳定的方案是增大Spark Streaming的micro batch的时间间隔,该方案会使整个实时系统的延迟变长,并不推荐。

因此这里通过异步处理Task中的业务逻辑来解决。如下文的代码所示,同步方案中,Task内执行业务逻辑,处理时间不定;异步方案中,Task把业务逻辑嵌入线程,交给线程池执行,Task立刻结束, Executor向Driver报告执行完毕,异步处理的时间非常短,在100ms以内。另外,当线程池中积压的线程数量太大时(代码中qsize>100的情况),会暂时使用同步处理,配合反压机制(见下文的参数spark.streaming.backpressure.enabled),可以保证不会因为数据积压过多而导致系统崩溃。经实验验证,该方案大大提高了系统的吞吐量。

[plain]  view plain copy
  1. // 同步处理  
  2. // 函数 runBusinessLogic是 Task 中的业务逻辑,执行时间不定  
  3. rdd.foreachPartition(partition => runBusinessLogic (partition))  
  4.   
  5. // 异步处理,threadPool是线程池  
  6. rdd.foreachPartition(partition => {  
  7.   val qsize = threadPool.getQueue.size // 线程池中积压的线程数  
  8.   if (qsize > 100) {  
  9.     runBusinessLogic(partition) // 暂时同步处理  
  10.   }  
  11.   threadPool.execute(new Runnable {  
  12.     override def run() = runBusinessLogic(partition)  
  13.   })  
  14. })  

代码2

异步化Task也存在缺点:如果Executor发生异常,存放在线程池中的业务逻辑无法重新计算,会导致部分数据丢失。经实验验证,仅当Executor异常崩溃时有数据丢失,且不常见,在本项目的场景中可以接受。

Kafka Receiver的稳定性

本项目使用了Spark Streaming中的Kafka Receiver,本质上调用Kafka官方的客户端ZookeeperConsumerConnector。其策略是每个客户端在Zookeeper的固定路径下把自己注册为临时节点,于是所有客户端都知道其他客户端的存在,然后自动协调和分配Kafka的数据资源。该策略存在一个弊端,当一个客户端与Zookeeper的连接状态发生改变(断开或者连上),所有的客户端都会通过Zookeeper协调, 重新分配Kafka的数据资源;在此期间所有客户端都断开与Kafka的连接,系统接收不到Kafka的数据,直到重新分配成功。如果网络质量不佳,并且Receiver的个数较多,这种策略会造成数据输入不稳定,很多Spark Streaming用户遇到这样的问题。在我们的系统中,该策略并没有产生明显的负面影响。值得注意的是,Kafka 客户端与Zookeeper有个默认的参数zookeeper.session.timeout.ms=6000,表示客户端与Zookeeper连接的session有效时间为6秒,我们的客户端多次出现因为Full GC超过6秒而与Zookeeper断开连接,之后再次连接上,期间所有客户端都受到影响,系统表现不稳定。所以项目中设置参数zookeeper.session.timeout.ms=30000。

YARN资源抢占问题

在Hulu内部,Spark Streaming这样的长时服务与MapRedue、Spark、Hive等批处理应用共享YARN集群资源。在共享环境中,经常因一个批处理应用占用大量网络资源或者CPU资源导致Spark Streaming服务不稳定(尽管我们采用了CGroup进行资源隔离,但效果不佳)。更严重的问题是,如果个别Container崩溃Driver需要向YARN申请新的Container,或者如果整个应用崩溃需要重启,Spark Streaming不能保证很快申请到足够的资源,也就无法保证线上服务的质量。为解决该问题,Hulu使用label-based scheduling的调度策略,从YARN集群中隔离出若干节点专门运行Spark Streaming和其他长时服务,避免与批处理程序竞争资源。

完善监控信息

监控反映系统运行的性能状态,也是一切优化的基础。Hulu使用Graphite和Grafana作为第三方监控系统,本项目把系统中关键的性能参数(如计算时长和次数)发送给Graphite服务器,就能够在Grafana网页上看到直观的统计图。
图4是统计Kafka中日志的剩余数量,一条线对应于一个partition的历史余量,大部分情况下余量接近零,符合预期。图中09:55左右日志余量开始出现很明显的尖峰,之后又迅速逼近零。事后经过多种数据核对,证实Kafka的数据一直稳定,而当时Spark Streaming执行作业突然变慢,反压机制生效,于是Kafka Receiver减小读取日志的速率,造成Kafka数据积压;一段时间之后Spark Streaming又恢复正常,快速消耗了Kafka中的数据余量。

直观的监控系统能有效地暴露问题,进而理解和强化系统。在实践中,主要的监控指标有:

  • Kafka的剩余数据量
  • Spark的作业运行时间和调度时间
  • 每个Task的计算时间
  • Codis的访问次数、时间、命中率

另外,有脚本定期分析这些统计数据,出现异常则发邮件报警。比如图4中 Kafka 的日志余量过大时,会有连续的报警邮件。我们的经验是,监控越细致,之后的优化工作越轻松。同时,优秀的监控也需要对系统深刻的理解。

图片描述

图4 Graphite监控信息,展示了Kafka中日志的剩余数量,一条线对应于一个partition的历史余量

参数优化

下表列出本项目中比较关键的几个参数: 
图片描述

总结

Spark Streaming的产品上线运行一年多,期间进行了多次Spark版本升级,从最早期的0.8版本到最近的 1.5.x版本。总体上Spark Streaming是一款优秀的实时计算框架,可以在线上使用 。但仍然存在一些不足,包括:Spark同时使用堆内和堆外的内存,缺乏一些有效的监控信息,遇到OOM时分析和调试比较困难;缺少Executor初始化接口;Spark采用函数式编程方式,抽象层次高,好处是使用方便,坏处是理解和优化困难;新版本的Spark有一些异常,如Shuffle过程中Block丢失、内存溢出。

原文链接:http://geek.csdn.NET/news/detail/78416?utm_source=tuicool&utm_medium=referral

http://blog.csdn.net/guohecang/article/details/51583214

转载于:https://my.oschina.net/xiaominmin/blog/1598462

相关文章:

Python | 一万多条拼车数据,看春运的迁徙图

作者 | 白苏,医疗健康领域产品经理一枚,Python&R爱好者来源 | InThirty编辑 | Jane今天是腊月二十八,你们都到家了吗?这篇文章,作者对北京、上海、广州、深圳、杭州等地 1万多条出行数据进行分析,得出了…

[转载] sql server 2000系统表解释

sql server 2000系统表解释汇总了几个比较有用的系统表,内容摘自联机帮助sysobjects---------------在数据库内创建的每个对象(约束、默认值、日志、规则、存储过程等)在表中占一行。只有在 tempdb 内,每个临时对象才在该表中占一…

【驱动】uboot环境变量分析

0、bootcmd 0.1 飞凌原设置 bootcmdif mmc rescan; then if run loadbootscript; then run bootscript; else if test ${bootdev} sd1; then echo update firmware.........;run update_from_sd;else echo mmc boot..........;if run loadimage; then run mmcboot; else run n…

python--属性魔法方法

转载于:https://www.cnblogs.com/Purp1e/p/8149773.html

利用三层交换机实现VLAN的通信实验报告

利用三层交换机实现VLAN的通信实验报告<?xml:namespace prefix o ns "urn:schemas-microsoft-com:office:office" />背景&#xff1a;要想实现VLAN之间的通讯,我们可以采用通过路由器实现VLAN间的通信 使用路由器实现VLAN间通信时&#xff0c;路由器与交换机…

【Qt】Qt Creator中文输入设置

#【Qt】Qt Creator中文输入设置 一、ubuntu中文输入法的设置 1、在终端中输入&#xff1a; $ ibus-setup 弹出界面如图&#xff1a; 2、选择中文输入法 3、点击右上角设置–》选择系统设置–》选择语言支持 4、语言支持选择&#xff1a; 汉语&#xff08;中国&#xff09…

为何Google将几十亿行源代码放在一个仓库?

作者 | Rachel Potvin&#xff0c;Josh Levenberg 译者 | 张建军 编辑 | apddd 【AI科技大本营导读】与大多数开发者的想象不同&#xff0c;Google只有一个代码仓库——全公司使用不同语言编写的超过10亿文件&#xff0c;近百TB源代码都存放在自行开发的版本管理系统Piper中&…

Java反射得到属性的值和设置属性的值

package com.whbs.bean; public class UserBean { private Integer id; private int age; private String name; private String address; public UserBean(){ System.out.println("实例化"); } public Integer getId() { return id; } public void setI…

ASP.NET 中的正则表达式

引言 Microsoft.NET Framework 对正则表达式的支持是一流的&#xff0c;甚至在 Microsoft ASP.NET 中也有依赖正则表达式语言的控件。本文介绍了深入学习正则表达式的基础知识和推荐内容。 本文主要面向对正则表达式知之甚少或没有使用经验&#xff0c;但却熟悉 ASP.NET、可借助…

如何用最强模型BERT做NLP迁移学习?

作者 | 台湾大学网红教授李宏毅的三名爱徒来源 | 井森堡&#xff0c;不定期更新机器学习技术文并附上质量佳且可读性高的代码。编辑 | Jane谷歌此前发布的NLP模型BERT&#xff0c;在知乎、Reddit上都引起了轰动。其模型效果极好&#xff0c;BERT论文的作者在论文里做的几个实验…

【驱动】GPIO寄存器配置总结

#【驱动】GPIO寄存器配置总结 0、设置复用功能为GPIO 1、设置引脚特性&#xff0c;与硬件匹配 2、配置寄存器举例 字段解释&#xff1a; 2.0、SRE 数据位&#xff1a;0 SRE(Slew Rate Field)&#xff1a;转换速度字段&#xff1f;&#xff1f;&#xff1f;这是一个可以调…

android Tabhost部件

本文结合源代码和实例来说明TabHost的用法。 使用TabHost 可以在一个屏幕间进行不同版面的切换&#xff0c;例如android自带的拨号应用&#xff0c;截图&#xff1a; 查看tabhost的源代码&#xff0c;主要实例变量有&#xff1a; private TabWidget mTabWidget; private Fr…

网易开源支持图像识别的自动化UI测试工具,零基础亲测好评!

编辑 | Jane出品 | AI科技大本营AI科技大本营给大家推荐了很多有意思、适合开发者们的工具&#xff0c;比如代码修复神器、帮小白快速分析 Error、PDF 翻译工具、变量命名神器等等。今天&#xff0c;营长要专门给测试人员&#xff0c;或者想做测试的小伙伴们推荐一款工具&#…

【驱动】GPIO 作为按键时的 设备树 配置

#【驱动】GPIO作为按键时的 设备树 配置 0、设备树 0.0 别名 imx6ul.dtsi 什么作用&#xff1f;&#xff1f;&#xff1f; /*************开始/ / { aliases {… gpio0 &gpio1; gpio1 &gpio2; gpio2 &gpio3; gpio3 &gpio4; gpio4 &gpio5; /**********…

最小树形图及其生产方法

诸位看官&#xff0c;这是我第一次在整篇文章的所有图片里面加水印。小弟写博客的时间不长&#xff0c;就有两篇博客被盗用并未注明原文网址。这一方面使我痛心不已&#xff0c;另一方面迫使我不得不重新考虑一下版权保护问题。小弟不是吝啬鬼&#xff0c;如果影响阅读或者是确…

【数据库】MySQL的C语言接口学习

0、【初始化】 MYSQL* mysql_init(MYSQL *mysql); 1、【设置连接选项】 int mysql_options(MYSQL *mysql,enum mysql_option option, const void *arg); 2、【连接】 MYSQL* mysql_real_connect(MYSQL *mysql, const char *host, const char *user, const char *passwd, cons…

程序员单身比例有多高?【2019开发者图鉴】告诉你

编辑 | Jane 出品 | AI科技大本营 本次调查共 8 个问题&#xff0c;根据这些数字我们整理了《2019开发者图鉴》&#xff0c;下面营长将发现的一些有意思的数字分享给大家&#xff1a; 性别与年龄 本次参与调查的男女比例约为 8&#xff1a;2&#xff08;男8女2&#xff09;。 …

26.2. Web UI

http://localhost:3000/ 原文出处&#xff1a;Netkiller 系列 手札 本文作者&#xff1a;陈景峯 转载请与作者联系&#xff0c;同时请务必标明文章原始出处和作者信息及本声明。

VC++ 6.0的小花招

Visual Studio系列中产品中&#xff0c;Visual Studio 6.0是最经典的一个版本&#xff0c;虽然后来有Visual Studio .NET 2003&#xff0c;以及2005&#xff0c;也确实添加了很多让我觉得激动的特性&#xff0c;但是从使用细节的细腻程度上来看&#xff0c;VS 6.0无疑是最棒的。…

【linux】嵌入式中 crontab的使用

0、编辑 执行&#xff1a;crontab -e 执行命令后&#xff0c;将出现一个编辑界面&#xff0c;内容格式如下 Minute Hour Day Month Dayofweek command 分钟 小时 天 月 天每星期 命令 每个字段代表的含义如下&#xff1a; Minute 每个小时的第几分钟执行该任务 Hour 每天的第几…

程序员该怎么做,才能成为coding王者?

每当做编程题目时&#xff0c;大多数人都会靠基本的直觉&#xff0c;遵循一些固定的步骤来有效地解题。不管是有意还是无意&#xff0c;在做编程题目的时你会下意识地遵循一些步骤&#xff0c;在阅读完这篇文章后你就可以将这些步骤和这篇文章联系起来&#xff0c;从而就可以更…

27.3. source code

tar zxvf bandwidthd-2.0.1.tgz cd bandwidthd-2.0.1 ./configure --prefix/srv/bandwidthd-2.0.1 make make install 原文出处&#xff1a;Netkiller 系列 手札 本文作者&#xff1a;陈景峯 转载请与作者联系&#xff0c;同时请务必标明文章原始出处和作者信息及本声明。

WF4.0实战(一):文件审批流程

http://www.cnblogs.com/zhuqil/archive/2010/04/13/DocumentApprovalProcess.html转载于:https://www.cnblogs.com/Little-Li/archive/2010/06/01/1749392.html

AI科技大本营在线公开课大放送(附演讲PPT)

新年新思&#xff01;新一年&#xff0c;每个人的梦想都闪耀着多彩光芒&#xff0c;对于AI领域的每一位学习者和从业者&#xff0c;我们充满渴望&#xff0c;怀揣梦想&#xff0c;心系对技术的不懈追求。AI科技大本营同样也有自己的新年梦想和脚踏实地的规划&#xff0c;比如今…

《微信跳一跳》安卓手机刷分软件搭建及攻略

2019独角兽企业重金招聘Python工程师标准>>> 元旦期间被微信小程序的游戏刷屏幕了。手笨脚笨的我也尝试了下这新出的小玩意&#xff0c;实在话手脚不协调最高仅仅90分&#xff0c;处于做技术的角度&#xff0c;直觉上可以技术上模拟解决&#xff0c;凑好朋友在微信群…

【libevent】libevent库学习总结(一)——基础

libevent库学习总结&#xff08;一&#xff09;——基础 一、基础 1.1、 介绍 Libevent是一个用于开发可伸缩网络服务器的事件通知库。Libevent API提供了一种机制来执行回调函数&#xff0c;当某个特定事件发生在文件描述符上或超时到达之后。此外&#xff0c;Libevent还支…

AS1.0(2.0)中的XML示例

虽然Flash早就升级为AS3.0&#xff0c;但是FMS的服务端编程依然仅支持AS1.0(2.0)&#xff0c;服务端与.net通讯的最简单方式莫过于请求一个RESTful的webService或wcf&#xff0c;通过它们返回的xml来获取数据。 var _xml:XML new XML("<ArrayOfstring xmlns\"htt…

【Qt】Qt发布可执行程序(打包依赖库)

Qt发布可执行程序&#xff08;打包依赖库&#xff09; 0、编译出可执行文件 如&#xff1a;xxx.exe 1、将xxx.exe拷贝到一个目录下面 2、启动Qt终端交互界面程序 如&#xff1a;Qt 5.6 for Desktop&#xff08;MinGW&#xff09; 3、进入xxx.exe所在的目录 4、执行命令…

小编说之“常见问题答疑”

2019独角兽企业重金招聘Python工程师标准>>> 关于前嗅Forespider爬虫的常见问题答疑 奋战在一线为客户答疑的狗蛋儿给小编提供了很多客户经常会问到的问题的素材&#xff0c;小编帮大家整理了一些&#xff0c;快来看看是不是都用的上吧&#xff01; 一、采集预览没有…