




下载本文档
版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
概要依赖管理基本套路输入源转换操作输出操作持久化操作依赖管理依赖<dependency><groupId>
.apache.spark</groupId><artifactId>spark-streaming_2.10</artifactId><version>1.6.2</version></dependency>Source相关依赖部分source相关依赖现在已经单独打包,需要单独引入基本套路//1、参数处理if
(args.length
<
2)
{System.err.println("Usage:
NetworkWordCount
<hostname>
<port>")System.exit(1)}//2、初始化StreamingContextval
sparkConf
=
new
SparkConf().setAppName("NetworkWordCount")val
ssc
=
new
StreamingContext(sparkConf,
Seconds(1))//3、从source获取数据创建DStreamval
lines
=
ssc.socketTextStream(args(0),args(1).toInt,
StorageLevel.MEMORY_AND_DISK_SER)//4、对DStream进行val
words
=
lines.flatMap(_.split("
"))val
wordCounts=
words.map(x
=>
(x,
1)).reduceByKey(_+
_)//5、处理计算结果
wordCounts.print()//6、启动Spark
Streamingssc.start()ssc.awaitTermination()输入源Dstream输入源---input
DStreamSpark内置了两类Source:Source分类举例说明Basic
sourcesfile
systems,
socketconnections,
and
AkkaactorsStreamingContext
直接就可以创建,无需引入额外的依赖Advanced
sourcesKafka,
Flume,Kinesis,,
etc需要引入相关依赖,并且需要通过相关的工具类来创建val
lines
=
ssc.socketTextStream(args(0),args(1).toInt,
StorageLevel.MEMORY_AND_DISK_SER)import
.apache.spark.streaming.kafka._val
kafkaStream
=
KafkaUtils.createStream(streamingContext,
[ZK
quorum],
[consumer
group
id],
[per-topic
number
of
Kafka
partitions
to
consume])Dstream输入源---Receiverinput
Dstream都会关联一个Receiver(除了FileInputDStream)Receiver以任务的形式运行在应用的执行器进程中,从输入源收集数据并保存为RDD。Receiver收集到输入数据后会把数据
到另一个执行器进程来保障容错性(默认行为)Receiver会消耗额外的cpu资源,所以要注意分配
的cpu
cores(receiver是一个单独的task,会消耗cpu)local模式下不要“local”or
“local[1]”(需要指定多个,只有一个核会卡住)分布式运行时,分配的cores >
receivers的数量StreamingContext
会周期性地运行Spark
作业来处理这些数据(每接受一批次数据,就会提交作业运行处理),把数据与之前时间区间中的RDD进行整合(如果是时间窗口,需要与其它RDD做运算整合)内置的input
Dstream:Basic
Sources内置input
Dstream–
/apache/spark/tree/v1.6.2/external(高级)文件流val
logData
=
ssc.textFileStream(logDirectory)Spark
支持从任意Hadoop
兼容的文件系统中
数据,
Spark
Streaming
也就支持从任意Hadoop
兼容的文件系统
中的文件创建数据流(InputFormat参数化)ssc.fileStream[LongWritable,
IntWritable,SequenceFileInputFormat[LongWritable,
IntWritable]](inputDirectory).map
{case
(x,
y)
=>
(x.get(),
y.get())}文件必须原子化创建(比如把文件移入Spark
的
,而不是一条条往已有文件写数据)Akka
actor流(spark
底层使用akka通信)内置的input
Dstream:Advanced
SourcesApache
Kafkadef
main(args:
Array[String])
{if
(args.length
<
4)
{System.err.println("Usage:KafkaWordCount
<zkQuorum><group>
<topics>
<numThreads>")System.exit(1)}#使用Array接受args参数val
Array(zkQuorum,
group,
topics,
numThreads)
=
argsval
sparkConf
=
new
SparkConf().setAppName("KafkaWordCount")val
ssc
=
new
StreamingContext(sparkConf,
Seconds(2))#指定hdfs
,作用:容错ssc.checkpoint("checkpoint")val
topicMap
=
topics.split(",").map((_,
numThreads.toInt)).toMapval
lines
=
KafkaUtils.createStream(ssc,
zkQuorum,
group,
topicMap).map(_._2)val
words
=
lines.flatMap(_.split("
"))val
wordCounts
=
words.map(x
=>
(x,
1L)).reduceByKeyAndWindow(_
+
_,_
-
_,Minutes(10),
Seconds(2),
2)wordCounts.print()ssc.start()ssc.awaitTermination()}/apache/spark/examples/st
/apache/spark/tree/v1.6.2/examples/src/main/scala/
reamingDstream输入源:multiple
input
DStreammultiple
input
streams(same
type
and
same
slideduration)//相同的类型,相同的滑动窗口ssc.union(Seq(stream1,stream2,…))
//合并多个streamstream.union(otherStream)//两个stream进行合并Dstream输入源:Custom
ReceiverCustom
input
Dstream–
.apache.spark.streaming.receiver.Receiver(只需要扩展Receiver)–无状态转换操作和Sparkcore的语义⼀一致无状态转化操作就是把简单的RDD
转化操作应用到每个批次上,也就是转化DStream中的每一个RDD(对Dstream的操作会
到每个批次的RDD上)无状态转换操作不会跨多个batch的RDD去执行(每个批次的RDD结果不能累加)有状态转换操作1-updateStateByKey有时 需要在DStream
中跨所有批次状态(例如用户的会话)。针对这种情况,updateStateByKey()
为 提供了对一个状态变量的 ,用于键值对形式的Dstream使用updateStateByKey需要完成两步工作:定义状态:可以是任意数据类型定义状态更新函数-updateFuncupdate(events,
oldState)events:是在当前批次中收到的事件的列表(可能为空)。–oldState:是一个可选的状态对象,存放在Option
内;如果一个键没有之前的状态,这个值可以空缺。newState:由函数返回,也以Option
形式存在;
可以返回一个空的Option
来表示想要删除该状态。注意:有状态转化操作需要在你的StreamingContext
中打开检查点机制来确保容错性–
ssc.checkpoint("hdfs://...")有状态转换操作2-window基于窗口的操作会在一个比StreamingContext
的批次间隔更长的时间范围内,通过整合多个批次的结果,计算出整个窗口的结果所有基于窗口的操作都需要两个参数,分别为windowDuration以及slideDuration,两者都必须是StreamContext
的批次间隔的整数倍valaccessLogsWindow
=
accessLogsDStream.window(Seconds(30),
Seconds(10))val
windowCounts
=
accessLogsWindow.count()batchDuration(每个批次的长度)val
ssc
=
new
StreamingContext(sparkConf,
Seconds(10))windowDuration(每次移动,窗口框住的长度(几个批次))长控制每次计算最近的多少个批次的数据(windowDuration/batchDuration)slideDuration(每次移动的距离(
几个批次))默认值与batchDuration相等(默认滑动一个batch)控制多长时间计算一次有状态转换操作2-window操作代码片段val
ssc
=
new
StreamingContext(sparkConf,
Seconds(10))…val
accessLogsWindow=
accessLogsDStream.window(Seconds(30),
Seconds(20))val
windowCounts
=
accessLogsWindow.count()..窗口时长为3个批次,滑动步长为2个批次;每隔2个批次就对前3
个批次的数据进行一次计算有状态转换操作2-window操作—普通规约与增量规约增量规约只考虑新进入窗口的数据和离开窗口的数据,让Spark增量计算归约结果。这种特殊形式需要提供归约函数的一个逆函数,比如+对应的逆函数为-有状态转换操作2-window操作—理解增量规约DStream输出常见输出操作print每个批次中抓取DStream
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
评论
0/150
提交评论