费伟老师暴风、美团、一度云课件_第1页
费伟老师暴风、美团、一度云课件_第2页
费伟老师暴风、美团、一度云课件_第3页
费伟老师暴风、美团、一度云课件_第4页
费伟老师暴风、美团、一度云课件_第5页
已阅读5页,还剩6页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

1、Kafka 原理及应用场景Kafka 原理及应用场景 Kafka 基本原理1.1 消息队列介绍消息队列的两种模式: 1.点对点 2.发布订阅 点对点 1.在点对点模式下,消息生产者成为发送者,消息消费者成为接收者.他们通过队列来交换消息,发送者把消息发送给队列,接收者使用队列的消息,点对点的区别在于:消息只能由一个消费者使用. 发布/订阅:消息生产者(发布)将消息发布到 topic 中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到 topic 的消息会被所有订阅者消费。 Kafka 原理及应用场景1.2 kafka 中的发布/订阅消费组是一个很关键的概念. 有了消费组使 k

2、afka 比其他的消息队列要更加灵活,强大.如果每个消费者属于同一个消费群体,消费者之间的 topic 将被均衡地负载均衡;这种模式叫队列模型.相反,如果每个消费者属于不同的消费群体,则所有的消息将被消费在每个 client 中;就是所谓的“发布 - 订阅”模式。这1.3 producer 原理首先创建 ProducerRecord 对象,然后调用 KafkaProducer#send 方法进行发送数据,KafkaProducer 接收到消息后首先对其进行序列化,然后结合本地缓存的元数据信息一起发送给 partitioner 去确定目标分区,最后追加写入到内存中的消息缓冲池。此时KafkaPr

3、oducer#send 方法成功返回。KafkaProducer 中还有一个专门的 Sender IO 线程负责将缓冲池中的消息分批次发送给对应的 broker,完成真正的消息发送逻辑。Kafka 原理及应用场景1.4 producer 参数调优1)batch.size :该参数即控制一个 batch 的大小。默认是 16KB。适当调整该值可以有效提高 producer 的吞吐. 当 batch 的大小超过 batch.size 或者时间达到 linger.ms 就会发送batch,根据经验,设置为 1MB 吞吐会更高,太小的话吞吐小,太大的话导致内存浪费进而影响吞吐量2)acks : 决定高

4、吞吐还是高持久性,生产中需要先明确我们的目标是什么? 高吞吐量?高持久性?以下设置为常用选项:(1)acks=0: 设置为 0 表示 producer 不需要等待任何确认收到的信息。副本将立即加到 socket buffer 并认为已经发送。没有任何保障可以保证此种情况下 server 已经成功接收数据,同时重试配置不会发生作用(因为客户端不知道是否失败)回馈的 offset 会总是设置为-1;(2)acks=1: 这意味着至少要等待 leader 已经成功将数据写入本地 log,但是并没有等待所有 follower 是否成功写入。这种情况下,如果 follower 没有成功备份数据,而此时

5、leader 又挂掉,则消息会丢失。(3)acks=all: 这意味着 leader需要等待所有备份都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的保证。3)linger.ms 当记录产生速度大于发送速度的时候,这项设置将通过增加小的延迟来减少网络IO,节省带宽。原理就是把原本需要多次发送的小 batch,通过引入延时的方式合并成atch发送,减少了网络传输的压力,从而提升吞吐量。当然,也会引入延时。4)compression.type: producer 所使用的压缩器。压缩是在用户主线程完成的,耗 CPU,可以有效减少网络 IO。生产环境中可以结合服务器的压力测试

6、进行适当配置.5) max.in.flight.requests.per.connectionproducer 可以在一个 connection 中发送多个请求,这样可以减少开销,默认是 5。适当增加此值通常会增大吞吐量,从而整体上提升 producer的性能。另外如果开启了重试机制,配置该参数大于 1 可能造成消息发送的乱序(先发送 A,然后发送 B,但 B 却先行被 broker 接收),如果保证消息顺序性可以修改为 1.6) retries 重试机制,允许重试将潜在的改变数据的顺序,如果这两个消息记录都是发送到同一个 partition,则第一个消息失败第二个发送成功,则第二条消息会比第

7、一条消息出现要早。对于有强烈无消息丢失需求的用户来说,开启重试机制是必选项。Kafka 原理及应用场景1.5 brocker 原理1)客户端请求发送给 1 个 Accepter 线程.然后转发给多个 Processsor 线程.N 个 processsor线程负责处理实际的 socket 请求.2)然后通过请求队列发送给多个线程进行实际的 io 处理.3)logManager 负责所有的读写请求,将实例化各自的 log 实例对象。同时,的 log 线程将负责维护 partitions 与 log segment。4)replicaManager 负责该 broker 的 partition 副

8、本的管理工作5)kafkaController 负责该 broker 的 Controller 状态的管理工作1.2 brocker 参数调优1. work.threadsbroker 处理消息的最大线程数, 主要处理网络io,读写缓冲区数据,基本没有 io 等待,配置线程数量为 cpu 核数加 1.2. num.io.threads 进行磁盘 io 操作,高峰期可能有些 io 等待,因此配置需要大些。配置线程数量为 cpu 核数 2 倍,最大不超过 3 倍.3. num.replica.fetchers每个follow 从leader 拉取消息进行同步数据,该配置可以提高fol

9、lower的 I/O 并发度,单位时间内 leader 持有更多请求,相应负载会增大,需要根据机器硬件资源做权衡.4. replica.fetch.max.bytes 默认为 1MB,这个值太小,5MB 为宜,根据业务情况调整.5. replica.fetch.wait.max.msfollow 拉取频率,如果频率过高会导致 cpu 飙升,定时器超时检查比较消耗 CPU,需要做好权衡.Kafka 原理及应用场景二 hive 函数窗口函数1. FIRST_VALUE 取分组内排序后,截止到当前行,第一个值比较每个用户 pv 与第一天 pv,查询返回当前 pv 以及第一天 pv。select ui

10、d, dt, pv, first_value(pv) over (partition by gid order by dt) from tmp_pv;0006D2BC-4DF9-4C0B-83AD-0183789E78D40006D2BC-4DF9-4C0B-83AD-0183789E78D40006D2BC-4DF9-4C0B-83AD-0183789E78D40006D2BC-4DF9-4C0B-83AD-0183789E78D40006D2BC-4DF9-4C0B-83AD-0183789E78D40006D2BC-4DF9-4C0B-83AD-0183789E78D40006D2BC-

11、4DF9-4C0B-83AD-0183789E78D4993BD7AD-3B62-BA0C-15AE-A14B85921889993BD7AD-3B62-BA0C-15AE-A14B85921889993BD7AD-3B62-BA0C-15AE-A14B85921889993BD7AD-3B62-BA0C-15AE-A14B85921889993BD7AD-3B62-BA0C-15AE-A14B85921889993BD7AD-3B62-BA0C-15AE-A14B85921889993BD7AD-3B62-BA0C-15AE-A14B859218892017-10-102017-10-112

12、017-10-122017-10-132017-10-142017-10-152017-10-162017-10-102017-10-112017-10-122017-10-132017-10-142017-10-152017-10-161 15 17 13 12 14 14 12 29 23 21021 28 22 2窗口为第一行到当前行(缺失 window 子句有order by ,默认为 rows betweenunboundedpreceding and current row)。所以,first_value 返回窗口的第一行,即第一天浏览次数。2. LAST_VALUE: 取分组内排

13、序后,截止到当前行,最后一个值比较每个用户 pv 与最新一天 pv 进行比较,查询返回当前 pv 以及最新一天 pv。select gid, dt, pv, last_value(pv) over (partition by gid order by dt rows between current row and unbounded following) from tmp_pv;窗口为当前行到最后一行(rows between current row and unbounded following)。last_value返回的是窗口最后一行,即最新一天的浏览次数。0006D2BC-4DF9-4

14、C0B-83AD-0183789E78D40006D2BC-4DF9-4C0B-83AD-0183789E78D40006D2BC-4DF9-4C0B-83AD-0183789E78D40006D2BC-4DF9-4C0B-83AD-0183789E78D40006D2BC-4DF9-4C0B-83AD-0183789E78D40006D2BC-4DF9-4C0B-83AD-0183789E78D40006D2BC-4DF9-4C0B-83AD-0183789E78D4993BD7AD-3B62-BA0C-15AE-A14B859218892017-10-102017-10-112017-10

15、-122017-10-132017-10-142017-10-152017-10-162017-10-101 45 47 43 42 44 44 42 2Kafka 原理及应用场景993BD7AD-3B62-BA0C-15AE-A14B859218892017-10-119 2993BD7AD-3B62-BA0C-15AE-A14B85921889993BD7AD-3B62-BA0C-15AE-A14B85921889993BD7AD-3B62-BA0C-15AE-A14B85921889993BD7AD-3B62-BA0C-15AE-A14B85921889993BD7AD-3B62-BA0

16、C-15AE-A14B859218892017-10-122017-10-132017-10-142017-10-152017-10-163 21021 28 22 2其中:ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW 定义起点和终点,UNBOUNDED PRECEDING 为起点,表明从第一行开始, CURRENT ROW 为默认值,就是这一句等价于:ROWS UNBOUNDED PRECEDINGOVER 从句介绍1、使用标准的聚合函数 COUNT、SUM、MIN、MAX、AVG2、使用 PARTITION BY 语句,使用一个或者多个原

17、始数据类型的列3、使用 PARTITION BY 与 ORDER BY 语句,使用一个或者多个数据类型的分区或者排序列3. LEAD(col,n,DEFAULT) :用于统计窗口内往下第 n 行值。第一个参数为列名,第二个参数为往下第 n 行(可选,默认为 1),第三个参数为默认值(当往下第 n 行为 NULL 时候,取默认值,如不指定,则为 NULL)获取用户在某个页面停留的起始与结束时间:select userid, time stime, lead(time) over(partition by userid order by time) etime, urlfromtest.user_

18、log;计算用户在页面停留的时间间隔(实际分析当中,这里要做数据清洗工作,如果一个用户停留了 4、5 个小时,那这条记录肯定是不可取的。)select userid,time stime,Kafka 原理及应用场景lead(time) over(partition by userid order by time) etime,UNIX_TIMESTAMP(lead(time) over(partition by userid order bytime),yyyy-MM-dd HH:mm:ss)- UNIX_TIMESTAMP(time,yyyy-MM-dd HH:mm:ss) period,u

19、rlfrom test.user_log;4. LAG(col,n,DEFAULT) :与 lead 相反,用于统计窗口内往上第 n 行值。第一个参数为列名, 第二个参数为往上第 n 行(可选,默认为 1),第三个参数为默认值(当往上第 n 行为 NULL时候,取默认值,如不指定,则为 NULL)select id, month,pv,lag(pv,1,0) over ( order by id )from tmp;1 1month1002 2month12103 3month22124 4month31225 5month4131Hive 应用生产中我们经常把处理的逻辑 sql 写入到文件中

20、,然后用 hive f filename.sql 执行. 有时间需要动态传入一些变量信息,比如时间,表名.1.使用 env 获取当前 shell 环境的环境变量a)b)eg: export datatime=2017-11-10select * from tabliname where datatime = $env:datatime;2.使用-hivevar 方式传入a)b)hive -hivevar datatime = datatime -hivevar limit=10 -f filename.sqlselect * from tablename where datatime = $h

21、ivevar:datatime limit $hivevar:limit3.Order by 和 Sort by 的区别?a)使用 order by 会引发全局排序Kafka 原理及应用场景i.如果在 HADOOP 上进行 order by 全排序,会导致所有的数据集中在一台reducer 节点上,然后进行排序,这样很可能会超过单个节点的磁盘和内存存储能力导致任务失败。ii. iii.iv.使用 distribute 和 sort 进行分组排序select * from baofeng_click distribute by product_line sort by click desc; d

22、istribute by + sort by 就是该替代方案,被 distribute by 设定的字段为 KEY, 数据会被 HASH 分发到不同的 reducer 机器上,然后 sort by 会对同一个reducer 机器上的每组数据进行局部排序。4.为什么 hive 导入数据很快?a) 将数据存到 Hive 的数据表时,Hive 采用的是“读时模式”,意思是针对写操作不会做任何校验,只是简单的将文件复制到 Hive 的表对应的 HDFS 目录,存入数据的只是简单的文件复制和粘贴,所以导入数据速度非常的快。当读取、查询的时候,才会根据表模式来解释数据,这个时候如果遇到了不符合模式的数据,

23、Hive 会直接将数据解析成 NULL。b) 好处?i.向 Hive 表中新增数据非常的快,通常情况下我们直接用 Hadoop 命令将文件cp 到一个 HDFS 目录,Hive 就可以直接读这个目录下的数据;好处是存储在 Hive 表中的数据跟 Hive 本身没有关系,这份数据也可以被其他工具比如 Pig 来处理;实现其他的数据分析.ii.5.如果 Join 表数据量小,使用 MapJoina)如果确认用于 Join 的表数据量很小,比如只有 100MB 大小,可以使用/*+ MAPJOIN(a) */语法,这样 Hive 会先将小表分发到所有 reducer 节点的分布式缓存中并加载到内存,

24、然后进行 Join 操作,由于减少了 shuffle 操作,性能有所提升。SELECT /*+ MAPJOIN(a) */ tablea.id, FROM tablea join tableb on(tablea.id=tableb.id)b)6.NULL 和数字相加的问题a)如果有用到 sum 函数,但是发现 sum 的列中有 NULL 值,可以使用以下方法转换成 0 值:b)COALESCE(f, cast(0 AS bigint),coalesce 方返回列表中第一个不为 NULL 的字段,相当于如果第一个字段是 NULL,就第二个字段。7.Join 的实现原理s

25、elect , o.orderid from order o join user u on o.uid = u.uid;在map 的输出value 中为不同表的数据打上tag 标记,在 reduce 阶段根据tag 判断数据来源。MapReduce 的过程如下Kafka 原理及应用场景hive 执行流程1.Antlr 定义SQL 的语法规则,完成 SQL 词法,语法解析,将 SQL 转化为抽象语法树 AST Tree 2.遍历 AST Tree,抽象出查询的基本组成单元 QueryBlock3. 遍历 QueryBlock,翻译为执行操作树 OperatorTree4. 逻辑层优化

26、器进行 OperatorTree 变换,合并不必要的 ReduceSinkOperator,减少 shuffle 数据量5. 遍历 OperatorTree,翻译为 MapReduce 任务6. 物理层优化器进行 MapReduce 任务的变换,生成最终的执行计划Hive 最终生成的 MapReduce 任务,Map 阶段和 Reduce 阶段均由 OperatorTree 组成。逻辑操作符,就是在 Map 阶段或者 Reduce 阶段完成单一特定的操作。基本的操作符包括 TableScanOperator , SelectOperator , FilterOperator , JoinOperator ,G

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论