博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
MapReduce实战(三)分区的实现
阅读量:4326 次
发布时间:2019-06-06

本文共 12400 字,大约阅读时间需要 41 分钟。

需求:

在实战(一)的基础 上,实现自定义分组机制。例如根据手机号的不同,分成不同的省份,然后在不同的reduce上面跑,最后生成的结果分别存在不同的文件中。

对流量原始日志进行流量统计,将不同省份的用户统计结果输出到不同文件。

思考:

需要自定义改造两个机制:

1、改造分区的逻辑,自定义一个partitioner,主要是实现如何进行分组。

Partitioner的作用是对Mapper产生的中间结果进行分片,以便将同一个分区的数据交给同一个Reducer处理,它直接影响Reducer阶段的负载均衡。
    Partitioner只提供了一个方法:
    getPartition(Text key,Text value,int numPartitions)
    前两个参数是Map的Key和Value,numPartitions为Reduce的个数。

2、自定义reducer task的并发任务数,使得多个reduce同时工作。

 

项目目录如下:

AreaPartition.java:

package cn.darrenchan.hadoop.mr.areapartition;import java.util.HashMap;import org.apache.hadoop.mapreduce.Partitioner;public class AreaPartitioner
extends Partitioner
{ private static HashMap
areaMap = new HashMap<>(); /** * 这里只是提前设定了一下,其实这里可以写查询数据库,返回号码所在省份的编号 */ static{ areaMap.put("135", 0); areaMap.put("136", 1); areaMap.put("137", 2); areaMap.put("138", 3); areaMap.put("139", 4); } @Override public int getPartition(KEY key, VALUE value, int numPartitions) { //从key中拿到手机号,查询手机归属地字典,不同的省份返回不同的组号 int areaCoder = areaMap.get(key.toString().substring(0, 3))==null?5:areaMap.get(key.toString().substring(0, 3)); return areaCoder; }}

 

FlowSumArea.java:

package cn.darrenchan.hadoop.mr.areapartition;import java.io.IOException;import org.apache.commons.lang.StringUtils;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import cn.darrenchan.hadoop.mr.flow.FlowBean;/** * 对流量原始日志进行流量统计,将不同省份的用户统计结果输出到不同文件  * 需要自定义改造两个机制: * 1、改造分区的逻辑,自定义一个partitioner * 2、自定义reduer task的并发任务数 *  */public class FlowSumArea {    public static class FlowSumAreaMapper extends            Mapper
{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 拿一行数据 String line = value.toString(); // 切分成各个字段 String[] fields = StringUtils.split(line, "\t"); // 拿到我们需要的字段 String phoneNum = fields[1]; long upFlow = Long.parseLong(fields[7]); long downFlow = Long.parseLong(fields[8]); // 封装数据为kv并输出 context.write(new Text(phoneNum), new FlowBean(phoneNum, upFlow, downFlow)); } } public static class FlowSumAreaReducer extends Reducer
{ @Override protected void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException { long up_flow_counter = 0; long d_flow_counter = 0; for (FlowBean bean : values) { up_flow_counter += bean.getUpFlow(); d_flow_counter += bean.getDownFlow(); } context.write(key, new FlowBean(key.toString(), up_flow_counter, d_flow_counter)); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(FlowSumArea.class); job.setMapperClass(FlowSumAreaMapper.class); job.setReducerClass(FlowSumAreaReducer.class); // 设置我们自定义的分组逻辑定义 job.setPartitionerClass(AreaPartitioner.class);        job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); // 设置reduce的任务并发数,应该跟分组的数量保持一致,写1不会报错,2,3,4,5均会报错,7,8,9...反而不会报错,因为后面的直接数据为0了 job.setNumReduceTasks(6); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }}

 

 

FlowBeanArea.java:

package cn.darrenchan.hadoop.mr.flow;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.Writable;import org.apache.hadoop.io.WritableComparable;public class FlowBean implements WritableComparable
{ private String phoneNum;// 手机号 private long upFlow;// 上行流量 private long downFlow;// 下行流量 private long sumFlow;// 总流量 public FlowBean() { super(); } public FlowBean(String phoneNum, long upFlow, long downFlow) { super(); this.phoneNum = phoneNum; this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; } public String getPhoneNum() { return phoneNum; } public void setPhoneNum(String phoneNum) { this.phoneNum = phoneNum; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } @Override public String toString() { return upFlow + "\t" + downFlow + "\t" + sumFlow; } // 从数据流中反序列出对象的数据 // 从数据流中读出对象字段时,必须跟序列化时的顺序保持一致 @Override public void readFields(DataInput in) throws IOException { phoneNum = in.readUTF(); upFlow = in.readLong(); downFlow = in.readLong(); sumFlow = in.readLong(); } // 将对象数据序列化到流中 @Override public void write(DataOutput out) throws IOException { out.writeUTF(phoneNum); out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } @Override public int compareTo(FlowBean flowBean) { return sumFlow > flowBean.getSumFlow() ? -1 : 1; }}

 

 

将项目打包成area.jar,并执行命令:

hadoop jar area.jar cn.darrenchan.hadoop.mr.areapartition.FlowSumArea /flow/srcdata /flow/outputarea

我们可以看到如下运行信息: 

17/02/26 09:10:54 INFO client.RMProxy: Connecting to ResourceManager at weekend110/192.168.230.134:8032

17/02/26 09:10:54 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
17/02/26 09:10:55 INFO input.FileInputFormat: Total input paths to process : 1
17/02/26 09:10:55 INFO mapreduce.JobSubmitter: number of splits:1
17/02/26 09:10:55 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1488112052214_0005
17/02/26 09:10:55 INFO impl.YarnClientImpl: Submitted application application_1488112052214_0005
17/02/26 09:10:55 INFO mapreduce.Job: The url to track the job: http://weekend110:8088/proxy/application_1488112052214_0005/
17/02/26 09:10:55 INFO mapreduce.Job: Running job: job_1488112052214_0005
17/02/26 09:11:01 INFO mapreduce.Job: Job job_1488112052214_0005 running in uber mode : false
17/02/26 09:11:01 INFO mapreduce.Job: map 0% reduce 0%
17/02/26 09:11:07 INFO mapreduce.Job: map 100% reduce 0%
17/02/26 09:11:19 INFO mapreduce.Job: map 100% reduce 17%
17/02/26 09:11:23 INFO mapreduce.Job: map 100% reduce 33%
17/02/26 09:11:26 INFO mapreduce.Job: map 100% reduce 50%
17/02/26 09:11:27 INFO mapreduce.Job: map 100% reduce 83%
17/02/26 09:11:28 INFO mapreduce.Job: map 100% reduce 100%
17/02/26 09:11:28 INFO mapreduce.Job: Job job_1488112052214_0005 completed successfully
17/02/26 09:11:28 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=1152
FILE: Number of bytes written=652142
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=2338
HDFS: Number of bytes written=526
HDFS: Number of read operations=21
HDFS: Number of large read operations=0
HDFS: Number of write operations=12
Job Counters
Launched map tasks=1
Launched reduce tasks=6
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=2663
Total time spent by all reduces in occupied slots (ms)=83315
Total time spent by all map tasks (ms)=2663
Total time spent by all reduce tasks (ms)=83315
Total vcore-seconds taken by all map tasks=2663
Total vcore-seconds taken by all reduce tasks=83315
Total megabyte-seconds taken by all map tasks=2726912
Total megabyte-seconds taken by all reduce tasks=85314560
Map-Reduce Framework
Map input records=22
Map output records=22
Map output bytes=1072
Map output materialized bytes=1152
Input split bytes=124
Combine input records=0
Combine output records=0
Reduce input groups=21
Reduce shuffle bytes=1152
Reduce input records=22
Reduce output records=21
Spilled Records=44
Shuffled Maps =6
Failed Shuffles=0
Merged Map outputs=6
GC time elapsed (ms)=524
CPU time spent (ms)=3210
Physical memory (bytes) snapshot=509775872
Virtual memory (bytes) snapshot=2547916800
Total committed heap usage (bytes)=218697728
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=2214
File Output Format Counters
Bytes Written=526

 

 运行结果完成之后,我们发现这次生成了6个文件,显示如下:

 

最终显示结果如下所示,我们看到的确是按照我们预期的进行了相应的分组:

 

 

在运行过程中,我们不断监控该过程,看看是不是一共6个reduce同时工作,发现最多的地方确实是6个YarnChild,说明我们的程序正确。

Last login: Sun Feb 26 04:26:01 2017 from 192.168.230.1

[hadoop@weekend110 ~] jps
2473 NameNode
8703 RunJar
9214 Jps
9029 YarnChild
8995 YarnChild
2747 SecondaryNameNode
8978 -- process information unavailable
2891 ResourceManager
2992 NodeManager
8799 MRAppMaster
9053 YarnChild
2569 DataNode
[hadoop@weekend110 ~] jps
2473 NameNode
2747 SecondaryNameNode
2891 ResourceManager
2992 NodeManager
8799 MRAppMaster
2569 DataNode
9330 Jps
[hadoop@weekend110 ~] jps
2473 NameNode
9482 MRAppMaster
2747 SecondaryNameNode
2891 ResourceManager
9386 RunJar
2992 NodeManager
2569 DataNode
9495 Jps
[hadoop@weekend110 ~] jps
2473 NameNode
9482 MRAppMaster
2747 SecondaryNameNode
2891 ResourceManager
9386 RunJar
9558 Jps
2992 NodeManager
2569 DataNode
[hadoop@weekend110 ~] jps
2473 NameNode
9482 MRAppMaster
9580 Jps
2747 SecondaryNameNode
2891 ResourceManager
9386 RunJar
2992 NodeManager
2569 DataNode
[hadoop@weekend110 ~] jps
2473 NameNode
9598 YarnChild
9482 MRAppMaster
2747 SecondaryNameNode
9623 Jps
2891 ResourceManager
9386 RunJar
2992 NodeManager
2569 DataNode
[hadoop@weekend110 ~] jps
2473 NameNode
9650 Jps
9482 MRAppMaster
2747 SecondaryNameNode
2891 ResourceManager
9386 RunJar
2992 NodeManager
2569 DataNode
[hadoop@weekend110 ~] jps
2473 NameNode
9482 MRAppMaster
9665 YarnChild
2747 SecondaryNameNode
9681 YarnChild
9696 Jps
2891 ResourceManager
9386 RunJar
2992 NodeManager
2569 DataNode
9704 YarnChild
[hadoop@weekend110 ~] jps
2473 NameNode
9772 Jps
9482 MRAppMaster
9665 YarnChild
2747 SecondaryNameNode
9681 YarnChild
9770 YarnChild
9751 YarnChild
2891 ResourceManager
9386 RunJar
2992 NodeManager
9730 YarnChild
2569 DataNode
9704 YarnChild
[hadoop@weekend110 ~] jps
2473 NameNode
9482 MRAppMaster
9817 Jps
9665 -- process information unavailable
2747 SecondaryNameNode
9681 YarnChild
9770 YarnChild
9751 YarnChild
2891 ResourceManager
9386 RunJar
2992 NodeManager
9730 YarnChild
2569 DataNode
9704 YarnChild
[hadoop@weekend110 ~] jps
2473 NameNode
9482 MRAppMaster
2747 SecondaryNameNode
9681 YarnChild
9872 Jps
9770 YarnChild
9751 YarnChild
2891 ResourceManager
9386 RunJar
2992 NodeManager
9730 YarnChild
2569 DataNode
9704 YarnChild
[hadoop@weekend110 ~] jps
2473 NameNode
9482 MRAppMaster
9921 Jps
2747 SecondaryNameNode
9770 YarnChild
9751 YarnChild
2891 ResourceManager
9386 RunJar
2992 NodeManager
9730 YarnChild
2569 DataNode
9704 YarnChild
[hadoop@weekend110 ~] jps
2473 NameNode
9482 MRAppMaster
2747 SecondaryNameNode
9770 YarnChild
9751 -- process information unavailable
2891 ResourceManager
9386 RunJar
10021 Jps
2992 NodeManager
2569 DataNode
[hadoop@weekend110 ~] jps
2473 NameNode
9482 MRAppMaster
2747 SecondaryNameNode
10079 Jps
2891 ResourceManager
9386 RunJar
2992 NodeManager
2569 DataNode
[hadoop@weekend110 ~] jps
10090 Jps
2473 NameNode
9482 MRAppMaster
2747 SecondaryNameNode
2891 ResourceManager
2992 NodeManager
2569 DataNode
[hadoop@weekend110 ~] jps
2473 NameNode
9482 MRAppMaster
2747 SecondaryNameNode
10099 Jps
2891 ResourceManager
2992 NodeManager
2569 DataNode

 

转载于:https://www.cnblogs.com/DarrenChan/p/6464259.html

你可能感兴趣的文章
HTML基础之JS
查看>>
01背包问题python 2.7实现
查看>>
E - Kagome Kagome
查看>>
Android Studio Errors
查看>>
软件工程第二次作业—结对编程
查看>>
【字符编码】Java字符编码详细解答及问题探讨
查看>>
学习操作系统导图
查看>>
在线的JSON formate工具
查看>>
winform非常实用的程序退出方法!!!!!(转自博客园)
查看>>
xml解析
查看>>
centos安装vim
查看>>
linux工作调度(计划任务)
查看>>
HTTP协议形象展现
查看>>
hdu--1698 Just a Hook(线段树+区间更新+懒惰标记)
查看>>
Django 框架的 模板继承 与 模板包含
查看>>
Effective Modern C++:01类型推导
查看>>
根据html页面模板动态生成html页面(c#类)
查看>>
Failed to Connect to MySQL at 127.0.0.1:3306 MySql WorkBench 本地连接问题
查看>>
android下activity中多个listview只允许主界面滚动
查看>>
(贪心5.2.1)UVA 10026 Shoemaker's Problem(利用数据有序化来进行贪心选择)
查看>>