范文臣-Databricks-软件工程师-Spark SQL A Complier from queries to RDD programs_第1页
范文臣-Databricks-软件工程师-Spark SQL A Complier from queries to RDD programs_第2页
范文臣-Databricks-软件工程师-Spark SQL A Complier from queries to RDD programs_第3页
范文臣-Databricks-软件工程师-Spark SQL A Complier from queries to RDD programs_第4页
范文臣-Databricks-软件工程师-Spark SQL A Complier from queries to RDD programs_第5页
已阅读5页,还剩122页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

SparkSQL:

AcompilerfromqueriestoRDDs

WenchenFan

SDCC2016

2

Agenda

•WhySparkSQL?

•TheFrontend:Catalyst

•TheBackend

•TheTungstenProject

•Benchmark

•What’snext

3

Background:WhatisanRDD?

•Dependencies

•Partitions

•Computefunction:Partition=>Iterator[T]

4

Background:WhatisanRDD?

•Dependencies

•Partitions

•Computefunction:Partition=>Iterator[T]

OpaqueComputation

5

Background:WhatisanRDD?

•Dependencies

•Partitions

•Computefunction:Partition=>Iterator[T]

OpaqueData

6

RDDProgrammingModel

ConstructexecutionDAGusinglowlevelRDDoperators.

7

SparkSQLCometoRescue

•Moreefficient:Onlyprocessstructuraldata,thislimitswhatcanbeexpressedbutenablesoptimization.

8

SparkSQLCometoRescue

•Moreefficient:Onlyprocessstructuraldata,thislimitswhatcanbeexpressedbutenablesoptimization.

•High-levelAPI:SQL,DataFrame/Dataset

9

Writelesscode

10

NotJustLessCode,FasterToo!

DataFrameSQL DataFrameRDataFramePython DataFrameScalaRDDPython

RDDScala

0246810

TimetoAggregate10millionintpairs(secs)

11

Thenot-so-secrettruth...

SQL

isaboutmorethanSQL.

CostModel

UnresolvedLogicalPlan

LogicalPlan

Selected

Physical

Plan

DataFrame

RDDs

Physical

Plans

Catalog

SparkSQLOverview

SQLAST

PhysicalPlanning

CodeGeneration

Analysis

Logical

Optimization

OptimizedLogicalPlan

Dataset

DataFrames,DatasetsandSQL

sharethesameoptimization/executionpipeline

12

13

CostModel

UnresolvedLogicalPlan

LogicalPlan

Selected

Physical

Plan

DataFrame

RDDs

Physical

Plans

Catalog

Catalyst:Thefrontend

SQLAST

LogicalOptimization

CodeGeneration

Analysis

Physical

Planning

OptimizedLogicalPlan

Dataset

RDDs

OptimizedQueryPlan

QueryPlan

HowCatalystWorks:AnOverview

SQLAST

Catalys

tTransformations

DataFrame

Dataset

programs

Abstractionsofusers’

(Trees)

14

QueryPlan

DataFrame

RDDs

HowCatalystWorks:AnOverview

SQLAST

Transformations

Catalys

t

OptimizedQueryPlan

Dataset

programs

Abstractionsofusers’

(Trees)

15

16

Trees:AbstractionsofUsers’

Programs

SELECTsum(v)

FROM(

SELECT

t1.id,

1+2+t1.valueASv

FROMt1JOINt2

WHERE

t1.id=t2.idAND

t2.id>50*1000)tmp

17

t1.id=t2.idAND

t2.id>50*1000)tmp

Trees:AbstractionsofUsers’

Prxgpresion

SELECTsum(v)

FROM(

SELECT

t1.id,

1+2+t1.valueASv

FROMt1JOINt2

WHERE

•Anexpressionrepresentsanewvalue,computedbasedoninputvalues

•e.g.1+2+t1.value

Trees:AbstractionsofUsers’

Aggregate

sum(v)

t1.id,1+2+t1.valueasv

t1.id=t2.idt2.id>50*1000

Prymlan

Project

SELECTsum(v)

FROM(

SELECT

Filter

t1.id,

Join

1+2+t1.valueASv

FROMt1JOINt2

WHERE

t1.id=t2.idAND

Scan

(t1)

Scan

t2.id>50*1000)tmp

(t2)

18

sum(v)

t1.id,1+2+t1.valueasv

t1.id=t2.idt2.id>50*1000

Scan(t2)

LogicalPlan

•ALogicalPlandescribescomputationondatasetswithoutdefininghowtoconductthecomputation

Aggregate

Project

Filter

Join

Scan

(t1)

19

Parquet

Scan

(t1)

PhysicalPlan

sum(v)

t1.id,1+2+t1.valueasv

t1.id=t2.idt2.id>50*1000

Hash-Aggregate

Project

Filter

•APhysicalPlandescribes

computationondatasetswith

specificdefinitionsonhowto

conductthecomputation

Sort-MergeJoin

JSONScan

(t2)

20

QueryPlan

DataFrame

RDDs

HowCatalystWorks:AnOverview

SQLAST

Catalys

tTransformations

OptimizedQueryPlan

Dataset(Java/Scala)

programs

Abstractionsofusers’

(Trees)

21

22

Attribute

Add

(t1.value)

Literal(3)

Literal(1)Literal(2)

Transform

•Afunctionassociatedwitheverytreeusedtoimplementasinglerule

1+2+t1.value

23+t1.value

Evaluate1+once

Evaluate1+2foreveryrow

Add

Add

Attribute

(t1.value)

23

Transform

•AtransformisdefinedasaPartialFunction

•PartialFunction:Afunctionthatisdefinedforasubsetofitspossiblearguments

valexpression:Expression=...

expression.transform{ caseAdd(Literal(x,IntegerType),Literal(y,IntegerType))=>

}

Literal(x+y)

Casestatementdetermineifthepartialfunctionisdefinedforagiveninput

Attribute

(t1.value)

Literal(1)

24

Transform

valexpression:Expression=...

expression.transform{

caseAdd(Literal(x,IntegerType),Literal(y,IntegerType))=>Literal(x+y)

}

1+2+t1.value

Add

Add

Literal(2)

Attribute

(t1.value)

Literal(1)

25

Transform

valexpression:Expression=...

expression.transform{

caseAdd(Literal(x,IntegerType),Literal(y,IntegerType))=>Literal(x+y)

}

1+2+t1.value

Add

Add

Literal(2)

Attribute

(t1.value)

Literal(1)

26

Transform

valexpression:Expression=...

expression.transform{

caseAdd(Literal(x,IntegerType),Literal(y,IntegerType))=>Literal(x+y)

}

1+2+t1.value

Add

Add

Literal(2)

Attribute

(t1.value)

Literal(1)

Attribute

(t1.value)

Transform

valexpression:Expression=...

expression.transform{

caseAdd(Literal(x,IntegerType),Literal(y,IntegerType))=>Literal(x+y)

}

1+2+t1.value

Add

Add

Literal(2)

3+t1.value

Add

Literal(3)

27

Combining

MultipleRules

PredicatePushdown

sum(v)

t1.id,1+2+t1.valueasv

t1.id=t2.id

t2.id>50*1000

Aggregate

Aggregate

sum(v)

Project

Project

t1.id,

1+2+t1.value

asv

Join

Filter

t1.id=t2.id

t2.id>50*1000

Join

Filter

Scan(t1)

Scan

(t1)

Scan(t2)

Scan

(t2)28

CombiningMultipleRules

Constant

Aggregatesum(v)

Folding

sum(v)

Aggregate

t1.id,1+2+t1.valueasv

Project

Project

t1.id,3+t1.valueasv

Join

t1.id=t2.id

Join

t2.id>50000

t1.id=t2.id

t2.id>50*1000

Filter

Filter

Scan

(t2)

Scan

(t1)

Scan(t1)

Scan

(t2)29

Aggregate

sum(v)

t1.id,3+t1.valueasv

t1.id=t2.id

t1.id,3+t1.valueasv

t1.id=t2.id

Join

Filter

Filter

Project

t1.idt1.value

t2.id

Scan

(t2)

Scan

(t1)

Scan

(t2)

30

CombiningMultipleRules

Aggregate

Project

sum(v)

Column

Pruning

Project

t2.id>50000

Join

t2.id>50000

Project

Scan

(t1)

Aftertransformations

Combining

Aggregate

sum(v)

t1.id,3+t1.valueasv

t1.id=t2.id

Project

Project

t1.id

t2.id

t1.value

Scan

(t1)

Scan

(t1)

Scan

(t2)

31

MultipleRules

Aggregate

Project

Before

transformations

sum(v)

Project

Join

t1.id,

1+2+t1.value

asv

t2.id>50000

Filter

Filter

t1.id=t2.id

t2.id>50*1000

Join

Scan

(t2)

OptimizedQueryPlan

CostModel

LogicalPlan

UnresolvedLogicalPlan

Selected

Physical

Plan

Physical

Plans

32

SQLAST

QueryPlan

DataFrame

RDDs

Catalys

t

Dataset

(Java/Scala

)

Physical

Analysis

Planning

Logical

Optimization

OptimizedLogicalPlan

Catalog

PhysicalPlanning

Analysis

CostModel

UnresolvedLogicalPlan

LogicalPlan

OptimizedLogicalPlan

Selected

Physical

Plan

33

PhysicalPlans

Logical

Optimization

Catalog

•Analysis:TransformsanUnresolvedLogicalPlantoaResolvedLogicalPlan

•Unresolved=>Resolved:UseCatalogtofindwheredatasetsandcolumnsarecomingfromandtypesofcolumns

•LogicalOptimization:TransformsaResolvedLogicalPlantoanOptimizedLogicalPlan

•PhysicalPlanning:TransformsaOptimizedLogicalPlantoaPhysicalPlan

34

CostModel

UnresolvedLogicalPlan

LogicalPlan

Selected

Physical

Plan

DataFrame

RDDs

Physical

Plans

Catalog

TheBackendExecutionEngine

SQLAST

LogicalOptimization

CodeGeneration

Analysis

Physical

Planning

OptimizedLogicalPlan

Dataset

G.Graefe,Volcano—AnExtensibleandParallelQueryEvaluationSystem,

InIEEETransactionsonKnowledgeandDataEngineering1994

Scan

(t2)

datarowsdatarows

VolcanoIteratorModel

•Standardfor30years:almostalldatabasesdoit

•Eachoperatorisan

“iterator”that

consumesrecordsfrom

itsinputoperator

Project

Filter

Join

Scan

(t1)

37

HowSparkSQLRunQueries

SELECTnameFROMpersonWHEREage<30

Project

Filter

ParquetScan

(person)

name

age<30

38

HowSparkSQLRunQueries

39

HowSparkSQLRunQueries

40

HowSparkSQLRunQueries

41

HowSparkSQLRunQueries

42

HowSparkSQLRunQueries

ParquetScan

Filter

Project

43

groupby:dept

output:AVG(age)

DataExchange

HashAggregate

Parquet

Scan

DataExchange

Partition1

(dept=“a”,age=20)(dept=“a”,age=22)

(dept=“b”,age=18)

Partitionn

(dept=“b”,age=26)

(dept=“b”,age=22)(dept=“c”,age=24)

mapPartitions

dept=a

ageCount=123,ageSum=2312

dept=b

ageCount=45,ageSum=912

44

45

DataExchange

HashAggregate

Parquet

Scan

groupby:dept

output:AVG(age)

HashAggregate

ShuffleExchange

Parquet

Scan

partitionedby:dept

46

OptimizedExecution

withProjectTungsten

Binaryencodingofrowobject

Expressioncodegeneration

Wholestagecodegeneration

Vectorization

47

TheoverheadsofJVMobjects

“abcd”

•Native:4bytesencoding

•Java:48bytes

VALUE

java.lang.Stringobjectinternals:OFFSETSIZETYPEDESCRIPTION

04(objectheader)

44(objectheader)

84(objectheader)

124char[]String.value

164intString.hash

204intString.hash32

...

...

...[]

0

0

Instancesize:48bytes(reportedbyInstrumentationAPI)

withUTF-8

12byteobject

20byda+overh

8byte

hashcode

48

“data

0x0

123

32L

48L

4

“bricks”

Nullbitmap

Tungsten’sCompactEncoding

(123,“data”,“bricks”)

Offsettodata

6

Fieldlengths

Offsettodata

RowObject

RowObject

ByteArray

49

ByteArray

LessObjectsCreation

Createsrowobjects

(andcolumn

objects)when

outputsnewrecords

Operator

Overwritesthebytearraywhenoutputsnewrecords

Operator

allocategrant

allocate

grant

allocate

grant

Page

MemoryManager

ManualMemoryManagement

Operator

Operator

Operator

50

51

Add

Attribute(a)

HowtoEvaluateExpression

a+1+2

Add

Literal(2)

Literal(1)

Functioncalls

Add.eval

Add.eval

Attribute.eval

ExpressionCodeGeneration

DataFrameCode/SQLdf.where(df("year")>2015)

CatalystExpressionsGreaterThan(year#234,Literal(2015))

booleanfilter(ObjectbaseObject){

Low-levelJavacode

intoffset=baseOffset+bitSetWidthInBytes+3*8L;intvalue=Platform.getInt(baseObject,offset);returnvalue34>2015;

}

JVMintrinsicJIT-edtopointerarithmetic

52

53

ExpressionCodeGeneration

Savesalot

ofvirtual

function

callsand

boxing

costs!

54

AfterExpressionCodeGeneration

ParquetScan

Filter

Project

55

56

WhatWeReallyRun

ByteArray

ByteArray

WhatWe

ReallyRun

Project

ByteArray

Filter

57

WholeStageCodeGeneration

Fusingoperatorstogether:

•Identifychainsofoperators(“stages”)

•Compileeachstageintoasinglefunction

WholeStageCodegen:Planner

longcount=0;

for(ss_item_skinstore_sales){

if(ss_item_sk==1000){

count+=1;

}

}

WholeStageCodegen:Generatecodelikehandwritten

Aggregatecount(*)

Project

Filter

Scan

Iterator.next

wastesalotperformance!

Iterator[Row](ByteArray)

WhereCanWePushFurther?

Stage

Iterator[Row](ByteArray)

Stage

Iterator[Row](ByteArray)

61

Format

Iterator[RowBatch]

Vectorization:Batch+Columnar

Stage

Iterator[RowBatch]

Stage

Iterator[RowBatch]

Columnar

Format

1

2

3

john

mike

sally

4.1

3.5

6.4

62

Whycolumnar?

1.Moreefficient:denserstorage,regulardataaccess,easiertoindexinto

2.More

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论