版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
【第6章回顾】1.了解Flume功能、来源、特点和版本。2.理解Flume体系架构及Source、Sink、Channel功能。(重点)3.理解Flume部署要点包括运行环境、运行模式、配置文件flume-env.sh。(重点)4.掌握FlumeShell命令的使用。(重点,难点)5.熟练掌握在Linux环境下部署Flume,灵活编写Agent属性文件和使用FlumeShell命令进行实时日志收集。(重点,难点)【课程内容】开学第一课(理论1学时)第1章部署全分布模式Hadoop集群(理论3学时+实验2学时)第2章HDFS实战(理论4学时+实验2学时)第3章MapReduce编程(理论4学时+实验2学时)第4章部署ZooKeeper集群和ZooKeeper实战(理论4学时+实验2学时)第5章部署本地模式Hive和Hive实战(理论4学时+实验2学时)第6章Flume实战(理论4学时+实验2学时)第7章Kafka实战(理论4学时+实验2学时)第8章Spark集群部署和基本编程(理论4学时+实验2学时)第9章Flink集群部署和基本编程(选修)第7章Kafka实战7.1初识Kafka7.2Kafka体系架构7.3Kafka部署要点7.4KafkaShell常用命令7.5综合实战:Kafka实战7.1初识KafkaApacheKafka是一个分布式的、支持分区的、多副本的、基于ZooKeeper的发布/订阅消息系统,起源于LinkedIn公司开源出来的分布式消息系统,2011年成为Apache开源项目,2012年成为Apache顶级项目,目前被多家公司采用。Kafka采用Scala和Java编写,其设计目的是通过Hadoop和Spark等并行加载机制来统一在线和离线的消息处理,构建在ZooKeeper上,不同的分布式系统可统一接入到Kafka,实现和Hadoop各组件之间不同数据的实时高效交换,被称为“生态系统的交通枢纽”。目前与越来越多的分布式处理系统如ApacheStorm、ApacheSpark等都能够较好的集成,用于实时流式数据分析。7.2Kafka体系架构订阅消息发布消息到Partition
ConsumerGroup2
ConsumerGroup1Producer1Producer2Producer3Producer4KafkaBroker1KafkaBroker2KafkaBroker3Consumer1Consumer2Consumer3Consumer4Consumer5ZooKeeper7.2Kafka体系架构Kafka整体架构比较新颖,更适合异构集群。Kafka中主要有Producer、Broker和Customer三种角色,一个典型的Kafka集群包含多个Producer、多个Broker、多个ConsumerGroup和一个ZooKeeper集群。每个Producer可以对应多个Topic,每个Consumer只能对应一个ConsumerGroup,整个Kafka集群对应一个ZooKeeper集群,通过ZooKeeper管理集群配置、选举Leader以及在ConsumerGroup发生变化时进行负载均衡。7.2Kafka体系架构(1)Message(消息):Message是通信的基本单位,每个Producer可以向一个Topic发布一些消息,Kafka中的消息是以Topic为基本单位组织的,消息是无状态的,消息消费的先后顺序是没有关系的。每条Message包含三个属性:offset,消息的唯一标识,类型为long;MessageSize,消息的大小,类型为int;data,消息的具体内容,可以看作一个字节数组。(2)Topic(主题)发布到Kafka集群的消息都有一个类别,这个类别被称为Topic,Kafka根据Topic对消息进行归类,发布到Kafka集群的每条消息都需要指定一个Topic。(3)Partition(分区):物理上的概念,一个Topic可以分为多个Partition,每个Partition内部都是有序的。每个Partition只能由一个Consumer来进行消费,但是一个Consumer可以消费多个Partition。(4)Broker:消息中间件处理节点。一个Kafka集群由多个Kafka实例组成,每个实例被称为Broker。一个Broker上可以创建一个或多个Topic,同一个Topic可以在同一Kafka集群下的多个Broker上分布。Broker与Topic关系图
Broker1Topic1Topic2Topic3Topic4
Broker2Topic1Topic2Topic3
Broker3Topic1Topic27.2Kafka体系架构(5)Producer(消息生产者):向Broker发送消息的客户端。(6)Consumer(消息消费者):从Broker读取消息的客户端。(7)ConsumerGroup:每个Consumer属于一个特定的ConsumerGroup,一条消息可以发送到多个不同的ConsumerGroup,但是一个ConsumerGroup中只能有一个Consumer能够消费该消息。7.3Kafka部署要点7.3.1Kafka运行环境7.3.2Kafka运行模式7.3.3Kafka配置文件7.3.1Kafka运行环境1)操作系统Kafka支持不同操作系统,例如GNU/Linux、Windows、MacOSX等。需要注意的是,在Linux上部署Kafka要比在Windows上部署能够得到更高效的I/O处理性能。编者采用的操作系统为Linux发行版CentOS7。2)Java环境Kafka使用Java语言编写,因此它的运行环境需要Java环境的支持。编者采用的Java为OracleJDK1.8。3)ZooKeeper集群Kafka依赖ZooKeeper集群,因此运行Kafka之前需要首先启动ZooKeeper集群。Zookeeper集群可以自己搭建,也可以使用Kafka安装包中内置的shell脚本启动Zookeeper。编者采用自行搭建ZooKeeper集群,版本为3.4.13。7.3.2Kafka运行模式Kafka有两种运行模式:单机模式和集群模式。单机模式是只在一台机器上安装Kafka,主要用于开发测试,而集群模式则是在多台机器上安装Kafka,也可以在一台机器上模拟集群模式,实际的生产环境中均采用多台服务器的集群模式。无论哪种部署方式,修改Kafka的配置文件perties都是至关重要的。单机模式和集群模式部署的步骤基本一致,只是在perties文件的配置上有些差异。7.3.3Kafka配置文件$KAFKA_HOME/config中有多个配置文件。perties配置参数(部分)参数名说明broker.id用于指定Broker服务器对应的ID,各个服务器的值不同listeners表示监听的地址及端口,PLAINTEXT表示纯文本,也就是说,不管发送什么数据类型都以纯文本的方式接收,包括图片,视频等work.threads网络线程数,默认是3num.io.threadsI/O线程数,默认是8socket.send.buffer.bytes套接字发送缓冲,默认是100KBsocket.receive.buffer.bytes套接字接收缓冲,默认是100KBsocket.request.max.bytes接收到的最大字节数,默认是100MBlog.dirs用于指定Kafka数据存放目录,地址可以是多个,多个地址需用逗号分割num.partitions分区数,默认是1num.recovery.threads.per.data.dir每一个文件夹的恢复线程,默认是1log.retention.hours数据保存时间,默认是168h,即一个星期(7天)log.segment.bytes指定每个数据日志保存最大数据,默认为1GB,当超过这个值时,会自动进行日志滚动erval.ms设置日志过期的时间,默认为300s(即5min)zookeeper.connect用于指定Kafka所依赖的ZooKeeper集群的IP和端口号,地址可以是多个,多个地址需用逗号分割zookeeper.connection.timeout.ms设置Zookeeper的连接超时时间,默认为6s,如果到达这个指定时间仍然连接不上就默认该节点发生故障7.4KafkaShell常用命令Kafka支持的所有命令在$KAFKA_HOME/bin下存放。7.4KafkaShell常用命令命令功能描述kafka-server-start.sh启动KafkaBrokerkafka-server-stop.sh关闭KafkaBrokerkafka-topics.sh创建、删除、查看、修改Topickafka-console-producer.sh启动Producer,生产消息,从标准输入读取数据并发布到Kafkakafka-console-consumer.sh启动Consumer,消费消息,从Kafka读取数据并输出到标准输出7.5综合实战:Kafka实战规划Kafka集群部署Kafka集群启动Kafka集群验证Kafka集群使用KafkaShell关闭Kafka集群7.5.1规划Kafka集群主机名IP地址运行服务软硬件配置master30QuorumPeerMainKafka内存:4GCPU:1个2核硬盘:40G操作系统:CentOS7.6.1810Java:OracleJDK8u191ZooKeeper:ZooKeeper3.4.13Kafka:Kafka2.1.1slave131QuorumPeerMainKafka内存:1GCPU:1个1核硬盘:20G操作系统:CentOS7.6.1810Java:OracleJDK8u191ZooKeeper:ZooKeeper3.4.13Kafka:Kafka2.1.1slave232QuorumPeerMainKafka内存:1GCPU:1个1核硬盘:20G操作系统:CentOS7.6.1810Java:OracleJDK8u191ZooKeeper:ZooKeeper3.4.13Kafka:Kafka2.1.17.5.1规划Kafka集群软件名称软件版本发布日期下载地址VMwareWorkstationProVMwareWorkstation14.5.7ProforWindows2017年6月22日/products/workstation-pro.htmlCentOSCentOS7.6.18102018年11月26日/download/JavaOracleJDK8u1912018年10月16日/technetwork/java/javase/downloads/index.htmlZooKeeperZooKeeper3.4.132018年7月15日/releases.htmlKafkaKafka2.1.12019年2月15日/downloads7.5.2部署Kafka集群1.初始软硬件环境准备(1)准备三台机器,安装操作系统,编者使用CentOSLinux7。(2)对集群内每一台机器,配置静态IP、修改机器名、添加集群级别域名映射、关闭防火墙。(3)对集群内每一台机器,安装和配置Java,要求Java8或更高版本,编者使用OracleJDK8u191。(4)安装和配置Linux集群中各节点间的SSH免密登录。(5)在Linux集群上部署ZooKeeper集群。7.5.2部署Kafka集群2.获取KafkaKafka官方下载地址为/downloads,编者选用的Kafka版本是2019年2月15日发布的Kafka2.1.1,其安装包文件kafka_2.12-2.1.1.tgz例如存放在master机器的/home/xuluhui/Downloads中。读者应该注意到了,Kafka安装包和一般安装包的命名方式不一样,例如kafka_2.12-2.1.1.tgz,其中2.12是Scala版本,2.1.1才是Kafka版本,官方强烈建议Scala版本和服务器上的Scala版本保持一致,避免引发一些不可预知的问题。3.安装Kafka以下所有操作需要在三台机器上完成。切换到root,解压kafka_2.12-2.1.1.tgz到安装目录如/usr/local下,使用命令如下所示。[xuluhui@master~]$suroot[root@masterxuluhui]#cd/usr/local[root@masterlocal]#tar-zxvf/home/xuluhui/Downloads/kafka_2.12-2.1.1.tgz7.5.2部署Kafka集群4.配置Kafka修改Kafka配置文件perties,master机器上的配置文件$KAFKA_HOME/config/perties修改后的几个参数如下所示。broker.id=0log.dirs=/usr/local/kafka_2.12-2.1.1/kafka-logszookeeper.connect=master:2181,slave1:2181,slave2:2181slave1和slave2机器上的配置文件$KAFKA_HOME/config/perties中参数broker.id依次设置为1、2,其余参数值与master机器相同。5.创建所需目录以上第4步骤使用了系统不存在的目录:Kafka数据存放目录/usr/local/kafka_2.12-2.1.1/kafka-logs,因此需要创建它,使用的命令如下所示。[root@masterlocal]#mkdir/usr/local/kafka_2.12-2.1.1/kafka-logs7.5.2部署Kafka集群6.设置$KAFKA_HOME目录属主为了在普通用户下使用Kafka,将$KAFKA_HOME目录属主设置为Linux普通用户例如xuluhui,使用以下命令完成。[root@masterlocal]#chown-Rxuluhui/usr/local/kafka_2.12-2.1.17.在系统配置文件目录/etc/profile.d下新建kafka.sh使用“vim/etc/profile.d/kafka.sh”命令在/etc/profile.d文件夹下新建文件kafka.sh,添加如下内容。exportKAFKA_HOME=/usr/local/kafka_2.12-2.1.1exportPATH=$KAFKA_HOME/bin:$PATH其次,重启机器,使之生效。此步骤可省略。7.5.3启动Kafka集群首先,在三台机器上使用命令“zkServer.shstart”启动ZooKeeper集群,确保其正常运行。其次,在三台机器上使用以下命令启动Kafka,此处以master机器为例。[xuluhui@master~]$kafka-server-start.sh-daemon$KAFKA_HOME/config/perties这里需要注意的是,启动脚本若不加-daemon参数,则如果执行Ctrl+Z后会退出,且启动的进程也会退出,所以建议加-daemon参数,实现以守护进程方式启动。7.5.4验证Kafka集群方法:jps7.5.5使用KafkaShell【案例7-1】使用Kafka命令创建Topic、查看Topic,启动Producer生产消息,启动Consumer消费消息。【案例7-1】(1)创建Topic在任意一台机器上创建Topic“kafkacluster-test”。[xuluhui@master~]$kafka-topics.sh--create\--zookeepermaster:2181,slave1:2181,slave2:2181\--replication-factor3\--partitions3\--topickafkacluster-test由于共部署了三个Broker,所以创建Topic时能指定--replication-factor3。其中,选项--zookeeper用于指定ZooKeeper集群列表,可以指定所有节点,也可以指定为部分节点;选项--replication-factor为复制数目,数据会自动同步到其他Broker上,防止某个Broker宕机数据丢失;选项--partitions用于指定一个Topic可以切分成几个partition,一个消费者可以消费多个partition,但一个partition只能被一个消费者消费。【案例7-1】(2)查看Topic详情在任意一台机器上查看Topic“kafkacluster-test”的详情。Topic“kafkacluster-test”总计有3个分区(PartitionCount),副本数为3(ReplicationFactor),且每个分区上有3个副本(通过Replicas的值可以得出),另外最后一列Isr(In-SyncReplicas)表示处理同步状态的副本集合,这些副本与Leader副本保持同步,没有任何同步延迟。另外,Leader、Replicas、Isr中的数字就是BrokerID,对应配置文件config/perties中的broker.id参数值。【案例7-1】(3)启动生产者生产消息在master机器上使用kafka-console-producer.sh启动生产者,使用命令如下所示。[xuluhui@master~]$kafka-console-producer.sh\--broker-listmaster:9092,slave1:9092,slave2:9092\--topickafkacluster-test(4)启动消费者消费消息在slave1和slave2机器上分别使用kafka-console-consumer.sh启动消费者,以slave1机器为例,使用命令如下所示。[xuluhui@slave1~]$kafka-console-consumer.sh\--bootstrap-servermaster:9092,slave1:9092,slave2:9092\--topickafk
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 股骨颈骨折骨栓植入术后护理查房
- 宫腔镜子宫内膜粘连松解术后护理查房
- 资深园艺设计服务保证承诺书5篇
- 企业文化建设与推广方案制作工具
- 供应商货款结算申请函(7篇)范文
- 生产现场安全管理标准化流程清单
- 业务流程改进与创新思维工具集
- 湖南师大附中博才实验中学2025-2026学年初三寒假延长作业英语试题含解析
- 湖南省汨罗市弼时片区市级名校2026年初三下学期第一次联合语文试题试卷含解析
- 2026年江苏省句容市崇明中学9校联考初三语文试题含解析
- 2026年青海省海南藏族自治州单招职业适应性测试题库附参考答案详解(模拟题)
- 广告制作公司奖惩制度
- 2026春牛津译林版英语八年级下册Unit+8+Reading+(同步课件)
- 第一单元(单元测试 基础夯实)-高二语文人教统编版选择性必修下册
- 2025山西中煤一局集团有限公司应届高校毕业生招聘20人笔试历年典型考点题库附带答案详解2套试卷
- 幼儿园课件《认识我们的身体》课件
- 2026年安克创新行测笔试题库
- 违反无菌技术操作
- AI养鱼:智慧渔业新模式
- 2025年《三级公共营养师》考试练习题库及答案
- 煤矿调度专项培训课件
评论
0/150
提交评论