版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
SparkSQL逻辑计划原理
【导读】本文重点讲解了SparkSQL解析为AST抽象语法树、生成
UnresolvedLogicalPlan>生成ResolvedLogicalPlan以及Optimized
LogicalPlan的过程,为接下来进一步生成物理计划SparkPlan做好了准各。
Catalyst优化器是Spark引擎中非常重要的组成部分,也是近年来Spark社区
项目重点投入、并且发展十分迅速的核心模块,对于Spark任务的性能提升起
到了关键的基础作用。
我们知道,在Spark1.6之前开发人员是通过Spark的RDD编程接口来实现对大
规模数据的分析和处理的,到了Sparkl.6版本后推出了DataSetDataFrame
的编程接口,这种数据结构与RDD的主要区别在于其携带了结构化数据的
Schema信息,从而可以被SparkCatalyst用来做进一步的解析和优化;而
SparkSQL则是比DataSet和DataFrame编程接口更为简单易用的大数据领域
语言,其用户可以是开发工程师、数据科学家、数据分析师等,并且与其他
SQL语言类似,可以通过SQL引擎将SQL预先解析成一棵AST抽象语法树;同
时,AST抽象语法树、DalaSet及DalaFrame接下来均会被SparkCatalyst优
化器转换成为UnresolvedLogicalPlansResolvedLogicalPlan,Physical
Plan、以及OptimizedPhysicalPlan,也就是说带有schema信息的Spark分
布式数据集都可以从SparkCatalyst中受益,这也是Spark任务性能得以提升
的核心所在。
值得一提的是,在物理计划树的生成过程中,首先会将数据源解析成为RDD,
也即在SparkSQL的物理计划执行过程中所操作的对象实际是RDD,一条Spark
SQL在生成最终的物理计划后仍然会经过前面文查中所提到的生成DAG、划分
Stage,并将taskset分发到特定的executor上运行等一系列的任务调度和执
行过程来实现该SparkSQL的处理逻辑.
接下来,本文将着重讲解SparkSQL逻辑计划的相关实现原理,在后续的文章
中会继续解析SparkSQL的物理计划。
生成UnresolvedLogicalPlan
用户可以通过spark-sql等客户端来提交sql语句,在sparksession初始化时
通过BaseSessionStateBuilder的build()方法始化SparkSqlParser、
Analyser以及SparkOptimizcr对象等:
efbuild():SessionState=
newSessionState
session.sharedState,
conf,
experimentalMethods,
functionRegistry^^^^^^^^M
udfRcgistratiorh^^^^^^^^H
0=>
()=>ana1yzer,
()=>opl'inizor,
planner,
()=>streamingQueryManager,
1istenerManagcr,
()二)resourceLoader,^^^B
createQueryExecutiont
createClone,
columnarRules,
queryStagePrepRules)^^^^^M
当用户程序调用SparkSession的sql接口时即开始了解析sql语句并执行对数
据处理的过程:
|defsql(sqlText:String):DataFrame二withActive
|valtracker二newQueryPlanningTracker^^^^^^^^^^^^^^l
valplan=tracker.measurePhase(QucryPlanningTrackcr.PARSING)
sessionState^sqlParser.parsePlan(sqlText)
Dataset.ofRows(self,plan,tracker)
其+1通过AbstractSqlParser的parsePlan方法将sql语句转换成抽象语法
树:
verridedefparsePlan(sqlText:String):LogicalPlan=parse(sq
IText){parser二
dslBuilder.visiiSiugleSldleiiienl(paxser.siugleSldLeinenl())nidlcl
caseplan:LogicalPlan=>plan
case
valposition=Origin(None,None)
thrownewParseException(Option(sqlText),"Unsupported
SQLstatement",position,position)
1、从SqlBaseParser的singleStatement()方法开始基于ANTLR4lib库来解
析sql语句中所有的词法片段,生成一棵AST抽象语法树;
2、访问AST抽象语法树并生成Unresolved逻辑计划树:
1)访问SingleStatementContext节点:
SingleStatementContext是整个抽象语法树的根节点,因此以AstBuilder的
visitSingleStatement方法为入口来遍历抽象语法树:
verridedefvisitSingleStatement(ctx:SingleStatementContext):I
visit(ctx.statcncnt).asInstanccOf[LogicalPlan]
■
)ublicTvisit(ParseTreetree)
returntree.accept(this):
2)根据访问者模式执行SingleStatementContext节点的accept方法:
Override
>ublic<T>Taccept(ParseTreeVisitor<?extendsT>visitor)
if(visitorinstanceofSqlBaseVisitor)return((SqlBas
Wisitor<?extendsT>)visitor).visitSingleStatement(this);
■
erridcpublicvisitSingleStatementISqlBaseParser.SingleStatcr
ntContextctx){returnvisitChildren(ctx);}
3)迭代遍历整棵ASTTree:
Override
intn二node.getChildCount()
(!shouldVisitNextChiId(node,result))
ParseTreec二node.getChild:i)
TchildResult二c.accept(lh:s)
result=aggregateResult(result,chiIdResult);
returnresult;
根据以上代码,在遍历AST树的过程中,会首先解析父节点的所有子节点,并
执行子节点上的accept方法来进行解析,当所有子节点均解析为
UnresolvedRelation或者Expression后,将这些结果进行聚合并返回到父节
点,由此可见,AST树的遍历所采用的是后序遍历模式。
接下来以查询语句中的QuerySpccificationContext节点的解析为例进一步阐
述以上过程:
如下为一条基本的sql语句:
electcol1fromtabnamewhereco!2>1(
QuerySpecificationContext节点下会产生用于扫描数据源的
FromClauseContext过滤条件对应的BooleanDefaultContexts以及投影时所
需的NamcdExprcssionScqContext节点。
1)FromClauseContext继续访问其子节点,当访问到TablelNameContext节点
时,访问到tableName的tocken时根据表名生成UnresolvedRelation:
overridedefvisitTableName(ctx:TableNameContext):LogicalPlan|
二withOrigin(ctx)
valtableld二visitMultipartldentif:er(ctx.multipartldcntifier|
valtable二nayApplyAliasPlan(ctx.tableAlias,UnresolvedRelat|
2)BooleanDefaultContext的子节点中分为三个分支:代表Reference的
ValueExpressionDefaultContext>代表数值的
ValueExpressionDefaultContext以及代表运算符的ComparisonContext;
例如遍历代表数据值ValueExpressionDefaultContext及其子节点,直到访问
至ijIntcgerLiteralContext:
verridedefvisitlntegerLiteral(ctx:IntegerLiteralContext):Lit|
ral二withOrigin:ctx)
BigDccimal(ctx.gctTcxt)match
casevifv.isValidTnt二
Literal:v.intValue)
casevifv.isValidLong二〉
Literal;v.
casev=>Literal(v.underlying(J)
而Literal的定义如下,是一个叶子类型的Expression节点:
aseclassLiteral(value:Any,dataType:DataType)extends1
cafExpression
3)NamedExpressionSeqContext是投影节点,迭代遍历直到
Regu1arQuerySpecificationContext节点,然后通过访问
withSelectQuerySpecification方法创建出投影所需的ProjectLogical
Plan:
verridedefvisitRegularQuerySpecification
ctx:RegularQuerySpecificationContext):LogicalPlan=wit|
Origin(ctx)
valfromOneRowRelation().optiona'(ctx.fromClause)
visitFromClause(ctx.fromClause)
withSclectQuerySpecification
ctx.lateralView,
ctx.\vhereClause,^^^^B
ctx.aggregationsause,・
ctx.havingC:ause,^^^^|
efcreateProjeclC二if(namedExpressions.nonEmply)
Project(namedExpressions,withFi
总结一下以上处理过程中所涉及的类之间的关系,如下图所示:
类图
生成ResolvedLogicalPlan
SparkAnalyser
在SparkSession的sql方法中,对sql语句进行过Parser解析并生成
UnresolvedLogicalPlan之后则通过执行Dataset.ofRows(self,plan,
tracker)继续进行catalog绑定,数据源绑定的过程如下:
|defofRows(sparkSession:SparkSession,logicalPlan:LogicalPlan,trackt
卜:QueryP1anningTracker)
|:DataFrame二sparkSession.wiIhAclive
|\,1q('二n「v;Query「x(culi()n(spnrkScssi()n,I()gica]PIan,li7i(kr)
,il-l,-.1■,'::■■/,:।'.I;IJ,q,■.II,I-.....-I.'II:
■
defassertAnalyzedO:Unit=analyzed
由如下实现逻辑可见,analyzed变量是通过懒加载方式初始化的,通过该变
量的初始方法可见Spark的catalog实现逻辑主要通过Analyser类来实现的:
lazyvalanalyzed:LogicalPlan=executePhase(QueryPlanningTracker.ANAI
YS1S)
sparkSession.sessionState.analyzer.executeAndCheck(logical,tracker)
其中,executeAndCheck方法的执行是通过Analyzer的父类RuleExecutor的
execute方法来实现的:
efexecute(plan:TreeType):TreeType=
■
valbatchStartPlan=curPlan|
variteration二
varcontinue=true
curPlan=batch,rules.foldLeft(curPlan)
case(plan,rule)=.
valstartTime二System.nanoTime
valresult二rule(plan)
valrunTime=System.nanoTi【ne()-startTime|
va1effectivo=!rosult.fastEqu<i1s(p1an)H
queryExecutionMetries.incNumEffectiveExecution(rule.ruleName),
quer^^ExecutionMetrics.incTimeEffectiveExecutionB)y(rule.ruleNam
,runTime)
「)I⑴(h,ng(、l」)lg(T.I(以1八11。(「"k・门ih\,imc,pkn.rm::
resul
iteration+=
if(iteration>ba二ch.strategy,maxIterations)
valendingMsg=if(batch,strategy.maxIterationsSetting==nul1)
}else
s”,pleaseset'${batch,strategy.maxIterationsSetting}'toalarger]
valmessage=s"Maxiterations(${iteration-1)reachedforbatch$
)atch.name)H
if(Utils.isTestingbatch,strategy.errorOnExceed)
IhrownewTreeNodeExceplion(curPlan,message,
1ogWarning(message)
if(batch,strategy二二Once
Uli】s.isTesling&&!blacklisledOnceBalches.contains(batch,name))
continue=fals
if(curPlan.fastEquals(lastPlan))
logTrace
s“Fixedpointreachedforbatch${batch,name}after${iteration•-1)i
continue=fals
lastPlan=curPlad
planChangeLogger.logBatch(batch,name,batchStartPlan,curPlan)
planChangeLogger.logMetrics(RuleExecutor.getCurrentMetrics()-befor
Metrics)
curPlan
如上代码的主要处理过程如下:
1、遍历的Analyzer类中的batches列表:
通过batches方法获取所有的catalog绑定相关的规则,在Analyzer中包括
Substitution、Hints、Resolution、UDF、Subquery等几个规则组;
以较为常见的"Resolution”规则组为例,其具有非常多的规则用于解析函数、
Namespace,数据表、视图、列等信息,当然用户也可以子定义相关规贝U:
ResolvcTableValuedFunctions:
ResolveNamespace(catal.ogManager)::■
newResolveCatalogs(catalogManager)::
ResolveRelations::
RescdveTables::
ResolveReferences::
ResolveCreateNamedStruct:
ResolvcDeserializer:
ResolveNewInstance:
ResolveUpCast::
ResolveGroupingAnalytics:
ResolvePivot::
ResolveOrdinannOrderByAndGroupBy::,
ResolveAggAliasInGroupBy:
其中,Batch类的定义如下,包括Batch名称、循环执行策略、具体的规则组
集合,循环执行策略Strategy又分为Once和FixedPoint两种,即仅执行一次
和固定次数:
rotectedcaseclassBatch(name:String,strategy:Strategy,rules:Rule
2、将每个Batch中所有的规则Rule对象实施于该UnsolvedLogicalPlan,并
且该Batch中规则可能要执行多轮,直到执行的批数等于
batch,strategy,maxIterations或者logicalplan与上个批次的结果比没有变
化,则退出执行;
其中在Spark中的定义如下,在spark3.0中默认可最大循环100次:
conf,analyzerYax11erations,
errorOnExceed二
maxIterationsSetting=SQLConf.ANALYZER_MAX_ITERATIONS.key
■
|.interna1()
|.::,‘’―…Il」.III,,.「,」「i:」::」,!.「「「「TII:”:■■■♦♦■♦♦■;
.creatcWithDefault(100)
接下来以将ResolveRelations(解析数据表或者视图)规则应用于Unresolved
LogicalPlan的解析过程为例,支持解析UnresolvedRelation>
UnresolvedTable^Unreso1vcdTab1eOrView等多种未解析的数据源:
|defapply(plan:LogicalPlan):LogicalPlan:RBSolveTempViews(plan).reso|
lookupRelation(u.multipartldentifier).map(resolveViews).getOrElse(u
caseu@UnresolvedTable(identifier)二
u.fai1Analysis(s"${v.identifier.quoted}isaviewnottable.")
casetable=>table
}.getOrElse(u)
caseu@UnresolvedTab1eOrView(identifier)=
1ookupTab1eOrView(identifier).getOrElse(u)
当解析对象为UnresolvedRelation实例时,调生lookupRelation方法来对其
进行解析,通过SessionCatalog或者扩展的CatalogPlugin来获取数据源的元
数据,并生成ResolvedLogicalPlan:
rivatedeflookupRelation(identifier:Seq[String]):Option[LogicalPlan|
expandRelationName[identifier)match
caseSessionCatalogAndldentifier(catalog,ident)二
lazyvalloaded=CatalogV2Util.loadTable(catalog,ident).map
vlSessionCatalog.getRelation(vlTable.
casetable
SubqueryAlias
catalog,name+:ident.asMultipartldcntificr,^^^^^^^^^^^^!
DataSourceV2Relation.create(table,Some(catalog),Some(ident)))
最常见的是SessionCatalog,作为SparkSession级别catalog接口对象,其
定义如下,包括Externalcatalog、G1obalTempViewManager
FunctionRegistrySQLConf、HadoopConfiguration>Parser.
FunctionResourceLoader对象;其中,Externalcatalog有两个主要的实现
类:HiveExternalCatalog和InMcmoryCatalog,而HiveExternalCatalog则主
要应用于企业级的业务场景中:
LassSessionCatalog
globalTcmpViewManagerBuilder:()二)GlobalTcmpViowManager,
functionRegistry:
hadoopConf:Config」ration,
parser:ParserInterface,
>i:l|.''i1.Ir).::,1:i,^•:-','.'':
如果采用默认的SessionCatalog,当需要获取数据表时则通过
Externalcatalog实例调用其对应的接口来实现:
werridedefloadTableGdent:Identifier):Table={■
va]catalogTable二try
catalog.gctTableMctadata(idcnt.asTableldentifier)
、L,「「'•1」:常
D
VlTable(catalogTable)
■
lefgetTableMetadata(name:TableTdentifier):CatalogTable
yaldb=formatDatabaseName(name,database.getOrElse(getCurrentDatabas
valtable二formatTableName(name.
requireDbExists(db)
requireTableExists(Tableldentifier(table,Some(db)))
接下来如果采用Externalcatalog接口的实现类HiveExternalCatalog的情况
下,则通过HiveClientlmpl类从Hive的metadata中类获取用户表的元数据相
关信息:
privatedefgetRawTableOption(dbName:String,tableName:String):Option]
Option(client.getTable(dbName,tableName,false*don。Ilhnwoxcepli(
另外,如需扩展的catalog范围可通过实现CatalogPlugin接口、并且配置
uspark,sql.catalog.spark_catalogv参数来实现,例如在iceberg数据湖的
实现中通过自定义其catalog来实现其个性化的缨辑:
|spark.sql.catalog.spark_catalog
prg.apache,iceberg,spark.SparkSessionCatalog
3、返回解析后的ResolvedLogicalPlan©
以上处理逻辑中所涉及的主要的类之间的关系如下所示:
接下来仍然以前面的SQL语句(selectcollfromtabnamewhereco12>
10)为例,简要阐述如何将一个UnresolvedLogicalPlan解析成为Analyzed
LogicalPlan:
1、根据Analyzer的解析规则,UnResolvedRelalion节点可以应用到
ResolveRelations规则,通过CatalogManger获取数据源中表的信息,得到
Relation的相关列的信息并加上标号,同时创建一个针对数据表的
SubqucryAlias节点;
2、针对过滤条件col2>10的过滤条件,针对列UnresolvedAttribute可以适
用到ResolveReference规则,根据第1步中得到的列信息可以进行解析;数字
10可以应用到ImplicitTypcCasts规则对该数字匹配最合适的数据类型;
3、针对Project节点,接下来在进行下一轮解析,再次匹配到
ResolveReference规则对投影列进行解析,从而将整棵树解析为Resolved
LogicalPlan0
生成OptimizedLogicalPlan
得到ResolvedLogicalPlan之后,为了使SQL语句的执行性能更优,则需要根
据一些规则进一步优化逻辑计划树,生成OptimizedLogicalPlan。
本文采用的是Spark3.0的源码,生成OptimizedLogicalPlan是通过懒加载
的方式被调用的,并且Optimizer类与Analyzer类一样继承了RuleExecutor
类,所有基于规则(RB0)的优化实际都是通过RuleExecutor类来执行,同样也
是将所有规则构建为多个批次,并且将所有批次中规则应用于Analyzed
LogicalPlan,直到树不再改变或者执行优化的循环次数超过最大限制
(spark.sql.optimizer,maxIterations,默认100):
11azyvaloptimizedPlan:LogicalPlan二executePhase(QueryPlanningTracke
optimizingandplanning.
|valplan=sparkSession.sessionState.optimizer.executcAndTrack(withCaj
thedData.clone。,tracker)
■
|defexecuteAndTrack(plan:TrueType,tracker:QueryPlanningTracker):Tr
|QueryPlanningTracker.withTracker(tracker)
|execute(plan)
逻辑计划优化规则仍然又多个Batch组成,每个Balch中包含多个具体的Rule
并且可以执行一次或者固定次数。其中比较常用的优化规则有:谓词下推、常
量累加、列剪枝等几种。
谓词下推将尽可能使得谓词计算靠近数据源,根据不同的场景有
LimitPushDown、PushProjectionThroughUnionPushDownPredicates等多种
实现,PushDownPredicates又包含PushPredicatcThroughNonJoin和
PushPredicateThroughJoin;
其中,PushPredicateThroughJoin可实现将谓词计算下推至join算子的下
面,从而可以提升数据表之间的join计算过程中所带来的网络、内存以及10
等性能开销:
alapplyLocally:PartialFunction[LogicalPlan,LogicalPlan]:
|casef@Filter(filterCondition,Join(lefttright,joinType,joinConditi|
|on,hint)
|val(leftFi1terConditions,rightFi1terConditions,commonFi1terConditi
split(splitConjur.ctivePredicates(fi1terCondition),left,right)Kn
joinTypematch
case_:TnnerLike二
valncwLeft=1eftFilterConditions.
rcduceLcftOption(And).
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 公共交通车辆更新淘汰制度
- 2026年永修县总医院面向社会公开招聘工作人员备考题库及答案详解一套
- 2026年数据通信科学技术研究所招聘备考题库及参考答案详解一套
- 2026年西安高新一中沣东中学招聘备考题库带答案详解
- 2026年杭州市丁蕙第二小学编外人员招聘备考题库完整参考答案详解
- 企业员工绩效考核评价制度
- 2026年用友数智化应用工程师招聘备考题库附答案详解
- 大理护理职业学院关于招募2026年春季学期职业教育银龄教师的备考题库附答案详解
- 企业员工培训与考核评估制度
- 企业内部审计制度
- (正式版)新建标 001-2019 《自治区农村安居工程建设标准》
- 禁毒社工知识培训课件
- 家具展厅管理方案(3篇)
- 半成品摆放管理办法
- 周围性瘫痪的护理常规
- 电能质量技术监督培训课件
- 电子制造行业数字化转型白皮书
- 肿瘤患者双向转诊管理职责
- 福建省漳州市2024-2025学年高一上学期期末教学质量检测历史试卷(含答案)
- 管道穿越高速桥梁施工方案
- 2024版《中医基础理论经络》课件完整版
评论
0/150
提交评论