《Spark编程基础及项目实践》课后习题及答案_第1页
《Spark编程基础及项目实践》课后习题及答案_第2页
《Spark编程基础及项目实践》课后习题及答案_第3页
《Spark编程基础及项目实践》课后习题及答案_第4页
《Spark编程基础及项目实践》课后习题及答案_第5页
已阅读5页,还剩7页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

《Spark编程基础及项目实践》课后习题及答案第一章Spark基础概述一、选择题下列关于Spark的说法错误的是()

A.Spark是基于内存计算的分布式计算框架

B.Spark支持批处理、流处理、机器学习等多种计算场景

C.Spark只能运行在YARN集群管理器上

D.Spark的核心抽象是弹性分布式数据集(RDD)Spark的哪个组件负责接收用户提交的作业并进行解析和调度()

A.Driver

B.Executor

C.ClusterManager

D.WorkerNode

二、简答题简述Spark与HadoopMapReduce的核心区别。

什么是Spark的惰性求值(Laziness)?其作用是什么?请举例说明。

三、答案选择题答案1.C(解析:Spark支持多种集群管理器,包括Standalone、YARN、Mesos等,并非只能运行在YARN上)2.A(解析:Driver负责接收用户作业、解析作业生成DAG、调度任务到Executor执行)简答题答案1.核心区别:计算模型:MapReduce采用"Map+Reduce"两阶段计算模型,中间结果需写入磁盘;Spark基于RDD,支持多阶段计算,中间结果可缓存在内存中,减少磁盘IO。性能:Spark内存计算大幅提升迭代计算和交互式查询的性能,比MapReduce快10-100倍。适用场景:MapReduce适用于简单批处理任务;Spark支持批处理、流处理(SparkStreaming)、机器学习(MLlib)、图计算(GraphX)等多种场景,通用性更强。任务调度:MapReduce任务调度粒度较粗;Spark支持更细粒度的任务调度和DAG优化。2.惰性求值定义:Spark中的转换操作(Transformation)不会立即执行,而是先记录对RDD的一系列操作逻辑,直到遇到行动操作(Action)时,才会触发真正的计算,将所有操作形成的DAG提交给集群执行。作用:减少网络传输和磁盘IO:通过合并多个转换操作,优化执行计划(如流水线优化),减少中间结果的落地和传输。提升计算效率:允许Spark对整个任务流程进行全局优化,选择最优的执行路径。示例:scala

//以下转换操作不会立即执行

valrdd1=sc.textFile("data.txt")

valrdd2=rdd1.flatMap(_.split(""))

valrdd3=rdd2.map((_,1))

valrdd4=rdd3.reduceByKey(_+_)

//遇到行动操作collect(),才触发所有之前的转换操作执行

rdd4.collect()第二章RDD编程基础一、选择题下列属于SparkRDD行动操作(Action)的是()

A.map

B.filter

C.count

D.reduceByKey

关于RDD的持久化(Persist),下列说法正确的是()

A.持久化后的RDD会被永久存储,无法释放

B.cache()方法等价于persist(StorageLevel.MEMORY_ONLY)

C.持久化只能将RDD存储在内存中

D.每次访问持久化的RDD都会重新计算

二、编程题给定一个文本文件data.txt,其中每行是一个字符串,要求使用RDD操作统计文件中每个单词出现的次数(词频统计),并将结果按词频降序排列。

已知RDD[String]rawLogs存储了日志数据,日志格式为"时间级别消息"(如"2025-12-24INFO系统启动成功")。请筛选出级别为"ERROR"的日志,并提取其中的消息内容,最终返回包含所有错误消息的数组。

假设有RDD[Person]people,其中Person是样例类caseclassPerson(age:Int,name:String)。要求:①过滤掉年龄小于18岁的用户;②统计剩余用户中各年龄组的人数,年龄组划分:18-25、26-35、36-45、46-65、66+。

三、答案选择题答案1.C(解析:map、filter、reduceByKey均为转换操作,count为行动操作,用于统计RDD元素个数)2.B(解析:A选项,持久化的RDD可通过unpersist()方法释放;C选项,持久化支持内存、磁盘、内存+磁盘等多种存储级别;D选项,持久化后RDD数据被缓存,再次访问无需重新计算)编程题答案1.词频统计及降序排列(Scala版本):scala

//读取文本文件生成RDD

valrdd=sc.textFile("data.txt")

//词频统计:flatMap切分单词→map生成键值对→reduceByKey聚合→sortBy降序排列

valwordCount=rdd.flatMap(line=>line.split("\\s+"))//按任意空白字符切分

.map(word=>(word,1))

.reduceByKey(_+_)

.sortBy(_._2,ascending=false)//按词频降序

//行动操作,输出结果

wordCount.collect().foreach(println)解析:flatMap将每行文本拆分为多个单词(解决map拆分会产生嵌套集合的问题),reduceByKey按单词聚合计数,sortBy按词频(元组第二个元素)降序排列。2.筛选ERROR日志并提取消息(Scala版本):scala

//筛选级别为ERROR的日志,提取消息内容

valerrorMessages=rawLogs.filter(log=>log.split("")(1)=="ERROR")//按空格拆分,第二个字段为级别

.map(log=>log.split("",3)(2))//拆分3部分,第三部分为消息(支持消息含空格)

.collect()//转换为数组返回

//输出错误消息

errorMessages.foreach(println)解析:filter通过拆分日志字符串筛选ERROR级别,map使用split("",3)避免消息中的空格导致拆分错误,确保完整提取消息内容。3.年龄过滤及年龄组统计(Scala版本):scala

//定义样例类

caseclassPerson(age:Int,name:String)

//①过滤年龄≥18岁的用户;②定义年龄组映射函数;③统计各年龄组人数

valageGroupCount=people.filter(_.age>=18)

.map(person=>{

valage=person.age

//映射到对应的年龄组

valageGroup=agematch{

caseaifa>=18&&a<=25=>"18-25"

caseaifa>=26&&a<=35=>"26-35"

caseaifa>=36&&a<=45=>"36-45"

caseaifa>=46&&a<=65=>"46-65"

caseaifa>=66=>"66+"

}

(ageGroup,1)

})

.reduceByKey(_+_)

//输出结果

ageGroupCount.collect().foreach(println)解析:使用filter过滤未成年人,通过match表达式将年龄映射到对应组别,最后用reduceByKey聚合统计各年龄组人数。第三章DataFrame与SparkSQL一、简答题简述RDD与DataFrame的区别与联系。

什么是SparkSQL的元数据?元数据在DataFrame操作中有什么作用?

二、编程题已知有JSON文件user.json,包含用户数据(id:Int,name:String,age:Int,city:String)。请使用SparkSQL完成以下操作:①读取JSON文件生成DataFrame;②筛选出年龄在20-30岁之间的用户;③按城市分组统计各城市的用户数量;④将结果保存为Parquet文件。三、答案简答题答案1.区别与联系:联系:DataFrame底层基于RDD实现,是RDD的封装和优化,均可用于分布式数据处理。区别:数据结构:RDD是无结构化的分布式数据集,仅存储数据本身;DataFrame是结构化数据集,包含列名和数据类型(元数据),类似关系型数据库表。操作方式:RDD支持函数式编程(map、filter等);DataFrame支持SQL查询和DataFrameAPI(select、filter、groupBy等),更贴近传统数据处理方式。优化能力:DataFrame支持Catalyst优化器,可对查询计划进行优化;RDD需用户手动优化(如持久化、分区调整)。类型安全:RDD是类型安全的(编译时检查类型错误);DataFrame非类型安全(运行时才发现类型错误)。2.元数据定义:元数据是描述DataFrame数据结构的信息,包括列名、数据类型、列约束等。作用:支持结构化查询:基于元数据解析SQL语句,确定查询的列和数据类型,实现类似数据库的查询逻辑。优化查询计划:Catalyst优化器利用元数据分析数据分布和类型,生成最优的执行计划(如谓词下推、列裁剪)。数据交互:方便DataFrame与外部数据源(数据库、JSON、Parquet等)交互,自动匹配数据结构。编程题答案1.JSON文件读取与数据处理(Scala版本):scala

//1.创建SparkSession(DataFrame操作的入口)

importorg.apache.spark.sql.SparkSession

valspark=SparkSession.builder()

.appName("UserDataProcessing")

.master("local[*]")//本地模式,*表示使用所有可用CPU核心

.getOrCreate()

importspark.implicits._//导入隐式转换,支持RDD→DataFrame

//2.读取JSON文件生成DataFrame(自动推断schema)

valuserDF=spark.read.json("user.json")

//3.筛选年龄20-30岁的用户

valfilteredDF=userDF.filter($"age">=20&&$"age"<=30)//使用$符号引用列名,支持表达式

//4.按城市分组统计用户数量

valcityUserCountDF=filteredDF.groupBy("city")//按city列分组

.count()//统计每组数量,结果列名为count

.withColumnRenamed("count","user_num")//重命名列名为user_num

//5.保存为Parquet文件(默认压缩格式,列式存储)

cityUserCountDF.write

.mode("overwrite")//覆盖已存在的文件

.parquet("city_user_count.parquet")

//查看结果

cityUserCountDF.show()

//关闭SparkSession

spark.stop()解析:SparkSession是DataFrame和SparkSQL的核心入口,替代了旧版本的SQLContext和HiveContext。read.json()自动推断JSON文件的schema(元数据),无需手动定义。filter使用$符号引用列名,支持复杂的逻辑表达式;groupBy+count完成分组统计。Parquet是列式存储格式,支持压缩,比JSON更适合大规模数据存储和查询,write.mode("overwrite")确保重复运行时不会报错。第四章SparkStreaming编程一、简答题简述SparkStreaming的核心思想(微批处理)。

SparkStreaming中,如何保证数据的exactly-once语义?

二、编程题使用SparkStreaming接收TCP端口(如localhost:9999)发送的文本数据,实现实时词频统计,要求每10秒统计一次该窗口内的词频,并输出结果。

三、答案简答题答案1.微批处理核心思想:SparkStreaming将实时数据流按固定的时间间隔(批处理间隔)切分为一系列微小的批数据(Micro-Batch),然后将每个微批数据转换为RDD,通过SparkCore的批处理引擎进行处理,最终将结果批量输出。本质上是"准实时"处理,平衡了实时性和处理效率,批处理间隔可根据需求调整(如1秒、5秒、10秒)。2.Exactly-once语义保证:Exactly-once表示每个数据只被处理一次,即使出现节点故障也不会重复处理或丢失数据。实现方式:输入源可靠性:使用支持事务的输入源(如Kafka的offset管理、Flume的可靠传输),确保数据可回溯。状态管理:使用Checkpoint机制保存Streaming应用的状态(如批处理进度、RDD依赖),故障恢复时可从Checkpoint恢复状态,避免重复处理。输出端事务:输出结果时使用事务机制(如数据库事务、幂等操作),确保即使重复输出也不会产生错误结果(如幂等写入可多次执行但结果一致)。编程题答案1.实时词频统计(Scala版本):scala

//1.创建StreamingContext(批处理间隔10秒)

importorg.apache.spark.streaming.{StreamingContext,Seconds}

importorg.apache.spark.SparkConf

valconf=newSparkConf().setAppName("RealTimeWordCount").setMaster("local[2]")//至少2个核心,1个用于接收数据,1个用于处理

valssc=newStreamingContext(conf,Seconds(10))//10秒批处理间隔

//2.设置Checkpoint(可选,用于故障恢复)

ssc.checkpoint("checkpoint/wordcount")

//3.接收TCP端口数据(localhost:9999)

vallines=ssc.socketTextStream("localhost",9999)

//4.实时词频统计:切分单词→生成键值对→聚合统计

valwordCounts=lines.flatMap(_.split(""))

.map(word=>(word,1))

.reduceByKey(_+_)//每个微批内的词频聚合

//5.输出结果(打印到控制台)

wordCounts.print()

//6.启动Streaming应用

ssc.start()

//等待应用停止(手动停止或故障停止)

ssc.awaitTermination()

//停止应用(可选)

//ssc.stop()操作说明:运行前需在本地启动TCP服务(如使用nc命令:nc-lk9999),然后输入文本数据。setMaster("local[2]"):本地模式下需至少2个核心,因为StreamingContext需要1个核心专门接收数据,其余核心用于处理数据,核心不足会导致数据接收阻塞。socketTextStream:从TCP端口接收文本数据,每行作为一个数据记录。print():默认打印每个微批结果的前10条记录,可通过print(n)指定打印条数。第五章Spark项目实践(综合案例)案例:用户行为分析背景:现有用户行为日志数据(格式:user_id,action,timestamp,product_id),其中action包括"click"(点击)、"purchase"(购买)、"collect"(收藏)。要求使用Spark分析以下指标:每日各行为类型的总次数;每日购买转化率(购买次数/点击次数);top10热门商品(按点击次数排序)。一、解题思路数据读取与预处理:读取日志数据,解析字段,转换时间戳为日期格式;指标计算:

①按日期和行为类型分组统计次数;

②分别统计每日点击次数和购买次数,计算转化率;

③按商品分组统计点击次数,排序取前10;

结果输出:将分析结果保存为Parquet文件或写入数据库。二、代码实现(Scala版本)scala

importorg.apache.spark.sql.SparkSession

importorg.apache.spark.sql.functions._

importjava.util.Calendar

//1.创建SparkSession

valspark=SparkSession.builder()

.appName("UserBehaviorAnalysis")

.master("local[*]")

.getOrCreate()

importspark.implicits._

//2.读取日志数据(假设为CSV格式,无表头)

valbehaviorDF=spark.read

.option("header","false")

.option("inferSchema","true")

.csv("user_behavior.log")

.toDF("user_id","action","timestamp","product_id")//重命名列

//3.数据预处理:转换时间戳为日期(yyyy-MM-dd)

valdfWithDate=behaviorDF.withColumn("date",

from_unixtime(col("timestamp")/1000,"yyyy-MM-dd")//假设timestamp为毫秒级,转换为秒级后格式化

)

//4.指标1:每日各行为类型的总次数

valdailyActionCount=dfWithDate.groupBy("date","action")

.count()

.withColumnRenamed("count","action_count")

dailyActionCount.show()

dailyActionCount.write.mode("overwrite").parquet("daily_action_count.parquet")

//5.指标2:每日购买转化率(购买次数/点击次数)

//5.1统计每日点击次数和购买次数

valdailyClickCount=dfWithDate.filter(col("action")==="click")

.groupBy("date")

.count()

.withColumnRenamed("count","click_count")

valdailyPurchaseCount=dfWithDate.filter(col("action")==="purchase")

.groupBy("date")

.count()

.withColumnRenamed("count","purchase_count")

//5.2关联点击和购买数据,计算转化率

valdailyConversionRate=dailyClickCount.join(dailyPurchaseCount,"date","left")//左连接,确保有点击无购买时也能显示

.na.fill(0,Seq("purchase_count"))//无购买时填充为0

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论