版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
1、大数据技术基础培训1Map/Reduce 技术培训2MapReduce描述并行处理大数据的分布式框架适合对海量数据进行分析和处理框架提供并行化机制框架提供节点失效处理机制框架提供状态监控机制使用Java编写34典型集群部署5MapReduce框架流程InputFormat 阶段InputFormat决定输入数据如何被切分供Map任务使用InputFormat将输入数据划分成一系列的InputSplit每个Map任务处理一个InputSplitInputSplit还包含存放这个数据块的机器列表提供RecordReader读取InputSplit,并且构造key-value对传递给Map任务控制数
2、据如何被解压缩将数据转换成MapReduce能够处理的Java类型6Map阶段Map任务可以独立地处理数据集通常用来对数据进行过滤、转换处理7Shuffle阶段对Map任务的输出进行Partition、Sort、Spill以及mergeReducer获取处理过的Map输出,Merge后进行Reduce操作8Reduce阶段扩展了MapReduceBase类实现了Reducer接口接受来自多个Map任务的输出将key/value对按照key进行排序Reduce方法通常会遍历每个key对应的所有value9MapReduce理论基础10MapReduce简单示例: Word CountWord C
3、ountInput: large number of text documentsTask: count the occurrence of each word across all the document11Input“to be or not to be”“it will be make or break”“to”, 1“be”, 1“or”, 1“not”, 1“to”, 1“be”, 1Map“it”, 1“will”, 1“be”, 1“make”, 1“or”, 1“break”, 1Partition“be”, 1“not”, 1“be”, 1“to”, 1“or”, 1“to
4、”, 1“it”, 1“be”, 1“break”, 1“will”, 1“make”, 1“or”, 1Shuffle“be”, 1“not”, 1“be”, 1“it”, 1“be”, 1“break”, 1“to”, 1“or”, 1“to”, 1“will”, 1“make”, 1“or”, 1Sort“be”, 1“be”, 1“be”, 1“break”, 1“it”, 1“not”, 1“make”, 1“or”, 1“or”, 1“to”, 1“to”, 1“will”, 1Reduce“be”, 3“break”, 1“it”, 1“not”, 1“make”, 1“or”,
5、 2“to”, 2“will”, 1并行化执行12JobTracker & TaskTracker13JobTrackerMapReduce框架中的任务调度器,类似于”Master”的角色资源管理管理TaskTracker为任务分配可用的资源(Task Slot)任务生命周期管理任务提交分配Task并执行处理失败的Task任务完成通过http:/:50030监控任务执行状态可以查看Map Task和Reduce Task执行状态可以查看Task的stdout和stderr的日志信息14TaskTracker是一个Daemon进程与JobTracker进行通信接受任务分配更新Task状态管理本地
6、单独Task的执行,包括Map任务和Reduce任务TaskTracker可以配置Map Slot和Reduce Slot的数量每个Task占用一个Slot每个Task是一个独立的进程TaskTracker启动Task并且监控Task状态15任务提交和执行过程16NameNodeJobTrackerDataNodeServerTaskTrackerTaskTaskTaskClientSubmitjobconfStagingJob related info including jarLibStreaming libSend config and jarsRequest location of d
7、ataObtain location of dataSubmit jobSend task to TaskTrackerRequest for job related infoSend job related infoRun task Report about task123456789MapReduce失效处理TaskTracker失效通过心跳机制,可以被检测到失效需要重新执行已完成的、正在执行的Map任务需要重新执行正在执行的Reduce任务通过JobTracker实现失效处理JobTracker失效如果配置了高可用性,未完成的任务会在备份JobTracker启动之后重新提交17MapRe
8、duce编程18MapReduce编程模型MapReduce使用Java编写输入和输出Key/Value对的集合实现Map和Reduce方法map (in_key, in_value) - list(out_key, intermediate_value)处理键值对的输入生成中间结果,也是键值对reduce (out_key, list(intermediate_value) - list(out_value)将相同Key的中间值合并生成最终结果(通常每个Key对应一个结果)19map(/ Do some work here.collect( ) );reduce(.collect( / Do
9、 some work here); )MapValueOutputCollectorReporter:OutputCollectorKeyValueIterator (value)ReporterValueKey:OutputCollectorMapOutputCollectorMap() 和 Reduce()例子:WordCountpublic void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException StringTokenizer tokenize
10、r = new StringTokenizer(value.toString();while (tokenizer.hasMoreTokens() word.set(tokenizer.nextToken(); output.collect(word, one); public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException int sum = 0; while (values.hasNext() sum += values.next().g
11、et(); output.collect(key, new IntWritable(sum); 21WritableComparable接口Key Value需要被序列化实现WritableComparable接口WritableComparable extends Writable, ComparableWritable接口write(DataOutput out)read(DataInput in)Comparable接口int compareTo(T o)Key使用compareTo方法来排序22Hadoop DatatypesHadoop提供Java 基本数据类型的封装,并且提供get
12、(), set() 和 toString()方法还提供了hashCode() 和 equals()方法提供的类型包括:ByteWritableIntWritableLongWritableFloatWritableDoubleWritableBooleanWritableNullWritable(不支持set()IntPairWritable23Hadoop Datatypes 实例TextWritable for a Java StringmyTxtWritable.set(stringToInsert);myTxtWritable.toString();ArrayWritable for
13、a Java ArraymyAW.set(Writable)myAW.get() returns the WritablemyAW.toArray() returns a Java ObjectMapWritable for a Java Hash MapmyMapW.put(K,V)myMapW.get(K)BytesWritable for a Java array of bytesmyBW.set(byte, offset, len)myBW.get() returns byte24Input Format25Input SplitInputSplit包含Map任务需要处理的数据信息Jo
14、bClient负责计算和划分Split大部分压缩文件无法SplitMapReduce能够自动解压文件Indexed LZO,BZip2格式的压缩文件可以被splithadoop jar hadoop-lzo.jar pression.lzo.LzoIndexer yourfile.lzo26MapReduce InputFormatInputFormat控制InputSplit的划分提供RecordReader,从InputSplit中读取并构造key-value对供Map任务使用一般会 按照HDFS文件的Block每个InputSplit包含一个HDFS Block64G的文件,blocks
15、ize为64M,默认会有1024个InputSlit使用较大的Block Size具有以下好处提高单个Map任务处理数据量减少Split数量,可以减少总的Map任务的启动停止开销可以通过设置SplitSize来控制Mapper的数量max(minSplitSize, min(BlockSize, maxSplitSize)FileInputFormatsetInputPaths(JobConf conf, Path. inputPaths) addInputPaths(JobConf conf, String commaSeparatedPaths) 27小文件处理问题Hadoop用来处理海量
16、文件(百万),但是小文件意味着更多的文件(千万级)小文件越多,MapReduce任务中需要的Mapper数就越多默认每个Mapper会处理文件的一个Block或者整个文件(文件大小小于BlockSize)大量的Mapper需要使用更多的启动/停止开销生成的大量的日志小文件大量的小文件会使得HDFS Metadata变大增加Namenode的负载采用CombineFileInputFormat每个InputSplit包含多个File Block通过设置SplitSize控制Mapper数量28三种常用的InputFormatTextInputFormat(默认)每次处理一行Key是LongWri
17、table, Value是TextKeyValueInputFormat每次处理一行Key和Value之间存在分隔符Key是Text, Value也是Text必须显式指定分隔符SequenceFileInputFormat使用二进制格式存储的Key/Value对支持压缩,可以只对Value压缩、也可以对Block压缩是Splitable的2930TextInputFormat 实现Now isthe timeYour MapCodek=0v= “Now is”k=7v= “the time”1231TextInputFormat细节 HDFSNow isthe timeforNode all
18、goodmen toNode K = offset = 64MB-3v=“for all good”32Record ReaderKV KV KV KV KV KV KV KV 64MBRecord ReaderMapper 1Record ReaderMapper 2ignore64MB33KeyValueInputFormatYour input file in HDFSJoeSmith AnnSingh Key: “Ann” value: “Singh”YourMapCode12Key: “Joe” value: “Smith”自定义InputFormat实现InputFormat 接口
19、InputSplits getSplits()RecordReader getRecordReader()如果使用文件作为输入,继承FileInputFormat大部分会使用FileInputFormat的getSplits方法重新定义自己的RecordReaderK createKey()V createValue()boolean next(K key, V value)34Output Format35Output Format所有的输出文件缺省会写入同一个目录默认输出文件的名字是”part-”加5位数字第一个输出文件是part-00000输出文件数量与Reduce数相同如果没有Redu
20、ce任务,输出文件数与Mapper数量相同FileOutputFormatsetOutputPath(JobConf conf, Path outputDir)TextOutputFormat (default)每个Key Value对是一行key TAB value SequenceFileOutputFormat与SequenceFileInputFormat配合使用36MapReduce 输出压缩Job输出结果压缩conf.setBoolean(press, true);conf.setClass(pression.codec, SnappyCodec.class, Compressio
21、nCodec.class);Map任务输出结果压缩conf.setCompressMapOutput(true); conf.setMapOutputCompressorClass(SnappyCodec.class);37任务创建、提交38JobClient与Jobtracker交互的接口检查任务的Input和Output拷贝Job的jar和配置信息到MapReduce框架提交任务监控任务状态39JobClient提交任务通过JobClient提供的静态方法runJob()方法,会创建JobClient的实例,并调用submitJob()方法runJob()方法会每秒钟去获取Job的状态在J
22、ob完成之前,程序会被阻塞在状态获取也可以直接调用JobClient的submitJob程序在任务提交之后不会被阻塞40JobConf配置Job的参数常用的参数jobconf.setJobName(“AnyName”);jobconf.setMapperClass(myMapper.class);jobconf.setCombinerClass(myReducer.class);jobconf.setReducerClass(myReducer.class);jobconf.setOutputKeyClass(Text.class); / for entire jobjobconf.setOu
23、tputValueClass(Text.class); / for entire jobjobconf.setMapOutputKeyClass(Text.class); / for just map()jobconf.setMapOutputValueClass(IntWritable.class); / for just map()jobconf.setNumReduceTasks(1); jobconf.setJarByClass(WordCount.class);41JobConf自定义参数/Job创建JobConf conf = new JobConf(); conf.set(“My
24、Property”, “MyValue”);JobClient.runJob(conf);/Mapper/Reducer中使用public void setup(Context context) throws IOException, InterruptedException Configuration conf = context.getConfiguration(); String myValue = conf.get(“MyProperty”); /etc 42main函数的编写在main函数中直接构造JobConf,并通过JobClient提交通过Tool和 ToolRunner实现T
25、ool接口 int run(String args)ToolRunner int run(Configuration conf, Tool tool, String args)支持处理Hadoop通用的参数,如 conf, -D, -fs等43ToolRunner 实例44Override public int run(String args) throws Exception if (args.length != 2) System.out.println( Usage: ProductSearchIndexer ); ToolRunner.printGenericCommandUsage(
26、System.out); return -1; /JobClient.runJob() public class ProductSearchIndexer extends Configured implements Tool public static void main(String args) throws Exception int res = ToolRunner.run(new Configuration(), new ProductSearchIndexer(), args); System.exit(res); Implement ToolInvoke ToolRunnerIn
27、main()Implement runMapReduce程序运行打包成Jar,拷贝到集群中运行hadoop jar yourmr.jar io.transwarp.example.YourClassName args使用hadoop-eclipse插件在Eclipse中运行MapReduce程序45高级API46DistributedCache分发任务需要的只读的大文件addCacheFile(URI,conf)/setCacheFiles(URIs,conf)addCacheArchive(URI,conf)/setCacheArchives(URIs,conf)addArchiveToCl
28、assPath(Path, Configuration)/addFileToClassPath(Path, Configuration)每个TaskTracker在执行Job之前拷贝文件到本地该TaskTracker上的任务都从本地读取执行完Job之后删除在任务的工作目录下创建SymbolLinkDistributedCache.createSymlink(Configuration)使用#分割, hdfs:/namenode:port/lib.so.1#lib.so 分发的文件可以是Text, Archives, Jars等47DistributedCache例子/Job创建JobConf
29、conf = new JobConf(); DistributedCache.addCacheFile(new URI(/user/peter/cacheFile/testCache1), conf);JobClient.runJob(conf);/Mapper/Reducer中使用public void setup(Context context) throws IOException, InterruptedException Configuration conf = context.getConfiguration(); URI localFiles = DistributedCache.getCacheFiles(conf); /
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 定期观摩活动方案策划(3篇)
- 新公司各项管理制度内容(3篇)
- 活动策划方案大全建材(3篇)
- 矿山环境奖惩管理制度范本(3篇)
- 绩效系统管理制度(3篇)
- 银行郊游活动策划方案(3篇)
- Unit 5 Topic 3 Section B 课件+素材 2025-2026学年仁爱科普版九年级英语下册
- 2026年及未来5年市场数据中国肉鸡行业发展前景预测及投资方向研究报告
- 纳税人培训课件与简报
- 信息技术外包与合作伙伴管理制度
- 乙肝疫苗接种培训
- 心衰患者的用药与护理
- 食品代加工业务合同样本(版)
- 车间管理人员绩效考核方案
- 安全生产应急平台体系及专业应急救援队伍建设项目可行性研究报告
- 浙江省杭州市北斗联盟2024-2025学年高二上学期期中联考地理试题 含解析
- 医用化学知到智慧树章节测试课后答案2024年秋山东第一医科大学
- 中国传统美食饺子历史起源民俗象征意义介绍课件
- 医疗器械样品检验管理制度
- 更换法人三方免责协议书范文
- 中建“大商务”管理实施方案
评论
0/150
提交评论