当前位置: 首页 > 编程日记 > 正文

Hadoop学习笔记之三 数据流向

http://hadoop.apache.org/docs/r1.2.1/api/index.html

最基本的:

1. 文本文件的解析

2. 序列文件的解析

 toString会将Byte数组中的内存数据 按照字节间隔以字符的形式显示出来。

文本文件多事利用已有的字符处理类, 序列文件多事创建byte数组,然后将文件流中的数据复制到byte数组后进行解析。

LineRecordReader。。。 。。。

这里首先需要了整个文件数据 的流动方向。

MapReduce框架借助inputformat完成输入数据的规范检查,借助outputformat完成输出数据的规范性检查。

context的常用用法:

context.getConfiguration
context.getInputSplit
context.write

利用好context可以随心所欲的输出,从输入key value list中获得信息,输出可以是单个的key,value;也可以是key list

从输入中获得信息,用context随便向外写!

一个Mapper对应一个Split文件,而recorderreder需要多次调用用来解析键值对:

如下所示一个文本文件传入mapper,而map函数多次被触发

public class TestDataFlow {public static void main(String[] args)throws Exception{Configuration conf  = new Configuration();Job job = new Job(conf, "testDataFlow");job.setJarByClass(TestDataFlow.class);job.setMapperClass(myMapper.class);FileInputFormat.addInputPath(job, new Path("hdfs://MASTERPC:9000/home/Fea.txt"));FileOutputFormat.setOutputPath(job, new Path("hdfs://MASTERPC:9000/out"));job.waitForCompletion(true);}public static class myMapper extends Mapper<Object, Text, Text, Text>{private FileSplit split; public void map(Object key, Text value, Context context)throws IOException, InterruptedException{split = (FileSplit)context.getInputSplit();System.out.println("输入文件块的路径:"+split.getPath().toString());}}}
14/11/09 17:15:44 INFO mapred.MapTask: Processing split: hdfs://MASTERPC:9000/home/Fea.txt:0+7397

输入文件块的路径:hdfs://MASTERPC:9000/home/Fea.txt
输入文件块的路径:hdfs://MASTERPC:9000/home/Fea.txt
输入文件块的路径:hdfs://MASTERPC:9000/home/Fea.txt
输入文件块的路径:hdfs://MASTERPC:9000/home/Fea.txt
输入文件块的路径:hdfs://MASTERPC:9000/home/Fea.txt
输入文件块的路径:hdfs://MASTERPC:9000/home/Fea.txt
输入文件块的路径:hdfs://MASTERPC:9000/home/Fea.txt
输入文件块的路径:hdfs://MASTERPC:9000/home/Fea.txt

下面是一段自定义InputFormat的程序,功能是将零碎的小文件合并成大的sequence文件:key文件名,value文件值

//主要包括两个部分:文件划分 + 创建RecordReader; 下面的代码new了一个自己的reader返回
public
class myFileInputFormat extends FileInputFormat<NullWritable, BytesWritable> {protected boolean isSplitable(JobContext context, Path file){return false;}public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context)throws IOException,InterruptedException{myRecordReader recorder = new myRecordReader();recorder.initialize(split, context);return recorder;} }
//使用split获得输入文件块的大小、路径信息;使用context获得fs真正的从dfs上读入文件内容到value成员变量中。
//成员变量value用来传递给Mapper的map函数使用
public class myRecordReader extends RecordReader<NullWritable, BytesWritable> {private FileSplit split;private Configuration conf;private BytesWritable value = new BytesWritable();private boolean processed = false;public void initialize(InputSplit split, TaskAttemptContext context)throws IOException, InterruptedException {this.split = (FileSplit)split;this.conf = context.getConfiguration();}public void close(){;}public NullWritable getCurrentKey()throws IOException{return null;}public BytesWritable getCurrentValue()throws IOException{return value;}public float getProgress()throws IOException, InterruptedException{return processed?1.0f:0.0f;}public boolean nextKeyValue()throws IOException, InterruptedException{if (!processed){byte[] buf = new byte[(int)split.getLength()];FileSystem fs = FileSystem.get(conf);Path path = split.getPath();InputStream in = null;try{in = fs.open(path);IOUtils.readFully(in, buf, 0, buf.length);value.set(buf,0, buf.length);}finally{IOUtils.closeStream(in);}processed = true; return true;}else{return false;}}}//RecordReader处理好的key和value自动的传递给map函数
public static class myMapper extends Mapper<Object, BytesWritable, Text, BytesWritable>{private FileSplit split; private Text outkey = new Text();public void setup(Context context){split = (FileSplit)context.getInputSplit();outkey.set(split.getPath().toString());System.out.println("输入文件块的路径:"+split.getPath().toString());}public void map(Object key, BytesWritable value, Context context)throws IOException, InterruptedException{System.out.println("key: "+ outkey.toString()+" value: "+value.toString()); }

Mapper类对应的是输入的文件块split,map对应的是文件块解析出来的一个个的<key, value>

RecordReader对应的是输入的文件块split,可能需要多次对split进行解析。

InputFormat中分为两部分,getSplit是将inputPath路径下的HDFS文件划分成块split,另一部分是CreateRecordReader

创建用来解析每个split。

其中划分成块是在终端上传数据文件时进行的,然后划分之后的文件信息提供给jobTracker进行分配,分配到任务的节点(Mapper)到相应的 位置下载自己的split文件

然后调用RecordReader不断对这个split文件进行解析,将生成的<key,value>送给map函数进行处理生成新的<key,value>,经过排序合并分组之后传递给相应的reduce。

自定义InputFormat

下面是一个将小文件聚合成大的序列文件的mapred作业,核心是利用自定义的FileInputFormt将系统分配的小文件,以文件名(text)key,

文件内容(bytes)value的键值对形式传递给最后的SeqenceFileOutputFormat,将这些键值对写入HDFS上的序列文件中。

由于文件较小所以不会再分块,一个文件作为整体输入到节点上,FileInputFormat中设置文件不可分。

Mapper类中调用FileInputFormat中的创建初始化RecordReader函数:createRecordReader(),

RecordReader将输入文件解析后将<key,value>送给map函数, 在之前的TextInputFormat的LineRecordReader是不断的对split进行处理

将每行解析成一个<key, value>送给map,但是这里的应用时RecordReader仅仅处理split文件一次,就将整个文件的内容作为<null, value>

传递给map, 最后的reduce在将所有的键值对聚合成一个。所以RecordReader类中做了处理次数的判断处理。

这样一来对于每个节点的map也只会执行一次,多以在Mapper一开始的时候setup函数中将输入的Split文件的文件名获得作为key。

文件:myRecordReader.java

[说明]RecordReader的作用就是为了解析split文件,所以createRecordReader时传入的参数是split、context,在RecordReader类中,首先需要获得

HDFS上的split的输入文件流,然后对此进行解析,并记录下处理的位置,方便下次map调用的时候,从上次的位置接着对split文件进行解析。

文件:myFileInputFormat.java

[说明]该类中设置文件不可分,创建RecordReader对象。

inputFormat和 recordReader中的键值对类型一定要和  map中的键值对类型一致。

作业上下文context可以获得作业参数conf、输入文件InputSplit、

在Mapper类的初始化函数中传入的参数就是context

public void setup(Context context)

OutputFormat:

主要的函数是:

getRcordWriter, 返回的是RecordWriter, 它负责将键值对写入到存储部件中。

checkOutputSpecs,负责检查输出目录参数是否合理,如果输出目录已经存在就报错。

getOutputCommiter,OutputCommiter负责临时文件的初始化,作业完成之后清理临时目录等等。

系统默认的是TextOutputFormat,它使用LineRecordWriter将最终获的键值对以key + \t + value的方式逐行输出到文本文件中。

在 编程时使用较多的是重载 writer、RecordWriter、getRecordWriter,

tips:在eclipse中代码的空白处右键->Source->Override/Implements Methods 然后在弹出的窗口中选择需要重载的成员方法。^.^

(生成的重载函数上面会自动生@Override灰色字体)

下面是hadoop源码SequenceFileOutputFormat中的一段:

  public RecordWriter<K, V> 
         getRecordWriter(TaskAttemptContext context
                         ) throws IOException, InterruptedException {
   。。。 。。。
   final SequenceFile.Writer out = SequenceFile.createWriter(fs, conf, file,context.getOutputKeyClass(),context.getOutputValueClass(),compressionType,codec,context);return new RecordWriter<K, V>() {public void write(K key, V value)throws IOException {out.append(key, value);}public void close(TaskAttemptContext context) throws IOException { out.close();}};

 可见write中传入的是OutputFormat<K,V>中的键值对,而创建的SequenceFile.Writer是根据setOutputKeyClass、setOutputValueClass设置的类型进行写文件。

public class TestFormat {public  static void main(String[] args)throws Exception{Configuration conf = new Configuration();Job job = new Job(conf, "testFormat!");job.setInputFormatClass(SequenceFileInputFormat.class);job.setOutputFormatClass(SequenceFileOutputFormat.class);FileInputFormat.addInputPath(job , new Path("hdfs://master:9000/hadoop"));FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000/out"));job.waitForCompletion(true);}
}

java.lang.Exception: java.io.IOException:

Type mismatch in key from map: expected org.apache.hadoop.io.LongWritable, recieved org.apache.hadoop.io.Text

        job.setOutputKeyClass(Text.class);job.setOutputValueClass(BytesWritable.class);

RecordReader是根据输入文件中的内容自动获得key和value的class,所以map之后的k,v类型是原始文件的kv类型,但是write函数中out(writer)的kv类型时job中设置的。


 //草稿:

 public static class myOutputFormat<Text, BytesWritable> extends  SequenceFileOutputFormat<Text, BytesWritable>{public RecordWriter<Text, BytesWritable> getRecordWriter(TaskAttemptContext arg0) throws IOException,InterruptedException {// TODO Auto-generated method stubreturn super.getRecordWriter(arg0);}}

定义自己的数据:

public class Point3D implements WritableComparable<Point3D>{private float x, y, z;public float getX(){return x;}public float getY(){return y;}public float getZ(){return z;}public void readFields(DataInput in) throws IOException{x = in.readFloat();y = in.readFloat();z = in.readFloat();}public void write(DataOutput out)throws IOException{out.writeFloat(x);out.writeFloat(y);out.writeFloat(z);}public int compareTo(Point3D p){float ret = (x*x+y*y +z*z) - (p.x*(p.x)+p.y*(p.y)+p.z*(p.z));if (ret > 0)return 1;else if (ret == 0)return 0;else return -1;}}

一些常用的流:

FSDataInputStream
DataInputStream
FileInputStream
InputStream

BufferedInputStream

链式处理:

在hadoop计算的过程中比较耗费时间的是IO操作,一些Job 在Map需要前期预处理,reduce后需要后处理,这样可以使用Job的链式处理;

通过ChainMapper.addMapp实现,但是该类不支持hadoop1.2.1版本。

---------------------------------------------------------------

参考文献:

《实战Hadoop》开启通向云计算的捷径,刘鹏主编,电子工业出版社

最短路径系列之一《从零开始学Hadoop》

http://blog.csdn.net/chaoping315/article/details/6221440

转载于:https://www.cnblogs.com/sunniflyer/p/4085758.html

相关文章:

[微信小程序]星级评分和展示(详细注释附效果图)

微信小程序开发交流qq群 173683895 承接微信小程序开发。扫码加微信。 正文&#xff1a; 星级评分分成两种情况: 一:展示后台给的评分数据 二:用户点击第几颗星星就显示为几星评分; <!--pages/test/test.wxml--> <view> <view>一:显示后台给的评分</…

uber_这就是我本可以免费骑Uber的方式

uberby AppSecure通过AppSecure 这就是我本可以免费骑Uber的方式 (Here’s how I could’ve ridden for free with Uber) 摘要 (Summary) This post is about a critical bug on Uber which could have been used by hackers to get unlimited free Uber rides anywhere in th…

磁盘I/O 监控 iostat

iostat -cdxm 2 5 dm-4 如果没有这个命令&#xff0c;需要安装sysstat 包。 Usage: iostat [ options ] [ <interval> [ <count> ] ]Options are:[ -c ] [ -d ] [ -N ] [ -n ] [ -h ] [ -k | -m ] [ -t ] [ -V ] [ -x ] [ -z ][ <device> [...] | ALL ] [ -p…

[微信小程序]物流信息样式加动画效果(源代码附效果图)

微信小程序开发交流qq群 173683895 承接微信小程序开发。扫码加微信。 正文&#xff1a; 效果图片:(信息仅为示例) <!--pages/order/order_wl.wxml--> <view classpage_row top><image classgoods src../../images/dsh.png></image><view cl…

在 Ubuntu 14.04 Chrome中安装Flash Player(转)

在 Ubuntu 14.04 中安装 Pepper Flash Player For Chromium 一个 Pepper Flash Player For Chromium 的安装器已经被 Ubuntu14.04 的官方源收录。Flash Player For Linux 自11.2 起已经停止更新&#xff0c;目前 Linux 平台下面的 Flash Player 只能依靠 Google Chrom 的 PPAPI…

数据结构显示树的所有结点_您需要了解的有关树数据结构的所有信息

数据结构显示树的所有结点When you first learn to code, it’s common to learn arrays as the “main data structure.”第一次学习编码时&#xff0c;通常将数组学习为“主要数据结构”。 Eventually, you will learn about hash tables too. If you are pursuing a Comput…

Unity应用架构设计(9)——构建统一的 Repository

谈到 『Repository』 仓储模式&#xff0c;第一映像就是封装了对数据的访问和持久化。Repository 模式的理念核心是定义了一个规范&#xff0c;即接口『Interface』&#xff0c;在这个规范里面定义了访问以及持久化数据的行为。开发者只要对接口进行特定的实现就可以满足对不同…

PHP连接数据库并创建一个表

微信小程序开发交流qq群 173683895 承接微信小程序开发。扫码加微信。 <html> <body><form action"test.class.php" method"post"> title: <input type"text" name"title"><br> centent: <input t…

MyBatis 入门

什么是 MyBatis &#xff1f; MyBatis 是支持定制化 SQL、存储过程以及高级映射的优秀的持久层框架。MyBatis 避免了几乎所有的 JDBC 代码和手工设置参数以及抽取结果集。MyBatis 使用简单的 XML 或注解来配置和映射基本体&#xff0c;将接口和 Java 的 POJOs(Plain Old Java O…

cms基于nodejs_我如何使基于CMS的网站脱机工作

cms基于nodejsInterested in learning JavaScript? Get my ebook at jshandbook.com有兴趣学习JavaScript吗&#xff1f; 在jshandbook.com上获取我的电子书 This case study explains how I added the capability of working offline to the writesoftware.org website (whic…

how-to-cartoon-ify-an-image-programmatically

http://stackoverflow.com/questions/1357403/how-to-cartoon-ify-an-image-programmatically 转载于:https://www.cnblogs.com/guochen/p/6655333.html

Android Studio 快捷键

2015.02.05补充代码重构快捷键 Alt回车 导入包 自动修正CtrlN 查找类​CtrlShiftN 查找文件CtrlAltL 格式化代码CtrlAltO 优化导入的类和包AltInsert 生成代码(如get,set方法,构造函数等)CtrlE或者AltShiftC 最近更改的代码CtrlR 替换文本CtrlF 查找文本CtrlShiftSpace 自动补全…

【微信小程序】异步请求,权重,自适应宽度并折行,颜色渐变,绝对定位

微信小程序开发交流qq群 173683895 承接微信小程序开发。扫码加微信。 正文&#xff1a; 写这篇博文主要是为了能够给到大家做类似功能一些启迪&#xff0c;下面效果图中就是代码实现的效果&#xff0c;其中用到的技巧点还是比较多的&#xff0c; <!--pages/demo_list/d…

服务器部署基础知识_我在生产部署期间学到的知识

服务器部署基础知识by Shruti Tanwar通过Shruti Tanwar 我在生产部署期间学到的知识 (What I learned during production deployment) Production deployment. The final stage of every project. When all the hard work you’ve put in over the course of time goes live t…

STM32 KEIL中 负数绝对值处理

使用数码管显示负温度时需要把负数转换为绝对值 #include<math.h> 使用abs 或者自己写函数 #define ABS(x) ((x)>0?(x):-(x)))转载于:https://www.cnblogs.com/yekongdexingxing/p/6657371.html

js数组按照下标对象的属性排序

微信小程序开发交流qq群 173683895 承接微信小程序开发。扫码加微信。 根据数组中某个参数的值的大小进行升序 <script type"text/javascript">function compare(val) {return function (a, b) {var value1 a[val];var value2 b[val];return value1…

window 下相关命令

1. 启动window服务(各种应用启动设置的地方)命令方式&#xff1a; 1). window 按钮(输入CMD的地方)处输入&#xff1a;services.msc &#xff0c;然后执行。 // 输入命令正确&#xff0c;上面的待选框中会出现要执行的命令。msc 可以理解为Microsoft client 2). 计算机 -- …

javascript语法糖_语法糖和JavaScript糖尿病

javascript语法糖by Ryan Yurkanin瑞安尤卡宁(Ryan Yurkanin) 语法糖和JavaScript糖尿病 (Syntactic Sugar and JavaScript Diabetes) Syntactic sugar is shorthand for communicating a larger thought in a programming language.语法糖是用编程语言传达更大思想的简写。 …

《DSP using MATLAB》示例Example7.23

代码&#xff1a; wp 0.2*pi; ws 0.3*pi; Rp 0.25; As 50; [delta1, delta2] db2delta(Rp, As);[N, f, m, weights] firpmord([wp, ws]/pi, [1, 0], [delta1, delta2]);N f m weightsh firpm(N, f, m, weights); [db, mag, pha, grd, w] freqz_m(h, [1]);delta_w 2*pi…

css画图笔记

微信小程序开发交流qq群 173683895 承接微信小程序开发。扫码加微信。 正文&#xff1a; 在网页中&#xff0c;经常会用到各种Icon&#xff0c;如果老是麻烦设计狮画出来不免有些不好意思&#xff0c;所以有时候我们也可以用CSS写出各种简单的形状&#xff0c;一来可以减轻…

Web前端开发最佳实践(8):还没有给CSS样式排序?其实你可以更专业一些

前言 CSS样式排序是指按照一定的规则排列CSS样式属性的定义&#xff0c;排序并不会影响CSS样式的功能和性能&#xff0c;只是让代码看起来更加整洁。CSS代码的逻辑性并不强&#xff0c;一般的开发者写CSS样式也很随意&#xff0c;所以如果不借助工具&#xff0c;不太容易按照既…

超越Android:Kotlin在后端的工作方式

by Adam Arold亚当阿罗德(Adam Arold) 超越Android&#xff1a;Kotlin在后端的工作方式 (Going Beyond Android: how Kotlin works on the Backend) This article is part of a series.本文是系列文章的一部分。 While most developers use Kotlin on Android, it is also a …

词汇的理解 —— 汉译英(术语)

词汇的理解 —— 英译汉 1. 名词 机制&#xff1a;mechanism&#xff0c;系统&#xff1a;system&#xff1b;2. 动词 融资&#xff1a;financing&#xff1b;制动&#xff1a;braking&#xff0c;就是“刹车”&#xff1b;3. 音乐与乐器 horn&#xff1a;喇叭&#xff0c;号角…

Swift从零开始学习_08(代理协议传值)

Swift中的代理协议的写法. 这是第一个页面有一个button和一个label, button点击跳到下一个页面. 第二个页面有一个输入框和一个按钮, 点击按钮把输入框里的内容设置为第一个页面label的内容.效果如下 接下来是代码部分.跟OC的写法还是一样的.这里不再写第一个页面的那些UI的…

[微信小程序]商城之购买商品数量实现

微信小程序开发交流qq群 173683895 承接微信小程序开发。扫码加微信。 正文&#xff1a; 这里有三种变更数量的方式&#xff0c; 加号&#xff0c;减号&#xff0c;input输入 &#xff0c; 这里做了限制&#xff0c;数量不能小于等于0并且不能超过现有库存&#xff0c;下面是…

测试nginx网站代码_在40行以下代码中使用NGINX进行A / B测试

测试nginx网站代码by Nitish Phanse由Nitish Phanse 在40行以下代码中使用NGINX进行A / B测试 (A/B testing with NGINX in under 40 lines of code) A/B Testing, has enabled designers and product managers to get a deep insight into user behavioral patterns.A / B测试…

HttpServletResponse,HttpServletRequest详解

HttpServletResponse,HttpServletRequest详解 1、相关的接口 HttpServletRequest HttpServletRequest接口最常用的方法就是获得请求中的参数&#xff0c;这些参数一般是客户端表单中的数据。同时&#xff0c;HttpServletRequest接口可以获取由客户端传送的名称&#xff0c;也可…

[微信小程序]this.setData , that.setData , this.data.val三者之间的区别和作用

微信小程序开发交流qq群 173683895 承接微信小程序开发。扫码加微信。 正文&#xff1a; 1.this.setData({ }) <view bindtouchmove"tap_drag" bindtouchend"tap_end" bindtouchstart"tap_start" class"page-top" style"{…

jQuery(一)引入

一、jQuery简介 jQuery是一个兼容多浏览器的javascript库&#xff0c;核心理念是write less,do more(写得更少,做得更多) 二、安装 2.1、下载 下载地址&#xff1a;http://jquery.com/download/ 2.2、引入 在页面头部加入 <head> <meta http-equiv"Content-Type&…

javascript 堆栈_JavaScript调用堆栈-它是什么以及为什么它是必需的

javascript 堆栈The JavaScript engine (which is found in a hosting environment like the browser), is a single-threaded interpreter comprising of a heap and a single call stack. The browser provides web APIs like the DOM, AJAX, and Timers.JavaScript引擎(可在…