需求:
在实战(一)的基础 上,实现自定义分组机制。例如根据手机号的不同,分成不同的省份,然后在不同的reduce上面跑,最后生成的结果分别存在不同的文件中。
对流量原始日志进行流量统计,将不同省份的用户统计结果输出到不同文件。
思考:
需要自定义改造两个机制:
1、改造分区的逻辑,自定义一个partitioner,主要是实现如何进行分组。 Partitioner的作用是对Mapper产生的中间结果进行分片,以便将同一个分区的数据交给同一个Reducer处理,它直接影响Reducer阶段的负载均衡。
Partitioner只提供了一个方法:
getPartition(Text key,Text value,int numPartitions)
前两个参数是Map的Key和Value,numPartitions为Reduce的个数。
2、自定义reducer task的并发任务数,使得多个reduce同时工作。
项目目录如下:
AreaPartition.java:
package cn.darrenchan.hadoop.mr.areapartition;import java.util.HashMap;import org.apache.hadoop.mapreduce.Partitioner;public class AreaPartitionerextends Partitioner { private static HashMap areaMap = new HashMap<>(); /** * 这里只是提前设定了一下,其实这里可以写查询数据库,返回号码所在省份的编号 */ static{ areaMap.put("135", 0); areaMap.put("136", 1); areaMap.put("137", 2); areaMap.put("138", 3); areaMap.put("139", 4); } @Override public int getPartition(KEY key, VALUE value, int numPartitions) { //从key中拿到手机号,查询手机归属地字典,不同的省份返回不同的组号 int areaCoder = areaMap.get(key.toString().substring(0, 3))==null?5:areaMap.get(key.toString().substring(0, 3)); return areaCoder; }}
FlowSumArea.java:
package cn.darrenchan.hadoop.mr.areapartition;import java.io.IOException;import org.apache.commons.lang.StringUtils;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import cn.darrenchan.hadoop.mr.flow.FlowBean;/** * 对流量原始日志进行流量统计,将不同省份的用户统计结果输出到不同文件 * 需要自定义改造两个机制: * 1、改造分区的逻辑,自定义一个partitioner * 2、自定义reduer task的并发任务数 * */public class FlowSumArea { public static class FlowSumAreaMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 拿一行数据 String line = value.toString(); // 切分成各个字段 String[] fields = StringUtils.split(line, "\t"); // 拿到我们需要的字段 String phoneNum = fields[1]; long upFlow = Long.parseLong(fields[7]); long downFlow = Long.parseLong(fields[8]); // 封装数据为kv并输出 context.write(new Text(phoneNum), new FlowBean(phoneNum, upFlow, downFlow)); } } public static class FlowSumAreaReducer extends Reducer { @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { long up_flow_counter = 0; long d_flow_counter = 0; for (FlowBean bean : values) { up_flow_counter += bean.getUpFlow(); d_flow_counter += bean.getDownFlow(); } context.write(key, new FlowBean(key.toString(), up_flow_counter, d_flow_counter)); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(FlowSumArea.class); job.setMapperClass(FlowSumAreaMapper.class); job.setReducerClass(FlowSumAreaReducer.class); // 设置我们自定义的分组逻辑定义 job.setPartitionerClass(AreaPartitioner.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); // 设置reduce的任务并发数,应该跟分组的数量保持一致,写1不会报错,2,3,4,5均会报错,7,8,9...反而不会报错,因为后面的直接数据为0了 job.setNumReduceTasks(6); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }}
FlowBeanArea.java:
package cn.darrenchan.hadoop.mr.flow;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.Writable;import org.apache.hadoop.io.WritableComparable;public class FlowBean implements WritableComparable{ private String phoneNum;// 手机号 private long upFlow;// 上行流量 private long downFlow;// 下行流量 private long sumFlow;// 总流量 public FlowBean() { super(); } public FlowBean(String phoneNum, long upFlow, long downFlow) { super(); this.phoneNum = phoneNum; this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; } public String getPhoneNum() { return phoneNum; } public void setPhoneNum(String phoneNum) { this.phoneNum = phoneNum; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } @Override public String toString() { return upFlow + "\t" + downFlow + "\t" + sumFlow; } // 从数据流中反序列出对象的数据 // 从数据流中读出对象字段时,必须跟序列化时的顺序保持一致 @Override public void readFields(DataInput in) throws IOException { phoneNum = in.readUTF(); upFlow = in.readLong(); downFlow = in.readLong(); sumFlow = in.readLong(); } // 将对象数据序列化到流中 @Override public void write(DataOutput out) throws IOException { out.writeUTF(phoneNum); out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } @Override public int compareTo(FlowBean flowBean) { return sumFlow > flowBean.getSumFlow() ? -1 : 1; }}
将项目打包成area.jar,并执行命令:
hadoop jar area.jar cn.darrenchan.hadoop.mr.areapartition.FlowSumArea /flow/srcdata /flow/outputarea
我们可以看到如下运行信息:
17/02/26 09:10:54 INFO client.RMProxy: Connecting to ResourceManager at weekend110/192.168.230.134:8032
17/02/26 09:10:54 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.17/02/26 09:10:55 INFO input.FileInputFormat: Total input paths to process : 117/02/26 09:10:55 INFO mapreduce.JobSubmitter: number of splits:117/02/26 09:10:55 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1488112052214_000517/02/26 09:10:55 INFO impl.YarnClientImpl: Submitted application application_1488112052214_000517/02/26 09:10:55 INFO mapreduce.Job: The url to track the job: http://weekend110:8088/proxy/application_1488112052214_0005/17/02/26 09:10:55 INFO mapreduce.Job: Running job: job_1488112052214_000517/02/26 09:11:01 INFO mapreduce.Job: Job job_1488112052214_0005 running in uber mode : false17/02/26 09:11:01 INFO mapreduce.Job: map 0% reduce 0%17/02/26 09:11:07 INFO mapreduce.Job: map 100% reduce 0%17/02/26 09:11:19 INFO mapreduce.Job: map 100% reduce 17%17/02/26 09:11:23 INFO mapreduce.Job: map 100% reduce 33%17/02/26 09:11:26 INFO mapreduce.Job: map 100% reduce 50%17/02/26 09:11:27 INFO mapreduce.Job: map 100% reduce 83%17/02/26 09:11:28 INFO mapreduce.Job: map 100% reduce 100%17/02/26 09:11:28 INFO mapreduce.Job: Job job_1488112052214_0005 completed successfully17/02/26 09:11:28 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=1152 FILE: Number of bytes written=652142 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=2338 HDFS: Number of bytes written=526 HDFS: Number of read operations=21 HDFS: Number of large read operations=0 HDFS: Number of write operations=12 Job Counters Launched map tasks=1 Launched reduce tasks=6 Data-local map tasks=1 Total time spent by all maps in occupied slots (ms)=2663 Total time spent by all reduces in occupied slots (ms)=83315 Total time spent by all map tasks (ms)=2663 Total time spent by all reduce tasks (ms)=83315 Total vcore-seconds taken by all map tasks=2663 Total vcore-seconds taken by all reduce tasks=83315 Total megabyte-seconds taken by all map tasks=2726912 Total megabyte-seconds taken by all reduce tasks=85314560 Map-Reduce Framework Map input records=22 Map output records=22 Map output bytes=1072 Map output materialized bytes=1152 Input split bytes=124 Combine input records=0 Combine output records=0 Reduce input groups=21 Reduce shuffle bytes=1152 Reduce input records=22 Reduce output records=21 Spilled Records=44 Shuffled Maps =6 Failed Shuffles=0 Merged Map outputs=6 GC time elapsed (ms)=524 CPU time spent (ms)=3210 Physical memory (bytes) snapshot=509775872 Virtual memory (bytes) snapshot=2547916800 Total committed heap usage (bytes)=218697728 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=2214 File Output Format Counters Bytes Written=526
运行结果完成之后,我们发现这次生成了6个文件,显示如下:
最终显示结果如下所示,我们看到的确是按照我们预期的进行了相应的分组:
在运行过程中,我们不断监控该过程,看看是不是一共6个reduce同时工作,发现最多的地方确实是6个YarnChild,说明我们的程序正确。
Last login: Sun Feb 26 04:26:01 2017 from 192.168.230.1
[hadoop@weekend110 ~] jps2473 NameNode8703 RunJar9214 Jps9029 YarnChild8995 YarnChild2747 SecondaryNameNode8978 -- process information unavailable2891 ResourceManager2992 NodeManager8799 MRAppMaster9053 YarnChild2569 DataNode[hadoop@weekend110 ~] jps2473 NameNode2747 SecondaryNameNode2891 ResourceManager2992 NodeManager8799 MRAppMaster2569 DataNode9330 Jps[hadoop@weekend110 ~] jps2473 NameNode9482 MRAppMaster2747 SecondaryNameNode2891 ResourceManager9386 RunJar2992 NodeManager2569 DataNode9495 Jps[hadoop@weekend110 ~] jps2473 NameNode9482 MRAppMaster2747 SecondaryNameNode2891 ResourceManager9386 RunJar9558 Jps2992 NodeManager2569 DataNode[hadoop@weekend110 ~] jps2473 NameNode9482 MRAppMaster9580 Jps2747 SecondaryNameNode2891 ResourceManager9386 RunJar2992 NodeManager2569 DataNode[hadoop@weekend110 ~] jps2473 NameNode9598 YarnChild9482 MRAppMaster2747 SecondaryNameNode9623 Jps2891 ResourceManager9386 RunJar2992 NodeManager2569 DataNode[hadoop@weekend110 ~] jps2473 NameNode9650 Jps9482 MRAppMaster2747 SecondaryNameNode2891 ResourceManager9386 RunJar2992 NodeManager2569 DataNode[hadoop@weekend110 ~] jps2473 NameNode9482 MRAppMaster9665 YarnChild2747 SecondaryNameNode9681 YarnChild9696 Jps2891 ResourceManager9386 RunJar2992 NodeManager2569 DataNode9704 YarnChild[hadoop@weekend110 ~] jps2473 NameNode9772 Jps9482 MRAppMaster9665 YarnChild2747 SecondaryNameNode9681 YarnChild9770 YarnChild9751 YarnChild2891 ResourceManager9386 RunJar2992 NodeManager9730 YarnChild2569 DataNode9704 YarnChild[hadoop@weekend110 ~] jps2473 NameNode9482 MRAppMaster9817 Jps9665 -- process information unavailable2747 SecondaryNameNode9681 YarnChild9770 YarnChild9751 YarnChild2891 ResourceManager9386 RunJar2992 NodeManager9730 YarnChild2569 DataNode9704 YarnChild[hadoop@weekend110 ~] jps2473 NameNode9482 MRAppMaster2747 SecondaryNameNode9681 YarnChild9872 Jps9770 YarnChild9751 YarnChild2891 ResourceManager9386 RunJar2992 NodeManager9730 YarnChild2569 DataNode9704 YarnChild[hadoop@weekend110 ~] jps2473 NameNode9482 MRAppMaster9921 Jps2747 SecondaryNameNode9770 YarnChild9751 YarnChild2891 ResourceManager9386 RunJar2992 NodeManager9730 YarnChild2569 DataNode9704 YarnChild[hadoop@weekend110 ~] jps2473 NameNode9482 MRAppMaster2747 SecondaryNameNode9770 YarnChild9751 -- process information unavailable2891 ResourceManager9386 RunJar10021 Jps2992 NodeManager2569 DataNode[hadoop@weekend110 ~] jps2473 NameNode9482 MRAppMaster2747 SecondaryNameNode10079 Jps2891 ResourceManager9386 RunJar2992 NodeManager2569 DataNode[hadoop@weekend110 ~] jps10090 Jps2473 NameNode9482 MRAppMaster2747 SecondaryNameNode2891 ResourceManager2992 NodeManager2569 DataNode[hadoop@weekend110 ~] jps2473 NameNode9482 MRAppMaster2747 SecondaryNameNode10099 Jps2891 ResourceManager2992 NodeManager2569 DataNode