Spark大数据分析与实战(第二版)项目6 Spark Streaming处理用户行为数据_第1页
Spark大数据分析与实战(第二版)项目6 Spark Streaming处理用户行为数据_第2页
Spark大数据分析与实战(第二版)项目6 Spark Streaming处理用户行为数据_第3页
Spark大数据分析与实战(第二版)项目6 Spark Streaming处理用户行为数据_第4页
Spark大数据分析与实战(第二版)项目6 Spark Streaming处理用户行为数据_第5页
已阅读5页,还剩34页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

大数据分析与实战项目6SparkStreaming处理用户行为数据针对电商平台用户的浏览、收藏、购物车、下单等基本行为,借助成熟的SparkStreaming模块,模拟开展实时分析与处理,为电商平台运维提供参考。党的二十大作出加快建设网络强国、数字中国的重大部署。随着移动商务、舆情监控、传感监控、在线教育等领域的发展,对数据实时处理的需求日渐增强。情境导入Spark项目分解Spark序号任务任务说明1初探广告点击行为使用Necat模拟发出用户点击行为数据,SparkStreaming程序捕获数据,统计各个广告页面的点击量。2识别无效的广告点击识别无效点击,输出这些无效点击用户的ID。3统计一分钟内的订单数量统计1分钟内用户的下单数量。4电商用户的行为分析统计1分钟内用户下单、加购、收藏次数,并将下单行为保存到MySQL数据库表中。能编写无状态转换、基于窗口的有状态转换流式处理程序。了解SparkStreaming原理,能够读取Socket、File、Kafka等数据源创建DStream。能按照要求输出流式计算的结果(如输出到MySQL数据库中。123学习目标Spark项目6

SparkStreaming处理用户行为数据Spark任务1初探广告点击行为识别无效的广告点击统计1分钟内的订单数量任务2任务3电商用户的行为分析任务4任务分析Spark通常,电商平台首页均开设一定的广告服务,入驻商家可以购买广告位进行商品推广。在重点营销时段,需时刻关注广告位的费效。本任务要求每10秒钟,统计一次用户点击量,据此调整营销策略。认识流数据与SparkStreamingSpark数据分类:静态数据、动态数据(流数据)。静态数据是一段较长的时间内相对稳定的数据,一般采用批处理方式进行计算,可以在充裕的时间内对海量数据进行批量处理(即可以容忍较高的时间延迟)。流数据则是以大量、快速、时变的流形式持续到达,因此流数据是不断变化的数据;流数据是时间上无上限的数据集合,不能采用传统的批处理方式,必须实时计算。SparkStreaming的工作原理SparkSparkStreaming接收实时输入的数据流后,将数据流按时间片(通常为秒级)为单位,拆分为一个个小的批次数据;然后这些小批次的数据交给Spark引擎,以类似批处理的方式处理每个时间片数据。初步体验SparkSQLSpark将输入数据按照实际片段(例如1秒钟)分割成一段一段的离散数据流(称之为DStream,DiscretizedStream);每一个片段内的数据都会变成一个RDD,然后将DStream流处理操作转变为针对RDD的操作。编写第1个SparkStreaming程序Spark例子:利用Netcat工具向9999端口发送数据流(文本数据),使用SparkStreaming监听9999端口的数据流,并实时地进行词频统计(每10秒中,计算获取的单词数量)scala>importorg.apache.spark.streaming._//导入包scala>valssc=newStreamingContext(sc,Seconds(10))//创建上下文环境scala>vallines=ssc.socketTextStream("localhost",9999)//创建DStream,监听9999端口数据编写第1个SparkStreaming程序Sparkscala>valwords=lines.flatMap(x=>x.split(""))//按照空格切分,产生新DStreamscala>valpairs=words.map(x=>(x,1))//转换为(word,1)键值对形式scala>valwordsCounts=pairs.reduceByKey(_+_)//计算词频,注意:返回一个新的DStreamscala>wordsCounts.print()//使用DStream的print方法,打印结果scala>ssc.start()//启动上述计算逻辑,等待结束。编写第1个SparkStreaming程序Sparkhadoop@zsz-VirtualBox:~$nc-lk9999#启动Netcat,发送下面的文本数据:IlikeSparkSparkSparkisgoodIlikeSparkSparkispowerful相关知识小结SparkSparkStreaming原理:按照时间间隔,将流数据切分为片段,每个片段为一个RDD;按照RDD的处理方式进行处理。SparkStreaming编程:(1)读取数据源,创建DStream;(2)执行转换操作;(3)输出结果;(4)start开启流计算。根据知识储备的相关知识,读取Socket流数据,每10秒钟,统计一次用户点击量。任务实施项目6

SparkStreaming处理用户行为数据Spark任务1初探广告点击行为识别无效的广告点击统计1分钟内的订单数量任务2任务3电商用户的行为分析任务4任务分析Spark用户通过APP或者浏览器点击广告的过程中,可能出现误操作(例如短时间内连续点击);某些机构出于非法目的,通过机器人点击;这些均是无效的广告点击行为,需要加以去除。若某用户10秒内,点击3次以上的广告,则判定该用户的点击无效),输出这些无效点击用户的ID(每4秒更新1次)。DStream无状态转换操作SparkDStream无状态转换操作是指不记录历史状态信息,每次仅对新的批次数据进行处理;无状态转换操作每一个批次的数据处理都是独立的,处理当前批次数据时,即不依赖之前的数据,也不影响后续的数据。无状态转换示例Spark有一份黑名单文件blacklist.txt,记载了若干列入黑名单的IP地址,内容如下:355采用Netcat模拟用户访问所产生的数据流:用户每访问平台一次,在Netcat中输入一个IP地址。要求采用无状态转换方式,计算10秒内的平台有效访问次数(忽略黑名单IP的访问)无状态转换示例SparkvalblackIPs=sc.textFile(path)//读取本地黑名单文件,创建黑名单RDDvalssc=newStreamingContext(sc,Seconds(10))valIPs=ssc.socketTextStream("localhost",9999)//监听9999端口的数据valwhiteIPs=IPs.transform{rdd=>rdd.subtract(blackIPs)//点击数据(IP地址)中去掉黑名单RDD中的IP}valcount=whiteIPs.count()//统计剩余的IP地址数量count.print()//打印输出结果DStream有状态转换操作Spark有状态转换操作在处理当前批次的数据时,需要用到之前批次的数据或者中间计算结果;有状态转换包括基于滑动窗口的转换和updateStateByKey转换。有状态转换操作示例Spark示例:读取套接字流数据(9999端口),设置批次间隔1秒;窗口长度为3秒,滑动时间间隔为1秒,统计窗口内单词词频后打印输出。

vallinesDS=ssc.socketTextStream("localhost",9999)valwordsDS=linesDS.flatMap(x=>x.split(""))valkvDS=wordsDS.map(x=>(x,1))

//转换为(word,1)形式的键值对//使用reduceByKeyAndWindow方法,计算窗口内单词词频

val

windowWordCount=kvDS.reduceByKeyAndWindow(

(a:Int,b:Int)=>a+b,Seconds(3),Seconds(1))windowWordCount.print()无状态转换:不记录之前状态数据;有状态转换:需要用到之前批次的数据或者中间计算结果;重点学习窗口操作。Spark综合利用本任务中的知识储备,读取用户点击数据,识别出无效的点击行为,并输出相应的结果吧。任务实施相关知识小结Spark项目6

SparkStreaming处理用户行为数据Spark任务1初探广告点击行为识别无效的广告点击统计1分钟内的订单数量任务2任务3电商用户的行为分析任务4任务分析Spark除了监听socket端口方式获取流数据,SparkStreaming还可以读取文件、RDD队列及Kafka数据,生成DStream。获取Kafka中的用户行为数据(数据形式为“用户ID,行为类型”,行为类型”包括:①pv点击,②buy下单,③cart购物车,④fav收藏),统计1分钟内的用户下单数据。由文件流创建DStreamSparkSparkStreaming可以从HDFS文件系统目录、本地系统的文件目录读取数据到DStream中;一旦有目录中有文件加入,则获取文件中的内容,创建DStream。valssc=newStreamingContext(sc,Seconds(10))

valpath="hdfs://localhost:9000/user/hadoop/spark_streaming"

//创建DStream,监听hdfs相关目录vallines=ssc.textFileStream(path)lines.print()

RDD队列流创建DStreamSparkDStream可以看做离散的RDD序列,因此SparkStreaming可以读取RDD组成的数据队列;该方式较少使用。valinputDStream=ssc.queueStream(rddQueue)Kafka的原理SparkKafka是一个分布式、支持分区的(partition)、多副本的(replica)分布式消息系统,它可以实时的处理大量数据以满足多种需求场景,广泛应用于web日志、访问日志等领域。Kafka的安装与体验Spark解压(安装)Kafkasudotar-zxvf/home/hadoop/soft/kafka_2.12-3.6.0.tgz-C/usr/localKafka通常需要Zookeeper的支持,启动Zookeeper/usr/local/kafka/bin/zookeeper-server-start.shconfig/perties打开第二个Linux终端,输入以下命令,启动kafka服务/usr/local/kafkabin/kafka-server-start.shconfig/perties创建消息主题(Topic)/usr/local/kafkabin/kafka-topics.sh

--create--topicmytopic--bootstrap-serverlocalhost:9092Kafka的安装与体验Spark向Kafka主题发出消息/usr/local/kafkabin/kafka-console-producer.sh--broker-listlocalhost:9092--topicmytopic可以自行输入多行文本数据后,消费数据:/usr/local/kafkabin/kafka-console-consumer.sh--bootstrap-serverlocalhost:9092--topicmytpic--from-beginningKafka作为DStream数据源SparkvalkafkaParas=Map[String,String](//设置Kafka相关参数"bootstrap.servers"->"localhost:9092","key.deserializer"->"mon.serialization.StringDeserializer","value.deserializer"->"mon.serialization.StringDeserializer","group.id"->"use_a_separate_group_id_for_each_stream",)valtopics=Set("mytopic")//将读取的Kafka主题Topic写入到一个Set中Kafka作为DStream数据源SparkvalkafkaInputDS=KafkaUtils.createDirectStream(//创建DStreamssc,PreferConsistent,Subscribe[String,String](topics,kafkaParas))SparkStreaming可以从HDFS文件系统目录、本地系统的文件目录读取数据到DStream中。DStream可以看做离散的RDD序列,因此SparkStreaming可以读取RDD组成的数据队列。SparkStreaming还支持Kafka、Flume等高级数据源。Spark综合利用本任务中的知识储备,完成规定指标的统计分析。任务实施相关知识小结Spark项目6

SparkStreaming处理用户行为数据Spark任务1初探广告点击行为识别无效的广告点击统计1分钟内的订单数量任务2任务3电商用户的行为分析任务4任务分析Spark某电商平台用户行为数据集,包含了十万随机用户的行为数据。定时读取用户行为文件中的行为数据,写入到Kafka的某主题下,SparkStream获取该主题的数据后进行处理,统计1分钟内的各种用户行为(10秒更新一次),过滤出购买行为后写入MySQL数据库。DStream数据保存到文件Spark除了print算子,DStream有若干输出算子;相关说明如下:DStream数据保存到文件示

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论