版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
在开始分析源码之前,我们一起来回顾一下Kafka消费模型的几个要点Kafka的每个Consumer(消费者)实例属于一个ConsumerGroup(消费组在消费时,ConsumerGroup的每个Consumer占一个或多个Partition(分对于每个ConsumerGroup,在任意时刻,每个Partition多有1Consumer每个ConsumerGroup都有一个Coordinator(协调者)负责分配Consumer和Partition对应关系,当Partition是Consumer生变更是,会reblance(重新分配)过程,重新分配Consumer与Partition的对应关系;Consumer与Coordinator之间的心跳,这样Coordinator就能感知Consumer状态Consumer的时候及时触发rebalance掌握并理解Kafka的消费模型,对于接下来理解其消费的实现过程是至关重要的,如果你对上面的这些要点还有不清楚的地方,建议回顾一下之前的课程或者看一下Kafka相关的我们使用当前的版本2.2进行分析,使用Git 上直接源码到本地代码git cdgitcheckout09|Kafka的文档,看看从哪儿来入手开启我们的分析流Kafka的Consumer类KafkaConsumer的JavaDoc,给出了关于如何使用KafkaConsumer常详细的说明文档,并且给出了一个使用Consumer费的最简代码代码//设置必要的配置信Propertiesprops=newprops.put("bootstrap.servers",props.put("group.id", mit", erval.ms",props.put("key.deserializer", props.put("value.deserializer", 9//创建Consumer实KafkaConsumer<String,String>consumer=new//订阅consumer.subscribe(Arrays.asList("foo",//循环拉while(true)ConsumerRecords<String,String>records=for(ConsumerRecord<String,String>record:System.out.printf("offset=%d,key=%s,value=%s%n", 这段代码主要的主要流程设置必要的配置信息,包括:起始连接的Broker址,ConsumerGroupID,自创建Consumer订阅了2个Topic:foo和循环拉取消息并打印在控通过上面的代码实例我们可以看到,消费这个大的流程,在Kafka中实际上是被了“订阅”和“拉取消息”这两个小的流程。另外,我在之前的课程中反复提到过,Ka在消费过程中,每个Consumr实例是绑定到一个分区上的,那Consumr是如何确定,绑定到哪一个分区上的呢?这个问题也是可以通过分析消费流程来找到答案的。所以,我们分析整个消费流程主要聚焦在三个问题上:订阅过程是如何实现的Consumer是如何与Coordinator协商,确定消费哪些Partition的?拉取消息的过程是如何实了解前两个问题,有助于你充分理解Kafka的元数据模型,以及Kafka是如何在客户端和就带着这三个问题,来分析Kafka的订阅和拉取消息的过程如何实现。我们先来看看订阅的实现流程。从上面的例子到订阅的主流程方法代码publicvoidsubscribe(Collection<String>topics,ConsumerRebalanceListenertry//省略部分代5//重置订阅状this.subscriptions.subscribe(newHashSet<>(topics),8//更新元}finally subscript,另一个是更新元数据中的topic信息。订阅状态subscripts主要了订阅的topic和ptn的消费置等状态信息。属性metadata中了Kaa集群元数据的一个子集,包括集群的Brokr节点、c和Partn在节点上分布,以及我们聚焦的第二个问题:Coordinator给ConrPartn请注意一下,这个subscribe()方法的实现有一个非常值得大家学习的地方:就是开始的acquireAndEnsureOpen()try-finallyrelease(),作用就是保护这个方法只能单线程调Kafka在文档中明确地注明了Consumer不是线程安全的,意味着Consumer被并发调用时会出现不可预期的结果。为了避免这种情况发生 做了主动的检测并抛出异常,不是放任系统产生不可预期的情Kaa“主动检测不支持的情况并抛出异常,避免系统产生不可预期的行为”这种模式,对于增强的系统的健壮性是一种非常有效的做法。如果你的系统不支持用户的某种操作,正确的做法是,检测不支持的操作,直接用户操作,并给出明确的错误提示,而不应该只是具体Kafka是如何实现的并发检测,大家可以看一下方法acquireAndEnsureOpen()的实继续跟进到更新元数据的方法metadata.setTopics()里面,这个方法的实现除了更新元数据类Metadata中的topic相关的一些属性以外,还调用了Metadata.requestUpdate()代码publicsynchronizedintrequestUpdate()this.needUpdate=return requestUpate()tetrueKafka必须确保ConsumrBrokrPartn分析完订阅相关的代码,我们来总结一下:在订阅的实现过程中,Kafka更新了订阅状态subscriptsmetadatatopic的一些属性,将元数据状态置为“需要那这个元数据会在什么时候真正做一次更新呢?我们可以先带着这个问题接着看接下来,我们分析拉取消息的流程。这个流程的时序图如下(点击可放大查看我们对着时序图来分析它的实现流程。在KafkaConsumer.poll(法对应源码1179)的实现里面,可以看到主要是先后调用了2个私有方法:updateAssignmentMetadataIfNeeded():更新元数据pollForFetches():拉取消息方法updateAssignmentMetadataIfNeeded()中,调用了coordinator.poll()方法,poll() .ensureFreshMetadata()方法,在 方法中又调用了 .poll()方法,实现了与Cluster通信,在Coordinator上Consumer并拉取和更新元数据。至此,“元数据会在什么时候真正做一次更新”这个问题类ConsumerNetwork 封装了Consumer和Cluster之间所有的网络通信的实现,这个类是一个非常彻底的异步实现。它没有任何的线程,所有待发送的Request都存放在属性unsent中,返回的Response存放在属性 pletion中。每次调用poll()方法的时候,在当前线程中发送所有待发送的Request,处理所有收到的 MQ的代码,Producer和Consumer在主要收发消息流程上功能的复杂度是差不多的,但是你可以很明显地感受到Kafka的代码实现要比 MQ的代码实现更加的复杂难于理解。我们继续分析方法pollForFetches()的实现代码 privateMap<TopicPartition,List<ConsumerRecord<K,V>>>pollForFetches(Timer2//省略部分代3//如果缓存里面有的消息,直接返回这些4finalMap<TopicPartition,=5if(!records.isEmpty())6return7}8//构造拉取消息请求,并发9//省略部分代//发送网络请求拉取消息,等待直到有消息返回或者.poll(pollTimer,()-> //省略部分代//返回拉到的消return}这段代码的主要实现逻辑如果缓存里面有未的消息,直接返回这些消息构造拉取消息请求,并发发送网络请求并拉取消息,等待直到有消息返回或者返回拉到的消息在方法fetcher.sendFetches(实现里面,Kafka据元数据的信息,构造到所有需要的Broker的拉消息的Request,然后调用.Send()方法将这些请求异步发送出去。并且,了一个回调类来处理返回的Response,所有返回的Response被暂时存放在pletedFetches中。需要注意的是,这时的Request并没有被真正发给各Broker,而是被暂存在了.unsend等待被发送然后,在调用.poll()方法时,会真正将之前构造的所有Request发送出去,并处理收到的Response。最后,fetcher.fetchedRecords()方法中,将返回的Response反序列化后转换为消息列综合上面的实现分析,我在这里给出整个拉取消息的流程涉及到的相关类的类图,在这个类图中,为了便于你理解,我并没有把所有类都绘制上去,只是把本节课两个流程相关的主要类图(点击可放大查看本节课我们一起分析了KafkaConsumer消费消息的实现过程。大家来分析代码过程中,不仅仅是要掌握Kafka整个消费的流程是是如何实现的,更重要的是理解它这种完全异步发送请求时,构建Requt对象,暂存入发送队列,但不立即发送,而是等待合适的时机批量发送。并且,用回调或者RequFte方式,预先定义好如何处理响应的逻辑。Brokr返回的响应之后,也不会立即处理,而是暂存在队列中,择机处理。那这个策略较复,有是需响时候有可缓冲了或间到了,都有可能触发一次真正的网络请求,也就是在poll()方法中发送所有待发送Requ并处理所有Respons。设计处是需要用于发处理的线并且分发量处理的优势,这也是Kaa的性能非常好的原因之一。这种设计的缺点也非常的明显,就是的成本也很高。总体来说,不推荐大家把代码设计得这么复杂。代码结构简单、清晰、易是是我们在设计过程中需要考虑的一个非常重要的因素。很多时候,为了获得较好的代码结构,在可接受的范围内,去牺牲一些性能,也是划算的。我们知道,KafkaConsumer在消费过程中是需要消费位置的,Consumer每次从当前消费位置拉取一批消息,这些消息都被正常消费后,Consumer会给Coordinator发一个提交位置的请求,然后消费位置会向后移动,完成一批消费过程。那kafkaConsumer是如何和提交这个消费位置的呢?请你带着这个问题再回顾一下
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 重庆市西南师大附中2026年初三第九次适应性考试物理试题含解析
- 企业沟通标准化模板分享
- 团队建设活动策划与评估方案
- 办公场所信息安全事情事后恢复预案
- 描述家乡四季之美写景文章(11篇)
- 客户满意度与质量追溯承诺书3篇
- 技术人员工作流程管理模板
- 快速规划自动化设备调整清单
- 农业科技领域的承诺书(6篇)
- 单位债务及时偿付责任承诺书7篇
- 2025年高职(城市轨道交通机电技术)设备调试阶段测试题及答案
- 2026年考试题库北汽集团高管知识水平测试
- 核电防异物管理指南(核心版)
- 电厂防汛课件
- 日志观察及写作指导手册
- 人工智能在高职机械专业教学中的应用研究
- 高标准农田建设项目操作方案指南
- 2026年上饶职业技术学院单招职业技能考试必刷测试卷附答案
- 野战生存课件军用
- 环卫车辆安全行驶培训课件
- 刷漆搭架施工方案
评论
0/150
提交评论