已阅读5页,还剩3页未读, 继续免费阅读
版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
第 83 课 用 Scala 和 Java 二种方式实战 Spark Streaming 开发 编写人 李林贵 姜伟 一 Java 方式开发 1 开发前准备 假定您以搭建好了 Spark 集群 2 开发环境采用 eclipse maven 工程 需要添加 Spark Streaming 依赖 3 Spark streaming 基于 Spark Core 进行计算 需要注意事项 设置本地 master 如果指定 local 的话 必须配置至少二条线程 也可通过 sparkconf 来设置 因为 Spark Streaming 应用程序在运行的时候 至少有一条线程用 于不断的循环接收数据 并且至少有一条线程用于处理接收的数据 否则的话无法有 线程用于处理数据 随着时间的推移 内存和磁盘都会不堪重负 温馨提示 对于集群而言 每隔 Executor 一般肯定不只一个 Thread 那对于处理 Spark Streaming 应用程序而言 每个 Executor 一般分配多少 core 比较合适 根据我们过去 的经验 5 个左右的 core 是最佳的 段子 分配为奇数个 core 的表现最佳 例如 分 配 3 个 5 个 7 个 core 等 接下来 让我们开始动手写 Java 代码吧 第一步 配置 SparkConf 第一步 配置 SparkConf 1 至少 2 条线程 因为 Spark Streaming 应用程序在运行的时候 至少有一条 线程用于不断的循环接收数据 并且至少有一条线程用于处理接受的数据 否则的话无法 有线程用于处理数据 随着时间的推移 内存和磁盘都会不堪重负 2 对于集群而言 每个 Executor 一般肯定不止一个 Thread 那对于处理 Spark Streaming 的 应用程序而言 每个 Executor 一般分配多少 Core比较合适 根据我们过去的经验 5 个左右的 Core是最佳的 一个段子分配为奇数个 Core表现最佳 例如 3 个 5 个 7 个 Core等 SparkConf conf new SparkConf setMaster local 2 setAppName WordCountOnline SparkConf conf new SparkConf setMaster spark Master 7077 setAppName WordCountOnline 第二步 创建 SparkStreamingContext 我们采用基于配置文件的方式创建 SparkStreamingContext 对象 第二步 创建 SparkStreamingContext 1 这个是 SparkStreaming 应用程序所有功能的起始点和程序调度的核心 SparkStreamingContext 的构建可以基于 SparkConf参数 也可基于持久化的 SparkStreamingContext 的 内容来恢复过来 典型的场景是 Driver 崩溃后重新启动 由于 Spark Streaming具有连续 7 24 小时不 间断运行的特征所有需要在 Driver 重新启动后继续上衣系的状态 此时的状态恢复需要基于曾经的 Checkpoint 2 在一个 Spark Streaming 应用程序中可以创建若干个 SparkStreamingContext 对象 使用下一个 SparkStreamingContext 之前需要把前面正在运行的 SparkStreamingContext 对象关闭掉 由此 我们获得一个重大的启发 SparkStreaming框架也只是 Spark Core 上的一个应用程序而已 只不过 Spark Streaming框架箱运行的话需要 Spark工程师写业务逻辑处理代码 JavaStreamingContext jsc new JavaStreamingContext conf Durations seconds 5 第三步 创建 Spark Streaming 输入数据来源 我们将数据来源配置为本地端口 9999 注意 端口要求没有被占用 第三步 创建 Spark Streaming 输入数据来源 input Stream 1 数据输入来源可以基于 File HDFS Flume Kafka Socket 等 2 在这里我们指定数据来源于网络 Socket 端口 Spark Streaming 连接上该端口并在运行的时候一直监听 该端口的数据 当然该端口服务首先必须存在 并且在后续会根据业务需要不断的有数据产生 当然对于 Spark Streaming应用程序的运行而言 有无数据其处理流程都是一样的 3 如果经常在每间隔 5 秒钟没有数据的话不断的启动空的 Job 其实是会造成调度资源的浪费 因为并没有数据需要发生计算 所以 实例的企业级生成环境的代码在具体提交 Job 前会判断是否有数据 如果没有的话就不再提交 Job JavaReceiverInputDStream lines jsc socketTextStream Master 9999 第四步 我们就像对 RDD 编程一样 基于 DStream 进行编程 原因是 DStream 是 RDD 产生的模板 在 Spark Streaming 发生计算前 其实质是把每个 Batch 的 DStream 的操作翻译成为了 RDD 操作 1 flatMap 操作 2 mapToPair 操作 3 reduceByKey 操作 4 print 等操作 第四步 接下来就像对于 RDD 编程一样基于 DStream进行编程 原因是 DStream是 RDD产生的模 板 或者说类 在 Spark Streaming 具体发生计算前 其实质是把每个 Batch 的 DStream的操作翻译成 为对 RDD的操作 对初始的 DStream进行 Transformation 级别的处理 例如 map filter 等高阶函数等的编程 来进行具体 的数据计算 第 4 1 步 讲每一行的字符串拆分成单个的单词 JavaDStream words lines flatMap new FlatMapFunction 如果是 Scala 由于 SAM 转换 所以可以写成 val words lines flatMap line line split Override public Iterable call String line throws Exception return Arrays asList line split 第四步 对初始的 DStream 进行 Transformation 级别的处理 例如 map filter 等高阶函数等的编程 来 进行具体的数据计算 第 4 2 步 在单词拆分的基础上对每个单词实例计数为 1 也就是 word word 1 JavaPairDStream pairs words mapToPair new PairFunction Override public Tuple2 call String word throws Exception return new Tuple2 word 1 第四步 对初始的 DStream 进行 Transformation 级别的处理 例如 map filter 等高阶函数等的编程 来 进行具体的数据计算 第 4 3 步 在每个单词实例计数为 1 基础之上统计每个单词在文件中出现的总次数 JavaPairDStream wordsCount pairs reduceByKey new Function2 对相同的 Key 进行 Value 的累计 包括 Local 和 Reducer 级别同时 Reduce Override public Integer call Integer v1 Integer v2 throws Exception return v1 v2 此处的 print 并不会直接出发 Job 的执行 因为现在的一切都是在 Spark Streaming框架的控制之下的 对 于 Spark Streaming 而言具体是否触发真正的 Job 运行是基于设置的 Duration时间间隔的 诸位一定要注意的是 Spark Streaming应用程序要想执行具体的 Job 对 Dtream就必须有 output Stream操 作 output Stream有很多类型的函数触发 类 print saveAsTextFile saveAsHadoopFiles 等 最为重要的一个 方法是 foraeachRDD 因为 Spark Streaming 处理的结果一般都会放在 Redis DB DashBoard 等上面 foreachRDD 主要就是用用来完成这些功能的 而且可以随意的自定义具体数据到底放在哪里 wordsCount print Spark Streaming 执行引擎也就是 Driver 开始运行 Driver 启动的时候是位于一条新的线程中的 当然其 内部有消息循环体 用于 接受应用程序本身或者 Executor 中的消息 jsc start jsc awaitTermination jsc close 温馨提示 除了 print 方法将处理后的数据输出之外 还有其他的方法也非常重要 在开 发中需要重点掌握 比如 SaveAsTextFile SaveAsHadoopFile 等 最为重要的是 foreachRDD 方法 这个方法可以将数据写入 Redis DB DashBoard 等 甚至可以随 意的定义数据放在哪里 功能非常强大 二 Scala 方式开发 接下来我们看看如何用非常简洁的 Scala 代码实现上面 Java 代码同样的功能 第一步 接收数据源 第二
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 2026年天津仁爱学院单招职业倾向性考试题库及答案解析(名师系列)
- 2026年内蒙古电子信息职业技术学院单招职业适应性测试题库及答案解析(名师系列)
- 2026年娄底职业技术学院单招职业倾向性考试必刷测试卷附答案解析
- 城市网络空间安全监测
- 房屋打包协议书范本
- 房屋界定协议书范本
- 房屋管道修改协议书
- 房屋苗木交接协议书
- 房屋解封担保协议书
- 房屋质保金合同范本
- 《眩晕诊断与治疗》课件
- 从“小众运动”到“全民热潮”解码中国网球人群与市场机遇
- TSZUAVIA 001-2021 低慢小无人机探测反制系统要求
- 2025高速公路建设指挥部工作总结暨下年度计划
- 品管圈PDCA改善案例-呼吸内科静提高吸入装置正确使用率
- 2024年保育师考试测试题库及答案
- 2024年山东省济南市中考语文试题卷(含答案)
- 新版APQP附件模板A0-A8
- CJT511-2017 铸铁检查井盖
- 质量工程师简历模板
- 天然气场站安全知识培训
评论
0/150
提交评论