当前位置: 首页 > 编程日记 > 正文

一起学Hadoop——实现两张表之间的连接操作

---恢复内容开始---

之前我们都是学习使用MapReduce处理一张表的数据(一个文件可视为一张表,hive和关系型数据库Mysql、Oracle等都是将数据存储在文件中)。但是我们经常会遇到处理多张表的场景,不同的数据存储在不同的文件中,因此Hadoop也提供了类似传统关系型数据库的join操作。Hadoop生态组件的高级框架Hive、Pig等也都实现了join连接操作,编写类似SQL的语句,就可以在MapReduce中运行,底层的实现也是基于MapReduce。本文介绍如何使用MapReduce实现join操作,为以后学习hive打下基础。

1、Map端连。
数据在进入到map函数之前就进行连接操作。适用场景:一个文件比较大,一个文件比较小,小到可以加载到内存中。如果两个都是大文件,就会出现OOM内存溢出的异常。实现Map端连接操作需要用到Job类的addCacheFile()方法将小文件分发到各个计算节点,然后加载到节点的内存中。

下面通过一个例子来实现Map端join连接操作:
1、雇员employee表数据如下:
name gender age dept_no
Tom male 30 1
Tony male 35 2
Lily female 28 1
Lucy female 32 3

2、部门表dept数据如下:
dept_no dept_name
1 TSD
2 MCD
3 PPD

代码实现如下:

  1 package join;
  2 
  3 import org.apache.hadoop.conf.Configuration;
  4 import org.apache.hadoop.conf.Configured;
  5 import org.apache.hadoop.fs.FileSystem;
  6 import org.apache.hadoop.mapreduce.Job;
  7 import org.apache.hadoop.mapreduce.Reducer;
  8 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  9 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 10 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 11 import org.apache.hadoop.util.Tool;
 12 import org.apache.hadoop.io.*;
 13 import org.apache.hadoop.util.ToolRunner;
 14 import org.apache.hadoop.mapreduce.Mapper;
 15 
 16 import java.io.BufferedReader;
 17 import java.io.FileReader;
 18 import java.io.IOException;
 19 import java.net.URI;
 20 import java.util.HashMap;
 21 import java.util.Map;
 22 import org.apache.hadoop.fs.Path;
 23 
 24 public class MapJoin extends Configured implements Tool {
 25 
 26     public static class MapJoinMapper extends Mapper<LongWritable, Text, Text,NullWritable> {
 27         private Map<Integer, String> deptData = new HashMap<Integer, String>();
 28 
 29         @Override
 30         protected void setup(Mapper<LongWritable, Text, Text,NullWritable>.Context context) throws IOException, InterruptedException {
 31             super.setup(context);
 32             //从缓存的中读取文件。
 33             Path[] files = context.getLocalCacheFiles();
 34 //            Path file1path = new Path(files[0]);
 35             BufferedReader reader = new BufferedReader(new FileReader(files[0].toString()));
 36             String str = null;
 37             try {
 38                 // 一行一行读取
 39                 while ((str = reader.readLine()) != null) {
 40                     // 对缓存中的数据以" "分隔符进行分隔。
 41                     String[] splits = str.split(" ");
 42                     // 把需要的数据放在Map中。注意不能操作Map的大小,否则会出现OOM的异常
 43                     deptData.put(Integer.parseInt(splits[0]), splits[1]);
 44                 }
 45             } catch (Exception e) {
 46                 e.printStackTrace();
 47             } finally{
 48                 reader.close();
 49             }
 50         }
 51 
 52         @Override
 53         protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text,NullWritable>.Context context) throws IOException,
 54                 InterruptedException {
 55             // 获取从HDFS中加载的表
 56             String[] values = value.toString().split(" ");
 57             // 获取关联字段depNo,这个字段是关键
 58             int depNo = Integer.parseInt(values[3]);
 59             // 根据deptNo从内存中的关联表中获取要关联的属性depName
 60             String depName = deptData.get(depNo);
 61             String resultData = value.toString() + " " + depName;
 62             // 将数据通过context写入到Reduce中。
 63             context.write(new Text(resultData),NullWritable.get());
 64         }
 65     }
 66 
 67     public static class MapJoinReducer extends Reducer<Text,NullWritable,Text,NullWritable> {
 68         public void reduce(Text key, Iterable<NullWritable> values,Context context)throws IOException,InterruptedException{
 69             context.write(key,NullWritable.get());
 70         }
 71     }
 72 
 73     @Override
 74     public int run(String[] args) throws Exception {
 75         Configuration conf = new Configuration();
 76         Job job = Job.getInstance(conf, "Total Sort app");
 77         //将小表加载到缓存中。
 78         job.addCacheFile(new URI(args[0]));
 79         job.setJarByClass(MapJoinMapper.class);
 80         //1.1 设置输入目录和设置输入数据格式化的类
 81         FileInputFormat.setInputPaths(job,new Path(args[1]));
 82         job.setInputFormatClass(TextInputFormat.class);
 83 
 84         //1.2 设置自定义Mapper类和设置map函数输出数据的key和value的类型
 85         job.setMapperClass(MapJoinMapper.class);
 86         job.setMapOutputKeyClass(Text.class);
 87         job.setMapOutputValueClass(NullWritable.class);
 88 
 89         //1.3 设置reduce数量
 90         job.setNumReduceTasks(1);
 91         //设置实现了reduce函数的类
 92         job.setReducerClass(MapJoinReducer.class);
 93 
 94         //设置reduce函数的key值
 95         job.setOutputKeyClass(Text.class);
 96         //设置reduce函数的value值
 97         job.setOutputValueClass(NullWritable.class);
 98 
 99         // 判断输出路径是否存在,如果存在,则删除
100         Path mypath = new Path(args[2]);
101         FileSystem hdfs = mypath.getFileSystem(conf);
102         if (hdfs.isDirectory(mypath)) {
103             hdfs.delete(mypath, true);
104         }
105 
106         FileOutputFormat.setOutputPath(job, new Path(args[2]));
107 
108         return job.waitForCompletion(true) ? 0 : 1;
109     }
110 
111     public static void main(String[] args)throws Exception{
112 
113         int exitCode = ToolRunner.run(new MapJoin(), args);
114         System.exit(exitCode);
115     }
116 }

执行脚本文件如下::

1 /usr/local/src/hadoop-2.6.1/bin/hadoop jar MapJoin.jar \
2 hdfs://hadoop-master:8020/data/dept.txt \
3 hdfs://hadoop-master:8020/data/employee.txt \
4 hdfs://hadoop-master:8020/mapjoin_output

运行结果:

Lily female 28 1 TSD
Lucy female 32 3 PPD
Tom male 30 1 TSD
Tony male 35 2 MCD

2、Reduce端连接(Reduce side join)。
数据在Reduce进程中执行连接操作。实现思路:在Map进程中对来自不同表的数据打上标签,例如来自表employee的数据打上a标签,来自文件dept表的数据打上b标签。然后在Reduce进程,对同一个key,来自不同表的数据进行笛卡尔积操作。请看下图,我们对表employee和表dept的dept_no字段进行关联,将dept_no字段当做key。

在MapReduce中,key相同的数据会放在一起,因此我们只需在reduce函数中判断数据是来自哪张表,来自相同表的数据不进行join。

代码如下:

  1 public class ReduceJoin extends Configured implements Tool {
  2     public static class JoinMapper extends
  3             Mapper<LongWritable,Text,Text,Text> {
  4         String employeeValue = "";
  5         protected void map(LongWritable key, Text value, Context context)
  6                 throws IOException,InterruptedException {
  7             /*
  8              * 根据命令行传入的文件名,判断数据来自哪个文件,来自employee的数据打上a标签,来自dept的数据打上b标签
  9              */
 10             String filepath = ((FileSplit)context.getInputSplit()).getPath().toString();
 11             String line = value.toString();
 12             if (line == null || line.equals("")) return;
 13 
 14             if (filepath.indexOf("employee") != -1) {
 15                 String[] lines = line.split(" ");
 16                 if(lines.length < 4) return;
 17 
 18                 String deptNo = lines[3];
 19                 employeeValue = line + " a";
 20                 context.write(new Text(deptNo),new Text(employeeValue));
 21             }
 22 
 23             else if(filepath.indexOf("dept") != -1) {
 24                 String[] lines = line.split(" ");
 25                 if(lines.length < 2) return;
 26                 String deptNo = lines[0];
 27                 context.write(new Text(deptNo), new Text(line + " b"));
 28             }
 29         }
 30     }
 31 
 32     public static class JoinReducer extends
 33             Reducer<Text, Text, Text, NullWritable> {
 34         protected void reduce(Text key, Iterable<Text> values,
 35                               Context context) throws IOException, InterruptedException{
 36             List<String[]> lista = new ArrayList<String[]>();
 37             List<String[]> listb = new ArrayList<String[]>();
 38 
 39             for(Text val:values) {
 40                 String[] str = val.toString().split(" ");
 41                 //最后一位是标签位,因此根据最后一位判断数据来自哪个文件,标签为a的数据放在lista中,标签为b的数据放在listb中
 42                 String flag = str[str.length -1];
 43                 if("a".equals(flag)) {
 44                     //String valueA = str[0] + " " + str[1] + " " + str[2];
 45                     lista.add(str);
 46                 } else if("b".equals(flag)) {
 47                     //String valueB = str[0] + " " + str[1];
 48                     listb.add(str);
 49                 }
 50             }
 51 
 52             for (int i = 0; i < lista.size(); i++) {
 53                 if (listb.size() == 0) {
 54                     continue;
 55                 } else {
 56                     String[] stra = lista.get(i);
 57                     for (int j = 0; j < listb.size(); j++) {
 58                         String[] strb = listb.get(j);
 59                         String keyValue = stra[0] + " " + stra[1] + " " + stra[2] + " " + stra[3] + " " + strb[1];
 60                         context.write(new Text(keyValue), NullWritable.get());
 61                     }
 62                 }
 63             }
 64         }
 65     }
 66 
 67     @Override
 68     public int run(String[] args) throws Exception {
 69         Configuration conf = getConf();
 70         GenericOptionsParser optionparser = new GenericOptionsParser(conf, args);
 71         conf = optionparser.getConfiguration();
 72         Job job = Job.getInstance(conf, "Reduce side join");
 73         job.setJarByClass(ReduceJoin.class);
 74         //1.1 设置输入目录和设置输入数据格式化的类
 75         //FileInputFormat.setInputPaths(job,new Path(args[0]));
 76         FileInputFormat.addInputPaths(job, conf.get("input_data"));
 77 
 78         job.setInputFormatClass(TextInputFormat.class);
 79 
 80         //1.2 设置自定义Mapper类和设置map函数输出数据的key和value的类型
 81         job.setMapperClass(JoinMapper.class);
 82         job.setMapOutputKeyClass(Text.class);
 83         job.setMapOutputValueClass(Text.class);
 84 
 85         //1.3 设置reduce数量
 86         job.setNumReduceTasks(1);
 87         //设置实现了reduce函数的类
 88         job.setReducerClass(JoinReducer.class);
 89 
 90         //设置reduce函数的key值
 91         job.setOutputKeyClass(Text.class);
 92         //设置reduce函数的value值
 93         job.setOutputValueClass(NullWritable.class);
 94 
 95         // 判断输出路径是否存在,如果存在,则删除
 96         Path output_dir = new Path(conf.get("output_dir"));
 97         FileSystem hdfs = output_dir.getFileSystem(conf);
 98         if (hdfs.isDirectory(output_dir)) {
 99             hdfs.delete(output_dir, true);
100         }
101 
102         FileOutputFormat.setOutputPath(job, output_dir);
103 
104         return job.waitForCompletion(true) ? 0 : 1;
105     }
106 
107     public static void main(String[] args)throws Exception{
108         int exitCode = ToolRunner.run(new ReduceJoin(), args);
109         System.exit(exitCode);
110     }
111 }

执行MapReduce的shell脚本如下:

1 /usr/local/src/hadoop-2.6.1/bin/hadoop jar ReduceJoin.jar \
2 -Dinput_data=hdfs://hadoop-master:8020/data/dept.txt,hdfs://hadoop-master:8020/data/employee.txt \
3 -Doutput_dir=hdfs://hadoop-master:8020/reducejoin_output

总结:
1、Map side join的运行速度比Reduce side join快,因为Reduce side join在shuffle阶段会消耗大量的资源。Map side join由于把小表放在内存中,所以执行效率很高。
2、当有一张表的数据很小时,小到可以加载到内存中,那么建议使用Map side join。

欢迎关注本人公众号了解更多关于大数据方面的知识:

转载于:https://www.cnblogs.com/airnew/p/9689809.html

相关文章:

scala学习笔记-过程、lazy值和异常(6)

过程 在Scala中&#xff0c;定义函数时&#xff0c;如果函数体直接包裹在了花括号里面&#xff0c;而没有使用连接&#xff0c;则函数的返回值类型就是Unit。这样的函数就被称之为过程。过程通常用于不需要返回值的函数。 过程还有一种写法&#xff0c;就是将函数的返回值类型定…

为计算机编程序英语作文,计算机编程员英文简历范文

以下是无忧考网为大家整理的关于计算机编程员英文简历范文的文章&#xff0c;希望大家能够喜欢&#xff01;Basic information:Name: Yjb ys Gender: MaleAge: 21 Nationality: ChineseResidence: Guangdong - Huizhou height and weight: 178cm 60KGMarital status: Single Ed…

android常用命令

创建模拟器&#xff1a; 进入sdk下tool文件夹&#xff1a; cd ...../tools 检查target ID: ./android list targets 创建名为hello的AVD: ./android create avd -n hello -t 1 创建的模拟器默认在~/.android/avc/ 启动模拟器&#xff1a; ./emulator -avc hello 安装apk: 进入p…

Python基础05-数据类型:列表list

目录 列表 列表的一般用法 列表的方法 列表 列表的一般用法 列表用[]括起来&#xff0c;用逗号分隔元素。元素可以是任意的类型。 可以用len获取列表的长度&#xff0c;也就是元素的个数。 # 列表是个大杂烩&#xff0c;什么类型都可以往里面装 li [2019, 12, "存储…

h5打开App的方法。

在浏览器中&#xff1a;法1&#xff1a;location.href ${scheme};//location跳转App是几乎所以情况都支持的。法2&#xff1a;var ifr document.createElement(iframe);//iframe跳转有的地方不支持// ifr.src ${scheme};// ifr.style.display none;// document.body.append…

html作业三,3.15作业

html>科比简介div{width: 480px;height:230px;background-image:url(images/kobe.jpg);background-color:#567854;background-size:500px 250px;padding:10px;box-shaow:10px 10px 10px #666;}.text{color:#556644;text-indent: 3em;line-height: 1.4em;}科比布莱恩特 (美国…

HTML4 和 HTML5 的10个关键区别

http://www.oschina.net/news/22219/html4-html5-differences/转载于:https://www.cnblogs.com/antis/p/6708303.html

smartgit 授权文件 Free Trial License to Non-Commercial

Windows&#xff1a; %APPDATA%/syntevo/SmartGit/ OS X&#xff1a; ~/Library/Preferences/SmartGit/ Unix/Linux&#xff1a; ~/.smartgit/ and remove the file settings.xml 例如windows: 开始 运行 %APPDATA%/syntevo/SmartGit/ 就会自动找到路径删除settings.xml…

Python基础06-数据类型:元组tuple

目录 元组 元组的一般用法 元组的方法 元组 元组的一般用法 元组用()括起来&#xff0c;用逗号分隔元素&#xff0c;元素类型是任意的。因为函数、方法的调用也是用()括起来&#xff0c;为了避免混淆&#xff0c;写元组的时候&#xff0c;在最后一个元素后面多写一个逗号。…

html无规律卡片布局,如何实现同等间隙的卡片布局

在列表展示中&#xff0c;经常会使用卡片的内容展示形式&#xff0c;为了美观&#xff0c;常常要求各卡片间的间隙是一致的。卡片内容不一样可能高度不等&#xff0c;但一般来说为了整体的一致性&#xff0c;会限制每个卡片的宽高都相等。本文就基于宽高一致的多个卡片&#xf…

关于tail, head ,less ,more ,cat 之小介------Linux命令

前言&#xff1a;Linux命令在项目中是经常用&#xff0c;查看日志信息是一个不可缺少的指令。一般都是用Less&#xff0c;more,tail,head&#xff0c;cat 这些命令 目前是足够的。很久不用就会忘记。现在简单写一写。①tail 用于显示指定文件末尾内容&#xff0c;不指定文件时…

Python基础07-数据类型:字典dict

目录 字典 字典的一般用法 字典的方法 字典 字典的一般用法 字典是一组用{}括起来的键值对&#xff0c;每个键值对用逗号分隔。 # 字典 info {"Robin": "Baidu","Jack": ["Alibaba",20,{"B2C": "taobao.com&quo…

cordova 环境配制和创建插件

环境配制 英文网站&#xff1a;http://cordova.apache.org/ 中文网站&#xff1a;http://cordova.axuer.com/ 安装Cordova Cordova的命令行运行在Node.js 上面并且可以通过NPM安装。 根据 平台具体指导 安装相应平台的依赖。打开命令提示符或终端&#xff0c;然后键入npm insta…

HTML动画 request animation frame

在网页中&#xff0c;实现动画无外乎两种方式。1. CSS3 方式&#xff0c;也就是利用浏览器对CSS3 的原生支持实现动画&#xff1b;2. 脚本方式&#xff0c;通过间隔一段时间用JavaScript 来修改页面元素样式来实现动画。接下来我们就分别介绍这两种方式的原理&#xff0c;让大家…

express给html设置缓存,webpack + express 实现文件精确缓存

由于最近开发的个人博客(Vue node)在使用过程中&#xff0c;发现网络加载有点慢&#xff0c;所以打算对它进行一次优化。本次优化的目标如下&#xff1a;index.html 设置成 no-cache&#xff0c;这样每次请求的时候都会比对一下 index.html 文件有没变化&#xff0c;如果没变化…

2017年50道Java线程面试题

下面是Java线程相关的热门面试题&#xff0c;你可以用它来好好准备面试。 1) 什么是线程&#xff1f; 线程是操作系统能够进行运算调度的最小单位&#xff0c;它被包含在进程之中&#xff0c;是进程中的实际运作单位。程序员可以通过它进行多处理器编程&#xff0c;你可以使用多…

Python基础08-数据类型:集合set

目录 集合的概念 集合的方法 集合可变吗&#xff1f; 集合的概念 先理解一些概念。 数据类型按照是否可变分为可变类型、不可变类型。按照访问方式可以分为顺序访问、映射访问。 如何区分可变类型、不可变类型&#xff1f;就看在内存中存储内容是否可以被修改。如果内存地…

主元素问题 Majority Element

2018-09-23 13:25:40 主元素问题是一个非常经典的问题&#xff0c;一般来说&#xff0c;主元素问题指的是数组中元素个数大于一半的数字&#xff0c;显然这个问题可以通过遍历计数解决&#xff0c;时间复杂度为O(n)&#xff0c;空间复杂度为O(n)。这样的算法有两个弊端&#xf…

js判断是iOS还是Android

platform.js: var browser{versions:function(){ var u navigator.userAgent, app navigator.appVersion; return { trident: u.indexOf(Trident) > -1, //IE内核 presto: u.indexOf(Presto) > -1, //opera内核 webKit: u.indexOf(AppleWebKit) >…

计算机二级函数知识,2017年全国计算机二级考试MS Office高级应用知识点:INDIRECT函数...

INDIRECT函数知识点适用考试&#xff1a;全国计算机二级考试考试科目&#xff1a;MS Office高级应用科目知识点&#xff1a;INDIRECT函数INDIRECT函数立即对引用进行计算&#xff0c;并显示其内容。当需要更改公式中单元格的引用&#xff0c;而不更改公式本身&#xff0c;请使用…

Python基础09-字符串格式化

字符串格式化。主要是%格式&#xff0c;format格式化方法&#xff0c;具体写在代码例子的注释里。 msg list() # %s 接收字符串 msg.append("i am %s, which is a database." % "mysql") msg.append("i am %s, which is a %s." % ("db2&q…

dbcp 连接池参数说明

dbcp 连接池参数说明 参考&#xff1a;http://commons.apache.org/proper/commons-dbcp/configuration.htmlhttps://www.cnblogs.com/happySmily/p/5941813.html posted on 2018-09-24 10:31 姜小嫌 阅读(...) 评论(...) 编辑 收藏 转载于:https://www.cnblogs.com/jiangxiaoxi…

Fastlane- app自动编译、打包多个版本、上传到app store

Fastlane是一套使用Ruby写的自动化工具集&#xff0c;用于iOS和Android的自动化打包、发布等工作&#xff0c;可以节省大量的时间。 Github&#xff1a;https://github.com/fastlane/fastlane 官网&#xff1a;https://fastlane.tools/ 文档&#xff1a;https://docs.fastlane.…

计算机基础知识综合试卷一,计算机基础知识试题及答案a

培训选拔试题(A卷)姓名&#xff1a;部门&#xff1a;得分&#xff1a;注意事项&#xff1a;I.A考试时间为90分钟&#xff0c;闭卷考试。I.B应考人员在答题前&#xff0c;请将姓名部门等信息认真准确地填写在答题纸上。I.C应考人员应严格遵守考场纪律&#xff0c;服从监考人员的…

Python基础10-函数基础

目录 函数的定义 函数的返回值 函数的参数 参数的传递 参数的默认值 可变长参数 全局变量与局部变量 函数嵌套定义 风湿理论——函数即变量 函数的定义 定义函数的关键字def。函数名&#xff0c;本例函数名是test。小括号里面是参数。冒号后缩进的代码块是函数内容。…

Milking Cows 挤牛奶

1.2.1 Milking Cows 挤牛奶 Time Limit: 1 Sec Memory Limit: 64 MBSubmit: 554 Solved: 108[Submit][Status][Forum]Description 三个农民每天清晨5点起床&#xff0c;然后去牛棚给3头牛挤奶。第一个农民在300时刻(从5点开始计时&#xff0c;秒为单位)给他的牛挤奶&#xff…

用eclipse玩转Python,让习惯java开发的童鞋拥有一个更爽的开发体验

#0>>>>>>>预准备工作&#xff1a;(a标签貌似不能用&#xff0c;&#xff0c;只好比较lowbi的直接放地址&#xff09; IDEeclipse下载地址:http://ftp.yz.yamagata-u.ac.jp/pub/eclipse/technology/epp/downloads/release/neon/3/eclipse-jee-neon-3-win32…

适合计算机应用的班群名称,班级同学群名字大全

很多人现在都是一个班级建一个群&#xff0c;以便大家沟通交流&#xff0c;有什么事大家群里一说很方便&#xff0c;没事还可以吹吹牛B策策谈&#xff0c;那么同学班级群用什么样的名字好呢&#xff0c;在此起名网为大家收集整理了班级同学群名字大全。来看看吧。最新班级同学群…

Mac 安装多个版本jdk

JDK默认安装路径为/Library/Java/JavaVirtualMachines 多版本安装后效果为: 设置 1.执行以下命令 cd ~open -e .bash_profile #打开.bash_profile文件注:假如.bash_profile文件不存在执行下面命令新建文件 cd ~ touch .bash_profile #新建.bash_profile文件 ls -a #查看文件是…

Python基础11-函数式编程与内置函数

目录 函数即变量 lambda关键字定义匿名函数 高阶函数 内置函数map 内置函数filter 内置函数reduce 内置函数看文档 函数即变量 书接上回&#xff0c;Python里面&#xff0c;函数就是变量&#xff0c;可以被当成普通变量一样作为返回值&#xff0c;调用。 def foo():pr…