本文共 5240 字,大约阅读时间需要 17 分钟。
统计文件中一个相同单词的数量。
org.apache.logging.log4j log4j-core 2.8.2 org.apache.hadoop hadoop-common 2.7.2 org.apache.hadoop hadoop-client 2.7.2 org.apache.hadoop hadoop-hdfs 2.7.2 org.apache.hadoop hadoop-mapreduce-client-core 2.7.2 org.apache.maven.plugins maven-jar-plugin 2.4 true lib com.hdfs.WordCountDriver org.apache.maven.plugins maven-compiler-plugin 3.0 1.8 1.8 UTF-8
import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/** * 这里是mr程序,mapper阶段业务实现类 * * Mapper* KEYIN:表示mapper数据输入的时候key的数据类型,在默认的读取数据组件下,叫InputFormat,它的行为是一行一行的读取待处理的数据, * 读取一行,返回一行给mr程序,然后keyin就表示每一行的起始偏移量,这里数据类型是long * VALUEIN:表示mapper数据输入的时候value的数据类型,在默认的读取数据组件下,valuein表示读取的这一行内容,这里数据类型是String * KEYOUT:表示mapper数据输出的时候key的数据类型,这里输出的key是单词,所以数据类型是String * VALUEOUT:表示mapper数据输出的时候value的数据类型,这里输出的value是单词的次数,所以数据类型是Integer * * 上面说的数据类型String,Long,Integer都是JDK自带的类型,在序列化的时候,效率底下,所以Hadoop自己封装了一套数据类型 * Long->LongWritable * String->Text * Integer->IntWritable * null->NullWritable */public class WordCountMapper extends Mapper { /** * mapper阶段具体业务逻辑的实现方法,该方法的调用取决于读取数据的组件有没有给mr传入数据 * 如果有的话,每传入一个 对,该方法就会被调用一次 */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 拿到传入进来的一行内容,把数据类型转化为String String line = value.toString(); // 将这一行内容按空格切割成一个单词数组 String[] words = line.split(" "); // 遍历数组,每出现一个单词,就标记一个数字1, <单词, 1> for (String word : words) { // 使用mr程序的上下文context,把mapper阶段处理的数据发送给reduce节点,作为reduce阶段的输入数据 context.write(new Text(word), new IntWritable(1)); } }} 单词,>
import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/** * 这里是mr程序,reduce阶段处理实现类 * * KEYIN:输入数据key的类型,对应mapper输出的key类型,这里是单词,类型为:Text * VALUEIN:输入数据value的类型,对应mapper输出的value类型,这里是单词次数,类型为:IntWritable * KEYOUT:reduce阶段处理后数据输出的key类型,这里是单词,类型为:Text * VALUEOUT:reduce阶段处理后数据输出的value类型,这里是单词的总次数,类型为:IntWritable */public class WordCountReducer extends Reducer{ /** * reduce阶段具体业务的实现方法 * reduce接收所有来自map阶段处理的数据之后,按照key的字典序进行排序 * 按照key是否相同作为一组去调用reduce方法,key就是作为这一组kv对的key,把这一组所有的value作为一个迭代器传入reduce方法 * 例如:mapper输出为: , , ,reduce的输入就是: , */ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int count = 0; // 遍历一组相同key的value值 for(IntWritable value : values){ count += value.get(); } // reduce阶段的输出 context.write(key, new IntWritable(count)); }}
import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/** * mr程序运行的main主类,组装了程序运行时候所需要的信息 * 例如:使用的是哪个Mapper类,哪个是Reducer类,输入数据在哪,输出数据在什么地方 */public class WordCountDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); // 表示是本地运行模式,不设置也可以,默认是local conf.set("mapreduce.framework.name", "local"); // 通过Job来封装本次mr的相关信息 Job job = Job.getInstance(conf); // 指定本次mr程序Job任务jar包运行的主类 job.setJarByClass(WordCountDriver.class); // 指定本次mr程序所用的Mapper、Reducer类分别是什么 job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); // 指定本次mr程序mapper阶段输出的k、v类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 指定本次mr程序最终输出的k、v类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 设置reducetask任务的数量,默认是1,一个任务对应一个输出文件 job.setNumReduceTasks(3); // 指定本次mr程序输入数据的路径和最终输出结果存放在什么位置 FileInputFormat.setInputPaths(job, "D:\\input"); FileOutputFormat.setOutputPath(job, new Path("D:\\output")); // 提交程序,并且监控打印程序执行情况 boolean result = job.waitForCompletion(true); System.exit(result ? 0:1); }}
1、将mr程序提交给yarn集群,分发到很多的节点上并发执行
2、处理的数据和输出结果位于HDFS文件系统 3、提交集群运行的实现步骤:运行main函数即可。
转载地址:http://zgdhb.baihongyu.com/