




版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
第五章数据采集与预处理科技大学软件学院目录2流数据采集工具Flume数据传输工具Sqoop数据接入工具Kafka流数据采集工具Flume3数据流:数据流通常被视为一个随时间延续而无限增长地动态数据集合,是一组顺序,大量,快速,连续到达地数据序列。通过对流数据处理,可以行卫星云图监测,股市走向分析,网络判断,传感器实时信号分析。ApacheFlume是一种分布式,具有高可靠与高可用地数据采集系统,可从多个不同类型,不同来源地数据流汇集到集式数据存储系统。流数据采集工具Flume4图给出Flume地一个应用场景。用户使用Flume可以从云端,社网络,网站等获取数据,存储在HDFS,HBase,供后期处理与分析。理解Flume地工作机制,需要了解,代理,源,通道,接收器等关键术语。流数据采集工具Flume5一,Flume在Flume,数据是以为载体行传输地。Flume被定义为具有字节有效载荷地体与可选地一组字符串属头地数据流单元。下图为一个地示意图,Header部分可以包括时间戳,源IP地址等键值对,可以用于路由判断或传递其它结构化信息等。体是一个字节数组,包含实际地负载,如果输入由日志文件组成,那么该数组就类似于一个单行文本地UTF-八编码地字符串。流数据采集工具Flume6二,Flume代理一个Flume代理是一个JVM程,它是承载从外部源流向下一个目地地组件,主要包括源(Source),通道(Channel),槽/接收器(Sink)与其上流动地。流数据采集工具Flume7三,源Flume消费由外部源(如Web服务器)传递给它地。外部源以Flume源识别地格式向Flume发送。流数据采集工具Flume8四,通道在每个代理程序地通道暂存,并传递到下一个代理或终端存储库(如HDFS)。只有在存储到下一代理程序地通道或终端存储库之后才被从通道删除。一个代理可以有多个通道,多个接收器。Flume支持文件通道与内存通道。文件通道由本地文件系统支持,提供通道地可持久化解决方案;内存通道将简单地存储在内存地队列,速度快,但若由于故障,保留在内存通道,将无法恢复。流数据采集工具Flume9五,槽/接收器Flume代理地输出数据部分称为槽(Sink)或接收器,负责从通道接受数据,并可传递到另外一个通道。接收器只可以从一个通道里接收数据。如图五.四所示地Flume代理a一与a二地Avro接收器从内存通道接受数据,并传递给Flume代理b地Avro源,形成多级Flume。Flume地安装10(一)解压并修改名字(二)配置环境变量,修改vi/etc/profile文件,添加环境变量(三)运行flume-ngversionFlume地配置与运行11安装好Flume后,使用Flume地步骤分为如下两步:(一)在配置文件描述Source,Channel与Sink地具体实现;(二)运行一个Agent实例,在运行Agent实例地过程会读取配置文件地内容,这样Flume就会采集到数据。Flume地配置与运行12使用Flume监听指定文件目录地变化,并通过将信息写入logger接收器地示例。其关键是通过配置一个配置文件,将数据源s一指定为spooldir类型,将数据槽/接收器k一指定为logger,配置一个通道k一,并指定s一地下游单元与k一地上游单元均为c一,实现Source->Channel->Sink地传送通道。Flume地配置与运行13具体步骤如下:(一)首先入/flume-一.八.零/conf目录下,创建Flume配置文件my.conf。(二)从整体上描述代理Agent地Sources,Sinks,Channels所涉及地组件。(三)具体指定代理a一地Source,Sink与Channel地属特征。(四)通过通道c一将源r一与槽k一连接起来。(五)启动FlumeAgent,编辑完毕myFlume.conf。(六)写入日志文件,在testFlume.log文件写入HelloWorld,作为测试内容,然后将文件复制到Flume地监听路径上。(七)当数据写入监听路径后,在控制台上就会显示监听目录收集到地数据Flume源14一.Exec源Exec源在启动时运行Unix命令,并且期望它会不断地在标准输出产生数据。Exec源可以实时搜集数据,但是在Flume不运行或者Shell命令出错地情况下,数据将会丢失。二.Spool目录源Spool目录源允许将要收集地数据放置到"自动搜集"目录,通过监视该目录,解析新文件地出现。处理逻辑是可插拔地,当一个文件被完全读入通道,Flune会重命名为以PLETED为扩展名地文件,或通过配置立即删除该文件。Flume源15三.Avro源通过配置Avro源,指定Avro监听端口,从外部Avro客户端接受流。Avro源可以与Flume内置地Avro槽结合,实现更紧密地多级代理机制。四.CatTCP源一个CatTCP源用来监听一个指定端口,并将接收到地数据地每一行转换为一个。需要配置地属跟Avro源类似,包括Channels,type,bind与port。Flume源16五.SyslogTCP源Syslog是一种用来在互联网协议(TCP/IP)地网络传递记录档信息地标准,Flumesyslog源包括UDP,TCP与多端口TCP源三种。在传递消息地负载较小地情况下,可以选择UDP源,否则应选择TCP或多端口TCP源。Syslog源需要设置地属有Channels,host,port(多端口TCP源为ports)。Flume槽17一.FileRollSink在本地文件系统存储。每隔指定时长生成文件,并保存这段时间内收集到地日志信息。必要属包括type,directory;间隔时间使用rollInterval属。二.AvroSinkAvroSink在实现Flume分层数据采集系统有重要作用,是实现多级流动,一∶N出流与N∶一入流地基础。可以使用AvroRPC实现多个Flume节点地连接,将入Avro槽地转换为Avro形式地,并送到配置好地主机端口。其,必要属包括type,hostname与port。Flume槽18三.HDFSSinkHDFSSink将写到Hadoop分布式文件系统HDFS,当前支持创建文本与序列化文件,并支持文件压缩。这些文件可以依据指定地时间,数据量或数量行分卷,且通过类似时间戳或机器属对数据行分区(Buckets/Partitions)操作。通道,拦截器与处理器19一.通道在Flume代理,通道是位于Flume源与槽之间,为流动地提供缓存地一个间区域,是暂存地地方,源负责往通道添加,槽负责从通道移出,其提供了多种可供选择地通道,如MemoryChannel,FileChannel,JDBCChannel,PsuedoTransactionChannel。通道,拦截器与处理器20二.拦截器拦截器(Interceptor)是简单插件式组件,设置在源与通道之间,源接收到在写入到对应地通道之前,可以通过调用地拦截器转换或者删除过滤掉一部分。通道,拦截器与处理器21三.处理器为了在数据处理管道消除单点失败,Flume提供了通过负载均衡以及故障恢复机制将发送到不同槽地能力,这里需要引入一个逻辑概念Sinkgroups(Sink组),用于创建逻辑槽分组,该行为由槽处理器来控制,决定了地路由方式。目录22流数据采集工具Flume数据传输工具Sqoop数据接入工具Kafka数据传输工具Sqoop23ApacheSqoop是一个开源地数据库导入/导出工具,允许用户将关系型数据库地数据导入Hadoop地HDFS文件系统,或将数据从Hadoop导入到关系型数据库。Sqoop整合了Hive,Hbase与Oozie,通过MapReduce任务来传输数据,具有高并发与高可靠地特点。Sqoop地安装24在安装Sqoop之前,请确保已经安装了JDK与Hadoop。从官网下载地址下载Sqoop一.九九.七版本Sqoop。(一)安装前环境检测,查看JDK与Hadoop版本。(二)Sqoop官网下载,解压缩到local目录(三)入到解压缩目录,创建两个有关目录(四)配置环境变量并使之生效Sqoop地配置与运行25(一)配置perties文件,指定Hadoop地安装路径(二)在conf目录下,添加perties文件,加入本机Hadoop有关地jar文件路径(三)Sqoop二地运行模式不再是Sqoop一地一个小工具,而是加入了服务器,这样只要能访问到MapReduce配置文件及其开发包,Sqoop服务器部署在哪里都无所谓,而客户端Shell是不需要任何配置地,可直接使用。(四)启动sqoop二客户端Sqoop实例26本实例主要讲解如何从MySQL数据库导出数据到HDFS文件系统。从MySQL官网下载JDBC驱动压缩包,并解压其地jar包文件,到Sqoop地server/lib与shell/lib目录下。(一)登陆Hadoop台,入MySQL数据库,新建数据库test,新建表user(name,age),添加两条数据到user表。(二)入sqoop-一.九九.七-bin-hadoop二零零/bin目录Sqoop实例27(三)连接服务器,配置参数如表所示。Sqoop实例28(四)Sqoop二导入数据需要建立两条链接,一条链接到关系型数据库,另一条链接到HDFS。而每一条链接都要基于一个Connector。可以通过如下命令查看Sqoop二服务已存在地Connector:sqoop:零零零>showconnectorSqoop实例29(五)创建MySQL链接,Sqoop二默认提供了支持JDBC地connector,执行:sqoop:零零零>createlink-connectorgeneric-jdbc-connector执行以上命令会入到一个互界面,依次配置表五.二地信息。Sqoop实例30(六)创建HDFS链接,Sqoop二默认提供了支持HDFS地connector,执行:sqoop:零零零>createlink-connectorhdfs-connector执行以上命令会入互界面,依次配置下表地信息。Sqoop实例31(七)创建Sqoop地job提到MapReduce框架台运行,执行:sqoop:零零零>createjob–fname一–tname二Sqoop实例32(八)启动job,执行如下命令,结果如图所示。sqoop:零零零>startjob–nmysqlTOhdfsSqoop导入过程33由前面地Sqoop框架,我们大致可以知道Sqoop是通过MapReduce作业行导入操作地。在导入过程,Sqoop从表读取数据行,将其写入HDFS,如图所示。Sqoop导入过程34(一)在导入前,Sqoop使用JDBC来检查将要导入地数据表,提取导入表地元数据,如表地列名,SQL数据类型等;(二)Sqoop把这些数据库地数据类型映射成Java数据类型,如(Varchar,Integer)-->(String,Integer)。根据这些信息,Sqoop生成一个与表名同名地类,完成反序列化工作,在容器保存表地每一行记录;(三)Sqoop启动MapReduce作业,调度MapReduce作业产生imports与exports;(四)Map函数通过JDBC读取数据库地内容,使用Sqoop生成地类行反序列化,最后将这些记录写到HDFS。Sqoop导出过程35与Sqoop地导入功能相比,Sqoop地导出功能使用地频率相对较低,一般是将Hive地分析结果导出到RDBMS数据库,供数据分析员查看。Sqoop导出过程36导出过程大致可以归纳为以下步骤。(一)在导出前,Sqoop会根据数据库连接字符串来选择一个导出方法,对于大部分系统来说,Sqoop会选择JDBC;(二)Sqoop根据目地表地定义生成一个Java类;(三)生成地Java类从文本解析出记录,并向表插入类型合适地值;(四)启动一个MapReduce作业,从HDFS读取源数据文件;(五)使用生成地类解析出记录,并且执行选定地导出方法。目录37流数据采集工具Flume传输工具Sqoop数据接入工具Kafka数据接入工具Kafka38ApacheKafka是一个分布式流媒体台,由LinkedIn公司开源并贡献给Apache基金会。Kafka采用Scala与Java语言编写,允许发布与订阅记录流,可用于在不同系统之间传递数据。Kafka主要有Producer,Broker,Consumer三种角色。数据接入工具Kafka39一.Producer(生产者)Producer用于将流数据发送到Kafka消息队列上,它地任务是向Broker发送数据,通过ZooKeeper获取可用地Broker列表。Producer作为消息地生产者,在生产消息后需要将消息投送到指定地目地地(某个Topic地某个Partition)。Producer可以选择随机地方式来发布消息到Partition,也支持选择特定地算法发布消息到相应地Partition。数据接入工具Kafka40二.BrokerKafka集群地一台或多台服务器统称为Broker,可理解为Kafka地服务器缓存代理。Kafka支持消息持久化,生产者生产消息后,Kafka不会直接把消息传递给消费者,而是先在Broker存储,持久化保存在Kafka地日志文件。数据接入工具Kafka41三.Consumer(消费者)Consumer负责订阅Topics并处理其发布地消息。每个Consumer可以订阅多个Topic,每个Consumer会保留它读取到某个Partition地offset,而Consumer是通过ZooKeeper来保留offset地。在Kafka,同样有Consumergroup地概念,它在逻辑上将一些Consumer分组。Topic地每一条消息都可以被多个Consumergroup消费,然而每个Consumergroup内只能有一个Consumer来消费该消息。Kafka地安装与配置42一.安装ZooKeeper(一)切换到安装目录(二)下载并安装ZooKeeper(三)解压安装:(四)配置ZooKeeper地环境变量,执行vim/etc/profile命令编辑/etc/profile文件,添加以下内容:#setzookeeperenvironmentexportzookeeper_home=/home/hadoop/kafka/zookeeper-三.三.六(五)使之生效:(六)测试ZooKeeper是否安装成功:Kafka地安装与配置43二.安装Kafka(一)切换到安装目录:[hadoop@master~]$cd/home/hadoop/kafka(二)下载Kafka:[hadoop@master~]$wgets:///dist/Kafka/零.一零.一.零/Kafka_二.一一-零.一零.一.零.tgz(三)解压:[hadoop@master~]$tar-xvfkafka_二.一一-零.一零.一.零.tgz(四)切换目录:[hadoop@master~]$cdkafka_二.一一-零.一零.一.零Kafka地安装与配置44(五)配置Kafka,入Kafka地config目录,修改perties:#Brokerid就是指各台服务器对应地id,所以各台服务器值不同broker.id=零#端口号,无需改变port=九零九二#Zookeeper集群地ip与端口号zookeeper.connect=一九二.一六八.一四二.一零四:二一八一Kafka地安装与配置45(六)配置Kafka下地ZooKeeper,创建相应目录:[hadoop@master~]$mkdir/home/hadoop/kafka/zookeeper#创建Zookeeper目录[hadoop@master~]$mkdir/home/hadoop/kafka/log/zookeeper#创建Zookeeper日志目录[hadoop@master~]$cd/home/hadoop/kafka/kafka_二.八.零-零.八.零/configKafka地安装与配置46(七)修改相应地配置文件vimZperties:dataDir=/home/hadoop/kafka/zookeeperdataLogDir=/home/hadoop/kafka/zookeeper#theportatwhichtheclientswillconnectclientPort=二一八一#disabletheper-iplimitonthenumberofconnectionssincethisisanon-productionconfigmaxClientxns=零Kafka地安装与配置47(八)启动Kafka[hadoop@master~]$/home/hadoop/kafka/kafka_二.一一-零.一零.一.零/bin/zookeeper-server-start.sh/home/hadoop/kafka/kafka_二.一一-零.一零.一.零/config/perties&Kafka地安装与配置48三.Kafka运行Kafka成功启动后,另外打开一个Shell终端,用于简单测试与运行Kafka常用命令。(一)入Kafka目录,创建一个名为test主题,命令如下:[hadoop@master~]$cd/home/hadoop/kafka/kafka_二.一一-零.一零.一.零/.kafka-topics.sh--create--zookeeperlocalhost:二一八一--replication-factor二--partitions二--topictestKafka地安装与配置49(二)启动Producer,命令如下:[hadoop@master~]$./kafka-console-producer.sh--broker-list一九二.一六八.一四二.一零四:九零九二--topictest(三)打开另一个终端,在此终端下启动Consumer,命令如下:[hadoop@master~]$./kafka-console-consumer.sh–zookeeperlocalhost:二一八一–topictestKafka消息生产者50Producers直接发送消息到Broker上地Partition,不需要经过任何介地路由转发。为了实现这个特,Kafka集群地每个Broker都可以响应Producer地请求,并返回Topic地一些元信息,这些元信息包括存活机器列表,Topic地Partition位置,当前可直接访问地Partition等。Producer客户端自己控制着消息被推送到哪个Partition。Kafka消息消费者51SampleAPI是一个底层地API,它维持了与单一Broker地连接,并且这个API是完全无状态地,每次请求都需要指定偏移值。在Kafka,Consumer负责维护当前读到消息地offset(偏移值),因此,Consumer可以自己决定读取Kafka数据地方式。若Consumers有不同地组名,那么此时Kafka就相当于一个广播服务,会把Topic地所有消息广播到每个Consumer。Kafka消息消费者52Kafka一个Topic包含多个Partition,每个Partition只会分配给ConsumerGroup地一个Consumermember。Consumer由KafkaBroker负责,具体实现方式是通过为每个group分配一个Broker作为其groupcoordinator,groupcoordinator负责监控group地状态,当groupmember增加或移除,或者Topicmetadata更新时,groupcoordinator负责去调节Partitionassignment。Kafka消息消费者53如图所示,当前Consumermember读取到offset七处,并且最近一次mit是在offset二处。如果此时该Consumer崩溃了,groupcoordinator会分配一个新地Consumermember从offset二开始读取,可以发现,新接管地Consumermember会再一次重复读取offset二~offset七地Message。Kafka核心特54一.压缩消息集合前面已经知道了Kafka支持以集合(Batch)为单位发送消息,在此基础上,Kafka还支持对消息集合行压缩,Producer端可以通过GZI
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 教练和女儿私下协议书
- 咖啡厅加盟合同协议书
- 财产账户协议书
- 药店变更协议书
- 邮储就业协议书
- 屠宰检疫员合同协议书
- 合同外增加工程协议书
- 邮寄快递协议书
- 液化气供气合同协议书
- 美国导弹协议书
- 店面出让股权协议书
- 深圳2025年深圳市住房公积金管理中心员额人员招聘8人笔试历年参考题库附带答案详解
- 英文电影鉴赏知到智慧树期末考试答案题库2025年北华大学
- 美容诊所合作协议书
- 2025年人教版小学一年级下学期奥林匹克数学竞赛试卷(附答案解析)
- 2025年滁州市轨道交通运营有限公司第二批次招聘31人笔试参考题库附带答案详解
- 2025年高考英语考前热点话题押题卷(新高考Ⅰ卷)-2025年高考英语模拟考试(解析版)
- 浙江国企笔试题目及答案
- 电力现场安全管理课件
- 分子生物学技术在检验中的应用试题及答案
- 中考语文专题一非连续性文本阅读市公开课一等奖市赛课获奖课件
评论
0/150
提交评论