专注于 JetBrains IDEA 全家桶,永久激活,教程
持续更新 PyCharm,IDEA,WebStorm,PhpStorm,DataGrip,RubyMine,CLion,AppCode 永久激活教程

Hadoop系列-02、MapRedurce之WordCount

1、WorkCount代码


import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.util.StringTokenizer; public class WordCountApp { /** * map类,实现map函数 * * Mapper * * KEYIN 即K1 表示每一行的起始位置(偏移量offset) * * VALUEIN 即v1 表示每一行的文本内容 * * KEYOUT 即k2 表示每一行中的每个单词 * * VALUEOUT 即v2 表示每一行中的每个单词的出现次数,固定值1 */ public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ //暂存每个传过来的词频计数,均为1,省掉重复申请空间 private final static IntWritable one = new IntWritable(1); //暂存每个传过来的词的值,省掉重复申请空间 private Text word = new Text(); //核心map方法的具体实现,逐个<key,value>对去处理 @Override public void map(Object key, Text value, Context context) throws IOException, InterruptedException { //用每行的字符串值初始化StringTokenizer StringTokenizer itr = new StringTokenizer(value.toString()); //循环取得每个空白符分隔出来的每个元素 while (itr.hasMoreTokens()) { //将取得出的每个元素放到word Text对象中 word.set(itr.nextToken()); //通过context对象,将map的输出逐个输出 context.write(word, one); } } } /** * reduce类,实现reduce函数 * * KEYIN 即k2 表示每一行中的每个单词 * * VALUEIN 即v2 表示每一行中每个单词出现次数,固定值1 * * KEYOUT 即k3 表示整个文件中的不同单词 * * VALUEOUT 即v3 表示整个文件中的不同单词的出现总次数 */ public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); //核心reduce方法的具体实现,逐个<key,List(v1,v2)>去处理 @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { //暂存每个key组中计算总和 int sum = 0; //加强型for,依次获取迭代器中的每个元素值,即为一个一个的词频数值 for (IntWritable val : values) { //将key组中的每个词频数值sum到一起 sum += val.get(); } //将该key组sum完成的值放到result IntWritable中,使可以序列化输出 result.set(sum); //将计算结果逐条输出 context.write(key, result); } } /** * 启动mr的driver方法 */ public static void main(String[] args) throws Exception { //得到集群配置参数 Configuration conf = new Configuration(); //设置到本次的job实例中 Job job = Job.getInstance(conf, WordCountApp.class.getSimpleName()); //指定本次执行的主类是WordCount job.setJarByClass(WordCountApp.class); //指定map类 job.setMapperClass(TokenizerMapper.class); //指定combiner类,要么不指定,如果指定,一般与reducer类相同 job.setCombinerClass(IntSumReducer.class); //指定reducer类 job.setReducerClass(IntSumReducer.class); //指定job输出的key和value的类型,如果map和reduce输出类型不完全相同,需要重新设置map的output的key和value的class类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //指定输入数据的路径 FileInputFormat.addInputPath(job, new Path(args[0])); //指定输出路径,并要求该输出路径一定是不存在的 FileOutputFormat.setOutputPath(job, new Path(args[1])); //指定job执行模式,等待任务执行完成后,提交任务的客户端才会退出! System.exit(job.waitForCompletion(true) ? 0 : 1); } }

Pom文件


<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <build> <pluginManagement> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <configuration> <archive> <manifest> <mainClass>com.hadoop.demo.WordCountApp</mainClass> <addClasspath>true</addClasspath> <classpathPrefix>lib/</classpathPrefix> </manifest> </archive> <classesDirectory> </classesDirectory> </configuration> </plugin> </plugins> </pluginManagement> </build> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.6.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.6.0</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> </dependencies>

最后生成Jar包 word-count-1.0-SNAPSHOT.jar

2、需要统计单词的原文


$ mkdir -p data/source $ vim wc1 # ---- 以下是原文,随意复制的一段英文---- import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.util.StringTokenizer; $ mkdir -p data/apps # 将word-count-1.0-SNAPSHOT.jar 上传么data/apps

3、通过hadoop 执行mapredurce


# 上传原文件到HDFS $ bin/hdfs dfs -put data/source/wc1 / # 执行Jar包 $ bin/hadoop jar data/apps/word-count-***.jar /wc1 /out # 输出执行进度--> 20/07/23 01:33:55 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032 20/07/23 01:33:55 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 20/07/23 01:33:55 INFO input.FileInputFormat: Total input paths to process : 1 20/07/23 01:33:56 INFO mapreduce.JobSubmitter: number of splits:1 20/07/23 01:33:56 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1595434593584_0001 20/07/23 01:33:56 INFO impl.YarnClientImpl: Submitted application application_1595434593584_0001 20/07/23 01:33:56 INFO mapreduce.Job: The url to track the job: http://localhost:8088/proxy/application_1595434593584_0001/ 20/07/23 01:33:56 INFO mapreduce.Job: Running job: job_1595434593584_0001 20/07/23 01:34:06 INFO mapreduce.Job: Job job_1595434593584_0001 running in uber mode : false 20/07/23 01:34:06 INFO mapreduce.Job: map 0% reduce 0% 20/07/23 01:34:12 INFO mapreduce.Job: map 100% reduce 0% 20/07/23 01:34:17 INFO mapreduce.Job: map 100% reduce 100% 20/07/23 01:34:18 INFO mapreduce.Job: Job job_1595434593584_0001 completed successfully # 将结果导出到本地 $ bin/hadoop fs -get /out /opt/data # 查看 $ ll /opt/data total 4 -rw-r--r--. 1 root root 372 Jul 23 01:52 part-r-00000 -rw-r--r--. 1 root root 0 Jul 23 01:52 _SUCCESS

执行结果:

66_1.png

4、将任务交给yarn进行调度


# 上传原文件到HDFS 略(同上) # yarn 执行任务 $ yarn jar data/apps/word-count-***.jar /wc1 /out # 导出结果,查看 略(同上)

在Yarn中查看执行状态

66_2.png

MapRedurce工作原理,参考:

https://blog.csdn.net/mucaoyx/article/details/82078226

https://tech.souyunku.com/mingyueguli/p/10368427.html

https://tech.souyunku.com/zsql/p/11636112.html

文章永久链接:https://tech.souyunku.com/23562

未经允许不得转载:搜云库技术团队 » Hadoop系列-02、MapRedurce之WordCount

JetBrains 全家桶,激活、破解、教程

提供 JetBrains 全家桶激活码、注册码、破解补丁下载及详细激活教程,支持 IntelliJ IDEA、PyCharm、WebStorm 等工具的永久激活。无论是破解教程,还是最新激活码,均可免费获得,帮助开发者解决常见激活问题,确保轻松破解并快速使用 JetBrains 软件。获取免费的破解补丁和激活码,快速解决激活难题,全面覆盖 2024/2025 版本!

联系我们联系我们