版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
大数据分析与实战项目7基于StructuredStreaming的智慧交通数据处理某智慧交通建设项目中,针对实时采集的卡口监控数据,决定采用Spark新一代流计算引擎StructuredStreaming,完成数据的实时处理。利用物联网、大数据等技术,解决“安全监管压力大、交通拥堵整治难、应急事件处置慢、分析研判不智能”等痛点,成为行业应用热点。情境导入Spark项目分解Spark序号任务任务说明1统计正常工作的监控设备数了解StructuredStreaming的工作原理,编写简单的流计算程序统计各卡口正常状态的监控设备数。2找出超速通过卡口的车辆采用Kafka作为数据源,采集车辆通过卡口时的车速,与本卡口的最高限速进行比对,找出超速通过的车辆。3统计车辆的平均车速计算各卡口10分钟内的平均通过速度(每2分钟更新一次),要求考虑延迟到达的数据及重复数据。4数据处理结果写入MySQL为了便于后续的处理分析,将StructuredStreaming处理完毕的数据写入到MySQL数据库中。能够利用DataFrame的相关操作,完成流数据的处理。了解StructuredStreaming原理,根据Kafka等数据源,创建流式DataFrame。使用writestream等方法,将处理后的数据输出到File、Kafka及数据库。123学习目标Spark项目7
基于StructuredStreaming的智慧交通数据处理Spark任务1统计正常工作的监控设备数找出超速通过卡口的车辆统计车辆通过的平均速度任务2任务3数据处理的结果写入MySQL任务4任务分析Spark每个交通卡口均包含若干监控摄像头,这些摄像头需要定时发送设备状态信息到后台;使用Netcat工具发出设备状态数据(数据样式为“卡口ID,监控设备ID,状态码”,如状态码为100,则表示设备正常,其他状态码均为异常),要求使用StructuredStreaming计算每个卡口正常工作的监控设备数量。SparkStreaming的不足Spark(1)用批处理的思想处理流式数据,延迟高,不能做的真正的实时。(2)API基于底层RDD,不直接支持简单的SQL。(3)以数据处理时间基准,难以支持EventTime(事件发生的时间,简称事件时间)。(4)批处理、流处理的API不一致。StructuredStreaming编程模型Spark核心思想是将流数据视为一张可以不断添加数据的表(UnboundedTable,可以“无限”扩充的无界表),每个新到达的流数据会被添加到这个表中(作为表的新行)。StructuredStreaming编程模型Spark基于StructuredStreaming的WordCount计算演示:编写第一个StructuredStreaming程序Spark确定输入源,创建流式DataFrame(无界表)vallines=spark.readStream.format("socket")
.option("host","localhost").option("port",9999).load()定义流计算的处理过程vallinesDS=lines.as[String]//转为Dataset[String],便于后续处理valwords=linesDS.flatMap(x=>x.split(""))//包含一个名为value的列valwordCounts=words.groupBy("value").count()//分组、统计启动流式计算valquery=wordCounts.writeStream
.outputMode("complete").format("console").start()IDEA下编写结构化流处理程序Sparkprom.xml中添加如下依赖(SparkSQL、SparkCore)代码中导入隐式转换:importspark.implicits._代码最后,添加等待流计算结束的语句:query.awaitTermination()相关知识小结SparkStructuredStreaming的思路:新数据到达添加到无界表中,对新数据进行处理(按照DataFrame的方式),处理完毕后添加到Result结果中。readStream读取流式数据,创建流式DataFrame(无界表)。根据知识储备的相关知识,读取Socket模拟的交通流数据,统计各卡口正常的设备数量。任务实施项目7
基于StructuredStreaming的智慧交通数据处理Spark任务1统计正常工作的监控设备数找出超速通过卡口的车辆统计车辆通过的平均速度任务2任务3数据处理的结果写入MySQL任务4任务分析Spark使用Kafka作为数据源,向Kafka某主题中添加数据(数据样式为“卡口ID,监控设备ID,车牌号,通过速度,车辆类型”);每个卡口都有自己的限速,如果车辆通过卡口时速度超过限速,则输出相关信息。由文件生成StructuredStreamingSpark在流数据处理应用中,经常出现这样场景:Flume(或Sqoop)不断地将小文件(数据文件、服务器日志等)上传到HDFS目录,我们需要监控该目录,对目录下的小文件开展实时处理。vallines=spark.readStream.format("csv")//csv、json等数据格式.load(path)//文件的路径
lines就是一个DataFrame,后续按照SparkSQL中的方式进行处理即可。由Kafka生成StructuredStreamingSparkKafka是流式数据处理的最主要数据源,StructuredStreaming同样提供了订阅Kafka主题、消费其数据的能力。valdf=spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092")//指定kafka服务器.option("subscribe","structuredStreaming")//指定读取的kafka主题.load()Kafka数据以key-value形式存储,获取其中value即可(转为字符串类型)
vallines=df.selectExpr("CAST(valueASSTRING)")
StructuredStreaming的操作SparkStructuredStreaming的数据抽象为DataFrame/Dataset,与SparkSQL一致;因此,SparkSQL中的大多数操作亦适用于StructuredStreaming;可以在StructuredStreaming的流式DataFrame中使用select、where、filter等检查查询操作,也可以使用groupBy、count、sum等聚合操作。输出模式的选择SparkStructuredStreaming处理后的结果,可以有以下3中输出模式:(1)append模式:StructuredStreaming默认的输出模式;该模式下,只有上一次计算结束后,结果集(result)中新增加的行才会被输出;该模式不能包含聚合操作。(2)complete模式:该模式下,输出当前批次为止所有的统计结果;该模式下的计算过程,必须包含聚合操作。(3)update模式:处理完一个批次的数据后,只输出与之前批次相比变动的内容(新增或者更新);如果计算过程没有聚合操作,该模式与append等效。StructuredStreaming可以读取File、Kafka等数据源,生成流式DataFrame;数据抽象为DataFrame/Dataset,按照SparkSQL中的方式进行处理;输出模式:append、complete、updateSpark读取Kafka数据,结合每个卡口的限速,输出超速车辆的信息。任务实施相关知识小结Spark项目7
基于StructuredStreaming的智慧交通数据处理Spark任务1统计正常工作的监控设备数找出超速通过卡口的车辆统计车辆通过的平均速度任务2任务3数据处理的结果写入MySQL任务4任务分析Spark智慧城市交通建设中需要及时了解各监控卡口的平均车速(单位时间内通过该卡口的所有车辆平均时速),从而合理规划交通设施、安排疏导力量。通过Socket采集交通监控数据(数据样式:“时间戳,卡口ID,设备ID,车牌号,速度”),计算各卡口10分钟内的平均通过速度(每2分钟更新一次),要求考虑延迟到达的数据及重复数据。基于窗口的聚合Spark但很多时候,希望以事件时间为标准,开展数据的统计分析;例如因为网络延迟等因素,某些物联网设备产生事件(数据)的时间要早于Spark接收到该事件(数据)的时间,但我们还是希望能够根据事件时间来处理数据。StructuredStreaming提供了window窗口操作,它可以依据事件时间,将流式数据放置到非重叠的“桶”中;如果要开展聚合操作,则是对桶内的数据执行聚合。
基于窗口的聚合Spark案例:每辆汽车通过卡口时都会产生带有时间戳的数据(样式:“时间戳,卡口ID,监控设备ID,车牌号,车速”),要求统计10分钟内通过的车辆总数。valvehicles=lines.as[String]//lines为从socket端口读取的数据,为DataFrame.map(x=>{valarr=x.split(",")//按照逗号进行切割valdate=sdf.parse(arr(0))//安装设定的格式,将字符串转为Datevalts=newTimestamp(date.getTime())//data转换时间戳,表示事件时间(ts,arr(1),arr(2),arr(3),arr(4))//转为(时间戳,卡口ID,监控设备ID,车牌号,车速)}).toDF("timestamp","checkID","equipmentID","carNO","speed")基于窗口的聚合Spark案例:每辆汽车通过卡口时都会产生带有时间戳的数据(样式:“时间戳,卡口ID,监控设备ID,车牌号,车速”),要求统计10分钟内通过的车辆总数。valvehiclesCounts=vehicles.groupBy(window($"timestamp","10minutes","2minutes"))//按照窗口分组.count()//计算每组内的元素数量(即为车辆数)valresult=vehiclesCounts.select("window.start","window.end","count")//抽取3列.orderBy("start")//按照开始时间排序迟到数据与水印Spark由于网络延迟等因素,采用事件时间方式处理流式数据均面临数据迟到问题。Spark内部可以保留中间状态,从而不丢弃迟到数据;但如果一个流式计算运行时间较长(几天、几周甚至更长),系统内各中间状态积累数据量便会持续增加,导致内存等资源不断被占用,系统稳定性下降为了及时释放资源,Spark允许用户通过WarterMark水印的方式来决定保留多长时间的旧数据状态,即决定最大延迟阈值。迟到数据与水印Spark案例:每辆汽车通过卡口时都会产生带有时间戳的数据(样式:“时间戳,卡口ID,监控设备ID,车牌号,车速”),要求统计10分钟内通过的车辆总数。valvehiclesCounts=vehicles.withWatermark("timestamp","2minutes")//添加水印,最大延迟2分钟.groupBy(window($"timestamp","10minutes","2minutes")).count()重复数据的处理Spark可能某些因素导致数据源多次发送相同的数据,或者因为传输链路等因素同一条数据多次到达,这就需要数据去重操作。valvehiclesCounts=vehicles.withWatermark("timestamp","2minutes").dropDuplicates()//完成去重.groupBy(window($"timestamp","10minutes","2minutes")).count()groupBy(window())可以实现基于窗口的聚合操作;采用水印方式处理迟到数据dropDuplicate方法去除重复数据;Spark综合利用本任务中的知识储备,借助窗口、水印,计算各卡口10分钟内的平均通过速度。任务实施相关知识小结Spark项目7
基于StructuredStreaming的智慧交通数据处理Spark任务1统计正常工作的监控设备数找出超速通过卡口的车辆统计车辆通过的平均速度任务2任务3数据处理的结果写入MySQL任务4任务分析Spark读取Socket数据源(9999端口),找出超速车辆(假设城区限速70公里/小时),将超速通过的车辆信息写入到MySQL数据库及本地文件系统(json格式)。FileSink输出数据SparkStructuredStreaming计算结果可以按照CSV、JSON、Parquet等形式,写入本地或HDFS目录中。例如针对卡口的车辆通行信息,将“时间戳、卡口ID、监控设备ID、车牌号、车速”以JSON文件形式保存到本地目录。vallines=spark.readStream.format("socket").option("host","localhost").option("port",9999).load()FileSink输出数据Sparkvalvehicles=lines.as[String]//DataFrame转为Dataset.map(x=>{valarr=x.split(",")//String元素按照逗号切割(arr(0),arr(1),arr(2),arr(3),arr(4).toInt)//生成元组}).toDF("timestamp","checkID","equipmentID","carNO","speed")FileSink输出数据Sparkvalquery=vehicles.writeStream.outputMode("append")//写入文件时,仅支持append模式.format("json")//文件的格式为json.option("path","file:///home/hadoop/data/filesink")//文件保存的目录.option("checkpointLocation","streamingsink")//设置检查点.start()//启动流式计算KafkaSink输出数据SparkStructuredStreaming清洗处理完毕后,也可以写入到Kafka主题中,以供其他程序(系统)使用。valquery=vehicles.writeStream.outputMode("append")
.format("kafka")//写入kafka
.option("kafka.bootstrap.servers","localhost:9092")//设置kafka服务器
.option("topic","kafkasink")//指定kafka主题.option("checkpointLocation","streamingsink")//指定检查点路径.start()foreachBatch、foreach输出数据SparkforeachBatch、foreach具有高度的灵活性,用户可以自由决定数据写入何处、如何写;两者应用场景略有不同,foreachBatch以微批为单位进行任意的处理、输出;foreach则是作用在流式DataFrame的每一行上。valquery=vehicles.writeStream.
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- Fominoben-生命科学试剂-MCE
- 护理伦理与法规
- 2026年洛阳市孟津县小浪底镇政府招聘10人易考易错模拟试题(共500题)试卷后附参考答案
- 2026年河南鹤壁山城区招考拟聘用人员易考易错模拟试题(共500题)试卷后附参考答案
- 2026年河南省直事业单位招考(1324人)易考易错模拟试题(共500题)试卷后附参考答案
- 2026年河南焦作市中站区招聘事业单位人员人员(第三批)易考易错模拟试题(共500题)试卷后附参考答案
- 2026年河南平顶山新华区事业单位招聘易考易错模拟试题(共500题)试卷后附参考答案
- 2026年河南信阳罗山县第二批招聘事业单位人员149人笔试易考易错模拟试题(共500题)试卷后附参考答案
- 2026年河北邢台市内丘县招聘事业单位工作人员103人易考易错模拟试题(共500题)试卷后附参考答案
- 2026年河北省海兴县第二次事业单位招考易考易错模拟试题(共500题)试卷后附参考答案
- 浙江省A9协作体2025-2026学年高二上学期开学联考语文试卷
- 急危重症患者病情评估与分诊
- 镇静药物的使用及注意事项
- 急救常识科普
- 用户运营考试题及答案
- 初一作文成长经历8篇范文
- 摆脱青春烦恼班会课件
- 青浦区2024-2025学年六年级下学期期末考试数学试卷及答案(上海新教材沪教版)
- 2025版心肺复苏培训课件
- 华辰芯光半导体有限公司光通讯和激光雷达激光芯片FAB量产线建设项目环评资料环境影响
- 医学翻眼睑操作规范教学
评论
0/150
提交评论