为InfoSphere Warehouse 提供实时数据的高效解决方案.docx_第1页
为InfoSphere Warehouse 提供实时数据的高效解决方案.docx_第2页
为InfoSphere Warehouse 提供实时数据的高效解决方案.docx_第3页
为InfoSphere Warehouse 提供实时数据的高效解决方案.docx_第4页
为InfoSphere Warehouse 提供实时数据的高效解决方案.docx_第5页
已阅读5页,还剩13页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

为 InfoSphere Warehouse 提供实时数据的高效解决方案 WWW.SRCSKY.COM 2010-07-23互联网刘艳字体:大 中 小阅读:1我要评论(0) 数据整合是数据仓库中的关键概念,ETL(数据的提取、转换和加载)过程的设计和实现是数据仓库解决方案中极其重要的一部分。由于传统的ETL过程中数据抽取是需要加载所有源数据库中的数据,这样对于需要经常进行数据集中的案例,将带来无可忍受的低效率。我们将介绍通过结合InfoSphereReplicationServer和InfoSphereDataStage,实现数据仓库的实时更新,并且仅仅需要抽取更新了的数据。简介信息是现代企业的重要资源,是企业运用科学管理、决策分析的基础,于是企业如何通过各种技术手段,并把数据转换为信息、知识,已经成了提高其核心竞争力的主要瓶颈。而 ETL 则是一个主要的技术手段。ETL(数据的提取、转换和加载)过程的设计和实现是数据仓库解决方案中极其重要的一部分。由于传统的 ETL 过程中数据抽取是需要加载所有源数据库中的数据,这样对于需要经常进行数据集中的案例,将带来无可忍受的低效率。例如一个有 50G 数据量的数据库, 如果只有 0.01%(也就是大约 50M)的数据较上次加载有更新,但是为了抽取这部分数据,仍然需要抽取所有 50G 的数据,这将是非常低效的。在这篇文章中,我们将介绍通过结合 InfoSphere Replication Server 和 InfoSphere DataStage, 实现数据仓库的实时更新,并且仅仅需要抽取更新了的数据。ETL 过程简介ETL 过程就是数据流动的过程,从不同的数据源流向不同的目标数据集中地。它是构建数据仓库的重要一环,用户从数据源抽取出所需的数据,经过数据清洗 , 最终按照预先定义好的数据仓库模型,将数据加载到数据仓库中。它包涵三个阶段:E(Extract),T(Transform)和 L(Load)。提取(Extract):从不同的数据库(DB2,oracle,flat file 等)中读取源数据。通过接口提取源数据,例如 ODBC、专用数据库接口和平面文件提取器,并参照元数据来决定数据的提取及其提取方式。转换(Transform):开发者将提取的数据,按照业务需要转换为目标数据结构,并实现汇总。装载(Load):加载经转换和汇总的数据到目标数据仓库中,可实现 SQL 或批量加载。InfoSphere Replication Server 简介IBM InfoSphere Replication Server 是一个高速移动大量数据的企业软件应用程序,用于帮助企业连接分布在全球的业务、对客户进行快速响应以及从影响关键数据库系统的问题中恢复。只所以能够高效的提取数据是因为它用可恢复日志来记录数据库里数据的变化,Capture 程序负责连续读取数据库的恢复日志并捕获对源数据库更改(指对数据的插入、删除和更新操作),Apply 程序负责把这些变化的数据写入到目标数据库中。利用 Replication Server 的这一功能就可从大量的数据量中只提取出较上次更新的数据。Replication Server 和 Event publisher 的架构InfoSphere Replication Server 中提供了两种不同类型的复制:Q 复制和 SQL 复制。InfoSphere Data Event Publisher 捕获“更改的数据”事件并以 WebSphere MQ 消息的形式发布这些事件,其他应用程序可以使用这些消息来驱动后续处理。SQL 复制Capture 捕获数据变化后存储在一个临时中间表(staging tables),apply 程序把这些更新复制到相应的目标表。随着数据量的加大和客户对实时数据复制的要求,Q 复制应运而生。它的架构如图 1 所示:图 1. SQL 复制架构图查看原图(大图)Q 复制一个高吞吐量低延迟的方案,它不用中间表来存储已经提交的事务性数据,而是捕获对源表的更改并将已提交的数据转换为消息,即用 WebShpere MQ 消息队列在源和目标数据库间传送数据。它的架构如图 2 所示:图 2. Q 复制架构图查看原图(大图)Event publisher(EP)不同于 Q 复制,EP 不需要启动 apply 程序,捕获对源表的更改并将已落实的事务性数据转换为“可扩展标记语言”(XML)格式或定界格式(CSV: comma-separated value)的消息,以供用户直接从接受队列读取消息。在本文中,我们将利用 EP 的这个特点和 DataStage 整合为数据仓库提供实时高效的数据。它的构架如图 3 所示:图 3. EP 架构图查看原图(大图)IBM InfoSphere DataStage 简介IBM InfoSphere DataStage 是一款强大的基于图形化界面的 ETL 工具,它可以从多个不同的业务系统,多个平台的数据源中抽取数据、转换数据、装载数据到各种目标系统中。它有如下特点:基于图形化的开发环境,无需手工编码便可快速开发 ETL 作业,实现复杂的数据合并和转换逻辑。并且可以在开发新的作业时快捷的重用已有作业中的逻辑。支持广泛的数据源。DataStage 几乎支持所有的主流的数据库、企业级应用程序、文件作为数据源进行读取或写入数据。例如:DB2、Oracle、SQL Server、UniData、Informix、PeopleSoft、SAP、Siebel、顺序文件(如 CSV)、XML 文件等等。它也支持以多种常用的方式进行数据读取和写入,例如 FTP、SFTP、JMS 等等。强大的并行处理能力,能够对数据通过分割、管道等方式进行处理,提高硬件的使用效率,从而提高作业的性能。支持对数据进行批量和实时处理操作。InfoSphere Replication Server 和 InfoSphere DataStage 的整合DataStage 可以读取在不同数据库中数据,但是没有能力通过读取可恢复日志只捕获较上次更新的数据;另一方面,Replication Server 有能力捕获更新的数据却没有类似 DataStage 转换数据的功能,并且不像 DataStage, 支持对如此多的数据库,企业级应用程序和文件进行读写。所以本文将结合两者的优势,为 Warehouse 提供实时高效的数据,整合原理首先,利用 Replication Server 的 Event Publisher(EP),Q capture 从可恢复日志中捕获更新的数据,并且把数据变化写到 MQ 队列中;接着,MQ 消息通过 MQ 触发器触发了 DataStage 作业;最后,DataStage 的作业从 MQ 队列里直接读取数据进行处理。EP 支持两种类型的 MQ 消息:XML 和 CSV,XML 格式有好的移植性和灵活性而 CSV 有很好的性能,在这里我们将以 CSV 作为样例。DataStage 可以通过使用 MQ Connector stage 读取队列中的消息,然后基于所选的消息格式来解析消息,最后完成必要的转换。具体的架构图如图 4 所示:图 4. 总体架构图查看原图(大图)下面将具体介绍其实现。具体实现所需软件:IBM InfoSphere Replication Server 9.7IBM InfoSphere Information Server 8.1Event Publisher 的配置如果 source 是 Oracle,需要通过 Replication Server Oracle capture feature 来完成对变化数据的提取,请参考“参考资料”部分。在本文中,我们 source 以 DB2 为例:1. 创建 DB2 对象在本文中创建数据库 SOURCE,和表”DEMO”.”CUSTOMER”,并 import 数据清单 1. 创建表及导入数据CREATE TABLE DEMO.CUSTOMER ( CUSTOMER_ID INTEGER NOT NULL , SEX CHAR(1) , BIRTHDAY TIMESTAMP , SSN VARCHAR(30) , CITY VARCHAR(25) , STATE VARCHAR(25) , ZIP VARCHAR(15) , PHONE VARCHAR(15) , PRI_LANGUAGE VARCHAR(15) , LAST_UPDATE TIMESTAMP , FIRST_NAME VARCHAR(20) , MIDDLE_NAME VARCHAR(10) , LAST_NAME VARCHAR(20) ) ; ALTER TABLE DEMO .CUSTOMER ADD PRIMARY KEY (CUSTOMER_ID); DB2 import from customer.ixf of ixf insert into ”DEMO”.”CUSTOMER” 2. 创建 MQ 对象创建 Q manager:crtmqm QManager启动 Q manager:strmqm QManager创建队列 : runmqsc QMamanger mq.in清单 2. 创建 MQ 对象define qlocal (ADMINQ) define qlocal (RESTARTQ) define qlocal (q1) define qmodel (IBMQREP.SPILL.MODELQ) DEFSOPT(shared) MAXDEPTH(500000) MSGDLVSQ(fifo) DEFTYPE(permdyn) define channel(CHANNEL1) chltype(svrconn) trptype(tcp) mcauser(mqm) define listener(listener1) trptype(tcp) control(qmgr) port (2264) start listener (listener1) end 3. setup Event Publisher3.1 创建 control tables:asnclp f cncap.in清单 3. 创建控制表asnclp session set to q replication; set run script now stop on sql error on; set qmanager QManager for capture schema; set server capture to db source; create control tables for capture server using restartq RESTARTQ adminq ADMINQ ; quit; 3.2 创建 pubqmap 和 pub:asnclp f crtqmappub.in清单 4. 创建 pubqmap 及 pubasnclp session set to Q replication; set output capture script qpubmap.sql ; set log qpub.log; set server capture to db source ; set run script now stop on sql error on; set qmanager QManager for capture schema; create pubqmap pubqmap1 using sendq q1 message format delimited message content type T; create pub using pubqmap PUBQMAP1 (pubname eventpub1 DEMO.CUSTOMER) ; quit; 4. 启动 capture:清单 5. 启动 captureasnqcap capture_server=source capture_schema=ASN 2010-04-22-49339 ASN0600I Q Capture : : Initial : Program mqpub 9.7.0 is starting. 2010-04-22-37012 ASN7010I Q Capture : ASN : WorkerThread : The program successfully activated publication or Q subscription EVENTPUB1 (send queue q1, publishing or replication queue map PUBQMAP1) for source table DEMO.CUSTOMER. 2010-04-22-41830 ASN7000I Q Capture : ASN : WorkerThread : 0 subscriptions are active. 0 subscriptions are inactive. 1 subscriptions that were new and were successfully activated. 0 subscriptions that were new could not be activated and are now inactive. 2010-04-22-47328 ASN0572I Q Capture : ASN : WorkerThread : The mqpub 9.7.0 program initialized successfully. 5. 对源表做些插入、删除和更新操作清单 6. 更新源表connect to source; insert into DEMO.CUSTOMER values (9000, F,1960-05-06-00.00.00.000000, 467897085, Boise,Idaho,83701-83733,(71)657-9085,English, 2006-08-18-00000,IRVING,H,STERN); insert into DEMO.CUSTOMER values (9100, F,1960-05-06-00.00.00.000000, 467897085,Boise, Idaho,83701-83733,(71)657-9085,English, 2006-08-18-00000,IRVING,H,STERN); update DEMO.CUSTOMER set CUSTOMER_ID=5000 where CUSTOMER_ID=2075; update DEMO.CUSTOMER set FIRST_NAME=meggy where FIRST_NAME=EILEEN; update DEMO.CUSTOMER set LAST_NAME=Leee where LAST_NAME=JOHNSON; delete from DEMO.CUSTOMER where CUSTOMER_ID=6057; delete from DEMO.CUSTOMER where CUSTOMER_ID=8354; connect reset; DataStage 作业开发通过使用 Event Publisher, 我们可以把数据表里由于 DML 操作发生的数据变化实时放入到 MQ 消息队列中,它将生成如下的消息:清单 7. Event Publisher 生成的 MQ 消息示例10,IBM,2010098,181602875000,DEMO,CUSTOMER, ISRT,0000:0000:0000:0000:5914,0000:0000:0000:04f2:d60f, 2010-04-08-17.16.02,0000,9000,F, 1960-05-06-00.00.00.000000,467897085,Boise,Idaho, 83701-83733,(71)657-9085,English,2006-08-18-00000, IRVING,H,STERN 10,IBM,2010098,181603343000,DEMO,CUSTOMER, REPL,0000:0000:0000:0000:5916,0000:0000:0000:04f2:daaf, 2010-04-08-17.16.02,0000,2075,F,1950-04-16-20.45.00.000000, 205674075,Atlantaa,Georgia,30302-30399,(206)567-6075, English,2007-04-19-20.45.00.000000,SALLY, STERN,5000,F,1950-04-16-20.45.00.000000,205674075, Atlantaa,Georgia,30302-30399,(206)567-6075, English,2007-04-19-20.45.00.000000,SALLY,STERN 10,IBM,2010098,181603343000,DEMO,CUSTOMER, DLET,0000:0000:0000:0000:591a,0000:0000:0000:04f2:f768, 2010-04-08-17.16.02,0000,8354,F,2009-07-06-00.00.00.000000, 34124,Beijing,fadsfs,10083Chinese, 2009-07-28-00.00.00.000000,SHI LIXIAN, 我们可以发现在这些消息里包含了对所变化数据的描述。例如:在第一条消息中,第五个字段记录了数据变化发生表的 schema:“DEMO”;第六个字段记录了表名“CUSTOMER”;第七个字段“ISRT”记录的是对应的 DML 操作 , “ISRT”说明这是一个 Insert 操作引起了数据变化,之后的字段描述了数据变化发生的时间以及数据是如何变化的。该消息表明一条如下的新记录被插入到“DEMO.CUSTOMER”表中:9000,F,1960-05-06-00.00.00.000000,467897085, Boise,Idaho,83701-83733,(71)657-9085,English, 2006-08-18-00000,IRVING,H,STERN 同样,我们可以从第二和第三条消息中看出这里面所包含的数据变化信息:第二条消息描述了一个“update”操作发生在表“DEMO.CUSTOMER”中,第三消息描述了一个“delete”操作发生在表“DEMO.CUSTOMER”中。接下来,我们将开发一个 DataStage 作业,用来读取 MQ 中的消息,并且对消息进行处理,最后把数据放入数据仓库。如下图 5 所示:图 5. DataStage 作业示例查看原图(大图)首先,我们将使用 MQ Connector 从 MQ 队列中读取消息(这些消息如前面所示包含了数据的变更情况)。MQ Connector 支持两种方式访问 MQ Server, 一种是“server”方式,对应 DataStage 是和 MQ Server 在一台机器上的情况;另一种是“client”方式,对应 DataStage 跟 MQ Server 不在同一台机器,DataStage 需要通过 MQ Client 访问 MQ Server。在该例中,我们使用了 Server 的方式访问 MQ Server。如图所示:图 6. MQ Connector 设置查看原图(大图)接着,我们需要使用 DataStage Transform Stage 对 MQ 的消息进行处理。并行实现一些逻辑转换。在 Transform 中,我们先要知道某条消息包括了一个什么的操作,以便把相同的操作应用到数据仓库。如图所示,我们定义了一个约束,针对不同的操作 “DLET”,“REPL”和“ISRT”定义了不同的数据输出。其中 , “operator”是一个变量,我们使用 DataStage 内部函数 Field() 读出每条消息的第五个字段,并且使用 Trim() 函数去掉双引号后赋给该变量。图 7. 添加 Transform Stage 约束查看原图(大图)然后,我们从消息中读取包含的数据信息,并且这些数据信息映射到对应的的输出中。图 8. 映射 Transform Stage 的输入与输出查看原图(大图)在这个过程中,我们还实现了一个字段合并,把源数据中的字段 First Name, Middle Name, 和 Last Name 合并成了一个字段。如图 9 所示:图 9. 在 Transform Stage 中合并字段查看原图(大图)最后,基于不同的数据更新操作(Insert, Updated 和 Delete)生成不同的数据装载逻辑,把数据变更应用到数据仓库。在这个例子中,我们使用了 DB2 API Stage 实现了这个数据转载,如图所示,MQ 消息经过 Transform Stage 处理后,按照不同的数据操作,定义了不同数据流向,每一个操作对应的数据跟新方式链接到一个 DB2 API Stage,再通过 DB2 API Stage 生成对应的 SQL,把更新应用到数据仓库。图 10 所示是用来做 Update 的 DB2 API Stage.图 10. 设置 DB2 API StageDataStage 作业的编译和运行DataStage 作业开发好以后,我们就可以编译和运行作业了。DataStage 支持通过图形界面和命令行两种方式编译和运行作业。如图 11 所示为通过图形界面编译和运行作业。图 11. 图形界面编译与运行 DataStage 作业查看原图(大图)清单 8. 命令行编译 DataStage 作业示例C:IBMInformationServerClientsClassicdscc.exe /h /u /p /d /j 清单 9. 命令行运行 DataStage 作业示例C:IBMInformationServerClientsClassicdsjob.exe -domain -user -password -server -run DataStage 作业的调度以及跟 MQ 触发器相结合实现实时数据更新最后的部分,本文将介绍如何实现 DataStage 作业的调度和实时更新。DataStage作业的调度我们可以在 DataStage Director 为作业制定执行计划,让 DataStage 作业去定期的运行。如图 12 所示:图 12. 制定 DataStage 作业执行计划通常,对于特定时间段业务繁忙的系统,我们可以为作业制定执行计划让其在系统闲时运行,这样将提高硬件资源的使用效率,并且避免 DataStage 作业的运行影响到其它的业务系统。与 MQ触发器相结合实现实时的数据更新如果你希望把源数据的变化实时更新到数据仓库,你可以采用 MQ 触发器和 DataStage 作业相结合的方式。当然,你

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

最新文档

评论

0/150

提交评论