flume源码学习8-hdfs sink的具体写入流程
上一篇说了HDFSEventSink的实现,这里根据hdfs sink的配置和调用分析来看下sink中整个hdfs数据写入的过程:
线上hdfs sink的几个重要设置
1 2 3 4 5 6 7 8 | hdfs.path = hdfs: //xxxxx/%{logtypename}/%Y%m%d/%H: hdfs.rollInterval = 60 hdfs.rollSize = 0 //想让文件只根据实际来roll hdfs.rollCount = 0 hdfs.batchSize = 2000 hdfs.txnEventMax = 2000 hdfs.fileType = DataStream hdfs.writeFormat = Text |
这里说下和类相关的hdfs.fileType和hdfs.writeFormat,一个定义了文件流式用的类,一个定义了具体的数据序列化的类.
1)hdfs.fileType 有3个可选项:SequenceFile/DataStream/CompressedStream,DataStream可以想象成hdfs的textfile,默认是SequenceFileType,CompressedStream是用于压缩时设置
2)hdfs.writeFormat 定义了3种序列化方法,TEXT只写Event的body部分,HEADER_AND_TEXT写Event的body和header,AVRO_EVENT是avro的序列化方式
上面的设置,其数据写入流程大概如下:
1 | SinkRunner.process->SinkProcessor.process->HDFSEventSink.process->HDFSEventSink.append->BucketWriter.append->HDFSWriter.append->HDFSDataStream.append->BodyTextEventSerializer.write->java.io.OutputStream.write |
简单说下:
在HDFSEventSink中会实例化BucketWriter和HDFSWriter:
1 2 3 4 5 6 7 | if (bucketWriter == null ) { HDFSWriter hdfsWriter = writerFactory.getWriter(fileType ); //获取HDFSWriter 对象 .... bucketWriter = new BucketWriter(rollInterval , rollSize , rollCount , batchSize, context , realPath, realName, inUsePrefix, inUseSuffix, suffix, codeC, compType, hdfsWriter, timedRollerPool, proxyTicket, sinkCounter , idleTimeout , idleCallback, lookupPath); //根据HDFSWriter 对象获取BucketWriter对象 |
这里获取HDFSWriter 对象时用到了org.apache.flume.sink.hdfs.HDFSWriterFactory的getWriter方法,根据hdfs.fileType的设置会返回具体的org.apache.flume.sink.hdfs.HDFSWriter实现类的对象
目前只支持3种
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | static final String SequenceFileType = "SequenceFile" ; static final String DataStreamType = "DataStream" ; static final String CompStreamType = "CompressedStream" ; .... public HDFSWriter getWriter(String fileType) throws IOException { if (fileType.equalsIgnoreCase( SequenceFileType)) { //SequenceFile,sequencefile return new HDFSSequenceFile(); } else if (fileType.equalsIgnoreCase(DataStreamType)) { //DataStream return new HDFSDataStream(); } else if (fileType.equalsIgnoreCase(CompStreamType)) { //CompressedStream return new HDFSCompressedDataStream(); } else { throw new IOException( "File type " + fileType + " not supported" ); } |
BucketWriter可以理解成是对下层数据操作的一个封装,比如数据写入时其实调用了其append方法,append主要有下面几个步骤:
1)首先判断文件是否打开:
1 2 3 4 5 6 7 | if (! isOpen) { if (idleClosed) { throw new IOException( "This bucket writer was closed due to idling and this handle " + "is thus no longer valid" ); } open(); //如果没有打开,则调用open->doOpen->HDFSWriter.open方法打开bucketPath (bucketPath是临时写入目录,比如tmp结尾的目录,targetPath是最终目录) } |
doOpen的主要步骤
a.设置两个文件名:
1 2 3 | bucketPath = filePath + DIRECTORY_DELIMITER + inUsePrefix + fullFileName + inUseSuffix; targetPath = filePath + DIRECTORY_DELIMITER + fullFileName; |
b.调用HDFSWriter.open方法打开bucketPath
1 2 3 4 5 6 7 8 9 10 11 12 | if (codeC == null ) { // Need to get reference to FS using above config before underlying // writer does in order to avoid shutdown hook & IllegalStateExceptions fileSystem = new Path(bucketPath ).getFileSystem(config); LOG.info( "Creating " + bucketPath ); writer.open( bucketPath); } else { // need to get reference to FS before writer does to avoid shutdown hook fileSystem = new Path(bucketPath ).getFileSystem(config); LOG.info( "Creating " + bucketPath ); writer.open( bucketPath, codeC , compType ); } |
c.如果设置了rollInterval ,则执行计划任务调用close方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | // if time-based rolling is enabled, schedule the roll if (rollInterval > 0 ) { Callable<Void> action = new Callable<Void>() { public Void call() throws Exception { LOG.debug( "Rolling file ({}): Roll scheduled after {} sec elapsed." , bucketPath, rollInterval ); try { close(); } catch (Throwable t) { LOG.error( "Unexpected error" , t); } return null ; } }; timedRollFuture = timedRollerPool.schedule(action, rollInterval , TimeUnit. SECONDS); } |
2)判断文件是否需要翻转(达到hdfs.rollSize或者hdfs.rollCount设置):
1 2 3 4 5 | // check if it's time to rotate the file if (shouldRotate()) { close(); //close调用flush+doClose,flush调用doFlush,doFlush调用HDFSWriter.sync方法把数据同步到hdfs中 open(); } |
其中shouldRotate(基于数量和大小的roll方式):
1 2 3 4 5 6 7 8 9 10 11 12 | private boolean shouldRotate() { boolean doRotate = false ; if (( rollCount > 0 ) && (rollCount <= eventCounter )) { //hdfs.rollCount大于0并且处理的event的数量大于或等于hdfs.rollCount,doRotate 设置为true LOG.debug( "rolling: rollCount: {}, events: {}" , rollCount , eventCounter ); doRotate = true ; } if (( rollSize > 0 ) && ( rollSize <= processSize)) { //hdfs.rollCount大于0并且处理的event的数量大于或等于hdfs.rollCount,doRotate 设置为true LOG.debug( "rolling: rollSize: {}, bytes: {}" , rollSize , processSize ); doRotate = true ; } return doRotate; } |
其中doClose主要的步骤
a.调用HDFSWriter.close方法
b.调用renameBucket方法把tmp文件命名为最终文件:
1 2 3 4 | if (bucketPath != null && fileSystem != null ) { renameBucket(); // could block or throw IOException fileSystem = null ; } |
其中renameBucket:
1 | fileSystem.rename(srcPath, dstPath) |
3)调用HDFSWriter.append方法写入Event
1 | writer.append(event); |
4) 更新计数器
1 2 3 4 | // update statistics processSize += event.getBody(). length; eventCounter++; batchCounter++; |
5)判断是否需要flush(达到hdfs.batchSize的设置),batch写入数据到hdfs
1 2 3 | if (batchCounter == batchSize) { flush(); } |
Event写入时BucketWriter的append方法调用org.apache.flume.sink.hdfs.HDFSWriter实现类的append方法,比如这里的HDFSDataStream类,HDFSDataStream的主要方法:
configure用于设置serializer:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | public void configure(Context context) { serializerType = context.getString( "serializer" , "TEXT" ); //默认序列化方式为TEXT useRawLocalFileSystem = context.getBoolean( "hdfs.useRawLocalFileSystem" , false ); serializerContext = new Context(context.getSubProperties(EventSerializer.CTX_PREFIX)); logger.info( "Serializer = " + serializerType + ", UseRawLocalFileSystem = " + useRawLocalFileSystem); } append方法用于Event的写入,调用EventSerializer.write方法: public void append(Event e) throws IOException { // shun flumeformatter... serializer.write(e); //调用EventSerializer.write方法写入Event } |
open方法主要步骤:
1)根据hdfs.append.support的设置(默认为false)打开或者新建文件
1 2 3 4 5 6 7 8 | boolean appending = false ; if (conf.getBoolean( "hdfs.append.support" , false ) == true && hdfs.isFile (dstPath)) { //默认hdfs.append.support为false outStream = hdfs.append(dstPath); appending = true ; } else { outStream = hdfs.create(dstPath); //如果不支持append,则创建文件 } |
2)使用EventSerializerFactory.getInstance方法创建EventSerializer的对象
1 2 | serializer = EventSerializerFactory.getInstance( serializerType, serializerContext , outStream ); //实例化EventSerializer对象 |
3)如果EventSerializer对象支持reopen,并且hdfs.append.support设置为true时会抛出异常
1 2 3 4 5 6 | if (appending && ! serializer.supportsReopen()) { outStream.close(); serializer = null ; throw new IOException( "serializer (" + serializerType + ") does not support append" ); } |
4)调用文件打开或者reopen之后的操作
1 2 3 4 5 6 | if (appending) { serializer.afterReopen(); } else { serializer.afterCreate(); } } |
这里hdfs.writeFormat的3种设置和对应的类:
1 2 3 | TEXT(BodyTextEventSerializer.Builder. class ), //支持reopen HEADER_AND_TEXT(HeaderAndBodyTextEventSerializer.Builder. class ), //支持reopen AVRO_EVENT(FlumeEventAvroEventSerializer.Builder. class ), // 不支持reopen |
默认设置为TEXT,即BodyTextEventSerializer类:
1 2 3 4 5 6 7 8 9 10 | private BodyTextEventSerializer(OutputStream out, Context ctx) { //构造方法 this . appendNewline = ctx.getBoolean(APPEND_NEWLINE , APPEND_NEWLINE_DFLT ); //默认为true this . out = out; } .... public void write(Event e) throws IOException { //write方法 out.write(e.getBody()); //java.io.OutputStream.write,只写Event的body if (appendNewline) { //每一行之后增加一个回车 out.write( '\n' ); } |
相关文章:

详解zabbix中文版安装部署
一、zabbix简介(摘自百度百科)zabbix是一个基于WEB界面的提供分布式系统监视以及网络监视功能的企业级的开源解决方案。zabbix能监视各种网络参数,保证服务器系统的安全运营;并提供柔软的通知机制以让系统管理员快速定位/解决存在…

赠书 | 图解机器学习算法,看这文就够了!
机器学习指的是计算机根据给定的问题、课题或环境进行学习,并利用学习结果解决问题或课题等的一整套机制(图 1-1)。 图 1-1 分类的示意图与机器学习共同成为热门话题的还有人工智能和深度学习。这里梳理一下它们之间的关系(图 1-…

C#获得文件版本信息及只读文件的删除
获取文件的版本信息: FileVersionInfo myFileVersionInfo1 FileVersionInfo.GetVersionInfo("D://TEST.DLL"); textBox1.Text"版本号: " myFileVersionInfo1.FileVersion; 更改文件属性,删除只读文件: 下例欲将E:/test.txt文件…

组策略 从入门到精通(二) 如何区别跨越WAN网的计算机对组策略的套用
如果客户机与DC中间跨越了网络,造成传输速率慢的情况,我们希望通过策略中的一些元素,达成对这些计算机的另类处理。但我们并不知道这些计算机哪些与我们DC之间属于低速连接,哪些属于高速连接,那么我们要如何通过组策略…

插入记录时单引号的处理
由于Content, Title中可能包含单引号,直接使用sql的insert命令会报错,对此有两种处理方法,一种将单引号替换成两个单引号,第2种方法是使用存储过程。 表myBBS的格式定义如下: CREATE TABLE [dbo].[myBBS] ( [ID] [bi…

仅用 480 块 GPU 跑出万亿参数!全球首个“低碳版”巨模型 M6 来了
继今年 3 月阿里达摩院发布国内首个千亿参数多模态大模型 M6(MultiModality-to-MultiModality MultitaskMega-transformer,以下简称 M6) 之后,6 月 25 日,达摩院宣布对 M6 进行全新升级,带来“低碳版”巨模…

怎样将jpg转换成pdf软件
为什么80%的码农都做不了架构师?>>> 怎样将jpg转换成pdf软件 序言: 企业或个人通常会遇到设备终端软件的兼容性和支持性问题,比如,JPG转PDF文本,这给等于给用户设置了一个门槛,遇到需要将JPG转换…

二叉树的层次遍历 II
给出一棵二叉树,返回其节点值从底向上的层次序遍历(按从叶节点所在层到根节点所在的层遍历,然后逐层从左往右遍历) 样例 给出一棵二叉树 {3,9,20,#,#,15,7}, 3/ \9 20/ \15 7 按照从下往上的层次遍历为: [[15,7],[…

jquery autocomplete实现solr查询字段自动填充并执行查询
2019独角兽企业重金招聘Python工程师标准>>> 页面引入三个JS: <script type"text/javascript" src"js/jquery-1.7.2.js"></script> <script type"text/javascript" src"js/jquery-ui.js">&l…

C#使用CDO发送邮件
可以引用的COM组件列表,发现里面有一个名为Microsoft CDO For Exchange 2000 Library的COM组件,就是这个,我们可以用它来连接SMTP Server,使用用户名/密码验证发送邮件。 下面是实现的一个例子: Smtp Server使用的Smtp…

干货 | 当 YOLOv5 遇见 OpenVINO,实现自动检测佩戴口罩
YOLOv5网络YOLOv5代码链接:https://github.com/ultralytics/yolov5YOLOv5 于2020年6月横空出世!一经推出,便得到CV圈的瞩目,目前在各大目标检测竞赛、落地实战项目中得到广泛应用。 YOLOv5在COCO上的性能表现:YOLOv5一…

Ubuntu 16.04安装双显卡驱动方法收集
说明:不一定有效,要不断尝试。 http://www.linuxwang.com/html/2150.html http://blog.csdn.net/feishicheng/article/details/70662094>如有问题,请联系我:easonjim#163.com,或者下方发表评论。<

C#中的类型转换
C# 出来也有些日子了,最近由于编程的需要,对 C# 的类型转换做了一些研究,其内容涉及 C# 的装箱/拆箱/别名、数值类型间相互转换、字符的 ASCII 码和 Unicode 码、数值字符串和数值之间的转换、字符串和字符数组/字节数组之间的转换、各种数值…

解构 StyleCLIP:文本驱动、按需设计,媲美人类 P 图师
来源 | HyperAI超神经(ID:HyperAI)作者 | 神经三羊StyleCLIP 是一种新型「P 图法」,它结合了 StyleGAN 和 CLIP,可以仅依据文本描述,对图像进行修改和处理。提起 StyleGAN 大家都不陌生。这个由 NVIDIA 发布的新型生成…

nexus 4 下 DualBootInstallation 安装 ubuntu touch
最近折腾ubuntu for phone ubuntu也算是雷声大雨点小,从edge手机开始,到说兼容一大部分谷歌机,到现在缩水说只适配nexus 4 节操掉了一地啊,对付这种情况,ubuntu touch也就可以只装着玩玩了,还好ubuntu 官方…

我的家庭私有云计划-13
嗯,昨天算由感而发啊,大家看看就好了。 嗯,接着说咱们的云。 先说啊,我没打算在这个领域里面完全自研,我还没那么疯,这个呢属于一体化解决方案,我认为还是社会分工合作的结果,不强调…

C语言return函数
return函数 说到return,有必要提及主函数的定义。很多人甚至市面上的一些书籍,都使用了void main( )这一形式 ,其实这是错误的。 C/C 中从来没有定义过void main( ) 。C 之父 Bjarne Stroustrup 在他的主页上的 FAQ 中明确地写着: The defi…

怎样写出一个较好的高速排序程序
写出一个较好的高速排序程序 高速排序是经常使用的排序算法之中的一个,但要想写出一个又快又准的使用程序,就不是那么简单了须要注意的事项 首先要写正确。通常使用递归实现。其递归相当于二叉树展开,因此假设要用迭代实现的话须要使用一个队…

写代码时发现......还得是 SpringBoot !一篇拿下
关注了很多技术类公众号的读者肯定有这样一个感受,SpringBoot相关的文章铺天盖地,并且SpringBoot相关的文章阅读量、收藏量都很高,这也从侧面反映了SpringBoot技术的火爆。一切都在证明,SpringBoot已经成为了Java程序员必备的技能…

Python的 if .else.elif语句详解
If 语句 是用来判断的 Python 编程中 if 语句用于控制程序执行 用来检测一个条件:如果条件为 (真)true,就会运行这个语法块,如果为Fales 就跳过不执行。 elif是依附于if存在的,两者之间的运算逻辑相同&…

C#中string与byte[]的转换帮助类
在写C#程序时,string和byte[]之间的转换比较烦,在移植一些老程序时感觉很不好。我在C#中使用DES和TripleDES时移植一块老代码时也遇到了同样的情况。为了下次不为同样的事情烦恼,就写了下面的帮助类。 主要实现了以下…

鲲鹏入晋 万里腾飞,鲲鹏应用创新大赛2021山西赛区邀你来战!
2021 年 6 月 29 日,由山西省工业和信息化厅、山西转型综合改革示范区管理委员会为指导单位,华为技术有限公司主办,山西鲲鹏生态创新中心暨华为(山西综改区)DevCloud 创新中心承办,山西长河科技股份有限公司…

tcpdump-根据IP查看程序与服务都用了哪些端口
tcpdump -i em1 -tttt src 116.3.248.157 and port ! 6869 -nn -i 指定端口 -tttt 附带时间戳 -nn 解析域名与端口信息 ############################################# windows下可以使用netstat -nb |find “18999” 与 netstat -ao 结合使用,在通过pid号 查看进程…

快速构建Windows 8风格应用27-漫游应用数据
本篇博文主要介绍漫游应用数据概览、如何构建漫游应用数据、构建漫游应用数据最佳实践。 漫游应用数据概览 1.若应用当中使用了漫游应用数据,用户可以很轻松的在不同的设备间保持应用数据的同步。 2.Windows会将更新的漫游数据同步到云端,并将数据更新到…

jquery和css3打造超梦幻的三维动画背景
今天为大家带来的是一款由jquery和css3实现的超级梦幻的背景效果。绿色的小原点由远到近,由近到远一种飞跃效果。效果非常好看,我们一起看下效果图: 在线预览 源码下载 我们一起看下实现的代码。这是一款由jquey和css3实现的效果。这里引用…

C#时间函数扩展
//本周是本年第几周 private int DatePart(System.DateTime dt) { int weeknow Convert.ToInt32(dt.DayOfWeek);//今天星期几 int daydiff (-1) * (weeknow1);//今日与上周末的天数差 int days System.DateTime.Now.AddDays(daydiff).DayOfYear;//上周末是本年第几天 i…

“我被机器解雇了!”Amazon 63岁员工因算法评分太低被自动开除
整理 | Carol出品 | CSDN(ID:CSDNnews)“我被一个机器解雇了。”63岁“老司机”因跟踪算法被开除一觉醒来,63岁的斯蒂芬 诺曼丁(Stephen Normandin)发现自己居然被莫名其妙解雇了。斯蒂芬是Amazon Flex的一…

微信开放平台手机APP支付
PHP对接APP微信支付 微信开放平台手机APP支付总结 1. 微信开放平台手机APP支付总结 支付功能链接: https://pay.weixin.qq.com/wiki/doc/api/index.html APP支付功能文档: https://pay.weixin.qq.com/wiki/doc/api/app/app.php?chapter8_3 Demo下载地址: https://pay.weixin.q…

VS2005创建CLR自定义触发器
第一步:在Visual Studio 2005中编写代码 using System; using System.Data; using System.Data.Sql; using System.Data.SqlServer; using System.Data.SqlTypes; public partial class Triggers { // Enter existing table or view for the target and unc…

Adobe推出HTML5动画设计工具Edge
2019独角兽企业重金招聘Python工程师标准>>> HTML5和Flash,是敌对?是共存? 尽管Flash现在依然牢牢占据着网络动画的大半江山,但这种状况终将会被改变。 那么,Edge的推出是否意味着Adobe将放弃和屈服于Flash…