当前位置: 首页 > 编程日记 > 正文

Hadoop生态圈-Flume的组件之自定义拦截器(interceptor)

                Hadoop生态圈-Flume的组件之自定义拦截器(interceptor)

                                              作者:尹正杰

版权声明:原创作品,谢绝转载!否则将追究法律责任。

  本篇博客只是举例了一个自定义拦截器的方法,测试字节传输速度。

1>.自定义interceptor方法

  1 /*
  2 @author :yinzhengjie
  3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E7%94%9F%E6%80%81%E5%9C%88/
  4 EMAIL:y1053419035@qq.com
  5 */
  6 package cn.org.yinzhengjie.interceptor;
  7 
  8 import org.apache.flume.Context;
  9 import org.apache.flume.Event;
 10 import org.apache.flume.interceptor.Interceptor;
 11 
 12 import java.util.List;
 13 
 14 /**
 15  * 设置限速拦截器
 16  * <p>
 17  * 当 字节/时间,即同一时刻,如果进入的字节过多
 18  * 则休眠一会
 19  */
 20 public class MyInterceptor implements Interceptor {
 21 
 22     private int speed;
 23 
 24 
 25     //构造
 26     private MyInterceptor(int speed) {
 27         this.speed = speed;
 28     }
 29 
 30 
 31     //do nothing
 32     public void initialize() {
 33 
 34     }
 35 
 36     /**
 37      * 1、拿出上一个event的时间,和当前时间进行相减,得出上一个event的时间间隔
 38      * 2、得到上一个event的body字节数
 39      * 3、相除得到上一个event的速度,并在此event中先进行停留,再返回event
 40      *
 41      * @param event
 42      * @return
 43      */
 44 
 45     long lastTime = -1;
 46     long lastBodySize = 0;
 47 
 48     public Event intercept(Event event) {
 49 
 50 
 51         byte[] body = event.getBody();
 52         int len = body.length;
 53 
 54 
 55         long current = System.nanoTime();
 56 
 57         //第一个event
 58         if (lastTime == -1) {
 59             lastTime = current;
 60             lastBodySize = len;
 61         }
 62 
 63         //非第一个event
 64         else {
 65             //计算上一个event停留的时间
 66             long interval = current - lastTime;
 67             System.out.println("=========================" + current + "/" + lastTime + "/" + interval + "=========================");
 68             //上一个event的速度
 69             int now_speed = (int) ((double) lastBodySize / interval * 1000);
 70             if (now_speed > speed) {
 71                 System.out.println("=========================" + now_speed + "=========================");
 72                 //计算需要停留多少秒 线程休眠,时间 = shouldTime - interval
 73                 try {
 74                     Thread.sleep((lastBodySize / speed) * 1000 - interval);
 75                 } catch (InterruptedException e) {
 76                     e.printStackTrace();
 77                 }
 78             }
 79             lastBodySize = len;
 80             lastTime = System.currentTimeMillis();
 81 
 82         }
 83         return event;
 84 
 85     }
 86 
 87     //迭代List<Event>,将所有Event交给intercept(Event)进行处理
 88     public List<Event> intercept(List<Event> events) {
 89         for (Event event : events) {
 90             intercept(event);
 91         }
 92         return events;
 93     }
 94 
 95     //do nothing
 96     public void close() {
 97 
 98     }
 99 
100     public static class Builder implements Interceptor.Builder {
101 
102         private int speed;
103 
104         public void configure(Context context) {
105             speed = context.getInteger(Constants.SPEED, Constants.DEFAULT_SPEED);
106 
107         }
108 
109         public Interceptor build() {
110             return new MyInterceptor(speed);
111         }
112     }
113 
114     public static class Constants {
115         public static String SPEED = "speed";
116         public static int DEFAULT_SPEED = 1;
117 
118     }
119 }

2>.打包并将其发送到 /soft/flume/lib下

[yinzhengjie@s101 ~]$ cd /soft/flume/lib/
[yinzhengjie@s101 lib]$ 
[yinzhengjie@s101 lib]$ ll | grep MyFlume
-rw-r--r--  1 yinzhengjie yinzhengjie    5231 Jun 20 18:53 MyFlume-1.0-SNAPSHOT.jar
[yinzhengjie@s101 lib]$ 
[yinzhengjie@s101 lib]$ rm -rf MyFlume-1.0-SNAPSHOT.jar 
[yinzhengjie@s101 lib]$ 
[yinzhengjie@s101 lib]$ rz[yinzhengjie@s101 lib]$ 
[yinzhengjie@s101 lib]$ ll | grep MyFlume
-rw-r--r--  1 yinzhengjie yinzhengjie    8667 Jun 20 21:02 MyFlume-1.0-SNAPSHOT.jar
[yinzhengjie@s101 lib]$ 
[yinzhengjie@s101 lib]$ 

3>.编写agent的配置文件

[yinzhengjie@s101 ~]$ more /soft/flume/conf/yinzhengjie_myInterceptor.conf 
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# 定义源: seq
a1.sources.r1.type = seq
# 定义一次RPC产生的批次数量
a1.sources.r1.batchSize = 1024# 指定添加拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = cn.org.yinzhengjie.interceptor.MyInterceptor$Builder
a1.sources.r1.interceptors.i1.speed = 1# Describe the sink
a1.sinks.k1.type = logger# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 10000# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
[yinzhengjie@s101 ~]$ 

4>.启动flume并测试

[yinzhengjie@s101 ~]$ flume-ng agent -f /soft/flume/conf/yinzhengjie_myInterceptor.conf -n a1

  下图是运行agent部分的输出内容 

转载于:https://www.cnblogs.com/yinzhengjie/p/9208268.html

相关文章:

Pandas 重复数据处理大全

作者 | 东哥起飞来源 | Python数据科学本次来介绍重复值处理的常用方法。重复值处理主要涉及两个部分&#xff0c;一个是找出重复值&#xff0c;第二个是删除重复值&#xff0c;也就是根据自己设定的条件进行删除操作。定位重复值对于重复值&#xff0c;我们首先需要查看这些重…

DEDECMS教程:上/下一篇文章标题长度的截取方法

对dedecms了解的朋友们&#xff0c;想必对如何获取上一篇、下一篇文章的标签也是非常熟悉。dedecms获取上一篇、下一篇文章的标签分别为&#xff1a;{dede:prenext getpre/}、{dede:prenext getnext}。 在这个标签里&#xff0c;并没有设置上一篇、下一篇文章标题字数的功能&am…

以太网帧最小字节数以及以太网碰撞

说明&#xff1a;本文源自多个网页&#xff0c;原文链接已经不可寻 以太网是无连接的&#xff0c;不可靠的服务&#xff0c;采用尽力传输的机制。以太网CSMA/CD我就不多讲了&#xff0c;我相信大家都了解这个原理。以太网是不可靠的&#xff0c;这意味着它并不知道对方有没有收…

lodash 提取前N个元素 take

_.take(array, [n1])从数组的起始元素开始提取 N 个元素。 <!DOCTYPE html> <html lang"zh"><head><meta charset"UTF-8" /><meta name"viewport" content"widthdevice-width, initial-scale1.0" /><…

JavaScript 中的有限状态机

http://www.ibm.com/developerworks/cn/web/wa-finitemach/JavaScript 中的有限状态机Page navigation系列文章有限状态机很早就已用作设计和实现事件驱动的程序&#xff08;比如网络适配器和编译器&#xff09;内复杂行为的组织原则。现在&#xff0c;可编程的 Web 浏览器为新…

健身也内卷?这届网友用 Python 掌握了做标准俯卧撑的秘诀

自己在家锻炼时&#xff0c;我们很难知道自己的动作是否标准。本文作者用Python写了一个可以检测俯卧撑动作是否标准的程序&#xff0c;一起来看看他是怎么做的。 译者 | 章雨铭 出品 | CSDN 在新加坡军队中&#xff0c;有一种测试叫做IPPT&#xff08;个人身体素质测试&#x…

Linux下配置SNMP

SNMP&#xff08;Simple NetworkManagement Protocol &#xff09;&#xff1a;简单网络管理协议本案列为Linux环境搭建&#xff08;CentOS-6&#xff09;使用yum进行安装&#xff1a;yum install -y net-snmp安装完成后先查看网络配置&#xff1a;netstat -anupl确认161端口被…

重磅!MaxCompute助力阿里开源自研语音识别模型DFSMN,准确率高达96.04%

阿里开源语音识别模型DFSMN 在近期举行的云栖大会武汉峰会上&#xff0c;装有DFSMN语音识别模型的“AI收银员”在与真人店员的PK中&#xff0c;在嘈杂环境下准确识别了用户的语音点单&#xff0c;在短短49秒内点了34杯咖啡。此外&#xff0c;装备这一语音识别技术的自动售票机也…

全球网络拓扑图

原文出自海外一个论坛&#xff1a;http://board.us.ikariam.com/board16-miscellaneous/board205-discussion-board/67724-internet/InternetSo, I always take a look at these maps and also information on the workings of the internetAnd I have questions:Are there map…

Python 实现 PD 文字识别、提取并写入 CSV 文件脚本分享

作者 | 一只河马h来源 | 简说Python一、前言二、需求描述三、开始动手动脑3.1 安装相关第三方包3.2 导入需要用到的第三方库3.3 读取pdf文件&#xff0c;并识别内容3.4 对识别的数据进行处理&#xff0c;写入csv文件总结前言扫描件一直受大众青睐&#xff0c;任何纸质资料在扫描…

根据listObject中的某个字段排序

compareTo必须是两个对象之间的比较(比如Long,Integer...),以下例子是升序排序 private void businessSort(List<WxDailyBusinessInfo> wxDailyBusinessInfo) { //排序前for (int i 0; i < wxDailyBusinessInfo.size(); i) {System.out.println(new Gson(…

CC国内厂商现状

&#xff08;1&#xff09;阿里云的产品 CC攻击 攻击者攻击服务器的认证页面、登录页面、游戏论坛等。还是用饭馆的例子&#xff0c;CC攻击相当于&#xff0c;坏人霸占收银台结账、霸占服务员点菜&#xff0c;导致正常的客人无法享受到服务。 游戏盾如何防御CC攻击&#xff1f;…

网络模型 - 随机网络,无标度网络,分层网络

转自&#xff1a; http://www.flickr.com/photos/caseorganic/4510691991/in/set-72157624621620243小图大图Network Models - Random network, Scale-free network, Hierarchical network随机网络The Erds–Rnyi (ER) model of a random network14 (see figure, part A) start…

一文介绍机器学习中的三种特征选择方法

作者 | luanhz来源 | 小数志导读机器学习中的一个经典理论是&#xff1a;数据和特征决定了机器学习的上限&#xff0c;而模型和算法只是逼近这个上限。也正因如此&#xff0c;特征工程在机器学习流程中占有着重要地位。广义的特征工程一般可分为三个环节&#xff1a;特征提取、…

[转化率预估-1]引言

原文&#xff1a;hhttp://www.flickering.cn/ads/2014/06/%E8%BD%AC%E5%8C%96%E7%8E%87%E9%A2%84%E4%BC%B0%E2%80%94%E2%80%94%E5%BC%95%E8%A8%80/ 最近几年&#xff0c;“计算广告学”的概念风生水起&#xff0c;让我们这些从事在线广告匹配技术的程序猿着实荣耀了一把。这在参…

reportNG定制化之失败截图及日志

先从github上拉下 reportNg的源代码 reportng 拉下源码后我们使用IDEA进行导入 1、reportng.properties 增加部分类表项 这里我们直接在末尾添加 logLog Info screenshotScreen Shot durationDuration2、results.html.vm 修改结果的html&#xff0c;我们目前只修改fail的情况下…

基于 OpenCV 的手掌检测和手指计数

作者 | 努比 来源 | 小白学视觉 利用余弦定理使用OpenCV-Python实现手指计数与手掌检测。 手检测和手指计数 接下来让我们一起探索以下这个功能是如何实现的。 OpenCV OpenCV&#xff08;开源计算机视觉库&#xff09;是一个开源计算机视觉和机器学习软件库。OpenCV的构建旨在为…

side menu待研究

2019独角兽企业重金招聘Python工程师标准>>> http://fontawesome.bootstrapcheatsheets.com/ http://www.queness.com/post/14666/recreate-google-nexus-menu http://www.jqueryscript.net/demo/Sliding-Side-Menu-Panel-with-jQuery-Bootstrap-BootSideMenu/ &a…

Gitlab Issue Tracker and Wiki(一)

本节内容&#xff1a;创建第一个问题创建第一个合并请求接受合并请求工作里程碑在提交中引用问题创建维基百科页使用Gollum管理维基百科一. 创建问题1. 登陆Gitlab服务器2. 切换到想要创建问题的项目3. 点击Issues.4. 点击【New issue】5. 根据情况进行填写。二. 创建合并请求1…

runtime实践之Method Swizzling

利用 Objective-C 的 Runtime 特性&#xff0c;我们可以给语言做扩展&#xff0c;帮助解决项目开发中的一些设计和技术问题。这一篇&#xff0c;我们来探索一些利用 Objective-C Runtime 的黑色技巧。这些技巧中最具争议的或许就是 Method Swizzling 。 介绍一个技巧&#xff0…

网络协议关系拓扑图 很全面 很好

NETWORK ASSOCIATES GUIDE TO COMMUNICATIONS PROTOCOLS 网络协议关系拓扑图 很全面 很好 值得收藏&#xff01;

一行代码搞定 Python 逐行内存消耗分析

作者 | 费弗里来源 | Python大数据分析我们即将学习的是&#xff1a;一行代码分析Python代码行级别内存消耗。很多情况下&#xff0c;我们需要对已经写好的Python程序的内存消耗进行优化&#xff0c;但是一段代码在运行过程中的内存消耗是动态变化的&#xff0c;这种时候就可以…

崛起于Springboot2.X之Mybatis-全注解方式操作Mysql(4)

为什么80%的码农都做不了架构师&#xff1f;>>> 1、使用注解方式对mysql增删改查,它很方便&#xff0c;不像一些逆向工程工具一样生成的都是乱七八糟&#xff0c;虽然很全的方法&#xff0c;完全手写sql 基于上一篇博客&#xff0c;我们只需要新建一个目录dao层&am…

hdu 1247

Problem DescriptionA hat’s word is a word in the dictionary that is the concatenation of exactly two other words in the dictionary.You are to find all the hat’s words in a dictionary.InputStandard input consists of a number of lowercase words, one per li…

php执行URL解析

方法一&#xff1a; $url"http://www.baidu.com";file_get_contents($url);方法二&#xff1a; // CURL 方法$url"http://www.baidu.com";$ch curl_init( );curl_setopt( $ch,CURLOPT_URL,$url );curl_setopt( $ch,CURLOPT_HEADER,0 );curl_setopt( $ch,…

Python 来分析,堪比“唐探系列”!B站9.5分好评如潮!

作者 | 菜鸟哥来源 | 菜鸟学PythonHello 小伙伴们&#xff0c;最近一部非常不错的悬疑侦探喜剧 电影&#xff0c;登上B站热榜&#xff01;菜鸟哥看完之后&#xff0c;大呼过瘾&#xff0c;简直就是一本非常棒的"剧本杀"&#xff01;演员都是实力派&#xff0c;演技超…

10进制转换为二十六进制字符串A-Z

def convert10to26(num): ...: 10进制转为26进制字母 A-Z, 输入参数10进制数num, 返回26位的字母A-Z 参数type&#xff1a; num: int return: str ...: ...: digit_list [] # 列表当栈使用&#xff0c;存储每次求余的结果 ...: while num !0: ...: digit_list.append(num%26)…

从hello world 说程序运行机制

http://www.cnblogs.com/yanlingyin/archive/2012/03/05/2379199.html 开篇 学习任何一门编程语言&#xff0c;都会从hello world 开始。对于一门从未接触过的语言&#xff0c;在短时间内我们都能用这种语言写出它的hello world。然而&#xff0c;对于hello world 这个简单程序…

爱耳日腾讯天籁行动再升级 助力100位青年听障人才打破“屏障”

公益是解决社会问题的重要切入口&#xff0c;科技是提升效率的强有力工具。当产业技术走入公益场景&#xff0c;科技也在发挥更大的社会价值。 《中国听力健康报告&#xff08;2021&#xff09;》显示&#xff0c;过度的噪音曝露&#xff0c;正让全球11亿年轻人面临听力受损的风…

IOS推送详解

为什么80%的码农都做不了架构师&#xff1f;>>> IOS推送详解 一.关于推送通知 推送通知&#xff0c;也被叫做远程通知&#xff0c;是在iOS 3.0以后被引入的功能。是当程序没有启动或不在前台运行时&#xff0c;告诉用户有新消息的一种途径&#xff0c;是从外部服务…