专注于 JetBrains IDEA 全家桶,永久激活,教程
持续更新 PyCharm,IDEA,WebStorm,PhpStorm,DataGrip,RubyMine,CLion,AppCode 永久激活教程

MapReduce之自定义InputFormat

在企业开发中,Hadoop框架自带的InputFormat类型不能满足所有应用场景,需要自定义InputFormat来解决实际问题。
自定义InputFormat步骤如下:

  • (1)自定义一个类继承FilelnputFormat
  • (2)自定义一个类继承RecordReader,实现一次读取一个完整文件,将文件名为key,文件内容为value。
  • (3)在输出时使用SequenceFileOutPutFormat输出合并文件。

无论HDFS还是MapReduce,在处理小文件时效率都非常低,但又难免面临处理大量小文件的场景,此时,就需要有相应解决方案。可以自定义InputFormat实现小文件的合并。

1. 需求

将多个小文件合并成一个SequenceFile文件(SequenceFile文件是Hadoop用来存储二进制形式的key-value(bytes) 对的文件格式),SequenceFile里面存储着多个文件,存储的形式为文件路径+名称为key,文件内容为value。

(1)输入数据
112_1.png
(2)期望输出文件格式
112_2.png

2. 需求分析

1、 自定义一个类继承FileInputFormat
(1)重写isSplitable()方法,返回false,让文件不可切,整个文件作为1片
(2)重写createRecordReader(),返回自定义的RecordReader对象
2、 自定义一个类继承RecordReader
在RecordReader中,nextKeyValue()是最重要的方法,返回当前读取到的key-value,如果读到返回true,调用Mapper的map()来处理,否则返回false

3. 编写程序

MyInputFormat.java

/*
 * 1. 改变切片策略,一个文件固定切1片,通过指定文件不可切
 * 
 * 2. 提供RR ,这个RR读取切片的文件名作为key,读取切片的内容封装到bytes作为value
 */
public class MyInputFormat extends FileInputFormat {

    @Override
    public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        return new MyRecordReader();
    }

    @Override
    protected boolean isSplitable(JobContext context, Path filename) {
        return false;
    }
}

MyRecordReader.java

/*
 * RecordReader从MapTask处理的当前切片中读取数据
 * 
 * XXXContext都是Job的上下文,通过XXXContext可以获取Job的配置Configuration对象
 */
public class MyRecordReader extends RecordReader {

    private Text key;
    private BytesWritable value;

    private String filename;
    private int length;

    private FileSystem fs;
    private Path path;

    private FSDataInputStream is;

    private boolean flag=true;

    // MyRecordReader在创建后,在进入Mapper的run()之前,自动调用
    // 文件的所有内容设置为1个切片,切片的长度等于文件的长度
    @Override
    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {

        FileSplit fileSplit=(FileSplit) split;

        filename=fileSplit.getPath().getName();

        length=(int) fileSplit.getLength();

        path=fileSplit.getPath();

        //获取当前Job的配置对象
        Configuration conf = context.getConfiguration();

        //获取当前Job使用的文件系统
        fs=FileSystem.get(conf);

         is = fs.open(path);

    }

    // 读取一组输入的key-value,读到返回true,否则返回false
    // 将文件的名称封装为key,将文件的内容封装为BytesWritable类型的value,返回true
    // 第二次调用nextKeyValue()返回false
    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {

        if (flag) {

            //实例化对象
            if (key==null) {
                key=new Text();
            }

            if (value==null) {
                value=new BytesWritable();
            }

            //赋值
            //将文件名封装到key中
            key.set(filename);

            // 将文件的内容读取到BytesWritable中
            byte [] content=new byte[length];

            IOUtils.readFully(is, content, 0, length);

            value.set(content, 0, length);

            flag=false;

            return true;

        }
        return false;
    }

    //返回当前读取到的key-value中的key
    @Override
    public Object getCurrentKey() throws IOException, InterruptedException {
        return key;
    }

    //返回当前读取到的key-value中的value
    @Override
    public Object getCurrentValue() throws IOException, InterruptedException {
        return value;
    }

    //返回读取切片的进度
    @Override
    public float getProgress() throws IOException, InterruptedException {
        return 0;
    }

    // 在Mapper的输入关闭时调用,清理工作
    @Override
    public void close() throws IOException {
        if (is != null) {
            IOUtils.closeStream(is);
        }   
        if (fs !=null) {
            fs.close();
        }
    }
}

CustomIFMapper.java

public class CustomIFMapper extends Mapper<Text, BytesWritable, Text, BytesWritable>{

}

CustomIFReducer.java

public class CustomIFReducer extends Reducer<Text, BytesWritable, Text, BytesWritable>{

}

CustomIFDriver.java

public class CustomIFDriver {

    public static void main(String[] args) throws Exception {

        Path inputPath=new Path("e:/mrinput/custom");
        Path outputPath=new Path("e:/mroutput/custom");

        //作为整个Job的配置
        Configuration conf = new Configuration();
        //保证输出目录不存在
        FileSystem fs=FileSystem.get(conf);

        if (fs.exists(outputPath)) {
            fs.delete(outputPath, true);
        }

        // 创建Job
        Job job = Job.getInstance(conf);

        // 设置Job运行的Mapper,Reducer类型,Mapper,Reducer输出的key-value类型
        job.setMapperClass(CustomIFMapper.class);
        job.setReducerClass(CustomIFReducer.class);

        // Job需要根据Mapper和Reducer输出的Key-value类型准备序列化器,通过序列化器对输出的key-value进行序列化和反序列化
        // 如果Mapper和Reducer输出的Key-value类型一致,直接设置Job最终的输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(BytesWritable.class);

        // 设置输入目录和输出目录
        FileInputFormat.setInputPaths(job, inputPath);
        FileOutputFormat.setOutputPath(job, outputPath);

        // 设置输入和输出格式
        job.setInputFormatClass(MyInputFormat.class);
        job.setOutputFormatClass(SequenceFileOutputFormat.class);

        // ③运行Job
        job.waitForCompletion(true);

    }
}

文章永久链接:https://tech.souyunku.com/21717

未经允许不得转载:搜云库技术团队 » MapReduce之自定义InputFormat

JetBrains 全家桶,激活、破解、教程

提供 JetBrains 全家桶激活码、注册码、破解补丁下载及详细激活教程,支持 IntelliJ IDEA、PyCharm、WebStorm 等工具的永久激活。无论是破解教程,还是最新激活码,均可免费获得,帮助开发者解决常见激活问题,确保轻松破解并快速使用 JetBrains 软件。获取免费的破解补丁和激活码,快速解决激活难题,全面覆盖 2024/2025 版本!

联系我们联系我们