当前位置: 首页 > 编程日记 > 正文

[转]自定义hadoop map/reduce输入文件切割InputFormat

本文转载自:http://hi.baidu.com/lzpsky/blog/item/99d58738b08a68e7b311c70d.html

hadoop会对原始输入文件进行文件切割,然后把每个split传入mapper程序中进行处理,FileInputFormat是所有以文件作 为数据源的InputFormat实现的基类,FileInputFormat保存作为job输入的所有文件,并实现了对输入文件计算splits的方 法。至于获得记录的方法是有不同的子类进行实现的。

那么,FileInputFormat是怎样将他们划分成splits的呢?FileInputFormat只划分比HDFS block大的文件,所以如果一个文件的大小比block小,将不会被划分,这也是Hadoop处理大文件的效率要比处理很多小文件的效率高的原因。

hadoop默认的InputFormat是TextInputFormat,重写了FileInputFormat中的createRecordReader和isSplitable方法。该类使用的reader是LineRecordReader,即以回车键(CR = 13)或换行符(LF = 10)为行分隔符。

但大多数情况下,回车键或换行符作为输入文件的行分隔符并不能满足我们的需求,通常用户很有可能会输入回车键、换行符,所以通常我们会定义不可见字符(即用户无法输入的字符)为行分隔符,这种情况下,就需要新写一个InputFormat。

又或者,一条记录的分隔符不是字符,而是字符串,这种情况相对麻烦;还有一种情况,输入文件的主键key已经是排好序的了,需要hadoop做的只是把相 同的key作为一个数据块进行逻辑处理,这种情况更麻烦,相当于免去了mapper的过程,直接进去reduce,那么InputFormat的逻辑就相 对较为复杂了,但并不是不能实现。

1、改变一条记录的分隔符,不用默认的回车或换行符作为记录分隔符,甚至可以采用字符串作为记录分隔符
     1)自定义一个InputFormat,继承FileInputFormat,重写createRecordReader方法,如果不需要分片或者需要改变分片的方式,则重写isSplitable方法,具体代码如下:

public class FileInputFormatB extends FileInputFormat<LongWritable, Text> {

@Override

public RecordReader<LongWritable, Text> createRecordReader( InputSplit split, TaskAttemptContext context) {
        return new SearchRecordReader("\b");

}

@Override
    protected boolean isSplitable(FileSystem fs, Path filename) {
         // 输入文件不分片
        return false;
     }
}

2)关键在于定义一个新的SearchRecordReader继承RecordReader,支持自定义的行分隔符,即一条记录的分隔符。标红的地方为与hadoop默认的LineRecordReader不同的地方。

public class IsearchRecordReader extends RecordReader<LongWritable, Text> {
 private static final Log LOG = LogFactory.getLog(IsearchRecordReader.class);
 
 private CompressionCodecFactory compressionCodecs = null;
 private long start;
 private long pos;
 private long end;
 private LineReader in;
 private int maxLineLength;
 private LongWritable key = null;
 private Text value = null;
 //行分隔符,即一条记录的分隔符
 private byte[] separator = {'\b'};
 private int sepLength = 1;

‍ public IsearchRecordReader(){
 }
 public IsearchRecordReader(String seps){
  this.separator = seps.getBytes(); 
  sepLength = separator.length;
 }

public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException {
  FileSplit split = (FileSplit) genericSplit;
  Configuration job = context.getConfiguration();
  this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE);

this.start = split.getStart();
  this.end = (this.start + split.getLength());
  Path file = split.getPath();
  this.compressionCodecs = new CompressionCodecFactory(job);
  CompressionCodec codec = this.compressionCodecs.getCodec(file);

// open the file and seek to the start of the split
  FileSystem fs = file.getFileSystem(job);
  FSDataInputStream fileIn = fs.open(split.getPath());
  boolean skipFirstLine = false;
  if (codec != null) {
   this.in = new LineReader(codec.createInputStream(fileIn), job);
   this.end = Long.MAX_VALUE;
  } else {
   if (this.start != 0L) {
    skipFirstLine = true;
    this.start -= sepLength;
    fileIn.seek(this.start);
   }
   this.in = new LineReader(fileIn, job);
  }
  if (skipFirstLine) { // skip first line and re-establish "start".
   int newSize = in.readLine(new Text(), 0, (int) Math.min( (long) Integer.MAX_VALUE, end - start));
   
   if(newSize > 0){
    start += newSize;
   }
  }

this.pos = this.start;
 }

public boolean nextKeyValue() throws IOException {
  if (this.key == null) {
   this.key = new LongWritable();
  }
  this.key.set(this.pos);
  if (this.value == null) {
   this.value = new Text();
  }
  int newSize = 0;
  while (this.pos < this.end) {
   newSize = this.in.readLine(this.value, this.maxLineLength, Math.max(
 (int) Math.min(Integer.MAX_VALUE, this.end - this.pos), this.maxLineLength));

if (newSize == 0) {
    break;
   }
   this.pos += newSize;
   if (newSize < this.maxLineLength) {
    break;
   }

LOG.info("Skipped line of size " + newSize + " at pos " + (this.pos - newSize));
  }

if (newSize == 0) {
   //读下一个buffer
   this.key = null;
   this.value = null;
   return false;
  }
  //读同一个buffer的下一个记录
  return true;
 }

public LongWritable getCurrentKey() {
  return this.key;
 }

public Text getCurrentValue() {
  return this.value;
 }

public float getProgress() {
  if (this.start == this.end) {
   return 0.0F;
  }
  return Math.min(1.0F, (float) (this.pos - this.start) / (float) (this.end - this.start));
 }

public synchronized void close() throws IOException {
  if (this.in != null)
   this.in.close();
 }

}

3)重写SearchRecordReader需要的LineReader,可作为SearchRecordReader内部类。特别需要注意的地方就 是,读取文件的方式是按指定大小的buffer来读,必定就会遇到一条完整的记录被切成两半,甚至如果分隔符大于1个字符时分隔符也会被切成两半的情况, 这种情况一定要加以拼接处理。

public class LineReader {
  //回车键(hadoop默认)
  //private static final byte CR = 13;
  //换行符(hadoop默认)
  //private static final byte LF = 10;
    
  //按buffer进行文件读取
  private static final int DEFAULT_BUFFER_SIZE = 32 * 1024 * 1024;
  private int bufferSize = DEFAULT_BUFFER_SIZE;
  private InputStream in;
  private byte[] buffer;
  private int bufferLength = 0;
  private int bufferPosn = 0;
  
  LineReader(InputStream in, int bufferSize) {
   this.bufferLength = 0;
    this.bufferPosn = 0;
     
   this.in = in;
   this.bufferSize = bufferSize;
   this.buffer = new byte[this.bufferSize];
  }

public LineReader(InputStream in, Configuration conf) throws IOException {
   this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));
  }

public void close() throws IOException {
   in.close();
  }

public int readLine(Text str, int maxLineLength) throws IOException {
   return readLine(str, maxLineLength, Integer.MAX_VALUE);
  }

public int readLine(Text str) throws IOException {
   return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
  }

//以下是需要改写的部分_start,核心代码

public int readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException{
   str.clear();
   Text record = new Text();
   int txtLength = 0;
   long bytesConsumed = 0L;
   boolean newline = false;
   int sepPosn = 0;
   
   do {
    //已经读到buffer的末尾了,读下一个buffer
    if (this.bufferPosn >= this.bufferLength) {
     bufferPosn = 0;
     bufferLength = in.read(buffer);
     
     //读到文件末尾了,则跳出,进行下一个文件的读取
     if (bufferLength <= 0) {
      break;
     }
    }
    
    int startPosn = this.bufferPosn;
    for (; bufferPosn < bufferLength; bufferPosn ++) {
     //处理上一个buffer的尾巴被切成了两半的分隔符(如果分隔符中重复字符过多在这里会有问题)
     if(sepPosn > 0 && buffer[bufferPosn] != separator[sepPosn]){
      sepPosn = 0;
     }
     
     //遇到行分隔符的第一个字符
     if (buffer[bufferPosn] == separator[sepPosn]) {
      bufferPosn ++;
      int i = 0;
      
      //判断接下来的字符是否也是行分隔符中的字符
      for(++ sepPosn; sepPosn < sepLength; i ++, sepPosn ++){
       
       //buffer的最后刚好是分隔符,且分隔符被不幸地切成了两半
       if(bufferPosn + i >= bufferLength){
        bufferPosn += i - 1;
        break;
       }
       
       //一旦其中有一个字符不相同,就判定为不是分隔符
       if(this.buffer[this.bufferPosn + i] != separator[sepPosn]){
        sepPosn = 0;
        break;
       }
      }
      
      //的确遇到了行分隔符
      if(sepPosn == sepLength){
       bufferPosn += i;
       newline = true;
       sepPosn = 0;
       break;
      }
     }
    }

    
    int readLength = this.bufferPosn - startPosn;

bytesConsumed += readLength;
    //行分隔符不放入块中
    //int appendLength = readLength - newlineLength;
    if (readLength > maxLineLength - txtLength) {
     readLength = maxLineLength - txtLength;
    }
    if (readLength > 0) {
     record.append(this.buffer, startPosn, readLength);
     txtLength += readLength;
     
     //去掉记录的分隔符
     if(newline){
      str.set(record.getBytes(), 0, record.getLength() - sepLength);
     }
    }

} while (!newline && (bytesConsumed < maxBytesToConsume));

if (bytesConsumed > (long)Integer.MAX_VALUE) {
    throw new IOException("Too many bytes before newline: " + bytesConsumed);
   }
   
   return (int) bytesConsumed;
  }

//以下是需要改写的部分_end

//以下是hadoop-core中LineReader的源码_start

public int readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException{
    str.clear();
    int txtLength = 0;
    int newlineLength = 0;
    boolean prevCharCR = false;
    long bytesConsumed = 0L;
    do {
      int startPosn = this.bufferPosn;
      if (this.bufferPosn >= this.bufferLength) {
        startPosn = this.bufferPosn = 0;
        if (prevCharCR)  bytesConsumed ++;
        this.bufferLength = this.in.read(this.buffer);
        if (this.bufferLength <= 0)  break;
      }
      for (; this.bufferPosn < this.bufferLength; this.bufferPosn ++) {
        if (this.buffer[this.bufferPosn] == LF) {
          newlineLength = (prevCharCR) ? 2 : 1;
          this.bufferPosn ++;
          break;
        }
        if (prevCharCR) {
          newlineLength = 1;
          break;
        }
        prevCharCR = this.buffer[this.bufferPosn] == CR;
      }
      int readLength = this.bufferPosn - startPosn;
      if ((prevCharCR) && (newlineLength == 0))
        --readLength;
      bytesConsumed += readLength;
      int appendLength = readLength - newlineLength;
      if (appendLength > maxLineLength - txtLength) {
        appendLength = maxLineLength - txtLength;
      }
      if (appendLength > 0) {
        str.append(this.buffer, startPosn, appendLength);
        txtLength += appendLength; }
    }
    while ((newlineLength == 0) && (bytesConsumed < maxBytesToConsume));

if (bytesConsumed > (long)Integer.MAX_VALUE) throw new IOException("Too many bytes before newline: " + bytesConsumed);
    return (int)bytesConsumed;
  }

//以下是hadoop-core中LineReader的源码_end

}

2、已经按主键key排好序了,并保证相同主键key一定是在一起的,假设每条记录的第一个字段为主键,那么如 果沿用上面的LineReader,需要在核心方法readLine中对前后两条记录的id进行equals判断,如果不同才进行split,如果相同继 续下一条记录的判断。代码就不再贴了,但需要注意的地方,依旧是前后两个buffer进行交接的时候,非常有可能一条记录被切成了两半,一半在前一个buffer中,一半在后一个buffer中。

这种方式的好处在于少去了reduce操作,会大大地提高效率,其实mapper的过程相当的快,费时的通常是reduce。

转载于:https://www.cnblogs.com/Dreama/archive/2011/09/19/2181523.html

相关文章:

使用深度学习检测DGA(域名生成算法)——LSTM的输入数据本质上还是词袋模型...

from:http://www.freebuf.com/articles/network/139697.html DGA&#xff08;域名生成算法&#xff09;是一种利用随机字符来生成C&C域名&#xff0c;从而逃避域名黑名单检测的技术手段。例如&#xff0c;一个由Cryptolocker创建的DGA生成域xeogrhxquuubt.com&#xff0c;如…

学习Python开发培训有用吗

学习Python开发培训有用吗?这是目前很多人都比较关注的一个问题&#xff0c;Python语言在最近几年是广受IT互联网行业关注的&#xff0c; 下面我们就针对这问题来详细的分析一下。 学习Python开发培训有用吗?Python是被广泛使用的高级编程语言&#xff0c;Python解释器本身几…

Web性能优化实践——应用层性能优化

随着公司项目的进一步推广&#xff0c;用户数量的增加&#xff0c;已经面临着单台服务器不能负载的问题。 这次的优化由于时间关系主要分两步走&#xff0c;首先优化应用层代码以提高单台服务器的负载和吞吐率。之后再进行分表&#xff0c;引入队列、MemCached等分布式应用。 项…

技术图文:Python魔法方法之属性访问详解

背景 今天在B站学习“零基础入门学习 Python”中的第45节“魔法方法&#xff1a;属性访问”&#xff0c;这也是我们组织的 Python基础刻意练习活动 的学习任务&#xff0c;其中有这样的一个题目。 练习要求&#xff1a; 写一个矩形类&#xff0c;默认有宽和高两个属性。如果…

chmod权限设置

drwxr-xr-x. 7 root root 4096 Sep 26 20:16 sysconfig-rw-r--r--. 1 root root 1150 Aug 31 18:46 sysctl.conflrwxrwxrwx. 1 root root 14 Aug 31 17:21 system-release -> centos-release例如&#xff1a;-rw-r--r--第一个代表文件类型:-普通文件&#xff1a;…

【Python培训基础】一篇文件教你py文件打包成exe

场景: 如果要将我们编写好的代码给别人使用,如果要他们直接使用我们的代码,就需要安装各种编译软件以及第三方模块,还要对软件操作,编程有一定的了解,这对使用者的要求比较高,不是很方便,为了解决这一问题,我们可以选择将我们编写的代码,编译成一个可执行文件,这样,就可以实现跨…

刻意练习:Python基础 -- Task06. 字典与集合

背景 我们准备利用17天时间&#xff0c;将 “Python基础的刻意练习” 分为如下任务&#xff1a; Task01&#xff1a;变量、运算符与数据类型&#xff08;1day&#xff09;Task02&#xff1a;条件与循环&#xff08;1day&#xff09;Task03&#xff1a;列表与元组&#xff08;…

WCF - Session 剖析

WCF中的Session WCF是MS基于SOA建立的一套在分布式环境中各个相对独立系统进行通信的构架&#xff0c;实现了最新的基于WS-*规范。按照SOA的原则&#xff0c;相对独自的业务逻辑以service的形式封装&#xff0c;调用者通过Messaging的方式调用Service。对于承载着某个业务功能的…

mui 微信支付 与springMVC服务器交互

昨天搞定了微信支付,没有想象中的难,主要是官方的demo不全好多东西要自己琢磨,mui端的就不写了支付宝的有了一模一样.上java端的首先是jar包 一个是用来解析xml文件 一个是用来解析json 当然可以替代 然后是工具类当然并不是全都用的到. public class ConfigUtil { /** * 服务…

Python零基础自学会有哪些弊端

Python在人工智能领域的发展前景非常好&#xff0c;很多人都想要学习Python技术&#xff0c;有一些小伙伴会选择通过自学来学习&#xff0c;但是如果是零基础&#xff0c;自学的话一定要注意这些弊端&#xff0c;下面就为大家详细的介绍一下Python零基础自学会有哪些弊端? Pyt…

技术图文:如何利用 Turtle 绘制一棵漂亮的樱花树

背景 最近看到很多机构在推动“青少年编程能力等级标准”的制定以及相关考试的测评&#xff0c;看样子今年年底这个事情就能够确定&#xff0c;明天上半年在一些大中城市就会全面铺开。 《青少年编程能力等级》标准发布&#xff0c;年底前将在部分地区落地青少年编程能力等级…

Python 是否是下一个 PHP?为什么?

前几天和一个看好 Python 的 Rails 开发者聊天&#xff0c;他看好 Python 的原因就是 PHP 统治今天的网络应用开发。而 Python 很像下一个 PHP 。 『下一个 PHP』如何定义&#xff1f;是指流行程度么&#xff1f;如果是的话&#xff0c;我觉得 Python 不会像 PHP 那样流行。根本…

正确使用STL-MAP中Erase函数

一切尽在代码中。 #include <iostream> #include <map> #include <string> using namespace std ;int main(void) { map<int, string> m ;m.insert(pair<int, string>(1, "abc")) ;m.insert(pair<int, string>(2, "def&qu…

学完UI设计可以从事哪些工作

最近有很多同学都会问到一个问题&#xff0c;就是学完UI设计可以从事哪些工作?对于正在学习UI设计的同学和已经学完UI设计的同学们&#xff0c;可以来看看下面文章的详细介绍就知道了。 学完UI设计可以从事哪些工作? 一、交互设计师。 学习UI设计之后就可以做交互设计师了&am…

刻意练习:Python基础 -- Task08. 异常处理

背景 我们准备利用17天时间&#xff0c;将 “Python基础的刻意练习” 分为如下任务&#xff1a; Task01&#xff1a;变量、运算符与数据类型&#xff08;1day&#xff09;Task02&#xff1a;条件与循环&#xff08;1day&#xff09;Task03&#xff1a;列表与元组&#xff08;…

Winform 控件自适应 JSP 入门登录案例

明儿在放&#xff0c;先睡转载于:https://www.cnblogs.com/javabin/archive/2011/09/26/2192402.html

MyEclipse对Struts2配置文件较检异常 Invalid result location value/parameter

有时在编写struts.xml时会报错&#xff0c;但是找不出有什么她方有问题。也能正常运行 MyEclipse有地方去struts的xml进行了验证&#xff0c;经查找把这里 的build去掉就可以了 本文转自lpxxn博客园博客&#xff0c;原文链接&#xff1a;http://www.cnblogs.com/li-peng/p/3791…

学Python有哪些优势

Python在人工智能领域应用是比较广泛的&#xff0c;近几年&#xff0c;越来越多的人对Python技术比较感兴趣&#xff0c;想要学习&#xff0c;那么具体学Python有哪些优势呢?我们来看看下面的详细介绍就知道了。 学Python有哪些优势? 1.Python很受欢迎 流行程度似乎不是衡量价…

MongoDB 正则表达式

阅读目录 示例不区分大小写数组使用正则表达式正则中包含变量回到顶部示例 MongoDB 使用 $regex 操作符来设置匹配字符串的正则表达式。 > db.col.find() { "_id" : ObjectId("56c6bbef64799370c0ef358a"), "x" : "hello world", &…

刻意练习:Python基础 -- Task09. else 与 with 语句

背景 我们准备利用17天时间&#xff0c;将 “Python基础的刻意练习” 分为如下任务&#xff1a; Task01&#xff1a;变量、运算符与数据类型&#xff08;1day&#xff09;Task02&#xff1a;条件与循环&#xff08;1day&#xff09;Task03&#xff1a;列表与元组&#xff08;…

Java学习必不可少的网站,快收藏起来

java技术在IT互联网行业的发展前景一直在提升&#xff0c;越来越多的人都在学习java技术&#xff0c;今天小编来给大家提供一些学习Java的网站集合&#xff0c;希望能够帮助到正在学习java技术的同学。 Java学习必不可少的网站&#xff0c;快收藏起来! 1. Stackoverflow Stacko…

刻意练习:Python基础 -- Task11. 魔法方法

背景 我们准备利用17天时间&#xff0c;将 “Python基础的刻意练习” 分为如下任务&#xff1a; Task01&#xff1a;变量、运算符与数据类型&#xff08;1day&#xff09;Task02&#xff1a;条件与循环&#xff08;1day&#xff09;Task03&#xff1a;列表与元组&#xff08;…

Oracle中的MERGE语句

转自http://blog.chinaunix.net/space.php?uid16981447&doblog&cuid430716做了简单的格式整理&#xff0c;加入了一点点原创的东西。Oracle9i引入了MERGE命令,你能够在一个SQL语句中对一个表同时执行inserts和updates操作. MERGE命令从一个或多个数据源中选择行来upda…

C#从数据库导出数据[excel]

using System;using System.Collections.Generic;using System.Linq;using System.Text;using System.Data;using MySql.Data.MySqlClient;using Microsoft.Office.Interop.Excel;using Excel Microsoft.Office.Interop.Excel; //使用命名空间别名using System.Reflection; …

UI设计培训中的扁平化理念

本文是为正在学习UI设计的同学们整理的一份资料&#xff0c;主要讲的是UI设计培训中的扁平化理念&#xff0c;扁平化的设计是抛弃一切装饰的设计&#xff0c;扁平化设计使得用户操作起来更加简洁、高效和舒适。简洁大方的交互界面设计自然能够引导用户&#xff0c;并且在短时间…

刻意练习:Python基础 -- Task12. 模块

背景 我们准备利用17天时间&#xff0c;将 “Python基础的刻意练习” 分为如下任务&#xff1a; Task01&#xff1a;变量、运算符与数据类型&#xff08;1day&#xff09;Task02&#xff1a;条件与循环&#xff08;1day&#xff09;Task03&#xff1a;列表与元组&#xff08;…

Linux JSP连接MySQL数据库

Linux&#xff08;Ubuntu平台&#xff09;JSP通过JDBC连接MySQL数据库&#xff0c;与Windows平台类似&#xff0c;步骤如下&#xff1a; 下载 jdbc&#xff1a; mysql-connector-java-5.1.18.tar.gz 解压 jdbc&#xff1a; tar -zxvf mysql-connector-java-5.1.18.tar.gz 配置 …

h5 getUserMedia error PermissionDeniedError

HTML5 在使用非 localhost 地址访问时打开摄像头失败 。报getUserMedia error PermissionDeniedError&#xff0c;火狐下是可以正常调取的。 需要https&#xff1a; 火狐&#xff1a; 转载于:https://www.cnblogs.com/cosyer/p/7646672.html

女生零基础学软件测试难不难

软件测试属于一门IT技术编程语言&#xff0c;很多人都觉得IT技术都是男性比较多&#xff0c;按照目前的行业数据来看&#xff0c;确实是男性居多&#xff0c;但最近几年&#xff0c;女性程序猿也越来越多&#xff0c;其中就有软件测试这个岗位&#xff0c;下面具体来看看女生零…

技术图文:NumPy 的简单入门教程

背景 这段时间&#xff0c;LSGO软件技术团队正在组织 “机器学习实战刻意练习”活动&#xff0c;这个活动是“Python基础刻意练习”活动的升级&#xff0c;是对学员们技术的更深层次的打磨。在用 Python 写各类机器学习算法时&#xff0c;我们经常会用到 NumPy库&#xff0c;故…