专注于 JetBrains IDEA 全家桶,永久激活,教程
持续更新 PyCharm,IDEA,WebStorm,PhpStorm,DataGrip,RubyMine,CLion,AppCode 永久激活教程

使用MapReduce运行WordCount案例

@

目录

  • 一、准备数据
  • 二、MR的编程规范
  • 三、编程步骤
  • 四、编写程序
    • Mapper程序解读

一、准备数据

注意:准备的数据的格式必须是文本,每个单词之间使用制表符分割。编码必须是utf-8无bom
113_1.png

二、MR的编程规范

MR的编程只需要将自定义的组件和系统默认组件进行组合,组合之后运行即可!

三、编程步骤

①Map阶段的核心处理逻辑需要编写在Mapper
②Reduce阶段的核心处理逻辑需要编写在Reducer
③将编写的Mapper和Reducer进行组合,组合成一个Job
④对Job进行设置,设置后运行

四、编写程序

WCMapper.java

public class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable>{

    private Text out_key=new Text();
    private IntWritable out_value=new IntWritable(1);//每个单词出现一次记为1

    // 针对输入的每个 keyin-valuein调用一次   (0,hello hi  hello   hi)
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws Exception {

        System.out.println("keyin:"+key+"----keyout:"+value);

        String[] words = value.toString().split("\t");

        for (String word : words) {
            out_key.set(word);
            //写出数据(单词,1)
            context.write(out_key, out_value);
        }

    }
}

Mapper程序解读

1、 导包时,需注意导入 org.apache.hadoop.mapreduce包下的类(hadoop2.0的新api)
2、 自定义的类必须符合MR的Mapper的规范
3、 在MR中,只能处理key-value格式的数据
KEYIN, VALUEIN: mapper输入的k-v类型,由当前Job的InputFormat的RecordReader决定!封装输入的key-value由RecordReader自动进行,不可自定义。
KEYOUT, VALUEOUT: mapper输出的k-v类型,可自定义
4、 InputFormat的作用:
①验证输入目录中的文件格式,是否符合当前Job的要求
②生成切片,每个切片都会交给一个MapTask处理
③提供RecordReader,由RR从切片中读取记录,交给Mapper进行处理

方法: List<InputSplit> getSplits: 切片
RecordReader<K,V> createRecordReader: 创建RecordReader

默认hadoop使用的是TextInputFormat,TextInputFormat使用LineRecordReader

1、 在Hadoop中,如果有Reduce阶段。通常key-value都需要实现序列化协议!
MapTask处理后的key-value,只是一个阶段性的结果!
这些key-value需要传输到ReduceTask所在的机器!
将一个对象通过序列化技术,序列化到一个文件中,经过网络传输到另外一台机器,
再使用反序列化技术,从文件中读取数据,还原为对象是最快捷的方式!

java的序列化协议: Serializable
特点:不仅保存对象的属性值,类型,还会保存大量的包的结构,子父类和接口的继承信息,很笨重。
hadoop开发了一款轻量级的序列化协议: Writable机制!

WCReducer.java

/* 
 *KEYIN, VALUEIN: Mapper输出的keyout-valueout
 *KEYOUT, VALUEOUT: 自定义     
 */     
public class WCReducer extends Reducer<Text, IntWritable, Text, IntWritable>{

    private IntWritable out_value=new IntWritable();

    // reduce一次处理一组数据,key相同的视为一组
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws Exception {

        int sum=0;

        for (IntWritable intWritable : values) {
            sum += intWritable.get();
        }

        out_value.set(sum);

        //将累加的值写出
        context.write(key, out_value);

    }
}

WCDriver.java

/*
 * 1.启动这个线程,运行Job
 * 
 * 2.本地模式主要用于测试程序是否正确!  
 */
public class WCDriver {

    public static void main(String[] args) throws Exception {

        Path inputPath=new Path("e:/input/wordcount");
        Path outputPath=new Path("e:/output/wordcount");//保证输出目录不存在

        //作为整个Job的配置
        Configuration conf = new Configuration();//空参表示默认使用本地的文件系统

        //使用HDFS,分布式文件系统
        /*
        Path inputPath=new Path("/wordcount");
        Path outputPath=new Path("/mroutput/wordcount");

        conf.set("fs.defaultFS", "hdfs://hadoop101:9000");

        conf.set("mapreduce.framework.name", "yarn");// 在YARN上运行

        conf.set("yarn.resourcemanager.hostname", "hadoop102");// RM所在的机器
        */

        //一定要保证输出目录不存在
        FileSystem fs=FileSystem.get(conf);

        if (fs.exists(outputPath)) {
            fs.delete(outputPath, true);
        }

        // ①创建Job
        Job job = Job.getInstance(conf);

        //job.setJar("MapReduce-0.0.1-SNAPSHOT.jar");// 告诉NM运行时,MR中Job所在的Jar包在哪里

        // 将某个类所在地jar包作为job的jar包
        job.setJarByClass(WCDriver.class);

        // 为Job创建一个名字
        job.setJobName("wordcount");

        // ②设置Job
        // 设置Job运行的Mapper,Reducer类型,Mapper,Reducer输出的key-value类型
        job.setMapperClass(WCMapper.class);
        job.setReducerClass(WCReducer.class);

        // Job需要根据Mapper和Reducer输出的Key-value类型准备序列化器,通过序列化器对输出的key-value进行序列化和反序列化
        // 如果Mapper和Reducer输出的Key-value类型一致,直接设置Job最终的输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 设置输入目录和输出目录
        FileInputFormat.setInputPaths(job, inputPath);
        FileOutputFormat.setOutputPath(job, outputPath);

        // ③运行Job
        job.waitForCompletion(true);

    }
}

注意:
若要在yarn上运行,需将这三个程序打成jar包,然后放在集群某台机器上,使用hadoop jar命令运行

hadoop jar jar包名 主类名(WCDriver)

文章永久链接:https://tech.souyunku.com/19372

未经允许不得转载:搜云库技术团队 » 使用MapReduce运行WordCount案例

JetBrains 全家桶,激活、破解、教程

提供 JetBrains 全家桶激活码、注册码、破解补丁下载及详细激活教程,支持 IntelliJ IDEA、PyCharm、WebStorm 等工具的永久激活。无论是破解教程,还是最新激活码,均可免费获得,帮助开发者解决常见激活问题,确保轻松破解并快速使用 JetBrains 软件。获取免费的破解补丁和激活码,快速解决激活难题,全面覆盖 2024/2025 版本!

联系我们联系我们