Spark编程基础(Scala版第2版)课件 第7章 Spark Streaming_第1页
Spark编程基础(Scala版第2版)课件 第7章 Spark Streaming_第2页
Spark编程基础(Scala版第2版)课件 第7章 Spark Streaming_第3页
Spark编程基础(Scala版第2版)课件 第7章 Spark Streaming_第4页
Spark编程基础(Scala版第2版)课件 第7章 Spark Streaming_第5页
已阅读5页,还剩223页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

·SparkStreaming第7章目

录01流计算概述02SparkStreaming03DStream操作概述04基本输入源05高级数据源06转换操作07输出操作流计算概述7.1

流计算概述静态数据和流数据批量计算和实时计算流计算概念流计算框架目录CONTENT流计算处理流程7.1.1静态数据和流数据静态数据流数据7.1.1静态数据和流数据静态数据数据仓库7.1.1静态数据和流数据流数据应用越来越多7.1.1静态数据和流数据网络监控Web应用传感监测7.1.1静态数据和流数据每时每刻都在采集数据对数据进行实时的分析摄像头视频7.1.1静态数据和流数据大量快速时变持续到达流数据7.1.1静态数据和流数据PM2.5检测电子商务网站用户点击流7.1.1静态数据和流数据数据快速持续到达

潜在大小也许无穷无尽数据来源众多,格式复杂数据量大注重数据的整体价值数据顺序颠倒,或不完整流数据特征7.1.1静态数据和流数据静态数据流数据两种典型数据7.1.1静态数据和流数据实时计算处理逻辑静态数据价值丢弃流数据(动态数据)价值处理逻辑批量计算7.1.2

批量计算和实时计算MapReduce无法满足秒级响应处理大规模静态数据处理逻辑静态数据价值批量计算7.1.2

批量计算和实时计算实时获取不同数据源的海量数据获得有价值的信息经过实时分析处理流计算7.1.2

批量计算和实时计算数据采集实时分析处理结果反馈7.1.2

批量计算和实时计算数据的价值时间的流逝流计算基本理念数据过了时间点就没有价值7.1.3流计算概念流计算基本理念在互联网应用产品中尤其典型7.1.3流计算概念用户点击流7.1.3流计算概念您可能感兴趣7.1.3流计算概念您感兴趣的过了十几分钟用户离开网站7.1.3流计算概念数据的价值时间的流逝7.1.3流计算概念低延迟高可靠处理引擎可扩展7.1.3流计算概念高性能每秒处理几十万条数据流计算框架7.1.3流计算概念海量式支持TB级、PB级的数据规模流计算框架7.1.3流计算概念实时性低延迟,达到秒级别、毫秒级别流计算框架7.1.3流计算概念分布式支持大数据基本架构,平滑扩展流计算框架7.1.3流计算概念易用性快速进行开发和部署流计算框架7.1.3流计算概念可靠性可靠地处理流数据流计算框架7.1.3流计算概念开源流计算框架商业级的流计算平台公司为支持自身业务开发的流计算框架三类流计算框架和平台7.1.4流计算框架IBMStreamBaseIBMInfoSphereStreams商业级流计算平台7.1.4流计算框架TwitterStormYahoo!S4开源流计算框架7.1.4流计算框架TwitterStorm7.1.4流计算框架TwitterStorm免费开源简单高效可靠分布式实时计算系统处理大量的流数据7.1.4流计算框架Yahoo!S47.1.4流计算框架通用分布式可扩展分区容错可插拔流式系统Yahoo!S47.1.4流计算框架公司为支持自身业务开发的流计算框架Dstream银河流计算处理平台7.1.4流计算框架Puma概述数据实时采集数据实时计算实时查询服务7.1.5 流计算处理流程存储的数据是旧的不具备时效性传统的数据处理流程概述需要用户主动发出查询来获取结果7.1.5 流计算处理流程流计算处理流程示意图概述7.1.5 流计算处理流程数据实时采集采集多个数据源的海量数据实时性低延迟稳定可靠7.1.5 流计算处理流程日志数据数据实时采集7.1.5 流计算处理流程开源分布式日志采集系统每秒数百MB的数据采集每秒数百MB的数据传输数据实时采集7.1.5 流计算处理流程Scribe数据实时采集7.1.5 流计算处理流程数据实时采集Kafka7.1.5 流计算处理流程数据实时采集TimeTunnel7.1.5 流计算处理流程数据实时采集ChukwaFlume7.1.5 流计算处理流程流计算处理流程示意图数据实时计算7.1.5 流计算处理流程数据实时计算数据实时计算流程7.1.5 流计算处理流程流计算处理流程示意图实时查询服务7.1.5 流计算处理流程结果流计算框架实时查询展示储存实时查询服务7.1.5 流计算处理流程传统计算方式主动发起查询用户数据库7.1.5 流计算处理流程流数据处理系统实时查询服务实时数据7.1.5 流计算处理流程和结果推送更新结果传统的数据处理系统定时查询过去某一时刻实时查询服务7.1.5 流计算处理流程流处理系统传统的数据处理系统数据实时的数据预先存储好的静态数据结果实时结果过去某一时刻的结果用户得到结果的方式主动将实时结果推送给用户用户主动发出查询7.1.5 流计算处理流程SparkStreaming7.2

SparkStreaming从“Hadoop+Storm”架构转Spark架构SparkStreaming与Storm的对比SparkStreaming设计SparkStreamingSparkStreamingKafkaFlumeHDFSTCPsocketHDFSDatabasesDashboards图SparkStreaming支持的输入、输出数据源7.2.1SparkStreaming设计SparkStreaming图

SparkStreaming执行流程inputdatastreambatchesofinputdatabatchesofprocesseddataSparkEngine7.2.1SparkStreaming设计连续不断的数据流模仿流计算7.2.1SparkStreaming设计Spark是以线程级别并行实时响应级别高可以实现秒级响应变相实现高效的流计算…批处理批处理批处理批处理批处理模仿流计算交给Spark核心引擎处理7.2.1SparkStreaming设计图

DStream操作示意图LinesDStreamLinesfromtime0to1RDD@time1Linesfromtime1to2Linesfromtime2to3Linesfromtime3to4RDD@time2RDD@time3RDD@time4WordsDStreamwordsfromtime0to1flatMapoperationwordsfromtime1to2wordsfromtime2to3wordsfromtime3to4RDD@result1RDD@result2RDD@result3RDD@result47.2.1SparkStreaming设计SparkCoreRDD数据抽象SparkSQLDataFrame数据抽象SparkStreamingDStream数据抽象7.2.1SparkStreaming设计图

DStream操作示意图LinesDStreamLinesfromtime0to1RDD@time1Linesfromtime1to2Linesfromtime2to3Linesfromtime3to4RDD@time2RDD@time3RDD@time4WordsDStreamwordsfromtime0to1flatMapoperationwordsfromtime1to2wordsfromtime2to3wordsfromtime3to4RDD@result1RDD@result2RDD@result3RDD@result4第一秒切出的数据第二秒切出的数据7.2.1SparkStreaming设计图

DStream操作示意图LinesDStreamLinesfromtime0to1RDD@time1Linesfromtime1to2Linesfromtime2to3Linesfromtime3to4RDD@time2RDD@time3RDD@time4WordsDStreamwordsfromtime0to1flatMapoperationwordsfromtime1to2wordsfromtime2to3wordsfromtime3to4RDD@result1RDD@result2RDD@result3RDD@result4DStream这个时间段内获得的语句7.2.1SparkStreaming设计SparkStreamingStorm毫秒级响应无法实现可以实现实时计算可用于实时计算可实时计算容错处理RDD数据集更容易、更高效的容错处理高度容错计算方式兼容批量和实时数据处理实时流计算7.2.2SparkStreaming与Storm的对比企业批处理需求流处理需求7.2.3从“Hadoop+Storm”架构转向Spark架构图

采用Hadoop+Storm部署方式的一个案例7.2.3从“Hadoop+Storm”架构转向Spark架构图

采用Hadoop+Storm部署方式的一个案例7.2.3从“Hadoop+Storm”架构转向Spark架构图

采用Hadoop+Storm部署方式的一个案例批量处理实时流计算7.2.3从“Hadoop+Storm”架构转向Spark架构图

用Spark架构满足批处理和流处理需求批处理7.2.3从“Hadoop+Storm”架构转向Spark架构图

用Spark架构满足批处理和流处理需求实时计算7.2.3从“Hadoop+Storm”架构转向Spark架构图

用Spark架构满足批处理和流处理需求查询7.2.3从“Hadoop+Storm”架构转向Spark架构Spark架构Hadoop+Storm架构vs7.2.3从“Hadoop+Storm”架构转向Spark架构Spark架构优点一键安装和布置7.2.3从“Hadoop+Storm”架构转向Spark架构硬件集群软件维护任务监控应用开发难度Spark架构优点7.2.3从“Hadoop+Storm”架构转向Spark架构统一的硬件、计算平台资源池Spark架构优点7.2.3从“Hadoop+Storm”架构转向Spark架构DStream操作概述7.3DStream操作概述创建StreamingContext对象SparkStreaming工作机制SparkStreaming程序的基本步骤0201DStream03图

DStream操作示意图LinesDStreamLinesfromtime0to1RDD@time1Linesfromtime1to2Linesfromtime2to3Linesfromtime3to4RDD@time2RDD@time3RDD@time4WordsDStreamwordsfromtime0to1flatMapoperationwordsfromtime1to2wordsfromtime2to3wordsfromtime3to4RDD@result1RDD@result2RDD@result3RDD@result47.3.1SparkStreaming工作机制ExecutorExecutorDriverProgramSparkContextClusterManagerWorkerNodeCacheTaskTask(Receiver)WorkerNodeCacheTaskTask(Receiver)HDFS、HBaseInputDStreamInputDStream7.3.1SparkStreaming工作机制InputDStream套接字流从Kafka中读取的输入流文件流7.3.1SparkStreaming工作机制InputDStreamreceiver组件接收数据输送数据挂接7.3.1SparkStreaming工作机制1创建输入DStream输入源定义编写SparkStreaming程序有固定的步骤7.3.2SparkStreaming程序的基本步骤数据源头对文件进行监控文件流通过Kafka抛数据Kafka数据流构建一个RDD队列RDD队列流7.3.2SparkStreaming程序的基本步骤2DStream应用转换操作和输出操作流计算定义7.3.2SparkStreaming程序的基本步骤3streamingContext.start()开始接收数据和处理流程7.3.2SparkStreaming程序的基本步骤4等待处理结束streamingContext.awaitTermination()7.3.2SparkStreaming程序的基本步骤5手动结束流计算进程streamingContext.stop()7.3.2SparkStreaming程序的基本步骤怎么能够创建StreamingContext对象????为什么创建StreamingContext对象?7.3.3创建StreamingContext对象进行SparkCore的RDD编程7.3.3创建StreamingContext对象进行SparkCore的RDD编程SparkContext对象创建7.3.3创建StreamingContext对象spark-shell7.3.3创建StreamingContext对象对象变量名称叫scSparkContext对象7.3.3创建StreamingContext对象有SparkSession对象SparkSQL编程7.3.3创建StreamingContext对象spark-shell变量名称:spark7.3.3创建StreamingContext对象scala>importorg.apache.spark.streaming._scala>valssc=newStreamingContext(sc,Seconds(1))导入包7.3.3创建StreamingContext对象

创建StreamingContext对象importorg.apache.spark._importorg.apache.spark.streaming._valconf=newSparkConf().setAppName("TestDStream").setMaster("local[2]")valssc=newStreamingContext(conf,Seconds(1))7.3.3创建StreamingContext对象

创建StreamingContext对象基本输入源7.4基本输入源基本输入源7.4.1文件流7.4.3RDD队列流7.4.2套接字流7.4.1文件流$cd/usr/local/spark/mycode$mkdirstreaming$cdstreaming$mkdirlogfile$cdlogfile

1.

在spark-shell中创建文件流7.4.1文件流scala>importorg.apache.spark.streaming._scala>valssc=newStreamingContext(sc,Seconds(20))scala>vallines=ssc.textFileStream("file:///usr/local/spark/mycode/streaming/logfile")scala>valwords=lines.flatMap(_.split(""))scala>valwordCounts=words.map(x=>(x,1)).reduceByKey(_+_)scala>wordCounts.print()scala>ssc.start()scala>ssc.awaitTermination()

进入spark-shell创建文件流,另外打开一个终端窗口,启动spark-shell7.4.1文件流//这里省略若干屏幕信息-------------------------------------------Time:1479431100000ms-------------------------------------------//这里省略若干屏幕信息-------------------------------------------Time:1479431120000ms-------------------------------------------//这里省略若干屏幕信息-------------------------------------------Time:1479431140000ms-------------------------------------------输入ssc.start()以后,程序就开始自动进入循环监听状态在“/usr/local/spark/mycode/streaming/logfile”目录新建一个log.txt,可在监听窗口中显示词频统计结果7.4.1文件流$cd/usr/local/spark/mycode$mkdirstreaming$cdstreaming$mkdir-psrc/main/scala$cdsrc/main/scala$vimTestStreaming.scala

2.

采用独立应用程序方式创建文件流7.4.1文件流importorg.apache.spark._importorg.apache.spark.streaming._objectWordCountStreaming{defmain(args:Array[String]){valsparkConf=newSparkConf().setAppName("WordCountStreaming").setMaster("local[2]")//设置为本地运行模式,2个线程,一个监听,另一个处理数据

valssc=newStreamingContext(sparkConf,Seconds(2))//时间间隔为2秒vallines=ssc.textFileStream("file:///usr/local/spark/mycode/streaming/logfile")//这里采用本地文件,当然你也可以采用HDFS文件valwords=lines.flatMap(_.split(""))valwordCounts=words.map(x=>(x,1)).reduceByKey(_+_)wordCounts.print()ssc.start()ssc.awaitTermination()}}

用vim编辑器新建一个TestStreaming.scala代码文件7.4.1文件流$cd/usr/local/spark/mycode/streaming$vimsimple.sbt

用vim编辑器新建一个TestStreaming.scala代码文件name:="SimpleProject"version:="1.0"scalaVersion:="2.12.15"libraryDependencies+="org.apache.spark"%%"spark-core"%"3.2.0"libraryDependencies+="org.apache.spark"%"spark-streaming_2.12"%"3.2.0"%"provided"

在simple.sbt文件中输入以下代码7.4.1文件流$cd/usr/local/spark/mycode/streaming$/usr/local/sbt/sbtpackage

执行sbt打包编译的命令如下$cd/usr/local/spark/mycode/streaming$/usr/local/spark/bin/spark-submit--class"WordCountStreaming“/usr/local/spark/mycode/streaming/target/scala-2.11/simple-project_2.12-1.0.jar

打包成功以后,就可以输入以下命令启动这个程序7.4.1文件流Shell窗口"/usr/local/spark/mycode/streaming/logfile"log2.txtHellowordHellowordHelloword保存好文件退出vim编辑器7.4.1文件流监听窗口20秒后按Ctrl+C或Ctrl+D停止监听程序7.4.1文件流监听窗口打印出单词统计信息ABA:拨号B:接起任一方挂掉电话Socket编程原理7.4.2套接字流1Socket工作原理STEP17.4.2套接字流socket()socket()connect()write()read()close()bind()listen()accept()read()write()read()close()阻塞直到有客户端连接回应数据结束连接请求数据建立连接处理请求TCP客户端TCP服务器端7.4.2套接字流2使用套接字流作为数据源实现SparkStreaming编程STEP27.4.2套接字流$cd/usr/local/spark/mycode$mkdirstreaming#如果已经存在该目录,则不用创建$cdstreaming$mkdirsocket$cdsocket$mkdir-psrc/main/scala#如果已经存在该目录,则不用创建$cd/usr/local/spark/mycode/streaming/socket/src/main/scala$vimNetworkWordCount.scala#这里使用vim编辑器创建文件

使用套接字流作为数据源7.4.2套接字流SparkStreaming程序socket()connect()write()read()close()TCP客户端相当于7.4.2套接字流socket()connect()write()read()close()TCP客户端数据发送TCP服务端发起请求7.4.2套接字流数据发送TCP服务端SparkStreaming组件进行词频统计7.4.2套接字流socket()connect()write()read()close()TCP客户端构建TCP客户端7.4.2套接字流importorg.apache.spark._importorg.apache.spark.streaming._objectNetworkWordCount{defmain(args:Array[String]){if(args.length<2){System.err.println("Usage:NetworkWordCount<hostname><port>")System.exit(1)}valsparkConf=new

请在NetworkWordCount.scala文件中输入如下内容7.4.2套接字流SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")valsc=newSparkContext(sparkConf)sc.setLogLevel("ERROR")valssc=newStreamingContext(sc,Seconds(1))vallines=ssc.socketTextStream(args(0),args(1).toInt)valwords=lines.flatMap(_.split(""))valwordCounts=words.map(x=>(x,1)).reduceByKey(_+_)wordCounts.print()ssc.start()ssc.awaitTermination()}}

请在NetworkWordCount.scala文件中输入如下内容7.4.2套接字流<hostname><port>客户端向服务端发起连接告诉它向哪个主机哪个端口发起连接7.4.2套接字流StreamingExamplesStreamingExample.scala代码文件定义在定义在org.apache.spark.examples.streaming7.4.2套接字流StreamingExamples不需要实例化直接用它的静态方法单例对象7.4.2套接字流StreamingExamples.setStreamingLogLevels()StreamingExamples.scala代码文件定义设置日志log4j的级别格式7.4.2套接字流name:="SimpleProject"version:="1.0"scalaVersion:="2.12.15"libraryDependencies+="org.apache.spark"%"spark-streaming_2.12"%"3.2.0"%"provided"

在“/usr/local/spark/mycode/streaming/socket”目录创建simple.sbt文件7.4.2套接字流$cd/usr/local/spark/mycode/streaming/socket$/usr/local/sbt/sbtpackage$cd/usr/local/spark/mycode/streaming/socket$/usr/local/spark/bin/spark-submit\>--class"NetworkWordCount"\>./target/scala-2.12/simple-project_2.12-1.0.jar\>localhost9999

在“/usr/local/spark/mycode/streaming/socket”目录创建simple.sbt文件7.4.2套接字流“NetworkWordCount”客户端7.4.2套接字流启动TCP服务端netcat程序打开终端7.4.2套接字流nc程序$nc-lk9999输入命令7.4.2套接字流$nc-lk9999启动监听连续监听输入命令7.4.2套接字流向客户端发数据输入命令$nc-lk99997.4.2套接字流$nc-lk99997.4.2套接字流Helloword$nc-lk99997.4.2套接字流监听窗口-------------------------------------------Time:1479431100000ms-------------------------------------------(hello,1)(world,1)-------------------------------------------Time:1479431120000ms-------------------------------------------(hadoop,1)-------------------------------------------Time:1479431140000ms-------------------------------------------(spark,1)7.4.2套接字流7.4.2套接字流能不能自己去编写一个TCP服务端?7.4.2套接字流自己编写程序$cd/usr/local/spark/mycode/streaming/socket/src/main/scala$vimDataSourceSocket.scala

采用自己编写的程序产生Socket数据源7.4.2套接字流importjava.io.{PrintWriter}import.ServerSocketimportscala.io.Source7.4.2套接字流

采用自己编写的程序产生Socket数据源objectDataSourceSocket{defindex(length:Int)={//返回位于0到length-1之间的一个随机数valrdm=newjava.util.Randomrdm.nextInt(length)}defmain(args:Array[String]){if(args.length!=3){System.err.println("Usage:<filename><port><millisecond>")System.exit(1)}valfileName=args(0)//获取文件路径vallines=Source.fromFile(fileName).getLines.toList//读取文件中的所有行的内容valrowCount=lines.length//计算出文件的行数

vallistener=newServerSocket(args(1).toInt)//创建监听特定端口的7.4.2套接字流ServerSocket对象while(true){valsocket=listener.accept()newThread(){overridedefrun={println("Gotclientconnectedfrom:"+socket.getInetAddress)valout=newPrintWriter(socket.getOutputStream(),true)while(true){Thread.sleep(args(2).toLong)//每隔多长时间发送一次数据valcontent=lines(index(rowCount))//从lines列表中取出一个元素println(content)out.write(content+'\n')//写入要发送给客户端的数据out.flush()//发送数据给客户端}socket.close()}}.start()}}}$cd/usr/local/spark/mycode/streaming/socket$/usr/local/sbt/sbtpackage

执行sbt打包编译的命令如下7.4.2套接字流7.4.2套接字流创建文本/usr/local/spark/mycode/streaming/word.txtHellowordHellowordHellowordword.txt$cd/usr/local/spark/mycode/streaming/socket$/usr/local/sbt/sbtpackage

执行sbt打包编译的命令如下$cd/usr/local/spark/mycode/streaming/socket$/usr/local/spark/bin/spark-submit\>--class"DataSourceSocket"\>./target/scala-2.12/simple-project_2.12-1.0.jar\>./word.txt99991000

打包成功以后,启动DataSourceSocket程序7.4.2套接字流$cd/usr/local/spark/mycode/streaming/socket$/usr/local/spark/bin/spark-submit\>--class"NetworkWordCount"\>./target/scala-2.12/simple-project_2.12-1.0.jar\>localhost9999

下面就可以启动客户端,即NetworkWordCount程序7.4.2套接字流启动成功后,你就会看到,屏幕上不断打印出词频统计信息RDD队列输入的数据RDDRDDRDDSparkStreaming7.4.3

RDD队列流TestRDDQueueStream.scalaRDD1秒1秒RDD1秒……RDD队列7.4.3

RDD队列流1秒……RDDRDDRDD队列7.4.3

RDD队列流StreamingRDDRDDRDD队列每隔2秒对数据进行处理7.4.3

RDD队列流

新建一个TestRDDQueueStream.scala代码文件packageorg.apache.spark.examples.streamingimportorg.apache.spark.SparkConfimportorg.apache.spark.rdd.RDDimportorg.apache.spark.streaming.StreamingContext._importorg.apache.spark.streaming.{Seconds,StreamingContext}7.4.3

RDD队列流

新建一个TestRDDQueueStream.scala代码文件objectQueueStream{defmain(args:Array[String]){valsparkConf=newSparkConf().setAppName("TestRDDQueue").setMaster("local[2]")valssc=newStreamingContext(sparkConf,Seconds(2))valrddQueue=newscala.collection.mutable.SynchronizedQueue[RDD[Int]]()valqueueStream=ssc.queueStream(rddQueue)valmappedStream=queueStream.map(r=>(r%10,1))valreducedStream=mappedStream.reduceByKey(_+_)reducedStream.print()ssc.start()7.4.3

RDD队列流

新建一个TestRDDQueueStream.scala代码文件for(i<-1to10){rddQueue+=ssc.sparkContext.makeRDD(1to100,2)Thread.sleep(1000)}ssc.stop()}}7.4.3

RDD队列流

sbt打包成功后,执行下面命令运行程序$cd/usr/local/spark/mycode/streaming/rddqueue$/usr/local/spark/bin/spark-submit\>--class"org.apache.spark.examples.streaming.QueueStream"\>./target/scala-2.12/simple-project_2.12-1.0.jar7.4.3

RDD队列流

执行上面命令以后,程序就开始运行,就可以看到类似下面的结果-------------------------------------------Time:1479522100000ms-------------------------------------------(4,10)(0,10)(6,10)(8,10)(2,10)(1,10)(3,10)(7,10)(9,10)(5,10)7.4.3

RDD队列流高级数据源提纲123Kafka准备工作Spark准备工作Kafka简介4编写SparkStreaming程序使用Kafka数据源高吞吐量的分布式发布订阅消息系统订阅消息分发消息7.5.1Kafka

简介Kafka:消息分发系统

起到信息传递中枢的作用7.5.1Kafka

简介Kafka是一个分布式的消息分发系统7.5.1Kafka

简介Kafka作为一个信息传递的枢纽7.5.1Kafka

简介Kafka集群............BrokerBroker1Broker2Broker37.5.1Kafka

简介TopicTopic7.5.1Kafka

简介Topic1Broker1PartitionPartitionBroker2PartitionPartition.........……Partition7.5.1Kafka

简介ProducerPdoducerBroker2ConsumerBroker2Consumer消息消费者向Kafkabroker读取消息的客户端SparkStreaming7.5.1Kafka

简介ConsumerGroup每个Consumer只属于某个ConsumerGroup若不指定groupname则属于默认的groupConsumerGroup1Comsumer1Comsumer2.........ConsumerGroupZhidingComsumer37.5.1Kafka

简介逻辑概念7.5.1Kafka

简介7.5.2Kafka准备工作Kafka准备工作安装Kafka测试Kafka是否正常工作启动Kafka/blog/1096-2/“/usr/local/kafka”安装目录:安装Kafka7.5.2Kafka准备工作安装Kafka$cd~/Downloads$sudotar-zxfkafka_2.12-2.6.0.tgz-C/usr/local$cd/usr/local$sudomvkafka_2.12-2.6.0kafka$sudochown-Rhadoop./kafka7.5.2Kafka准备工作启动Kafka$cd/usr/local/kafka$./bin/zookeeper-server-start.shconfig/perties不能关闭这个终端窗口关闭这个窗口,会使Zookeeper服务停止7.5.2Kafka准备工作打开第二个终端,输入下面命令启动Kafka服务不能关闭这个终端窗口关闭这个窗口,会使kafka服务停止$cd/usr/local/kafka$bin/kafka-server-start.shconfig/perties7.5.2Kafka准备工作打开第三个终端$cd/usr/local/kafka$./bin/kafka-topics.sh--create--zookeeperlocalhost:2181\>--replication-factor1--partitions1\>--topicwordsender然后,可以执行如下命令,查看名称为“wordsender”的Topic是否已经成功创建:$./bin/kafka-topics.sh--list--zookeeperlocalhost:21817.5.2Kafka准备工作再新开一个终端(记作“监控输入终端”)$cd/usr/local/kafka$bin/kafka-console-consumer.sh\>--bootstrap-serverlocalhost:9092--topicwordsender7.5.2Kafka准备工作jar包启动spark-shell7.5.3Spark准备工作高级数据源......需要独立的库默认不存在jar文件7.5.3Spark准备工作spark-streaming-kafka-0-10_2.12-3.2.0.jar

spark-token-provider-kafka-0-10_2.12-3.2.0.jar7.5.3Spark准备工作复制/usr/local/spark/jarsspark-streaming-kafka-0-10_2.12-3.2.0.jar

spark-token-provider-kafka-0-10_2.12-3.2.0.jar7.5.3Spark准备工作Kafka安装目录的libs目录下的所有jar文件7.5.4编写SparkStreaming程序使用Kafka数据源编写消费者程序编写生产者程序编译打包程序运行程序010203047.5.4编写SparkStreaming程序使用Kafka数据源

1.

编写生产者(producer)程序:编写KafkaWordProducer程序$cd/usr/local/spark/mycode$mkdirkafka$cdkafka$mkdir-psrc/main/scala$cdsrc/main/scala$vimKafkaWordProducer.scala7.5.4编写SparkStreaming程序使用Kafka数据源

在KafkaWordProducer.scala中输入以下代码importjava.util.HashMapimportducer.{KafkaProducer,ProducerConfig,ProducerRecord}importorg.apache.spark.SparkConfimportorg.apache.spark.streaming._importorg.apache.spark.streaming.kafka010._7.5.4编写SparkStreaming程序使用Kafka数据源objectKafkaWordProducer{defmain(args:Array[String]){if(args.length<4){System.err.println("Usage:KafkaWordProducer<metadataBrokerList><topic>"+"<messagesPerSec><wordsPerMessage>")System.exit(1)}valArray(brokers,topic,messagesPerSec,wordsPerMessage)=args//Zookeeperconnectionpropertiesvalprops=newHashMap[String,Object]()props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokers)props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"mon.serialization.StringSerializer")props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"mon.serialization.StringSerializer")valproducer=newKafkaProducer[String,String](props)7.5.4编写SparkStreaming程序使用Kafka数据源//Sendsomemessageswhile(true){(1tomessagesPerSec.toInt).foreach{messageNum=>valstr=(1towordsPerMessage.toInt).map(x=>scala.util.Random.nextInt(10).toString).mkString("")print(str)println()valmessage=newProducerRecord[String,String](topic,null,str)producer.send(message)}Thread.sleep(1000)}}}7.5.4编写SparkStreaming程序使用Kafka数据源

2.

编写消费者程序:在当前目录下创建KafkaWordCount.scala代码文件importorg.apache.spark._importorg.apache.spark.SparkConfimportorg.apache.spark.rdd.RDDimportorg.apache.spark.streaming._importorg.apache.spark.streaming.kafka010._importorg.apache.spark.streaming.StreamingContext._importorg.apache.spark.streaming.kafka010.KafkaUtilsimportmon.serialization.StringDeserializerimportorg.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistentimportorg.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe7.5.4编写SparkStreaming程序使用Kafka数据源objectKafkaWordCount{defmain(args:Array[String]){valsparkConf=newSparkConf().setAppName("KafkaWordCount").setMaster("local[2]")valsc=newSparkContext(sparkConf)sc.setLogLevel("ERROR")valssc=newStreamingContext(sc,Seconds(10))ssc.checkpoint("file:///usr/local/spark/mycode/kafka/checkpoint")//设置检查点,如果存放在HDFS上面,则写成类似ssc.checkpoint("/user/hadoop/checkpoint")这种形式,但是,要启动HadoopvalkafkaParams=Map[String,Object]("bootstrap.servers"->"localhost:9092","key.deserializer"->classOf[StringDeserializer],"value.deserializer"->classOf[StringDeserializer],"group.id"->"use_a_separate_group_id_for_each_stream","auto.offset.reset"->"latest","mit"->(true:java.lang.Boolean))7.5.4编写SparkStreaming程序使用Kafka数据源valtopics=Array("wordsender")valstream=KafkaUtils.createDirectStream[String,String](ssc,PreferConsistent,Subscribe[String,String](topics,kafkaParams))stream.foreachRDD(rdd=>{valoffsetRange=rdd.asInstanceOf[HasOffsetRanges].offsetRangesvalmaped:RDD[(String,String)]=rdd.map(record=>(record.key,record.value))vallines=maped.map(_._2)valwords=lines.flatMap(_.split(""))valpair=words.map(x=>(x,1))valwordCounts=pair.reduceByKey(_+_)wordCounts.foreach(println)})ssc.startssc.awaitTermination}}7.5.4编写SparkStreaming程序使用Kafka数据源

3.

编译打包程序:创建simple.sbt文件$cd/usr/local/spark/mycode/kafka/$vimsimple.sbt7.5.4编写SparkStreaming程序使用Kafka数据源

在simple.sbt中输入以下代码name:="SimpleProject"version:="1.0"scalaVersion:="2.12.15"libraryDependencies+="org.apache.spark"%%"spark-core"%"3.2.0"libraryDependencies+="org.apache.spark"%%"spark-streaming"%"3.2.0"%"provided"libraryDependencies+="org.apache.spark"%%"spark-streaming-kafka-0-10"%"3.2.0"libraryDependencies+="org.apache.kafka"%"kafka-clients"%"2.6.0"7.5.4编写SparkStreaming程序使用Kafka数据源3.编译打包程序:进行打包编译$cd/usr/local/spark/mycode/kafka/$/usr/local/sbt/sbtpackage7.5.4编写SparkStreaming程序使用Kafka数据源

4.

运行程序:打开一个终端,运行“KafkaWordProducer”程序$cd/usr/local/spark/mycode/kafka/$/usr/local/spark/bin/spark-submit\>--class"KafkaWordProducer"\>./target/scala-2.12/simple-project_2.12-1.0.jar\>localhost:9092wordsender357.5.4编写SparkStreaming程序使用Kafka数据源

执行上面命令后,屏幕上会不断滚动出现新的单词,如下75073282130129280909900686616583677……不要关闭这个终端窗口让它一直不断发送单词7.5.4编写SparkStreaming程序使用Kafka数据源

新打开一个终端,执行下面命令,运行KafkaWordCount程序$cd/usr/local/spark/mycode/kafka/$/usr/local/spark/bin/spark-submit\>--class"KafkaWordCount"\>./target/scala-2.12/simple-project_2.12-1.0.jar7.5.4编写SparkStreaming程序使用Kafka数据源屏幕显示(4,5)(8,12)(6,14)(0,19)(2,11)(7,20)(5,10)(9,9)(3,9)(1,11)转换操作提纲12DStream有状态转换操作DStream无状态转换操作???为什么叫无状态转换操作?7.6.1DStream无状态转换操作t/s当前批次只针对当前一个批次进行统计102030405060707.6.1DStream无状态转换操作10203040506070t/st/s7.6.1DStream无状态转换操作t/s进行新的统计与前一批次无关不会进行累计102030405060707.6.1DStream无状态转换操作7.6.2DStream有状态转换操作滑动窗口转换操作updateStateByKey操作0102DStreamtime1time2time3time4time5windowattime1windowattime3windowattime5OriginalDstreamWindowedDstream滑动窗口转换操作7.6.2DStream有状态转换操作time1time2time3time4time5windowattime1windowattime3windowattime5OriginalDstreamWindowedDstream滑动窗口转换操作7.6.2DStream有状态转换操作设定滑动窗口大小设定滑动窗口时间间隔大小两个参数7.6.2DStream有状态转换操作设定滑动窗口时间间隔大小每隔多长时间执行一次计算7.6.2DStream有状态转换操作time1time2time3time4time5函数func进行聚合数值time6time7OriginalDstream7.6.2DStream有状态转换操作time1time2time3time4time5函数func进行聚合数值time6time7OriginalDstream数值7.6.2DStream有状态转换操作time1time2time3time4time5函数func进行聚合数值time6time7OriginalDstream数值数值7.6.2DStream有状态转换操作time1time2time3time4time5数值time6time7OriginalDstream数值数值单元素流7.6.2DStream有状态转换操作7.6.2DStream有状态转换操作updateStateByKey操作在跨批次之间维护状态怎么进行跨批次维护呢????updateStateByKey操作7.6.2DStream有状态转换操作有状态地进行维护NetworkWordCountStateful.scala7.6.2DStream有状态转换操作objectNetworkWordCountStateful{defmain(args:Array[String]){//定义状态更新函数valupdateFunc=(values:Seq[Int],state:Option[Int])=>{valcurrentCount=values.foldLeft(0)(_+_)valpreviousCount=state.getOrElse(0)Some(currentCount+previousCount)}

valsparkConf=newSparkConf().setMaster("local[2]").setAppName("NetworkWordCountStateful")valsc=newSparkContext(sparkConf)sc.setLogLevel("ERROR")7.6.2DStream有状态转换操作valssc=newStreamingContext(sc,Seconds(5))ssc.checkpoint("file:///usr/local/spark/mycode/streaming/stateful/")//设置检查点,检查点具有容错机制vallines=ssc.socketTextStream("localhost",9999)valwords=lines.flatMap(_.split(""))valwordDstream=words.map(x=>(x,1))valstateDstream=wordDstream.updateStateByKey[Int](updateFunc)stateDstream.print()ssc.start()ssc.awaitTermination()}}7.6.2DStream有状态转换操作输出操作提纲12把DStream写入到MySQL数据库中把DStream输出到文本文件中7.7.1把DStream输出到文本文件中importorg.apache.spark._importorg.apache.spark.streaming._objectNetworkWordCountStatef

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

最新文档

评论

0/150

提交评论