在Hive上实现SCD.doc_第1页
在Hive上实现SCD.doc_第2页
在Hive上实现SCD.doc_第3页
在Hive上实现SCD.doc_第4页
在Hive上实现SCD.doc_第5页
已阅读5页,还剩3页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

在Hive上实现SCD一、问题提出 官方一直称Hive是Hadoop数据仓库解决方案。既然是数据仓库就离不开多维、CDC、SCD这些概念,于是尝试了一把在Hive上实现SCD1和SCD2。这有两个关键点,一个是行级更新,一个是生成代理键。行级更新hive本身就是支持的,但需要一些配置,还有一些限制。生成代理键在RDBMS上一般都用自增序列。Hive也有一些对自增序列的支持,本实验分别使用了窗口函数ROW_NUMBER()和hive自带的UDFRowSequence实现生成代理键。二、软件版本Hadoop 2.7.2Hive 2.0.0三、实验步骤1. 准备初始数据文件a.txt,内容如下:plain view plain copy 在CODE上查看代码片派生到我的代码片1,张三,US,CA 2,李四,US,CB 3,王五,CA,BB 4,赵六,CA,BC 5,老刘,AA,AA 2. 用ROW_NUMBER()方法实现初始装载和定期装载(1)建立初始装载脚本init_row_number.sql,内容如下:sql view plain copy 在CODE上查看代码片派生到我的代码片USE test; - 建立过渡表 DROP TABLE IF EXISTS tbl_stg; CREATE TABLE tbl_stg ( id INT, name STRING, cty STRING, st STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ,; - 建立维度表 DROP TABLE IF EXISTS tbl_dim; CREATE TABLE tbl_dim ( sk INT, id INT, name STRING, cty STRING, st STRING, version INT, effective_date DATE, expiry_date DATE) CLUSTERED BY (id) INTO 8 BUCKETS STORED AS ORC TBLPROPERTIES (transactional=true); - 向过渡表加载初始数据 LOAD DATA LOCAL INPATH /home/grid/BigDataDWTest/a.txt INTO TABLE tbl_stg; - 向维度表装载初始数据 INSERT INTO tbl_dim SELECT ROW_NUMBER() OVER (ORDER BY tbl_stg.id) + t2.sk_max, tbl_stg.*, 1, CAST(1900-01-01 AS DATE), CAST(2200-01-01 AS DATE) from tbl_stg CROSS JOIN (SELECT COALESCE(MAX(sk),0) sk_max FROM tbl_dim) t2; (2)执行初始装载plain view plain copy 在CODE上查看代码片派生到我的代码片hive -S -f /home/grid/BigDataDWTest/init_row_number.sql (3)修改数据文件a.txt,内容如下:plain view plain copy 在CODE上查看代码片派生到我的代码片1,张,U,C 3,王五,CA,BB 4,赵六,AC,CB 5,刘,AA,AA 6,老杨,DD,DD 说明:1. 新增了第6条数据2. 删除了第2条数据3. 修改了第1条数据的name列、cty列和st列(name列按SCD2处理,cty列和st列按SCD1处理)4. 修改了第4条数据的cty列和st列(按SCD1处理)5. 修改了第5条数据的name列(按SCD2处理)(4)建立定期装载脚本scd_row_number.sql,内容如下:sql view plain copy 在CODE上查看代码片派生到我的代码片USE test; - 设置日期变量 SET hivevar:pre_date = DATE_ADD(CURRENT_DATE(),-1); SET hivevar:max_date = CAST(2200-01-01 AS DATE); - 向过渡表加载更新后的数据 LOAD DATA LOCAL INPATH /home/grid/BigDataDWTest/a.txt OVERWRITE INTO TABLE tbl_stg; - 向维度表装载更新后的数据 - 设置已删除记录和SCD2的过期 UPDATE tbl_dim SET expiry_date = $hivevar:pre_date WHERE sk IN (SELECT a.sk FROM ( SELECT sk,id,name FROM tbl_dim WHERE expiry_date = $hivevar:max_date) a LEFT JOIN tbl_stg b ON a.id=b.id WHERE b.id IS NULL OR ); - 处理SCD2新增行 INSERT INTO tbl_dim SELECT ROW_NUMBER() OVER (ORDER BY t1.id) + t2.sk_max, t1.id, , t1.cty, t1.st, t1.version, t1.effective_date, t1.expiry_date FROM ( SELECT t2.id id, name, t2.cty cty, t2.st st, t1.version + 1 version, $hivevar:pre_date effective_date, $hivevar:max_date expiry_date FROM tbl_dim t1 INNER JOIN tbl_stg t2 ON t1.id=t2.id AND AND t1.expiry_date = $hivevar:pre_date LEFT JOIN tbl_dim t3 ON T1.id = t3.id AND t3.expiry_date = $hivevar:max_date WHERE t3.sk IS NULL) t1 CROSS JOIN (SELECT COALESCE(MAX(sk),0) sk_max FROM tbl_dim) t2; - 处理SCD1 - 因为hive的update还不支持子查询,所以这里使用了一个临时表存储需要更新的记录,用先delete再insert代替update - 因为SCD1本身就不保存历史数据,所以这里更新维度表里的所有cty或st改变的记录,而不是仅仅更新当前版本的记录 DROP TABLE IF EXISTS tmp; CREATE TABLE tmp AS SELECT a.sk,a.id,,b.cty,b.st,a.version,a.effective_date,a.expiry_date FROM tbl_dim a, tbl_stg b WHERE a.id=b.id AND (a.cty b.cty OR a.st b.st); DELETE FROM tbl_dim WHERE sk IN (SELECT sk FROM tmp); INSERT INTO tbl_dim SELECT * FROM tmp; - 处理新增记录 INSERT INTO tbl_dim SELECT ROW_NUMBER() OVER (ORDER BY t1.id) + t2.sk_max, t1.id, , t1.cty, t1.st, 1, $hivevar:pre_date, $hivevar:max_date FROM ( SELECT t1.* FROM tbl_stg t1 LEFT JOIN tbl_dim t2 ON t1.id = t2.id WHERE t2.sk IS NULL) t1 CROSS JOIN (SELECT COALESCE(MAX(sk),0) sk_max FROM tbl_dim) t2; (5)执行定期装载plain view plain copy 在CODE上查看代码片派生到我的代码片hive -S -f /home/grid/BigDataDWTest/scd_row_number.sql 查询维度表结果如图1所示。sql view plain copy 在CODE上查看代码片派生到我的代码片select * from tbl_dim order by id,version; (6)再次执行定期装载,维度表的数据没有变化plain view plain copy 在CODE上查看代码片派生到我的代码片hive -S -f /home/grid/BigDataDWTest/scd_row_number.sql 2. 用UDFRowSequence方法实现初始装载和定期装载实验过程和ROW_NUMBER()方法基本一样,只是先要将hive-contrib-2.0.0.jar传到HDFS上,否则会报错。plain view in copy 在CODE上查看代码片派生到我的代码片hadoop dfs -put /home/grid/hive/lib/hive-contrib-2.0.0.jar /user 初始装载脚本init_UDFRowSequence.sql,内容如下:sql view plain copy 在CODE上查看代码片派生到我的代码片USE test; ADD JAR hdfs:/user/hive-contrib-2.0.0.jar; CREATE TEMPORARY FUNCTION row_sequence as org.apache.hadoop.hive.contrib.udf.UDFRowSequence; - 建立过渡表 DROP TABLE IF EXISTS tbl_stg; CREATE TABLE tbl_stg ( id INT, name STRING, cty STRING, st STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ,; - 建立维度表 DROP TABLE IF EXISTS tbl_dim; CREATE TABLE tbl_dim ( sk INT, id INT, name STRING, cty STRING, st STRING, version INT, effective_date DATE, expiry_date DATE) CLUSTERED BY (id) INTO 8 BUCKETS STORED AS ORC TBLPROPERTIES (transactional=true); - 向过渡表加载初始数据 LOAD DATA LOCAL INPATH /home/grid/BigDataDWTest/a.txt INTO TABLE tbl_stg; - 向维度表装载初始数据 INSERT INTO tbl_dim SELECT t2.sk_max + row_sequence(), tbl_stg.*, 1, CAST(1900-01-01 AS DATE), CAST(2200-01-01 AS DATE) from tbl_stg CROSS JOIN (SELECT COALESCE(MAX(sk),0) sk_max FROM tbl_dim) t2; 定期装载脚本scd_UDFRowSequence.sql,内容如下:sql view plain copy 在CODE上查看代码片派生到我的代码片USE test; ADD JAR hdfs:/user/hive-contrib-2.0.0.jar; CREATE TEMPORARY FUNCTION row_sequence as org.apache.hadoop.hive.contrib.udf.UDFRowSequence; - 设置日期变量 SET hivevar:pre_date = DATE_ADD(CURRENT_DATE(),-1); SET hivevar:max_date = CAST(2200-01-01 AS DATE); - 向过渡表加载更新后的数据 LOAD DATA LOCAL INPATH /home/grid/BigDataDWTest/a.txt OVERWRITE INTO TABLE tbl_stg; - 向维度表装载更新后的数据 - 设置已删除记录和SCD2的过期 UPDATE tbl_dim SET expiry_date = $hivevar:pre_date WHERE sk IN (SELECT a.sk FROM ( SELECT sk,id,name FROM tbl_dim WHERE expiry_date = $hivevar:max_date) a LEFT JOIN tbl_stg b ON a.id=b.id WHERE b.id IS NULL OR ); - 处理SCD2新增行 INSERT INTO tbl_dim SELECT t2.sk_max + row_sequence(), t1.id, , t1.cty, t1.st, t1.version, t1.effective_date, t1.expiry_date FROM ( SELECT t2.id id, name, t2.cty cty, t2.st st, t1.version + 1 version, $hivevar:pre_date effective_date, $hivevar:max_date expiry_date FROM tbl_dim t1 INNER JOIN tbl_stg t2 ON t1.id=t2.id AND AND t1.expiry_date = $hivevar:pre_date LEFT JOIN tbl_dim t3 ON T1.id = t3.id AND t3.expiry_date = $hivevar:max_date WHERE t3.sk IS NULL) t1 CROSS JOIN (SELECT COALESCE(MAX(sk),0) sk_max FROM tbl_dim) t2; - 处理SCD1 - 因为hive的update还不支持子查询,所以这里使用了一个

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论