一、shuffle过程
总的来说:
*分区
- partitioner
*排序
- sort
*copy (用户无法干涉)
- 拷贝
*分组
- group
可设置
*压缩
- compress
*combiner
- map task端的Reduce
二、示例
package com.ibeifeng.hadoop.senior.mapreduce;import java.io.IOException; import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner;/*** mapreduce* * @author root* */ public class ModuleMapReduce extends Configured implements Tool {// step1: map class/*** public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>* *///TODOpublic static class ModuleMapper extendsMapper<LongWritable, Text, Text, IntWritable> {@Overridepublic void setup(Context context) throws IOException,InterruptedException {//Nothing }@Overridepublic void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {//TODO }@Overridepublic void cleanup(Context context) throws IOException,InterruptedException {//Nothing }}// step2: reduce class/*** public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>* */public static class ModuleReducer extendsReducer<Text, IntWritable, Text, IntWritable> {@Overridepublic void setup(Context context)throws IOException, InterruptedException {//Nothing }@Overridepublic void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {//TODO }@Overridepublic void cleanup(Context context)throws IOException, InterruptedException {//Nothing }}// step3: Driver, component jobpublic int run(String[] args) throws Exception {// 1: get confifurationConfiguration configuration = getConf();// 2: create jobJob job = Job.getInstance(configuration, this.getClass().getSimpleName());// run jarjob.setJarByClass(this.getClass());// 3: set job// input->map->reduce->output// 3.1: inputPath inPath = new Path(args[0]);FileInputFormat.addInputPath(job, inPath);// 3.2 mapjob.setMapperClass(ModuleMapper.class);//TODOjob.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);//*****************shuffle********************// 1) partitioner//job.setPartitionerClass(cls);// 2)sort//job.setSortComparatorClass(cls);// 3) optional, combiner//job.setCombinerClass(cls);// 4) group//job.setGroupingComparatorClass(cls);//*****************shuffle********************// 3.3: reducejob.setReducerClass(ModuleReducer.class);//TODOjob.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// 3.4:outputPath outPath = new Path(args[1]);FileOutputFormat.setOutputPath(job, outPath);// 4:boolean isSuccess = job.waitForCompletion(true);return isSuccess ? 0 : 1 ;}//step 4: run programpublic static void main(String[] args) throws Exception {// 1: get confifurationConfiguration configuration = new Configuration();//set compress; 启用压缩configuration.set("mapreduce.map.output.compress", "true");//压缩格式configuration.set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.SnappyCodec");//int status = new WordCountMapReduce().run(args);int status = ToolRunner.run(configuration, new ModuleMapReduce(), args);System.exit(status);} }