版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
道科技园社区高新中二道5号生产力一种基于SparkStreaming的实时数据更新本发明提出了一种基于SparkStreaming的该实时数据存储及管理方法在新增任务只需要同步元数据信息获取数据新增和变化,把Kafka数据一一解析对应hive的数据类型,数据写入2读取元数据信息库中的元数据信息,包括读取所述元数据信息库中的所述Kafka数据读取所述Kafka数据,对读取到的批次的所述Kafka数据进行分区,按若所述当前系统时间的分钟数除以10余数大于1且所述当前系统时间减去所述前次hiveschema更新时间大于10分钟则执行hivesch获取当前最新的元数据信息库中的修改hive数据结构描述信息表中任务相关的sql语更新Kafka数据描述信息表中的对应表的字段信息和更新hive表字段描述信息表中的获取所述Kafka数据描述信息表和所述hive表字段描述信息表中清除Sparkbroadcast的数据,然后重新对Sparkbroadcast将所述分区的数据进行Json解析得到hash类型数据,并按照hive所述分区的数据生成的RDD[Row]和structschema信息构建成Spark的DataFrame数读取hive数据,然后获取该数据的schema信息,将该schema信息写入元读取所述hive表字段描述信息表中数据库名等于和表名等于执行所述配置资源的参3循环遍历每一条数据,根据对应的fieldType类型做匹配,根据匹配规则组建单个和,判断所述hive数据是否具备更新条件,当所述hive数据字段信息一样长的array数组;将该分区的数据进行Json解析得到hash类型数据,并按照从Hive表字段描述信息表中获取对应表的所有字段的数据结构,组合成一个hashmap将Kafka数据描述信息表中记录的hive建表语句的按顺序字段信息构建数组记为key对应的value,将hashmap中获取到的值转化为hashmapSchema中获取的数据类型的数每个分区数据处理完毕后判断所述批次的所述Kafka数据是7.一种实时数据更新及管理方法系统,用于执行权利要求1-5任一项所述的实时数据Kafka数据批次处理模块:用于读取所述Kafka数据及相关信息,45[0001]本发明涉及互联网技术领域,尤其涉及一种基于SparkStreaming的实时数据更[0003]现有技术中建立的实时数仓主要是对流量数据做实时ETL(Extraction-负责计算指标并通过WebServer配合前端完[0004]上述实时数仓存在新增Kafkatopic需要新增代码重新部署上线的问题,而新增6[0016]读取所述hive表字段描述信息表中数据库名等于和表名等于执行所述配置资源[0022]若所述当前系统时间的分钟数除以10余数大于1且所述当前系统时间减去所述前次hiveschema更新时间大于10分钟则执行hives[0023]获取当前最新的元数据信息库中的修改hive数据结构描述信息表中该任务相关[0024]更新Kafka数据描述信息表中的对应表的字段信息和更新hive表字段描述信息表[0034]所述分区的数据生成的RDD[Row]和structschema信息构建成Spark的DataFrame7[0037]从Hive表字段描述信息表中获取对应表的所有字段的数据结构,组合成一个[0038]将Kafka数据描述信息表中记录的hive建表语句的按顺序字段信息构建数组记为[0055]综上所述,本发明的实时数据存储及管理方法在新增字段、新增或删除kafka对应数据的offset信息。通过简单配置Kafka数据和hive数据映射信息可快速实现测试、8[0057]图1为本发明的基于SparkStreaming的实时数据更新及管理方法的元数据更新[0058]图2为本发明的基于SparkStreaming的实时数据更新及管理方法的数据处理流[0059]图3为运用本发明的基于SparkStreaming的实时数据更新及管理方法的数据处理流程的实时数据存储及管理方法系统的结构示[0060]图4为本发明的基于SparkStreaming的实时数据更新及管理方法应用到一种计[0064]本发明提供了一种基于SparkStreaming的实时数据更新及管理方法,其系统包括数据采集模块和数据处理模块两大主要模块。数据采集模块,即读取Kafka数据的多个名键0√90√0√0√[0073]sdx_partition_offset记录Spark消费Kafka的topic的offset的偏移量,其数据名键0√0√0√00√0√0√00[0078]本实施例提供了一种基于SparkStreaming的实时数据更新及管理方法的数据处[0085]配置信息保存时会在元数据信息表sdx_Kafka_hive_eventname同步插入一条信息,此时fieldnames字段信息为空。例如:INSERTINTO`sage_task_metadata`.`sdx_kafka_hive_eventname`(`eventname`,`db`,`tb`,`topicname`,`partitions`,`isevent`)VALUES('app_installation','ods__sdx__safe','ods__sdx_app_installation','sdb_sdx_app_[0086]创建hive表。例:createtableods_sdx_safe.ods[0087]参见说明书附图2,为本实施例的实时数据存储及管理方法数据处理流程描述示[0097]其中,SparkStreamingsecond:定义多长时间处理一次数据流创建一个体执行流程为:自动获取sdx_Kafka_hive_eventname中isevent为1且eventname为app_[0102]S312:根据hive中对应库表的结构信息重新追加写入sdx_hive_table_schema_[0103]INSERTINTO`sage_task_metadata`.`sdx_hive_table_schema_dec`(`db`,`tb[0104]VALUES('ods_sdx_s[0105]INSERTINTO`sage_task_metadata`.`sdx_hive_table_schema_dec`(`db`,`tb[0106]VALUES('ods__sdx__safe','ods__sdx_app_installation','name','[0107]INSERTINTO`sage_task_metadata`.`sdx_hive_table_schema_dec`(`db`,`tb[0108]VALUES('ods_sdx_safe','ods_sdx_app_installation','subregion','[0109]S313:更新sdx_kafka_hive_eventname中的fieldnames值,此时该值变为:id,[0110]S32:读取更新后的Kafka数据描述信息,即读取最新状态的sdx_Kafka_hive_为ods_sdx_app_installati[0114]则会触发在hive数据结构描述信息表(sdx_hive_table_add_columns)插入一条[0115]即:INSERTI`tb`,`sql`)VALUES('ods_sdx_safe','ods_sdx_app_installation','altertableods_[0117]当hive元数据具备更新条件(十分钟一次且sdx_hive_table_add_columns表不据信息库中的元数据信息。此时数据库中ods_sdx_app_installation会触发执行该sql语息、hive表字段组合成的fields字符串信息(该信息和hive中建表语句中字段顺序一致)、hive表字段信息按建表顺序构建的Struct[0129]将分区数据生成的RDD[Row]和structschema信息构建成Spark的DataFrame数等于和表名(tb)等于执行配置资源的参数配置时的数据库名会生成StructField(name,[0139]本实施例提供了一种KafkaJson构建DateFrame的方案,用于实现将Kafka数据[0142](2)从hive表字段描述信息表(sdx_hive_table_schema_dec)中可获取对应表的[0143](3)Kafka数据描述信息表(sdx_Kafka_hive_eventname)中记录有hive建表语句[0145](5)循环array中的每一个值、获取hashmap中的key对应的value(此处的key值和array中循环的值一样)、获取hashmapSchema中的key对应的value(此处的key值和array中循环的值一样)将hashmap中获取到的值转化为hashmapSchema中获取的数据类型的数[0152](2)若当前系统时间的分钟数除以10余数大于1且系统当前时间减去前次更新时[0153](3)获取当前最新的元数据信息库中的修改hive数据结构描述信息表(sdx_hive_table_add_columns)中该任务相关的sql语句[0154](4)更新Kafka数据描述信息表(sdx_Kafka_hive_eventname)中的对应表的字段信息和更新hive表字段描述信息表(sdx_hive_table_schema_dec)中的对应表的字段类型[0155](5)获取sdx_Kafka_hive_eventname和sdx_hive_table_schema_dec中更新后的[0159]本实施例提供了一种运用实施例1所提供的基于SparkStreaming的实时数据更[0165]读取元数据信息库中的Kafka数据的描述信息,和更新元数据信息库中对应元数[0171]图4为本实施例提供的一种计算机设备的结构示意图。图4显示的计算机设备1仅储器12可以包括易失性存储器12形式的计算机系[0180]修改hive数据信息。包括:修改属性信息并在hive数据结构描述信息表(sdx_据信息具备更新条件时执行数据更新操作,当hive元数据信息不具备更新条件时读取[0184]本实施例将一种基于SparkStreaming的实时数据更新及管理方法的数据处理流理器执行时实现如本发明任意实施例所提供的基于SparkStreaming的实时数据更新及[0193]修改hive数据信息。包括:修改属性信息并在hive数据结构描述信息表(sdx_据信息具备更新条件时执行数据更新操作,当hive元数据信息不具备更新条件时读取[0198]本实施例将基于SparkStreaming的实时数据更新及管理方法的数据处理流程应[0199]综上所述,本发明提供的基于SparkStreaming的实时数据更
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 2026年PE泡沫行业分析报告及未来发展趋势报告
- 2026福建福州市江北智慧城市建设运营有限公司招聘1人笔试模拟试题及答案解析
- 2026年口腔义齿行业分析报告及未来发展趋势报告
- 珙县英才汇人力资源服务有限公司2026年5月招聘考试备考试题及答案解析
- 2026年DIN导轨式开关电源行业分析报告及未来发展趋势报告
- 2026年大理市城管协管人员招聘考试备考试题及答案详解
- 2026年三七片行业分析报告及未来发展趋势报告
- 2026年毯子行业分析报告及未来发展趋势报告
- 2026年苄草丹行业分析报告及未来发展趋势报告
- 2026江苏南京大学YJ202601841电子科学与工程学院博士后招聘1人考试备考试题及答案解析
- 上海会展展览行业劳动合同模板
- 循环流化床锅炉启动调试导则
- 下基层调研工作制度
- JJG 621-2012 液压千斤顶行业标准
- T-GDWCA 0035-2018 HDMI 连接线标准规范
- 小升初语文文言文阅读历年真题50题(含答案解析)
- 头晕教学讲解课件
- 电气化铁路有关人员电气安全规则2023年新版
- GB/T 23853-2022卤水碳酸锂
- GB/T 16823.3-2010紧固件扭矩-夹紧力试验
- 2023年深圳市南山区事业单位招聘笔试题库及答案解析
评论
0/150
提交评论