Spark SQL逻辑计划原理_第1页
Spark SQL逻辑计划原理_第2页
Spark SQL逻辑计划原理_第3页
Spark SQL逻辑计划原理_第4页
Spark SQL逻辑计划原理_第5页
已阅读5页,还剩22页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

SparkSQL逻辑计划原理

【导读】本文重点讲解了SparkSQL解析为AST抽象语法树、生成

UnresolvedLogicalPlan>生成ResolvedLogicalPlan以及Optimized

LogicalPlan的过程,为接下来进一步生成物理计划SparkPlan做好了准各。

Catalyst优化器是Spark引擎中非常重要的组成部分,也是近年来Spark社区

项目重点投入、并且发展十分迅速的核心模块,对于Spark任务的性能提升起

到了关键的基础作用。

我们知道,在Spark1.6之前开发人员是通过Spark的RDD编程接口来实现对大

规模数据的分析和处理的,到了Sparkl.6版本后推出了DataSetDataFrame

的编程接口,这种数据结构与RDD的主要区别在于其携带了结构化数据的

Schema信息,从而可以被SparkCatalyst用来做进一步的解析和优化;而

SparkSQL则是比DataSet和DataFrame编程接口更为简单易用的大数据领域

语言,其用户可以是开发工程师、数据科学家、数据分析师等,并且与其他

SQL语言类似,可以通过SQL引擎将SQL预先解析成一棵AST抽象语法树;同

时,AST抽象语法树、DalaSet及DalaFrame接下来均会被SparkCatalyst优

化器转换成为UnresolvedLogicalPlansResolvedLogicalPlan,Physical

Plan、以及OptimizedPhysicalPlan,也就是说带有schema信息的Spark分

布式数据集都可以从SparkCatalyst中受益,这也是Spark任务性能得以提升

的核心所在。

值得一提的是,在物理计划树的生成过程中,首先会将数据源解析成为RDD,

也即在SparkSQL的物理计划执行过程中所操作的对象实际是RDD,一条Spark

SQL在生成最终的物理计划后仍然会经过前面文查中所提到的生成DAG、划分

Stage,并将taskset分发到特定的executor上运行等一系列的任务调度和执

行过程来实现该SparkSQL的处理逻辑.

接下来,本文将着重讲解SparkSQL逻辑计划的相关实现原理,在后续的文章

中会继续解析SparkSQL的物理计划。

生成UnresolvedLogicalPlan

用户可以通过spark-sql等客户端来提交sql语句,在sparksession初始化时

通过BaseSessionStateBuilder的build()方法始化SparkSqlParser、

Analyser以及SparkOptimizcr对象等:

efbuild():SessionState=

newSessionState

session.sharedState,

conf,

experimentalMethods,

functionRegistry^^^^^^^^M

udfRcgistratiorh^^^^^^^^H

0=>

()=>ana1yzer,

()=>opl'inizor,

planner,

()=>streamingQueryManager,

1istenerManagcr,

()二)resourceLoader,^^^B

createQueryExecutiont

createClone,

columnarRules,

queryStagePrepRules)^^^^^M

当用户程序调用SparkSession的sql接口时即开始了解析sql语句并执行对数

据处理的过程:

|defsql(sqlText:String):DataFrame二withActive

|valtracker二newQueryPlanningTracker^^^^^^^^^^^^^^l

valplan=tracker.measurePhase(QucryPlanningTrackcr.PARSING)

sessionState^sqlParser.parsePlan(sqlText)

Dataset.ofRows(self,plan,tracker)

其+1通过AbstractSqlParser的parsePlan方法将sql语句转换成抽象语法

树:

verridedefparsePlan(sqlText:String):LogicalPlan=parse(sq

IText){parser二

dslBuilder.visiiSiugleSldleiiienl(paxser.siugleSldLeinenl())nidlcl

caseplan:LogicalPlan=>plan

case

valposition=Origin(None,None)

thrownewParseException(Option(sqlText),"Unsupported

SQLstatement",position,position)

1、从SqlBaseParser的singleStatement()方法开始基于ANTLR4lib库来解

析sql语句中所有的词法片段,生成一棵AST抽象语法树;

2、访问AST抽象语法树并生成Unresolved逻辑计划树:

1)访问SingleStatementContext节点:

SingleStatementContext是整个抽象语法树的根节点,因此以AstBuilder的

visitSingleStatement方法为入口来遍历抽象语法树:

verridedefvisitSingleStatement(ctx:SingleStatementContext):I

visit(ctx.statcncnt).asInstanccOf[LogicalPlan]

)ublicTvisit(ParseTreetree)

returntree.accept(this):

2)根据访问者模式执行SingleStatementContext节点的accept方法:

Override

>ublic<T>Taccept(ParseTreeVisitor<?extendsT>visitor)

if(visitorinstanceofSqlBaseVisitor)return((SqlBas

Wisitor<?extendsT>)visitor).visitSingleStatement(this);

erridcpublicvisitSingleStatementISqlBaseParser.SingleStatcr

ntContextctx){returnvisitChildren(ctx);}

3)迭代遍历整棵ASTTree:

Override

intn二node.getChildCount()

(!shouldVisitNextChiId(node,result))

ParseTreec二node.getChild:i)

TchildResult二c.accept(lh:s)

result=aggregateResult(result,chiIdResult);

returnresult;

根据以上代码,在遍历AST树的过程中,会首先解析父节点的所有子节点,并

执行子节点上的accept方法来进行解析,当所有子节点均解析为

UnresolvedRelation或者Expression后,将这些结果进行聚合并返回到父节

点,由此可见,AST树的遍历所采用的是后序遍历模式。

接下来以查询语句中的QuerySpccificationContext节点的解析为例进一步阐

述以上过程:

如下为一条基本的sql语句:

electcol1fromtabnamewhereco!2>1(

QuerySpecificationContext节点下会产生用于扫描数据源的

FromClauseContext过滤条件对应的BooleanDefaultContexts以及投影时所

需的NamcdExprcssionScqContext节点。

1)FromClauseContext继续访问其子节点,当访问到TablelNameContext节点

时,访问到tableName的tocken时根据表名生成UnresolvedRelation:

overridedefvisitTableName(ctx:TableNameContext):LogicalPlan|

二withOrigin(ctx)

valtableld二visitMultipartldentif:er(ctx.multipartldcntifier|

valtable二nayApplyAliasPlan(ctx.tableAlias,UnresolvedRelat|

2)BooleanDefaultContext的子节点中分为三个分支:代表Reference的

ValueExpressionDefaultContext>代表数值的

ValueExpressionDefaultContext以及代表运算符的ComparisonContext;

例如遍历代表数据值ValueExpressionDefaultContext及其子节点,直到访问

至ijIntcgerLiteralContext:

verridedefvisitlntegerLiteral(ctx:IntegerLiteralContext):Lit|

ral二withOrigin:ctx)

BigDccimal(ctx.gctTcxt)match

casevifv.isValidTnt二

Literal:v.intValue)

casevifv.isValidLong二〉

Literal;v.

casev=>Literal(v.underlying(J)

而Literal的定义如下,是一个叶子类型的Expression节点:

aseclassLiteral(value:Any,dataType:DataType)extends1

cafExpression

3)NamedExpressionSeqContext是投影节点,迭代遍历直到

Regu1arQuerySpecificationContext节点,然后通过访问

withSelectQuerySpecification方法创建出投影所需的ProjectLogical

Plan:

verridedefvisitRegularQuerySpecification

ctx:RegularQuerySpecificationContext):LogicalPlan=wit|

Origin(ctx)

valfromOneRowRelation().optiona'(ctx.fromClause)

visitFromClause(ctx.fromClause)

withSclectQuerySpecification

ctx.lateralView,

ctx.\vhereClause,^^^^B

ctx.aggregationsause,・

ctx.havingC:ause,^^^^|

efcreateProjeclC二if(namedExpressions.nonEmply)

Project(namedExpressions,withFi

总结一下以上处理过程中所涉及的类之间的关系,如下图所示:

类图

生成ResolvedLogicalPlan

SparkAnalyser

在SparkSession的sql方法中,对sql语句进行过Parser解析并生成

UnresolvedLogicalPlan之后则通过执行Dataset.ofRows(self,plan,

tracker)继续进行catalog绑定,数据源绑定的过程如下:

|defofRows(sparkSession:SparkSession,logicalPlan:LogicalPlan,trackt

卜:QueryP1anningTracker)

|:DataFrame二sparkSession.wiIhAclive

|\,1q('二n「v;Query「x(culi()n(spnrkScssi()n,I()gica]PIan,li7i(kr)

,il-l,-.1■,'::■■/,:।'.I;IJ,q,■.II,I-.....-I.'II:

defassertAnalyzedO:Unit=analyzed

由如下实现逻辑可见,analyzed变量是通过懒加载方式初始化的,通过该变

量的初始方法可见Spark的catalog实现逻辑主要通过Analyser类来实现的:

lazyvalanalyzed:LogicalPlan=executePhase(QueryPlanningTracker.ANAI

YS1S)

sparkSession.sessionState.analyzer.executeAndCheck(logical,tracker)

其中,executeAndCheck方法的执行是通过Analyzer的父类RuleExecutor的

execute方法来实现的:

efexecute(plan:TreeType):TreeType=

valbatchStartPlan=curPlan|

variteration二

varcontinue=true

curPlan=batch,rules.foldLeft(curPlan)

case(plan,rule)=.

valstartTime二System.nanoTime

valresult二rule(plan)

valrunTime=System.nanoTi【ne()-startTime|

va1effectivo=!rosult.fastEqu<i1s(p1an)H

queryExecutionMetries.incNumEffectiveExecution(rule.ruleName),

quer^^ExecutionMetrics.incTimeEffectiveExecutionB)y(rule.ruleNam

,runTime)

「)I⑴(h,ng(、l」)lg(T.I(以1八11。(「"k・门ih\,imc,pkn.rm::

resul

iteration+=

if(iteration>ba二ch.strategy,maxIterations)

valendingMsg=if(batch,strategy.maxIterationsSetting==nul1)

}else

s”,pleaseset'${batch,strategy.maxIterationsSetting}'toalarger]

valmessage=s"Maxiterations(${iteration-1)reachedforbatch$

)atch.name)H

if(Utils.isTestingbatch,strategy.errorOnExceed)

IhrownewTreeNodeExceplion(curPlan,message,

1ogWarning(message)

if(batch,strategy二二Once

Uli】s.isTesling&&!blacklisledOnceBalches.contains(batch,name))

continue=fals

if(curPlan.fastEquals(lastPlan))

logTrace

s“Fixedpointreachedforbatch${batch,name}after${iteration•-1)i

continue=fals

lastPlan=curPlad

planChangeLogger.logBatch(batch,name,batchStartPlan,curPlan)

planChangeLogger.logMetrics(RuleExecutor.getCurrentMetrics()-befor

Metrics)

curPlan

如上代码的主要处理过程如下:

1、遍历的Analyzer类中的batches列表:

通过batches方法获取所有的catalog绑定相关的规则,在Analyzer中包括

Substitution、Hints、Resolution、UDF、Subquery等几个规则组;

以较为常见的"Resolution”规则组为例,其具有非常多的规则用于解析函数、

Namespace,数据表、视图、列等信息,当然用户也可以子定义相关规贝U:

ResolvcTableValuedFunctions:

ResolveNamespace(catal.ogManager)::■

newResolveCatalogs(catalogManager)::

ResolveRelations::

RescdveTables::

ResolveReferences::

ResolveCreateNamedStruct:

ResolvcDeserializer:

ResolveNewInstance:

ResolveUpCast::

ResolveGroupingAnalytics:

ResolvePivot::

ResolveOrdinannOrderByAndGroupBy::,

ResolveAggAliasInGroupBy:

其中,Batch类的定义如下,包括Batch名称、循环执行策略、具体的规则组

集合,循环执行策略Strategy又分为Once和FixedPoint两种,即仅执行一次

和固定次数:

rotectedcaseclassBatch(name:String,strategy:Strategy,rules:Rule

2、将每个Batch中所有的规则Rule对象实施于该UnsolvedLogicalPlan,并

且该Batch中规则可能要执行多轮,直到执行的批数等于

batch,strategy,maxIterations或者logicalplan与上个批次的结果比没有变

化,则退出执行;

其中在Spark中的定义如下,在spark3.0中默认可最大循环100次:

conf,analyzerYax11erations,

errorOnExceed二

maxIterationsSetting=SQLConf.ANALYZER_MAX_ITERATIONS.key

|.interna1()

|.::,‘’―…Il」.III,,.「,」「i:」::」,!.「「「「TII:”:■■■♦♦■♦♦■;

.creatcWithDefault(100)

接下来以将ResolveRelations(解析数据表或者视图)规则应用于Unresolved

LogicalPlan的解析过程为例,支持解析UnresolvedRelation>

UnresolvedTable^Unreso1vcdTab1eOrView等多种未解析的数据源:

|defapply(plan:LogicalPlan):LogicalPlan:RBSolveTempViews(plan).reso|

lookupRelation(u.multipartldentifier).map(resolveViews).getOrElse(u

caseu@UnresolvedTable(identifier)二

u.fai1Analysis(s"${v.identifier.quoted}isaviewnottable.")

casetable=>table

}.getOrElse(u)

caseu@UnresolvedTab1eOrView(identifier)=

1ookupTab1eOrView(identifier).getOrElse(u)

当解析对象为UnresolvedRelation实例时,调生lookupRelation方法来对其

进行解析,通过SessionCatalog或者扩展的CatalogPlugin来获取数据源的元

数据,并生成ResolvedLogicalPlan:

rivatedeflookupRelation(identifier:Seq[String]):Option[LogicalPlan|

expandRelationName[identifier)match

caseSessionCatalogAndldentifier(catalog,ident)二

lazyvalloaded=CatalogV2Util.loadTable(catalog,ident).map

vlSessionCatalog.getRelation(vlTable.

casetable

SubqueryAlias

catalog,name+:ident.asMultipartldcntificr,^^^^^^^^^^^^!

DataSourceV2Relation.create(table,Some(catalog),Some(ident)))

最常见的是SessionCatalog,作为SparkSession级别catalog接口对象,其

定义如下,包括Externalcatalog、G1obalTempViewManager

FunctionRegistrySQLConf、HadoopConfiguration>Parser.

FunctionResourceLoader对象;其中,Externalcatalog有两个主要的实现

类:HiveExternalCatalog和InMcmoryCatalog,而HiveExternalCatalog则主

要应用于企业级的业务场景中:

LassSessionCatalog

globalTcmpViewManagerBuilder:()二)GlobalTcmpViowManager,

functionRegistry:

hadoopConf:Config」ration,

parser:ParserInterface,

>i:l|.''i1.Ir).::,1:i,^•:-','.'':

如果采用默认的SessionCatalog,当需要获取数据表时则通过

Externalcatalog实例调用其对应的接口来实现:

werridedefloadTableGdent:Identifier):Table={■

va]catalogTable二try

catalog.gctTableMctadata(idcnt.asTableldentifier)

、L,「「'•1」:常

D

VlTable(catalogTable)

lefgetTableMetadata(name:TableTdentifier):CatalogTable

yaldb=formatDatabaseName(name,database.getOrElse(getCurrentDatabas

valtable二formatTableName(name.

requireDbExists(db)

requireTableExists(Tableldentifier(table,Some(db)))

接下来如果采用Externalcatalog接口的实现类HiveExternalCatalog的情况

下,则通过HiveClientlmpl类从Hive的metadata中类获取用户表的元数据相

关信息:

privatedefgetRawTableOption(dbName:String,tableName:String):Option]

Option(client.getTable(dbName,tableName,false*don。Ilhnwoxcepli(

另外,如需扩展的catalog范围可通过实现CatalogPlugin接口、并且配置

uspark,sql.catalog.spark_catalogv参数来实现,例如在iceberg数据湖的

实现中通过自定义其catalog来实现其个性化的缨辑:

|spark.sql.catalog.spark_catalog

prg.apache,iceberg,spark.SparkSessionCatalog

3、返回解析后的ResolvedLogicalPlan©

以上处理逻辑中所涉及的主要的类之间的关系如下所示:

接下来仍然以前面的SQL语句(selectcollfromtabnamewhereco12>

10)为例,简要阐述如何将一个UnresolvedLogicalPlan解析成为Analyzed

LogicalPlan:

1、根据Analyzer的解析规则,UnResolvedRelalion节点可以应用到

ResolveRelations规则,通过CatalogManger获取数据源中表的信息,得到

Relation的相关列的信息并加上标号,同时创建一个针对数据表的

SubqucryAlias节点;

2、针对过滤条件col2>10的过滤条件,针对列UnresolvedAttribute可以适

用到ResolveReference规则,根据第1步中得到的列信息可以进行解析;数字

10可以应用到ImplicitTypcCasts规则对该数字匹配最合适的数据类型;

3、针对Project节点,接下来在进行下一轮解析,再次匹配到

ResolveReference规则对投影列进行解析,从而将整棵树解析为Resolved

LogicalPlan0

生成OptimizedLogicalPlan

得到ResolvedLogicalPlan之后,为了使SQL语句的执行性能更优,则需要根

据一些规则进一步优化逻辑计划树,生成OptimizedLogicalPlan。

本文采用的是Spark3.0的源码,生成OptimizedLogicalPlan是通过懒加载

的方式被调用的,并且Optimizer类与Analyzer类一样继承了RuleExecutor

类,所有基于规则(RB0)的优化实际都是通过RuleExecutor类来执行,同样也

是将所有规则构建为多个批次,并且将所有批次中规则应用于Analyzed

LogicalPlan,直到树不再改变或者执行优化的循环次数超过最大限制

(spark.sql.optimizer,maxIterations,默认100):

11azyvaloptimizedPlan:LogicalPlan二executePhase(QueryPlanningTracke

optimizingandplanning.

|valplan=sparkSession.sessionState.optimizer.executcAndTrack(withCaj

thedData.clone。,tracker)

|defexecuteAndTrack(plan:TrueType,tracker:QueryPlanningTracker):Tr

|QueryPlanningTracker.withTracker(tracker)

|execute(plan)

逻辑计划优化规则仍然又多个Batch组成,每个Balch中包含多个具体的Rule

并且可以执行一次或者固定次数。其中比较常用的优化规则有:谓词下推、常

量累加、列剪枝等几种。

谓词下推将尽可能使得谓词计算靠近数据源,根据不同的场景有

LimitPushDown、PushProjectionThroughUnionPushDownPredicates等多种

实现,PushDownPredicates又包含PushPredicatcThroughNonJoin和

PushPredicateThroughJoin;

其中,PushPredicateThroughJoin可实现将谓词计算下推至join算子的下

面,从而可以提升数据表之间的join计算过程中所带来的网络、内存以及10

等性能开销:

alapplyLocally:PartialFunction[LogicalPlan,LogicalPlan]:

|casef@Filter(filterCondition,Join(lefttright,joinType,joinConditi|

|on,hint)

|val(leftFi1terConditions,rightFi1terConditions,commonFi1terConditi

split(splitConjur.ctivePredicates(fi1terCondition),left,right)Kn

joinTypematch

case_:TnnerLike二

valncwLeft=1eftFilterConditions.

rcduceLcftOption(And).

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论