大数据就业特训营-直播spark编程模型_第1页
大数据就业特训营-直播spark编程模型_第2页
大数据就业特训营-直播spark编程模型_第3页
大数据就业特训营-直播spark编程模型_第4页
大数据就业特训营-直播spark编程模型_第5页
免费预览已结束,剩余20页可下载查看

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

概要依赖管理基本套路输入源转换操作输出操作持久化操作依赖管理依赖<dependency><groupId>

.apache.spark</groupId><artifactId>spark-streaming_2.10</artifactId><version>1.6.2</version></dependency>Source相关依赖部分source相关依赖现在已经单独打包,需要单独引入基本套路//1、参数处理if

(args.length

<

2)

{System.err.println("Usage:

NetworkWordCount

<hostname>

<port>")System.exit(1)}//2、初始化StreamingContextval

sparkConf

=

new

SparkConf().setAppName("NetworkWordCount")val

ssc

=

new

StreamingContext(sparkConf,

Seconds(1))//3、从source获取数据创建DStreamval

lines

=

ssc.socketTextStream(args(0),args(1).toInt,

StorageLevel.MEMORY_AND_DISK_SER)//4、对DStream进行val

words

=

lines.flatMap(_.split("

"))val

wordCounts=

words.map(x

=>

(x,

1)).reduceByKey(_+

_)//5、处理计算结果

wordCounts.print()//6、启动Spark

Streamingssc.start()ssc.awaitTermination()输入源Dstream输入源---input

DStreamSpark内置了两类Source:Source分类举例说明Basic

sourcesfile

systems,

socketconnections,

and

AkkaactorsStreamingContext

直接就可以创建,无需引入额外的依赖Advanced

sourcesKafka,

Flume,Kinesis,,

etc需要引入相关依赖,并且需要通过相关的工具类来创建val

lines

=

ssc.socketTextStream(args(0),args(1).toInt,

StorageLevel.MEMORY_AND_DISK_SER)import

.apache.spark.streaming.kafka._val

kafkaStream

=

KafkaUtils.createStream(streamingContext,

[ZK

quorum],

[consumer

group

id],

[per-topic

number

of

Kafka

partitions

to

consume])Dstream输入源---Receiverinput

Dstream都会关联一个Receiver(除了FileInputDStream)Receiver以任务的形式运行在应用的执行器进程中,从输入源收集数据并保存为RDD。Receiver收集到输入数据后会把数据

到另一个执行器进程来保障容错性(默认行为)Receiver会消耗额外的cpu资源,所以要注意分配

的cpu

cores(receiver是一个单独的task,会消耗cpu)local模式下不要“local”or

“local[1]”(需要指定多个,只有一个核会卡住)分布式运行时,分配的cores >

receivers的数量StreamingContext

会周期性地运行Spark

作业来处理这些数据(每接受一批次数据,就会提交作业运行处理),把数据与之前时间区间中的RDD进行整合(如果是时间窗口,需要与其它RDD做运算整合)内置的input

Dstream:Basic

Sources内置input

Dstream–

/apache/spark/tree/v1.6.2/external(高级)文件流val

logData

=

ssc.textFileStream(logDirectory)Spark

支持从任意Hadoop

兼容的文件系统中

数据,

Spark

Streaming

也就支持从任意Hadoop

兼容的文件系统

中的文件创建数据流(InputFormat参数化)ssc.fileStream[LongWritable,

IntWritable,SequenceFileInputFormat[LongWritable,

IntWritable]](inputDirectory).map

{case

(x,

y)

=>

(x.get(),

y.get())}文件必须原子化创建(比如把文件移入Spark

,而不是一条条往已有文件写数据)Akka

actor流(spark

底层使用akka通信)内置的input

Dstream:Advanced

SourcesApache

Kafkadef

main(args:

Array[String])

{if

(args.length

<

4)

{System.err.println("Usage:KafkaWordCount

<zkQuorum><group>

<topics>

<numThreads>")System.exit(1)}#使用Array接受args参数val

Array(zkQuorum,

group,

topics,

numThreads)

=

argsval

sparkConf

=

new

SparkConf().setAppName("KafkaWordCount")val

ssc

=

new

StreamingContext(sparkConf,

Seconds(2))#指定hdfs

,作用:容错ssc.checkpoint("checkpoint")val

topicMap

=

topics.split(",").map((_,

numThreads.toInt)).toMapval

lines

=

KafkaUtils.createStream(ssc,

zkQuorum,

group,

topicMap).map(_._2)val

words

=

lines.flatMap(_.split("

"))val

wordCounts

=

words.map(x

=>

(x,

1L)).reduceByKeyAndWindow(_

+

_,_

-

_,Minutes(10),

Seconds(2),

2)wordCounts.print()ssc.start()ssc.awaitTermination()}/apache/spark/examples/st

/apache/spark/tree/v1.6.2/examples/src/main/scala/

reamingDstream输入源:multiple

input

DStreammultiple

input

streams(same

type

and

same

slideduration)//相同的类型,相同的滑动窗口ssc.union(Seq(stream1,stream2,…))

//合并多个streamstream.union(otherStream)//两个stream进行合并Dstream输入源:Custom

ReceiverCustom

input

Dstream–

.apache.spark.streaming.receiver.Receiver(只需要扩展Receiver)–无状态转换操作和Sparkcore的语义⼀一致无状态转化操作就是把简单的RDD

转化操作应用到每个批次上,也就是转化DStream中的每一个RDD(对Dstream的操作会

到每个批次的RDD上)无状态转换操作不会跨多个batch的RDD去执行(每个批次的RDD结果不能累加)有状态转换操作1-updateStateByKey有时 需要在DStream

中跨所有批次状态(例如用户的会话)。针对这种情况,updateStateByKey()

为 提供了对一个状态变量的 ,用于键值对形式的Dstream使用updateStateByKey需要完成两步工作:定义状态:可以是任意数据类型定义状态更新函数-updateFuncupdate(events,

oldState)events:是在当前批次中收到的事件的列表(可能为空)。–oldState:是一个可选的状态对象,存放在Option

内;如果一个键没有之前的状态,这个值可以空缺。newState:由函数返回,也以Option

形式存在;

可以返回一个空的Option

来表示想要删除该状态。注意:有状态转化操作需要在你的StreamingContext

中打开检查点机制来确保容错性–

ssc.checkpoint("hdfs://...")有状态转换操作2-window基于窗口的操作会在一个比StreamingContext

的批次间隔更长的时间范围内,通过整合多个批次的结果,计算出整个窗口的结果所有基于窗口的操作都需要两个参数,分别为windowDuration以及slideDuration,两者都必须是StreamContext

的批次间隔的整数倍valaccessLogsWindow

=

accessLogsDStream.window(Seconds(30),

Seconds(10))val

windowCounts

=

accessLogsWindow.count()batchDuration(每个批次的长度)val

ssc

=

new

StreamingContext(sparkConf,

Seconds(10))windowDuration(每次移动,窗口框住的长度(几个批次))长控制每次计算最近的多少个批次的数据(windowDuration/batchDuration)slideDuration(每次移动的距离(

几个批次))默认值与batchDuration相等(默认滑动一个batch)控制多长时间计算一次有状态转换操作2-window操作代码片段val

ssc

=

new

StreamingContext(sparkConf,

Seconds(10))…val

accessLogsWindow=

accessLogsDStream.window(Seconds(30),

Seconds(20))val

windowCounts

=

accessLogsWindow.count()..窗口时长为3个批次,滑动步长为2个批次;每隔2个批次就对前3

个批次的数据进行一次计算有状态转换操作2-window操作—普通规约与增量规约增量规约只考虑新进入窗口的数据和离开窗口的数据,让Spark增量计算归约结果。这种特殊形式需要提供归约函数的一个逆函数,比如+对应的逆函数为-有状态转换操作2-window操作—理解增量规约DStream输出常见输出操作print每个批次中抓取DStream

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论