不早了,快休息吧?

大数据

Hadoop入门教程(十四):Hadoop MapReduce Shuffle 机制

2021年03月24日 09:49:27 · 本文共 2,425 字阅读时间约 8分钟 · 2,972 次浏览
Hadoop入门教程(十四):Hadoop MapReduce Shuffle 机制

在上一篇教程我们引入了 MapReduce 的执行机制,并粗略的讲了 InputFormat,现在该讲一下 Shuffle 了。Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle。

MapReduce 是 Hadoop 的重点,而 Shuffle 是 MapReduce 的重点,很多神奇的操作都在这里发生,因为是入门类教程,我也只简单的了解一下,引导大家入门,先走进来,深入的部分请各位看官综合搜索引擎其他信息学习。

Partition 分区

MapReduce提供Partitioner接口,它的作用就是根据key或value及reduce的数量来决定当前的这对输出数据最终应该交由哪个ReduceTask处理。默认对key hash后再以ReduceTask数量取模。默认的取模方式只是为了平均reduce的处理能力,如果用户自己对Partitioner有需求,可以订制并设置到job上。 

自定义 Partition 分区

想要自定义一个 Partition 分区,只需要继承 Partitioner 就可以开始我们的骚操作了,来个简单的案例,比如按照日志的IP开头进行分区到不同的ReduceTask:

public class MyPartitioner extends Partitioner<Text, DemoEntity> {
@Override
public int getPartition(Text text, DemoEntity demoEntity, int numPartitions) {
    // 111.224.80.24 - - [17/Mar/2021:03:17:49 +0000] "GET /dictionary/gender HTTP/1.1" 200 405 "http://www.renfei.net/index.html" "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.100 Safari/537.36"
    // 取 IP
    String ip = text.toString().split(" ")[0];
    // 假设按照 IP 开头的区别分别分区
    if (ip.startsWith("192.")) {
        return 0;
    } else if (ip.startsWith("10.10.")) {
        return 1;
    } else if (ip.startsWith("10.0.")) {
        return 2;
    } else {
        return 3;
    }
}

使用我们自定义的 MyPartitioner,在任务上设置一下:

Job job = Job.getInstance(new Configuration());
job.setJarByClass(PartitionerDriver.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DemoEntity.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DemoEntity.class);
// 此处只演示 自定义 Partition 分区 的使用
// 设置 ReduceTasks 是 4 个,因为我们分区为 0、1、2、3
job.setNumReduceTasks(4);
job.setPartitionerClass(MyPartitioner.class);

WritableComparable 排序

在之前的案例中,我们拿到的输入都是排序好的,无论是 MapTask 还是 ReduceTask 都会按照 Key 进行排序,不管你是否需要,Hadoop 都会进行排序,默认是按照字典排序,使用的排序算法是快速排序。

我们在前面的《Hadoop入门教程(十二):Hadoop 的 Writable 类》尝试创建了自己的Bean对象,如果想要让这个 Bean 支持排序,我们就需要实现 WritableComparable 接口,并重写 compareTo() 方法:

public class DemoEntity implements WritableComparable<DemoEntity> {
    private String ip;
    private String path;
    private int port;
    /**
     * 序列化方法
     *
     * @param dataOutput 框架给我们的数据出口
     * @throws IOException
     */
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(ip);
        dataOutput.writeUTF(path);
        dataOutput.writeInt(port);
    }
    /**
     * 反序列化方法
     *
     * @param dataInput 框架给我们的数据来源
     * @throws IOException
     */
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        ip = dataInput.readUTF();
        path = dataInput.readUTF();
        port = dataInput.readInt();
    }
    // 此处省略 Getter/Setter ....
    /**
     * 排序支持
     *
     * @param o
     * @return
     */
    @Override
    public int compareTo(DemoEntity o) {
        // 假设我们按 port 排序
        return Integer.compare(o.getPort(), this.port);
    }
}

Combiner 合并

Combiner 是 Mapper 和 Reducer 之外的一种,但 Combiner 的父类是 Reducer,但又跟 Reducer 不太一样。Combiner 是在 MapTask 所在节点运行的,可以为每个 MapTask 的输出结果进行汇总合并,减少网络IO,就像压缩功能一样,但默认不会使用 Combiner,因为他会修改 MapTask 的输出结果,如果要使用 Combiner 前提必须是不能影响业务结果!所以得根据自己的业务场景来决定是否使用 Combiner。

如果你需要自己定义一个 Combiner 并使用它,可以继承R educer,重写Reduce方法,然后:job.setCombinerClass(MyCombiner.class);,由于是入门级别,主要是我懒得写了,这个教程写了半个月了,在这就不做演示了。

GroupingComparator分组

分组呢也好理解,有的时候我们有一个销售订单数据,想要知道每个月最高金额的订单是哪个,那我们就是按月份进行分组了,自定义类继承 WritableComparator,重写compare()方法,这里我也懒得演示了,各位用的时候可以再查。


商业用途请联系作者获得授权。
版权声明:本文为博主「任霏」原创文章,遵循 CC BY-NC-SA 4.0 版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.renfei.net/posts/1003474
评论与留言

以下内容均由网友提交发布,版权与真实性无法查证,请自行辨别。

微信搜一搜:任霏博客