版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
1、深入理解与应用Hadoop中的MapReduce现在大数据是越来越火了,而我自己研究这方面也很长时间了,今天就根据我自己的经验教会大家学会如何使用MapReduce,下文中将MapReduce简写为MR。下面将用一个具体的电信业务说明MR最基本的编写过程: 实验所用数据:代码示例:package com.appache.celephone3;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FileSystem
2、;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IOUtils;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.l
3、ib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;public class FlowCount public static String path1 = "hdfs:/hadoop:9000/dir/flowdata.txt" public static String path2 = "hdfs:/hadoop:9000/dirout/" public static void main(String args) throws
4、Exception Configuration conf = new Configuration(); conf.set("","hdfs:/hadoop:9000/"); FileSystem fileSystem = FileSystem.get(conf); if(fileSystem.exists(new Path(path2) fileSystem.delete(new Path(path2), true); Job job = new Job(conf,"FlowCount"); job.se
5、tJarByClass(FlowCount.class); /编写驱动 FileInputFormat.setInputPaths(job, new Path(path1); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); /shuffle洗牌阶段 job.setReducerClass(MyReducer.class);
6、 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setOutputFormatClass(TextOutputFormat.class); FileOutputFormat.setOutputPath(job, new Path(path2); /将任务提交给JobTracker job.waitForCompletion(true); /查看程序的运行结果 FSDataInputStream fr = fileSystem.open(new Path("hdfs:/hadoop
7、:9000/dirout/part-r-00000"); IOUtils.copyBytes(fr,System.out,1024,true); package com.appache.celephone3;import java.io.IOException;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;public class MyMapper extends Mapper<LongWrit
8、able, Text, Text, Text> Override protected void map(LongWritable k1, Text v1,Context context)throws IOException, InterruptedException String line = v1.toString();/拿到日志中的一行数据 String splited = line.split("t");/切分各个字段 /获取我们所需要的字段 String msisdn = splited1; String upFlow = splited8; String d
9、ownFlow = splited9; long flowsum = Long.parseLong(upFlow) + Long.parseLong(downFlow); context.write(new Text(msisdn), new Text(upFlow+"t"+downFlow+"t"+String.valueOf(flowsum); package com.appache.celephone3;import java.io.IOException;import org.apache.hadoop.io.Text;import org.ap
10、ache.hadoop.mapreduce.Reducer;public class MyReducer extends Reducer<Text, Text, Text, Text> Override protected void reduce(Text k2, Iterable<Text> v2s,Context context)throws IOException, InterruptedException long upFlowSum = 0L; long downFlowSum = 0L; long FlowSum = 0L; for(Text v2:v2s)
11、 String splited = v2.toString().split("t"); upFlowSum += Long.parseLong(splited0); downFlowSum += Long.parseLong(splited1); FlowSum += Long.parseLong(splited2); String data = String.valueOf(upFlowSum)+"t"+String.valueOf(downFlowSum)+"t"+String.valueOf(FlowSum); context.
12、write(k2,new Text(data); 具体业务描述:对于上面的电信数据,统计同一个用户的上行总流量和,下行总流量和以及上下总流量和,并且手机号(11位)的信息输出到一个文件中,非手机号(8位)的信息输出到一个文件中 代码示例:package com.appache.partitioner;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;
13、import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;public class
14、FlowCount public static String path1 = "hdfs:/hadoop:9000/dir/flowdata.txt" public static String path2 = "hdfs:/hadoop:9000/dirout/" public static void main(String args) throws Exception Configuration conf = new Configuration(); conf.set("", "hdfs:/h
15、adoop:9000/"); FileSystem fileSystem = FileSystem.get(conf); if(fileSystem.exists(new Path(path2) fileSystem.delete(new Path(path2), true); Job job = new Job(conf,"FlowCount"); job.setJarByClass(FlowCount.class); FileInputFormat.setInputPaths(job, new Path(path1); job.setInputFormatCl
16、ass(TextInputFormat.class);/<k1,v1> job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class);/<k2,v2> /shuffle阶段:分区、排序、分组、本地归并 job.setPartitionerClass(MyPartitioner.class); job.setNumReduceTasks(2); /<k2,v2s> job.setReduce
17、rClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); job.setOutputFormatClass(TextOutputFormat.class); FileOutputFormat.setOutputPath(job, new Path(path2); /提交作业 job.waitForCompletion(true); package com.appache.partitioner;import java.io.DataInput;impo
18、rt java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.Writable;public class FlowBean implements Writable long upFlow ; /上行流量 long downFlow; /下行流量 long flowSum; /总流量 public FlowBean() public FlowBean(String upFlow,String downFlow) this.upFlow = Long.parseLong(upFlow); this.down
19、Flow = Long.parseLong(downFlow); this.flowSum = Long.parseLong(upFlow) + Long.parseLong(downFlow); public long getupFlow() return upFlow; public long getdownFlow() return downFlow; public long getflowSum () return flowSum; Override public void write(DataOutput out) throws IOException out.writeLong(u
20、pFlow); out.writeLong(downFlow); out.writeLong(flowSum); Override public void readFields(DataInput in) throws IOException upFlow = in.readLong(); downFlow = in.readLong(); flowSum = in.readLong(); public String toString() return upFlow+"t"+downFlow+"t"+flowSum; package com.appach
21、e.partitioner;import java.io.IOException;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;public class MyMapper extends Mapper<LongWritable, Text, Text, FlowBean> Override protected void map(LongWritable k1, Text v1,Context con
22、text)throws IOException, InterruptedException String line = v1.toString();/拿到日志中的一行数据 String splited = line.split("t");/切分各个字段 /获取我们所需要的字段 String msisdn = splited1;/手机号 k2 FlowBean flowData = new FlowBean(splited8,splited9);/<100,200> context.write(new Text(msisdn), flowData); packag
23、e com.appache.partitioner;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Partitioner;public class MyPartitioner extends Partitioner<Text,FlowBean> /分区<100,200> Override public int getPartition(Text k2, FlowBean v2, int numPartitions) String tele = k2.toSt
24、ring(); if(tele.length() = 11) return 0; /手机号的信息输出到0区 else return 1; /非手机号的信息输出到1区 package com.appache.partitioner;import java.io.IOException;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;public class MyReducer extends Reducer<Text, FlowBean, Text, FlowBean> Overr
25、ide protected void reduce(Text k2, Iterable<FlowBean> v2s,Context context)throws IOException, InterruptedException long upFlow = 0L; long downFlow = 0L; long flowSum = 0L; for(FlowBean v2: v2s) upFlow += v2.getupFlow(); downFlow += v2.getdownFlow(); flowSum += v2.getflowSum(); context.write(k2
26、, new FlowBean(upFlow+"",downFlow+""); /将数据输出到指定的文件当中 先按照总流量由低到高排序,在总流量相同的情况下,按照下行流量和从低到高排序: 实例代码:package com.appache.sort;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.
27、Path;import org.apache.hadoop.io.IOUtils;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputF
28、ormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;public class FlowCount public static String path1 = "hdfs:/hadoop:9000/flowCount.txt" public static String path2 = "hdfs:/hadoop:9000/dirout/" public static void main(String args) throws Exception Configuration
29、conf = new Configuration(); conf.set("","hdfs:/hadoop:9000/"); FileSystem fileSystem = FileSystem.get(conf); if(fileSystem.exists(new Path(path2) fileSystem.delete(new Path(path2), true); Job job = new Job(conf, "FlowCount"); job.setJarByClass(FlowCount.c
30、lass); /编写驱动 FileInputFormat.setInputPaths(job,new Path(path1); /输入文件的路径 job.setInputFormatClass(TextInputFormat.class);/<k1,v1> job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(NullWritable.class); /shuffle优化阶段 job.setReducerClass(MyRedu
31、cer.class); job.setOutputKeyClass(FlowBean.class); job.setOutputValueClass(NullWritable.class); job.setOutputFormatClass(TextOutputFormat.class); FileOutputFormat.setOutputPath(job,new Path(path2); job.waitForCompletion(true); /查看运行结果: FSDataInputStream fr = fileSystem.open(new Path("hdfs:/hado
32、op:9000/dirout/part-r-00000"); IOUtils.copyBytes(fr,System.out,1024,true); package com.appache.sort;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.WritableComparable;public class FlowBean implements WritableComparable<FlowBean> pr
33、ivate String msisdn; /获取我们所需要的字段 private long upFlow; private long downFlow; private long flowSum; public FlowBean() public FlowBean(String msisdn,String upFlow,String downFlow,String flowSum) this.msisdn = msisdn; this.upFlow = Long.parseLong(upFlow); this.downFlow = Long.parseLong(downFlow); this.
34、flowSum = Long.parseLong(flowSum); /通过构造函数自动求取总流量 public String getMsisdn() return msisdn; public long getUpFlow() return upFlow; public long getDownFlow() return downFlow; public long getFlowSum() return flowSum; Override /所谓序列化就是将对象写到字节输出流当中 public void write(DataOutput out) throws IOException out
35、.writeUTF(msisdn); out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(flowSum); Override /所谓反序列化就是将对象从输入流当中给读取出来 public void readFields(DataInput in) throws IOException this.msisdn = in.readUTF(); this.upFlow = in.readLong(); this.downFlow = in.readLong(); this.flowSum = in.readLong(); Ov
36、erride /指定比较的标准 public int compareTo(FlowBean obj) if(this.flowSum = obj.flowSum) return (int)(obj.downFlow - this.downFlow); /下行流量由高到底 else return (int)(this.flowSum - obj.flowSum); /总流量由低到高 public String toString() return this.msisdn+"t"+this.upFlow+"t"+this.downFlow+"t&qu
37、ot;+this.flowSum; package com.appache.sort;import java.io.IOException;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;public class MyMapper extends Mapper<LongWritable, Text, FlowBean, Nul
38、lWritable> Override protected void map(LongWritable k1, Text v1,Context context)throws IOException, InterruptedException /拿到日志中的一行数据 String line = v1.toString(); /切分各个字段 String splited = line.split("t"); /获取我们所需要的字段-并用FlowBean存储我们所需要的字段 FlowBean flowdata = new FlowBean(splited0,splited1
39、,splited2,splited3); context.write(flowdata, NullWritable.get(); /<100,200,null> package com.appache.sort;import java.io.IOException;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Reducer;public class MyReducer extends Reducer<FlowBean, NullWritable,
40、 FlowBean, NullWritable> Override protected void reduce(FlowBean k2, Iterable<NullWritable> v2s,Context context)throws IOException, InterruptedException for(NullWritable v2:v2s) context.write(k2, NullWritable.get(); 具体业务描述:对于上面的电信数据,统计同一个用户的上行总流量和,下行总流量和以及上下总流量和,代码中要求加入本地归并优化方式: 代码示例:packag
41、e com.appache.celephone3;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IOUtils;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;impor
42、t org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;public class FlowCount public static String path1 = "
43、;hdfs:/hadoop:9000/dir/flowdata.txt" public static String path2 = "hdfs:/hadoop:9000/dirout/" public static void main(String args) throws Exception Configuration conf = new Configuration(); conf.set("","hdfs:/hadoop:9000/"); FileSystem fileSystem = F
44、ileSystem.get(conf); if(fileSystem.exists(new Path(path2) fileSystem.delete(new Path(path2), true); Job job = new Job(conf,"FlowCount"); job.setJarByClass(FlowCount.class); /编写驱动 FileInputFormat.setInputPaths(job, new Path(path1); job.setInputFormatClass(TextInputFormat.class); job.setMapp
45、erClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); /加入本地归并优化方式: job.setCombinerClass(MyReducer.class); job.setNumReduceTasks(2); /shuffle洗牌阶段 job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setOutputFormatClass(TextOutputFormat.class); FileOutputFormat.setOutputPath(job, new Path(path2); /将任务提交给JobTracker job.waitForCompletion(true); /查看程序的运行结果 FSDataInputStream fr = fileSystem.open(new Path("hdfs:/hadoop
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 阑尾炎术后尿潴留的护理干预
- 2024-2025学年冶金工业技能鉴定模拟试题附答案详解(完整版)
- 心力衰竭患者的日常护理要点
- 2025年广东深圳南山外国语学校初三6月质量监测道法试题含答案
- 骨科护理中的跨学科合作新模式
- 2026年湘西州公安局招聘警务辅助人员笔试试题(含答案)
- 安徽省部分学校2026届高三3月联考 历史(二)试卷(含答案详解)
- 2024-2025学年度冶金工业技能鉴定练习题附答案详解【预热题】
- 2024-2025学年度中级软考通关题库含完整答案详解【夺冠】
- 2024-2025学年度注册公用设备工程师高分题库含答案详解(新)
- 《小区供电系统设计中电气设备的选择案例分析综述》1900字
- DBJ41-T 087-2017 建设工程造价电子数据标准
- 《收益法在无形资产价值评估中的应用案例分析:以M生物公司为例》8900字(论文)
- 高速铁路接触网设备运行与维护课件:接触网支撑定位装置
- 膈下脓肿护理查房
- 《形象塑造》课件
- 渠道开发与维护课件
- 养老行业从业人员健康管理制度
- Unit 3 On the Move单词讲解 课件高中英语外研版(2019)必修第二册
- 养鹅专业技术工作总结报告
- 20S121生活热水加热机组(热水机组选用与安装)
评论
0/150
提交评论