专注于 JetBrains IDEA 全家桶,永久激活,教程
持续更新 PyCharm,IDEA,WebStorm,PhpStorm,DataGrip,RubyMine,CLion,AppCode 永久激活教程

使用MapReduce运行自定义bean案例

如果一个文件的内容不只是简单的单词,而是类似于一个对象那般,有多种属性值,如:

112_1.png
在这个文件中,每一行的内容分别代表:手机号、IP、访问网站、上行流量、下行流量、状态码,现在需要统计每个手机号访问网站的上行流量、下行流量以及它们的总和。由于mapper按照每行进行切片,不妨创建一个bean,封装这些属性。

FlowBean.java

public class FlowBean implements Writable{

    private long upFlow;//上行流量
    private long downFlow;//下行流量
    private long sumFlow;//流量总和

    public FlowBean() {

    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }

    // 序列化   在写出属性时,如果为引用数据类型,属性不能为null
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }

    //反序列化   序列化和反序列化的顺序要一致
    @Override
    public void readFields(DataInput in) throws IOException {
        upFlow=in.readLong();
        downFlow=in.readLong();
        sumFlow=in.readLong();
    }

    @Override
    public String toString() {
        return  upFlow + "\t" + downFlow + "\t" + sumFlow;
    }

}

FlowBeanMapper.java

/*
 * 1. 统计手机号(String)的上行(long,int),下行(long,int),总流量(long,int)
 * 
 * 手机号为key,Bean{上行(long,int),下行(long,int),总流量(long,int)}为value
 */
public class FlowBeanMapper extends Mapper<LongWritable, Text, Text, FlowBean>{

    private Text out_key=new Text();
    private FlowBean out_value=new FlowBean();

    // (0,1 13736230513 192.196.100.1   www.baidu.com   2481    24681   200)
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context)
            throws IOException, InterruptedException {

        String[] words = value.toString().split("\t");

        //封装手机号
        out_key.set(words[1]);
        // 封装上行
        out_value.setUpFlow(Long.parseLong(words[words.length-3]));
        // 封装下行
        out_value.setDownFlow(Long.parseLong(words[words.length-2]));
        //写出
        context.write(out_key, out_value);

    }

}

FlowBeanReducer.java

public class FlowBeanReducer extends Reducer<Text, FlowBean, Text, FlowBean>{

    private FlowBean out_value=new FlowBean();

    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context)
            throws IOException, InterruptedException {

        long sumUpFlow=0;
        long sumDownFlow=0;

        for (FlowBean flowBean : values) {

            sumUpFlow+=flowBean.getUpFlow();
            sumDownFlow+=flowBean.getDownFlow();

        }

        out_value.setUpFlow(sumUpFlow);
        out_value.setDownFlow(sumDownFlow);
        out_value.setSumFlow(sumDownFlow+sumUpFlow);

        context.write(key, out_value);

    }

}

FlowBeanDriver.java

public class FlowBeanDriver {

    public static void main(String[] args) throws Exception {

        Path inputPath=new Path("e:/mrinput/flowbean");
        Path outputPath=new Path("e:/mroutput/flowbean");

        //作为整个Job的配置
        Configuration conf = new Configuration();

        FileSystem fs=FileSystem.get(conf);

        if (fs.exists(outputPath)) {        
            fs.delete(outputPath, true);//保证输出目录不存在
        }

        // ①创建Job
        Job job = Job.getInstance(conf);

        // ②设置Job
        // 设置Job运行的Mapper,Reducer类型,Mapper,Reducer输出的key-value类型
        job.setMapperClass(FlowBeanMapper.class);
        job.setReducerClass(FlowBeanReducer.class);

        // Job需要根据Mapper和Reducer输出的Key-value类型准备序列化器,通过序列化器对输出的key-value进行序列化和反序列化
        // 如果Mapper和Reducer输出的Key-value类型一致,直接设置Job最终的输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        // 设置输入目录和输出目录
        FileInputFormat.setInputPaths(job, inputPath);
        FileOutputFormat.setOutputPath(job, outputPath);

        // ③运行Job
        job.waitForCompletion(true);
    }

}

统计结果:
112_2.png

文章永久链接:https://tech.souyunku.com/21731

未经允许不得转载:搜云库技术团队 » 使用MapReduce运行自定义bean案例

JetBrains 全家桶,激活、破解、教程

提供 JetBrains 全家桶激活码、注册码、破解补丁下载及详细激活教程,支持 IntelliJ IDEA、PyCharm、WebStorm 等工具的永久激活。无论是破解教程,还是最新激活码,均可免费获得,帮助开发者解决常见激活问题,确保轻松破解并快速使用 JetBrains 软件。获取免费的破解补丁和激活码,快速解决激活难题,全面覆盖 2024/2025 版本!

联系我们联系我们