Hadoop入门教程(十四):Hadoop MapReduce Shuffle 机制
2021年03月24日 09:49:27 · 本文共 2,425 字阅读时间约 8分钟 · 2,972 次浏览在上一篇教程我们引入了 MapReduce 的执行机制,并粗略的讲了 InputFormat,现在该讲一下 Shuffle 了。Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle。
MapReduce 是 Hadoop 的重点,而 Shuffle 是 MapReduce 的重点,很多神奇的操作都在这里发生,因为是入门类教程,我也只简单的了解一下,引导大家入门,先走进来,深入的部分请各位看官综合搜索引擎其他信息学习。
Partition 分区
MapReduce提供Partitioner接口,它的作用就是根据key或value及reduce的数量来决定当前的这对输出数据最终应该交由哪个ReduceTask处理。默认对key hash后再以ReduceTask数量取模。默认的取模方式只是为了平均reduce的处理能力,如果用户自己对Partitioner有需求,可以订制并设置到job上。
自定义 Partition 分区
想要自定义一个 Partition 分区,只需要继承 Partitioner 就可以开始我们的骚操作了,来个简单的案例,比如按照日志的IP开头进行分区到不同的ReduceTask:
public class MyPartitioner extends Partitioner<Text, DemoEntity> {
@Override
public int getPartition(Text text, DemoEntity demoEntity, int numPartitions) {
// 111.224.80.24 - - [17/Mar/2021:03:17:49 +0000] "GET /dictionary/gender HTTP/1.1" 200 405 "http://www.renfei.net/index.html" "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.100 Safari/537.36"
// 取 IP
String ip = text.toString().split(" ")[0];
// 假设按照 IP 开头的区别分别分区
if (ip.startsWith("192.")) {
return 0;
} else if (ip.startsWith("10.10.")) {
return 1;
} else if (ip.startsWith("10.0.")) {
return 2;
} else {
return 3;
}
}
使用我们自定义的 MyPartitioner,在任务上设置一下:
Job job = Job.getInstance(new Configuration());
job.setJarByClass(PartitionerDriver.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DemoEntity.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DemoEntity.class);
// 此处只演示 自定义 Partition 分区 的使用
// 设置 ReduceTasks 是 4 个,因为我们分区为 0、1、2、3
job.setNumReduceTasks(4);
job.setPartitionerClass(MyPartitioner.class);
WritableComparable 排序
在之前的案例中,我们拿到的输入都是排序好的,无论是 MapTask 还是 ReduceTask 都会按照 Key 进行排序,不管你是否需要,Hadoop 都会进行排序,默认是按照字典排序,使用的排序算法是快速排序。
我们在前面的《Hadoop入门教程(十二):Hadoop 的 Writable 类》尝试创建了自己的Bean对象,如果想要让这个 Bean 支持排序,我们就需要实现 WritableComparable 接口,并重写 compareTo() 方法:
public class DemoEntity implements WritableComparable<DemoEntity> {
private String ip;
private String path;
private int port;
/**
* 序列化方法
*
* @param dataOutput 框架给我们的数据出口
* @throws IOException
*/
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(ip);
dataOutput.writeUTF(path);
dataOutput.writeInt(port);
}
/**
* 反序列化方法
*
* @param dataInput 框架给我们的数据来源
* @throws IOException
*/
@Override
public void readFields(DataInput dataInput) throws IOException {
ip = dataInput.readUTF();
path = dataInput.readUTF();
port = dataInput.readInt();
}
// 此处省略 Getter/Setter ....
/**
* 排序支持
*
* @param o
* @return
*/
@Override
public int compareTo(DemoEntity o) {
// 假设我们按 port 排序
return Integer.compare(o.getPort(), this.port);
}
}
Combiner 合并
Combiner 是 Mapper 和 Reducer 之外的一种,但 Combiner 的父类是 Reducer,但又跟 Reducer 不太一样。Combiner 是在 MapTask 所在节点运行的,可以为每个 MapTask 的输出结果进行汇总合并,减少网络IO,就像压缩功能一样,但默认不会使用 Combiner,因为他会修改 MapTask 的输出结果,如果要使用 Combiner 前提必须是不能影响业务结果!所以得根据自己的业务场景来决定是否使用 Combiner。
如果你需要自己定义一个 Combiner 并使用它,可以继承R educer,重写Reduce方法,然后:job.setCombinerClass(MyCombiner.class);,由于是入门级别,主要是我懒得写了,这个教程写了半个月了,在这就不做演示了。
GroupingComparator分组
分组呢也好理解,有的时候我们有一个销售订单数据,想要知道每个月最高金额的订单是哪个,那我们就是按月份进行分组了,自定义类继承 WritableComparator,重写compare()方法,这里我也懒得演示了,各位用的时候可以再查。
版权声明:本文为博主「任霏」原创文章,遵循 CC BY-NC-SA 4.0 版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.renfei.net/posts/1003474
相关推荐
猜你还喜欢这些内容,不妨试试阅读一下以下内容均由网友提交发布,版权与真实性无法查证,请自行辨别。
- 前后端分离项目接口数据加密的秘钥交换逻辑(RSA、AES)
- OmniGraffle 激活/破解 密钥/密匙/Key/License
- Redis 未授权访问漏洞分析 cleanfda 脚本复现漏洞挖矿
- CleanMyMac X 破解版 [TNT] 4.6.0
- OmniPlan 激活/破解 密钥/密匙/Key/License
- 人大金仓 KingbaseES V8 R3 安装包、驱动包和 License 下载地址
- Parallels Desktop For Mac 16.0.1.48911 破解版 [TNT]
- Parallels Desktop For Mac 15.1.4.47270 破解版 [TNT]
- Sound Control 破解版 2.4.2
- CleanMyMac X 破解版 [TNT] 4.6.5
- 博客完全迁移上阿里云,我所使用的阿里云架构
- 微软确认Windows 10存在bug 部分电脑升级后被冻结
- 大佬们在说的AQS,到底啥是个AQS(AbstractQueuedSynchronizer)同步队列
- 比特币(BTC)钱包客户端区块链数据同步慢,区块链数据离线下载
- Java中说的CAS(compare and swap)是个啥
- 小心免费主题!那些WordPress主题后门,一招拥有管理员权限
- 强烈谴责[wamae.win]恶意反向代理我站并篡改我站网页
- 讨论下Java中的volatile和JMM(Java Memory Model)Java内存模型
- 新版个人网站 NEILREN4J 上线并开源程序源码
- 我站近期遭受到恶意不友好访问攻击公告