博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
MapReducer 之基于mapreduce框架编写第一个MR程序
阅读量:2457 次
发布时间:2019-05-10

本文共 5240 字,大约阅读时间需要 17 分钟。

文章目录

需求

统计文件中一个相同单词的数量。

导入依赖

org.apache.logging.log4j
log4j-core
2.8.2
org.apache.hadoop
hadoop-common
2.7.2
org.apache.hadoop
hadoop-client
2.7.2
org.apache.hadoop
hadoop-hdfs
2.7.2
org.apache.hadoop
hadoop-mapreduce-client-core
2.7.2
org.apache.maven.plugins
maven-jar-plugin
2.4
true
lib
com.hdfs.WordCountDriver
org.apache.maven.plugins
maven-compiler-plugin
3.0
1.8
1.8
UTF-8

map业务逻辑

import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/** * 这里是mr程序,mapper阶段业务实现类 * * Mapper
* KEYIN:表示mapper数据输入的时候key的数据类型,在默认的读取数据组件下,叫InputFormat,它的行为是一行一行的读取待处理的数据, * 读取一行,返回一行给mr程序,然后keyin就表示每一行的起始偏移量,这里数据类型是long * VALUEIN:表示mapper数据输入的时候value的数据类型,在默认的读取数据组件下,valuein表示读取的这一行内容,这里数据类型是String * KEYOUT:表示mapper数据输出的时候key的数据类型,这里输出的key是单词,所以数据类型是String * VALUEOUT:表示mapper数据输出的时候value的数据类型,这里输出的value是单词的次数,所以数据类型是Integer * * 上面说的数据类型String,Long,Integer都是JDK自带的类型,在序列化的时候,效率底下,所以Hadoop自己封装了一套数据类型 * Long->LongWritable * String->Text * Integer->IntWritable * null->NullWritable */public class WordCountMapper extends Mapper
{
/** * mapper阶段具体业务逻辑的实现方法,该方法的调用取决于读取数据的组件有没有给mr传入数据 * 如果有的话,每传入一个
对,该方法就会被调用一次 */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 拿到传入进来的一行内容,把数据类型转化为String String line = value.toString(); // 将这一行内容按空格切割成一个单词数组 String[] words = line.split(" "); // 遍历数组,每出现一个单词,就标记一个数字1,
<单词, 1>
for (String word : words) {
// 使用mr程序的上下文context,把mapper阶段处理的数据发送给reduce节点,作为reduce阶段的输入数据 context.write(new Text(word), new IntWritable(1)); } }}

reducer业务逻辑

import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/** * 这里是mr程序,reduce阶段处理实现类 * * KEYIN:输入数据key的类型,对应mapper输出的key类型,这里是单词,类型为:Text * VALUEIN:输入数据value的类型,对应mapper输出的value类型,这里是单词次数,类型为:IntWritable * KEYOUT:reduce阶段处理后数据输出的key类型,这里是单词,类型为:Text * VALUEOUT:reduce阶段处理后数据输出的value类型,这里是单词的总次数,类型为:IntWritable */public class WordCountReducer extends Reducer
{
/** * reduce阶段具体业务的实现方法 * reduce接收所有来自map阶段处理的数据之后,按照key的字典序进行排序 * 按照key是否相同作为一组去调用reduce方法,key就是作为这一组kv对的key,把这一组所有的value作为一个迭代器传入reduce方法 * 例如:mapper输出为:
,reduce的输入就是:
*/ @Override protected void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException { int count = 0; // 遍历一组相同key的value值 for(IntWritable value : values){ count += value.get(); } // reduce阶段的输出 context.write(key, new IntWritable(count)); }}

mr程序运行的main主类

import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/** * mr程序运行的main主类,组装了程序运行时候所需要的信息 * 例如:使用的是哪个Mapper类,哪个是Reducer类,输入数据在哪,输出数据在什么地方 */public class WordCountDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration(); // 表示是本地运行模式,不设置也可以,默认是local conf.set("mapreduce.framework.name", "local"); // 通过Job来封装本次mr的相关信息 Job job = Job.getInstance(conf); // 指定本次mr程序Job任务jar包运行的主类 job.setJarByClass(WordCountDriver.class); // 指定本次mr程序所用的Mapper、Reducer类分别是什么 job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); // 指定本次mr程序mapper阶段输出的k、v类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 指定本次mr程序最终输出的k、v类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 设置reducetask任务的数量,默认是1,一个任务对应一个输出文件 job.setNumReduceTasks(3); // 指定本次mr程序输入数据的路径和最终输出结果存放在什么位置 FileInputFormat.setInputPaths(job, "D:\\input"); FileOutputFormat.setOutputPath(job, new Path("D:\\output")); // 提交程序,并且监控打印程序执行情况 boolean result = job.waitForCompletion(true); System.exit(result ? 0:1); }}

MR程序运行方式

集群运行模式

1、将mr程序提交给yarn集群,分发到很多的节点上并发执行

2、处理的数据和输出结果位于HDFS文件系统
3、提交集群运行的实现步骤:

  • 将程序打成JAR包,然后在集群的任意一个节点上用hadoop命令启动
  • hadoop jar wordcount.jar com.hdfs.WordCountDriver args

本地运行模式

运行main函数即可。

MapReduce的输入和输出

在这里插入图片描述

在这里插入图片描述

转载地址:http://zgdhb.baihongyu.com/

你可能感兴趣的文章
vue.js快速入门_Vue.js快速介绍
查看>>
web前端的发展简史_前端框架简史
查看>>
bug探索_探索一流功能的力量
查看>>
如何避免您的网站在Twitter和Facebook上的可耻外观
查看>>
导入样机_如何开始构建Android应用程序:创建样机,UI和XML布局
查看>>
最先进的深度学习:Mask R-CNN简介
查看>>
如何快速设置您的ES6环境
查看>>
深圳哪个区的it工作多_如何创建虚拟IT工作区
查看>>
组件和高阶组件区别_高阶组件:终极指南
查看>>
如何知道Kubernetes是否适合您的SaaS
查看>>
apollo调试工具_GraphQL工具包Apollo的完整介绍
查看>>
函数编程代码例子_使用函数式编程使代码更易于阅读
查看>>
文件从头开始读函数_这是您可以从头开始编写的一些函数修饰符
查看>>
JavaScript中的pipe()和compose()快速介绍
查看>>
react中的状态机_使用状态机增强您的React
查看>>
ruby 数组删除部分数组_您需要了解的六个Ruby数组方法
查看>>
roro cam_现代JavaScript中的优雅图案:RORO
查看>>
React Router v4简介及其对路由的哲学
查看>>
程序自动化 linux_自动化Windows子系统Linux安装程序
查看>>
react获取api_使用React和WordPress API在您的网站上获取博客
查看>>