版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
大数据分析与实战项目5SparkSQL处理健康监控数据现有一组透析治疗患者的健康监测数据,要求针对性开展数据分析,为病患管理、健康监控提供信息支持。健康医疗是广大群众关心的热点问题,也是关系到社会稳定、国家发展的重大课题。情境导入Spark项目分解Spark序号任务任务说明1认识SparkSQL及其数据结构认识新的数据结构DataFrame、DataSet,在SparkShell下查看血透病患的健康监测数据。2由健康监测数据创建DataFrame根据血液透析患者的健康监测数据集,创建DataFrame,并打印相关信息。3DSL方式分析健康监测数据分析以下指标:(1)透析10年以上患者的信息;(2)分组统计各基础疾病患者的数量;(3)该组数据中,血压偏高者的占比。4SQL方式分析健康监测数据分析以下指标:(1)打印年龄在70岁(含)以上、有“慢性肾小球炎”基础病的患者信息;(2)该组数据中,BMI偏高(肥胖)、BMI偏低(消瘦)人员的数量。5将梳理后的数据写入MySQL数据库表中抽取“慢性肾小球炎”患者的部分数据,写入到MySQL数据库表中。能使用DSL风格或SQL风格编程,完成DataFrame数据的分析。能否根据各种数据源,创建DataFrame。能将DataFrame数据分析的结果写入数据库或文件中。123学习目标Spark项目5
SparkSQL处理健康监控数据Spark任务1认识SparkSQL及其数据抽象查看健康监测数据DSL方式分析健康监测数据任务2任务3SQL方式分析健康监测数据任务4DataFrame数据写入MySQL任务5任务分析Spark根据内存数据,创建数据集DataFrame/Dataset,并查看其中的数据。SparkSQL的产生Spark产生根源:(1)MapReduce编写程序比较复杂,需要大量学习;(2)Hive虽然支持类似SQL的查询,但它运行在MapReduce之上,运算效率低。伯克利实验室开发了基于Hive的结构化数据处理组件Shark(SparkSQL的前身),但Shark继承了大量的Hive代码,后续优化、维护较为麻烦。。SparkSQL是Spark体系中专门处理结构化数据(StructuredData)的模块,目前使用DataFrame、Dataset作为数据抽象。Spark的数据抽象SparkDataset中的数据形态下图反映了RDD、DataFrame、Dataset的区别。注意:从Spark2.X开始,DataSet、DataFrame的API方法已做了统一,极大减轻了学习负担。初步体验SparkSQLSpark在SparkRDD学习的过程中可知,SparkContext(sc)是RDD开发的程序入口(上下文环境);而SparkSQL也有自己的入口SparkSession。初步体验SparkSQLSparkSparkSession提供了createDataFrame方法,它可以将内存中的有序集合构造成DataFrame;scala>valdata=List(("Tom","male",20),("Jerry","male",18))
//定义一个列表scala>valdf=spark.createDataFrame(data)//根据data生成DataFramescala>df.show()//显示DataFrame中的数据初步体验SparkSQLSparkscala>valdf=spark.createDataFrame(data).toDF("name","gender","age")//创建DataFrame,并为各列命名scala>df.show(2)//显示df的前2行相关知识小结SparkSparkSQL专门处理结构化数据,数据抽象为DataFrame、Dataset。createDataFrame方法可以创建DataFrame对象;show方法显示DataFrame中的数据;toDF方法,修改DataFrame列的名称。根据知识储备的相关知识,利用内存数据(患者信息)创建DataFrame,并查看其中的内容。任务实施项目5
SparkSQL处理健康监控数据Spark任务1认识SparkSQL及其数据抽象查看健康监测数据DSL方式分析健康监测数据任务2任务3SQL方式分析健康监测数据任务4DataFrame数据写入MySQL任务5任务分析Spark要想借助SparkSQL模块开展数据分析,首先需要有DataFrame。在IDEA下,根据血液透析患者的健康监测数据集healthdata.csv,创建DataFrame,填充“基础病”字段(列)的缺失值,并打印前5行数据。由数据文件创建RDDSpark通过createDataFrame方法创建DataFrame通常应用于小规模实验(学习),实际业务中很多数据是存储在JSON、CSV、Parquet等文件中,SparkSQL提供了read方法,可以读取这些文件,并生成DataFrame。例如:JSON文件scala>valstudent=spark.read.json(filePath)//生成DataFramescala>valstudent=spark.read.format("json").load(filePath)//与上一行代码等效由数据文件创建RDDSpark逗号分隔值(Comma-SeparatedValues,CSV)是以纯文本形式存储数据的文件,其数据之间通常用逗号分割(也可以是其他字符),是数据交换、处理领域常见的格式。现有一个CSV文件student.csv,记录了若干学生的信息:由数据文件创建RDDSpark读取CSV文件创建DataFrame时,需要通过option方法指定header、sep等信息。scala>valstudent=spark.read.option("header","true").option("sep",",").csv(path)scala>valstudent=spark.read.format(“csv”).option("header",true).option("sep",",").load(path)scala>student.show()查看DataFrame中的数据SparkRDD经常用take、collect、first等行动操作查看数据,DataFrame同样提供了若干类似的行动操作方法。重复值、缺失值的处理SparkSparkSQL中,经常用na方法处置空值,其返回值类型为DataFrameNaFunctions;而DataFrameNaFunctions有drop、fill等方法具体处理空值。scala>studentDF2.na.drop()//删除缺失值scala>studentDF2.na.drop(Array("gender","score"))//删除gender/score缺失值scala>studentDF2.na.fill(Map(("gender","male"),("score",0)))//填充缺失值scala>studentDF2.distinct()//删除重复行DataFrame数据保存到文件中SparkSparkSQL能够读取各种文件生成DataFrame,同样DataFrame也可以根据需要输出到JSON、CSV等文件中。scala>student.write.format("json").save(path+"/json")//保存到json文件中scala>student.write.option("header","true").format("csv").save(path+"/csv")//保存到csv文件中SparkSQL提供了read方法,读取各种文件,生成DataFrame;查看DataFrame数据:show、first、head、collect等;缺失值、重复值:na、fill、drop、distinct等;save方法将DataFrame数据保存到文件中。Spark综合利用本任务中的知识储备,读取患者数据csv文件,完成缺失值的处理,并查看其中的数据。任务实施相关知识小结Spark项目5
SparkSQL处理健康监控数据Spark任务1认识SparkSQL及其数据抽象查看健康监测数据DSL方式分析健康监测数据任务2任务3SQL方式分析健康监测数据任务4DataFrame数据写入MySQL任务5任务分析SparkSparkSQL进行数据分析,包含DSL(DomainSpecificLanguage)和SQL两种语法风格;其中DSL方式提供了上百个方法(类似于RDD中的操作、算子),允许开发者调用这些方法完成数据的处理。针对给定的数据集,采用DSL方式,使用DataFrame常见方法分析以下指标:(1)查找透析10年以上患者的信息;(2)分组统计各基础疾病患者的数量;(3)计算该组患者中,血压偏高者的占比。数据的查询与筛选Spark在数据分析过程中,有时我们仅需要查询某些列的值,可以借助select、selectExp等方法。例如,查询studentDF中的name、age、score等3列,下列写法等效:scala>valdf=studentDF.select("name","age","score").show(2)scala>valdf=studentDF.select(col("name"),col("age"),col("score")).show(2)
scala>valdf=studentDF.select($"name,$"age",$"score").show(2)
scala>valdf=studentDF.select(‘name,‘age,‘score).show(2)
数据的查询与筛选Spark实际业务中,经常需要根据某些条件过滤出某些行;SparkSQL提供了where与filter方法,要用于筛选出符合某条件的行,两者用法一致。scala>valdf=studentDF.where("age>20")scala>valdf=
studentDF.where(col("age")>20)scala>valdf=
studentDF.where($"age">20and$"score">90)
数据的查询与筛选Spark实际业务中,经常需要根据某些条件过滤出某些行;SparkSQL提供了where与filter方法,要用于筛选出符合某条件的行,两者用法一致。scala>valdf=studentDF.where("age>20")scala>valdf=
studentDF.where(col("age")>20)scala>valdf=
studentDF.where($"age">20and$"score">90)
数据的排序Spark输出查询结果前,经常要进行排序,从而输出前N行;SparkSQL提供了orderBy、sort方法用于排序,二者等效、用法一致。scala>valdf=studentDF.orderBy("score")
//按照score升序排列,得到新DFscala>studentDF.orderBy(col("score").desc)//按照score列的值降序排列DataFrame的连接join操作Sparkjoin操作用于连接两个DataFrame组成一个新的DataFrame。scala>val
df1=spark.createDataFrame(List(("Tom",20),("Jerry",18),("Bob",19))).toDF("name","age")scala>valdf2=spark.createDataFrame(List(("Tom",176),("Jerry",182))).toDF("name","height")scala>df1.join(df2,"name",
"inner"
).show()//df1与df2做join操作,以name为连接字段scala>df1.join(df2,df1("name")===df2("name")).show()//与上一行等价,注意用三个等号与数据库表的连接一致,DF也支持left(左)、right(右)、full(全)连接;只需替换join操作中的第3个参数字符串即可。聚合与分组统计Spark在开展数据分析的过程中,经常用到各种聚合操作,比如求某列的和、平均值、最大值等,SparkSQL提供了相应的方法。聚合与分组统计Sparkscala>studentDF.select(sum("score").alias("totalScore"),
avg("score").alias("avgScore")).show()聚合与分组统计Spark在数据统计的过程中,经常需要分组统计;groupBy方法可以按照某个字段分组;groupBy通常与count、mean、max、min、sum等聚合操作联合使用,从而完成各种分组统计任务scala>studentDF.groupBy("gender").agg(max("score"),min("score")).show()操作DataFrame的列SparkwithColumnRenamed重命名指定列(字段)的名称。studentDF.withColumnRenamed("gender","sex")//gender列改名为sexwihtColumn方法可在当前DataFrame中增加一列。studentDF.withColumn("age+1",$"age"+1)//增加一个新列,其值为age+1查询数据:select、where、filter等;DataFrame排序:sort或orderBy;join方法连接两个DataFrame;统计分析方法:groupBy、agg、max、min、avg等Spark综合利用本任务中的知识储备,借助DataFrame的各种操作,完成指标的统计分析。任务实施相关知识小结Spark项目5
SparkSQL处理健康监控数据Spark任务1认识SparkSQL及其数据抽象查看健康监测数据DSL方式分析健康监测数据任务2任务3SQL方式分析健康监测数据任务4DataFrame数据写入MySQL任务5任务分析Spark除了DSL方式,SparkSQL还提供了通过SQL语句方式处理数据。编写SQL风格的程序:(1)打印年龄在70岁(含)以上、有“慢性肾小球炎”基础病的患者信息;(2)计算BMI,统计偏高(肥胖)、BMI偏低(消瘦)人员的数量。创建临时视图Spark在数据库中,使用SQL语句进行查询分析,需要有数据库表(或者视图)。用SQL方式处理DataFrame数据前,也需要将DataFrame对象注册为一个临时视图(可以理解为虚拟表)。scala>studentDF.createTempView("student")//创建会话临时视图scala>studentDF.createOrReplaceTempView("student")//创建或者替换临时视图scala>studentDF.createGlobalTempView("student")//创建全局临时视图scala>studentDF.createOrReplaceGlobalTempView("student")//创建或者替换全局临时视图使用SQL风格查询分析Spark临时视图相当于数据库中的表,临时视图的名字即相当于表的名字,在select子句中用临时视图名代替表名。scala>valsqlStr="selectname,age,genderfromstudentwhereage>20andgender='male'"//SQL语句scala>valtempDF=spark.sql(sqlStr)//执行SQL语句,返回一个DataFrame用户自定义函数UDFSpark不少场景是内置函数无法胜任的(或实现较为复杂),这就需要开发人员编写自定义函数(UserDefineFunction,简称UDF)来满足需求。//定义一个匿名函数,并将其注册为自定义函数(adjustAge)scala>spark.udf.register("adjustAge",(age:Int)=>if(age<18)"minor"else"adult")//在SQL语句中使用UDF函数adjustAgescala>valsqlStr="selectname,age,adjustAge(age)fromstudent"scala>spark.sql(sqlStr).show()//查询、并显示结果SQL方式:将DataFrame注册为临时视图,书写普通的SQL语句,然后执行;当SQL内置函数不能满足需求时,书写用户自定义函数。Spark综合利用本任务中的知识储备,利用SQL方式,完成相关指标的统计分析。任务实施相关知识小结Spark项目5
SparkSQL处理健康监控数据Spark任务1认识SparkSQL及其数据抽象查看健康监测数据DSL方式分析健康监测数据任务2任务3SQL方式分析健康监测数据任务4DataFrame数据写入MySQL任务5任务分析Spark借助JDBC连接器,SparkSQL可以实现与MySQL等数据库的互联互通,数据处理后的结果也可以写入到MySQL中。计算每位患者的透析脱水量(透析前的体重grossweight减去体重weight,是透析治疗特别关注的指标),按照脱水量排序,取Top5的病人信息,写入到MySQL数据库的表health中。MySQL相关准备工作Spark使用“sudoapt-getinstallmysql-server”命令,完成MySQL的安装。alteruser'root'@'localhost'identifiedwithmysql_native_passwordby'123';#修改root密码flushprivileges;#刷新权限createdatabasesparkTest;#创建数据库sparkTestusesparkTest;#使用数据库sparkTestcreatetablestudent(idchar(10),namechar(20),sexchar(10),ageint(4),addresschar(50));#创建表student读取MySQL创建DataFrameSpark使用spark.read.format("jdbc")可以读取数据库表中的数据。scala>valstudentDF=spark.read.format("jdbc").option("driver","com.mysql.jdbc.Driver")//指定连接驱动名称.option("url","jdbc:mysql://localhost:3306/sparkTest")//指定服务器.option("dbtable","student")//指定数据库名称.option("user","root")//指定连接的用户.option("password","123")//指定用户的密码.load()DataFrame数据写入MySQLSparkDataFrame
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 生物可吸收支架在糖尿病冠心病中的研究进展
- 生物制品稳定性试验pH值变化监测
- 生物制剂临床试验中受试者招募策略优化
- 生活质量核心指标的多学科干预策略
- 网络管理员IT运维考试题含答案
- 保险公司定损员面试题库专业评估与鉴定能力
- 深度解析(2026)《GBT 19441-2004进出境禽鸟及其产品高致病性禽流感检疫规范》
- 阿里巴教育科技岗位面试题集及答案
- 供应链风险预警系统实施与优化面试题
- 安全生产知识考试题库及答案解析
- 低温烫伤预防
- 2024-2025学年广东省深圳实验学校初中部九年级上学期开学考英语试题及答案
- 【MOOC】行为金融学-中央财经大学 中国大学慕课MOOC答案
- 电路分析与应用知到智慧树章节测试课后答案2024年秋吉林电子信息职业技术学院
- 管理经济学:理论与案例 毛蕴诗第2版 每章习题答案
- (高清版)WST 415-2024 无室间质量评价时的临床检验质量评价
- 国开(河北)2024年《中外政治思想史》形成性考核1-4答案
- MOOC 微型计算机原理与接口技术-南京邮电大学 中国大学慕课答案
- 有限空间安全检查档案(含检查记录、台账)全套
- 应急预案-光伏
- 科来网络回溯分析系统深圳超算测试报告
评论
0/150
提交评论