阿里云数据集成服务-使用手册-D_第1页
阿里云数据集成服务-使用手册-D_第2页
阿里云数据集成服务-使用手册-D_第3页
阿里云数据集成服务-使用手册-D_第4页
阿里云数据集成服务-使用手册-D_第5页
已阅读5页,还剩97页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

1、数据集成使用手册消息队列/用户手册消息队列/用户手册 PAGE 101 PAGE 101使用手册作业配置说明Job配置设计约定type: job,type: job,traceId: bazhen_001, version: 1.0, configuration: setting: , reader: plugin: ,parameter: ,writer: plugin: ,parameter: 其中:type=job,代表底层使用批量离线的数据同步。traceId,用户用来此traceId跟踪作业。version,版本号,公测版本CDP目前仅支持1.0版本格式。configuration=

2、,是DataX本身进行数据传输的描述。具体而言,描述的是源、目的、转换器(暂不支持)。所有的配置信息定义,均按照驼峰命名法进行配置key命名,且大小写敏感 。用户必须保证配置必须严格按照文档定义格式进行填写。由于JSON本身不支持注释, 用户在参考CDP提供的样例配置必须剔除掉注释信息 ,否则JSON解析将会报错。阿里云物联网套件/详细手册阿里云物联网套件/详细手册Job主体配置type: job, /用户提交同步类型,包括Job/Stream traceId: bazhen_001, /用户用来此traceId跟踪作业version: 1.0, /版本号,公测版本CDP目前仅支持1.0版本格

3、式。configuration: settting: key: value,type: job, /用户提交同步类型,包括Job/Stream traceId: bazhen_001, /用户用来此traceId跟踪作业version: 1.0, /版本号,公测版本CDP目前仅支持1.0版本格式。configuration: settting: key: value,reader: plugin: mysql, /这里填写源头数据存储类型的名称parameter: key: value,writer: plugin: odps, /这里填写目标端数据存储类型的名称parameter: key:

4、 value其中:type指定本次提交同步任务是Job、Stream。traceId是由用户指定,并且建议用户按照自己业务逻辑生成唯一ID,以方便 后期的运维工作。CDP只提供触发式数据同步API,CDP本身不提供Job存储、Job定期调度、Job的DAG依赖触发等各类涉及工 作流调度功能。因此同一份Job如果用户多次(非并行)进行提交,CDP将为每次提交的Job分配单个CDP集群(例 如杭州集群)唯一的运行ID,该ID为Job的主键ID,由CDP服务生成并分配,CDP保证全局唯一性。如果用户希 望对于每个数据同步任务配置使用同一个ID进行追踪,那么请为每个数据同步作业分配单一的traceID

5、。例如阿里云公有云产品采云间系统,其底层数据集成功能通过CDP完成。采云间使用dpc+采云间作业ID作 为traceId向CDP提交Job。这样采云间可以方便的使用采云间作业ID在CDP中获取并追踪该采云间作业近期所 有的运行状况。version目前所有Job仅支持版本号1.0,用户只能填写版本号为1.0。二、Job Setting配置type: job,traceId: bazhen_0001, type: job,traceId: bazhen_0001, version: 1.0, configuration: setting: errorLimit: ,speed: ,column:

6、- configuration.setting.column(类型转换)datetimeFormatdatetime类型和string类型的转换format。timeFormat: 设定time类型和string类型的转换format。dateFormat: 设定date类型和string类型的转换format。type: job,traceId: bazhen_0001, version: 1.0, configuration: setting: type: job,traceId: bazhen_0001, version: 1.0, configuration: setting: col

7、umn: datetimeFormat: yyyy-MM-dd HH:mm:ss, timeFormat: HH:mm:ss,dateFormat: yyyy-MM-dd,encoding: utf-8- configuration.setting.errorLimit(脏数据控制)type: job,version: 1.0, configuration:type: job,version: 1.0, configuration: setting:errorLimit: record: 1024上述配置用户指定了errorLimit上限为1024条record,当Job在传输过程中出现脏数据

8、记录数大于 ,Job报错退 出:- configuration.setting.speed (流量控制)type: job, configuration: setting: speed: type: job, configuration: setting: speed: mbps: 1 /代表1MB/s的传输带宽Reader插件MySQLReader快速介绍读取数据。在底层实现上,MySQLReader通过JDBC连接远程Mysql数据 库,并执行相应的sql语句将数据从mysql库中SELECT出来。实现原理简而言之,MySQLReader通过JDBC连接器连接到远程的Mysql数据库,并根

9、据用户配置的信息生成查询SELECT SQL语句并发送到远程Mysql数据库,并将该SQL执行返回结果使用DataX自定义的数据类型拼装为抽象的数据集,并传递给下游Writer处理。对于用户配置Table、Column、Where的信息,MySQLReader将其拼接为SQL语句发送到Mysql数据库;对 于用户配置querySql信息,Mysql直接将其发送到Mysql数据库。功能说明配置样例- 使用instance配置一个从Mysql数据库同步抽取数据到ODPS的作业:type: job,traceId: 您可以在这里填写您作业的追踪ID,建议使用业务名+您的作业ID, version:

10、1.0,configuration: reader: plugin: mysql, parameter: instanceName: datasync001, database: database,table: table,splitPk: db_id, username: datasync001, password: xxxxxx, column: *,where: Date(add_time) = 2014-06-01,writer: plugin: odps, parameter: accessId: bazhen.csy, accessKey: xxxxxxx, project: pr

11、oject, table: table, column: *,partition: pt=20150101- 使用JDBC方式配置一个从Mysql数据库同步抽取数据到ODPS的作业:云数据库 OceanBase/SQL语法参考云数据库 OceanBase/SQL语法参考type: job,traceId: 您可以在这里填写您作业的追踪ID,建议使用业务名+您的作业ID, version: 1.0,configuration: reader: plugin: mysql, parameter: jdbcUrl: jdbc:mysql:/ip:port/database, table: table

12、,splitPk: db_id, username: datasync001, password: xxxxxx, column: *,where: Date(add_time) = 2014-06-01,writer: plugin: odps, parameter: accessId: bazhen.csy, accessKey: xxxxxxx, project: project, table: table, column: *,partition: pt=20150101- 使用多个instance配置一个从Mysql数据库同步抽取分库分表数据到ODPS的作业:type: job,tr

13、aceId: 您可以在这里填写您作业的追踪ID,建议使用业务名+您的作业ID, version: 1.0,configuration: reader: plugin: mysql, parameter: connection: table: tbl1,tbl2, tbl3,instanceName: inst1,database: dbdatabase: db,table: tbl4,tbl5, tbl6,instanceName: inst2, database: db,splitPk: db_id, username: datasync001, password: xxxxxx, colu

14、mn: *,where: Date(add_time) = 2014-06-01,writer: plugin: odps, parameter: accessId: bazhen.csy, accessKey: xxxxxxx, project: project, table: table, column: *,partition: pt=20150101- 使用多个jdbc配置一个从Mysql数据库同步抽取分库分表数据到ODPS的作业:type: job,traceId: 您可以在这里填写您作业的追踪ID,建议使用业务名+您的作业ID, version: 1.0,configuration

15、: reader: plugin: mysql, parameter: connection: table: tbl1,tbl2, tbl3,jdbcUrl: jdbc:mysql:/ip:port/database1,table: tbl4,tbl5, tbl6,jdbcUrl: jdbc:mysql:/ip:port/database2,splitPk: db_id, username: datasync001, password: xxxxxx, column: *,where: Date(add_time) = 2014-06-01,writer: plugin: odps, para

16、meter: accessId: bazhen.csy, accessKey: xxxxxxx, project: project, table: table, column: *,partition: pt=20150101参数说明instanceName描述: 阿里云RDS实例名称(Instance名称)。用户使用该配置指定RDS的Instance名称,CDP将翻译为底层执行的jdbcUrl连接串连接。instanceName指定的是RDS实例。发给类似Mysql实例,指定了数据源的IP+Port,需要 和database配合使用。例如,在RDS WebConsole页面中点击【基本信息】

17、,其右侧详情的名称即是用户拥有RDS的instanceName。ACE/操作指南ACE/操作指南必选:是默认值:无jdbcUrl描述:对于CDP私有云场景、数据库迁移场景等,其本身数据源是普通Mysql数据库不是RDS。对于这类使用场景,用户可以指定jdbc信息直连。两类信息概念上是等同的,因此只能配置其一。如果两 者均配置,CDP默认将使用jdbcUrl信息。考虑到公有云上用户RDS DNS名称、内网等信息可能存在更改的情况,强烈建议公有云用户不使用jdbcUrl配置项。必选:是默认值:无databaseRDS实例下的数据库名称。必选:是默认值:无username描述:数据源的用户名必选:是

18、默认值:无password描述:数据源的密码必选:是默认值:无table描述:所选取的需要同步的表名称,一个CDPJob只能同步一张表。必选:是默认值:无column描述:所配置的表中需要同步的列名集合,使用JSON的数组描述字段信息。用户使用星号 代表默认使用所有列配置,例如*。支持列裁剪,即列可以挑选部分列进行导出。支持列换序,即列可以不按照表schema信息进行导出。支持常量配置,用户需要按照Mysql SQL语法格式:id, table, 1, bazhen.csy, null, to_char(a + 1), 2.3 , trueid为普通列名,table为包含保留在的列名,1为整形

19、数字常量,bazhen.csy为字符串常量,null为空指针,to_char(a + 1)为表达式,2.3为浮点数,true为布尔值。column必须用户显示指定同步的列集合,不允许为空! 必选:是默认值:无splitPk描述:MySQLReader进行数据抽取时,如果指定splitPk,表示用户希望使用splitPk代表的 字段进行数据分片,DataX因此会启动并发任务进行数据同步,这样可以大大提供数据同步 的效能。推荐splitPk用户使用表主键,因为表主键通常情况下比较均匀,因此切分出来的分片也不容 易出现数据热点。目前splitPk仅支持整形、字符串型数据切分,不支持浮点、日期等其他类

20、型。如果用户指定 其他非支持类型,MySQLReader将报错!如果splitPk不填写,包括不提供splitPk或者splitPk值为空,DataX视作使用单通道同步该 表数据。必选:否默认值:无where描述:筛选条件。例如在做测试时,可以将where条件指定为limit10;在实际业务场景中,往往会选择当天的数据进行同步,可以将where条件指定为gmt_create $bizdate;where条件可以有效地进行业务增量同步。如果不填写where语句,包括不提供where的key或者value,DataX均视作同步全量数据。必选:否默认值:无querySql描述:在有些业务场景下,wh

21、ere这一配置项不足以描述所筛选的条件,用户可以通过该配 置型来自定义筛选SQL。当用户配置了这一项之后,DataX系统就会忽略这些配置型,直接使用这个配置项的内容对数据进行筛选,例如需要进行 多表join后同步数据,使用selecta,bfromtable_ajointable_bontable_a.id必选:否默认值:无类型转换DataX 内部类型Mysql 数据类型LongDataX 内部类型Mysql 数据类型Longint, tinyint, smallint, mediumint, int, bigintDoublefloat, double, decimalStringvarch

22、ar,char,tinytext,text,mediumtext, longtextDatedate, datetime, timestamp, time, yearBooleanbit, boolBytestinyblob,mediumblob,blob,longblob, varbinary请注意:除上述罗列字段类型外,其他类型均不支持。MySQLReader将tinyint(1)视作整形。性能报告环境准备数据特征建表语句:CREATE TABLE tc_biz_vertical_test_0000 (CREATE TABLE tc_biz_vertical_test_0000 (biz_

23、order_id bigint(20) NOT NULL COMMENT id,key_value varchar(4000) NOT NULL COMMENT Key-value的内容,gmt_create datetime NOT NULL COMMENT 创建时间,gmt_modified datetime NOT NULL COMMENT 修改时间,attribute_cc int(11) DEFAULT NULL COMMENT 防止并发修改的标志,value_type int(11) NOT NULL DEFAULT 0 COMMENT 类型,buyer_id bigint(20)

24、 DEFAULT NULL COMMENT buyerid,seller_id bigint(20) DEFAULT NULL COMMENT seller_id, PRIMARY KEY (biz_order_id,value_type),KEY idx_biz_vertical_gmtmodified (gmt_modified)biz_order_id: 888888888key_value: ;orderIds:20148888888,2014888888813800; gmt_create: 2011-09-24 11:07:20biz_order_id: 888888888key_

25、value: ;orderIds:20148888888,2014888888813800; gmt_create: 2011-09-24 11:07:20gmt_modified: 2011-10-24 17:56:34value_type: 3buyer_id: 8888888value_type: 3buyer_id: 8888888seller_id: 1机器参数执行DataX的机器参数为:cpu24Intel(RXeon(RCPUE5-263002.30GHzmem:48GBnetdiscDataXMysql数据库机器参数为:cpu32Intel(RXeon(RCPUE5-2650v

26、22.60GHzmem:256GBnetdisc: BTWL419303E2800RGN INTEL SSDSC2BB800G4D2010370DataXjvm-Xms1024m -Xmx1024m -XX:+HeapDumpOnOutOfMemoryError测试报告单表测试报告通道数是否按DataX速DataX机DataX机DB网卡DB运行容器服务/使用手册容器服务/使用手册照主键切分度(Rec/s)器网卡进入流量(MB/s)器运行负载流出流量(MB/s)负载1否183185290.6310.61是183185290.6310.64否183185290.6310.64是329733580.

27、8600.768否183185290.6310.68是5495561151.461200.78说明:1. 这里的单表,主键类型为 bigint(20),范围为:190247559466810-570722244711460,从主键范围划分看,数据分布均匀。2. 对单表如果没有安装主键切分,那么配置通道个数不会提升速度,效果与1个通道一样。分表测试报告(2个分库,每个分库16张分表,共计32张分表)通道数DataX速度(Rec/s)DataX机器网卡进入流量(MB/s)DataX机器运行负载DB网卡流出流量(MB/s)DB运行负载120224131.51.032193.

28、11323.6810744051975.52055.1161227892229.28.12337.3约束限制主备同步数据恢复问题主备同步问题指Mysql使用主从灾备,备库从主库不间断通过binlog恢复数据。由于主备数据同步存在一定的 时间差,特别在于某些特定情况,例如网络延迟等问题,导致备库同步恢复的数据与主库有较大差别,导致从 备库同步的数据不是一份当前时间的完整镜像。CDP如果同步的是阿里云提供RDS,是直接从主库读取数据,不存在数据恢复问题。但是会引入主库负载问题,请注意流控配置。一致性约束系统,对外可以提供强一致性数据查询接口。例如当一次同步任务启动运 行过程中,当该库存在其他数据写

29、入方写入数据时,MySQLReader完全不会获取到写入更新数据,这是由于 数据库本身的快照特性决定的。关于数据库快照特性,请参看MVCCWikipedia操作审计/产品使用手册操作审计/产品使用手册上述是在MySQLReader单线程模型下数据同步一致性的特性,由于MySQLReader可以根据用户配置信息使用 了并发数据抽取,因此不能严格保证数据一致性:当MySQLReader根据splitPk进行数据切分后,会先后启动 多个并发任务完成数据同步。由于多个并发任务相互之间不属于同一个读事务,同时多个并发任务存在时间间 隔。因此这份数据并不是完整的、一致的数据快照信息。针对多线程的一致性快照

30、需求,在技术上目前无法实现,只能从工程角度解决,工程化的方式存在取舍,我们 提供几个解决思路给用户,用户可以自行选择:使用单线程同步,即不再进行数据切片。缺点是速度比较慢,但是能够很好保证一致性。关闭其他数据写入方,保证当前数据为静态数据,例如,锁表、关闭备库同步等等。缺点是可能影响 在线业务。数据库编码问题MySQLReader底层使用JDBC进行数据抽取,JDBC天然适配各类编码,并在底层进行了编码转换。因此MySQLReader不需用户指定编码,可以自动获取编码并转码。对于Mysql底层写入编码和其设定的编码不一致的混乱情况,MySQLReader对此无法识别,对此也无法提供解 决方案,

31、对于这类情况,导出有可能为乱码。增量数据同步MySQLReader使用JDBC SELECT语句完成数据抽取工作,因此可以使用SELECT.WHERE.进行增量数据抽取,方式有多种:数据库在线应用写入数据库时,填充modify字段为更改时间戳,包括新增、更新、删除(逻辑删)。对 于这类应用,MySQLReader只需要WHERE条件跟上一同步阶段时间戳即可。对于新增流水型数据,MySQLReader可以WHERE条件后跟上一阶段最大自增ID即可。对于业务上无字段区分新增、修改数据情况,MySQLReader也无法进行增量数据同步,只能同步全量数据。SQL安全性SELECT抽取语句,MySQLR

32、eader本身对querySql不做任 何安全性校验。这块交由DataX用户方自己保证。FAQmysql -u -p -h -D -e select * from Q: MySQLReader同步报错,报错信息为XXX A: 网络或者权限问题,请使用mysql命令行测试:mysql -u -p -h -D -e select * from 如果上述命令也报错,那可以证实是环境问题,请联系你的DBA。Q: 我想同步Mysql增量数据,怎么配置?where: Date(add_time) = 2014-06-01A: MySQLReader必须业务支持增量字段DataX才能同步增量,例如在淘宝大部

33、分业务表中,通过gmt_modified字段表征这条记录的最新修改时间,那么DataX MySQLReader只需要配置where条件为where: Date(add_time) = 2014-06-01Q: 上述bizdate代表什么意思? 我每天需要同步的gmt_modified值肯定不一样的,如何做到每天使用不同变量值A: CDP支持自定义变量,请参考CDP Console 中有关自定义变量章节说明。 Q:我有1亿条数据需要同步,大概需要同步多长时间A: 和数据库配置、DataX机器负载相关,请参考上述性能章节 Q: 为什么不推荐使用默认列*, 会有问题吗A: 如果你限定了DataX同步

34、a,b,c三列,那么数据库在添加列,修改列,删除列情况,DataX可以做到要么立即报错,要么兼容老情况。如果直接配置星号,上游的数据库变量会立刻影响下游数据!而且可能会在下游任务 运行到一段时间才报错,可能已经引入了大量脏数据。Q: MySQLReader同步出现乱码,怎么处理A: 通常情况下是你同步的数据库没有按照规范配置编码,比如数据库配置的编码是UTF8,但是底层物理文件实际存放的编码是GBK,DataX按照UTF-8读取数据就会出现乱码。此时你应该寻求你的DBA修改库的编码格式。SQLServerReader快速介绍SQLServerReader插件实现了从SqlServer读取数据。

35、在底层实现上,SQLServerReader通过JDBC连接远程SqlServer数据库,并执行相应的sql语句将数据从SqlServer库中SELECT出来。实现原理简而言之,SQLServerReader通过JDBC连接器连接到远程的SqlServer数据库,并根据用户配置的信息生成查 询SELECT SQL语句并发送到远程SqlServer数据库,并将该SQL执行返回结果使用CDP自定义的数据类型拼装为抽象的数据集,并传递给下游Writer处理。对于用户配置Table、Column、Where的信息,SQLServerReader将其拼接为SQL语句发送到SqlServer数据 库;对于

36、用户配置querySql信息,SqlServer直接将其发送到SqlServer数据库。功能说明配置样例type: job,type: job,traceId: 您可以在这里填写您作业的追踪ID,建议使用业务名+您的作业ID, version: 1.0,configuration: reader: plugin: sqlserver, parameter: instanceName: , username: username, password: xxx,splitPk: id, fetchSize: 512, table: table_01, column: a,b,c,where: 1 =

37、 1,writer: plugin: odps, parameter: accessId: bazhen.csy, accessKey: xxx, project: project, table: table, column: *,partition: pt=20140501使用JDBC配置一个从SqlServer数据库同步抽取数据到ODPS的作业:type: job,traceId: 您可以在这里填写您作业的追踪ID,建议使用业务名+您的作业ID, version: 1.0,configuration: reader: plugin: sqlserver, parameter: jdbcUr

38、l: jdbc:sqlserver:/localhost:3433;DatabaseName=dbname, username: username,password: xxx,splitPk: id,fetchSize: 512, table: table_01, column:fetchSize: 512, table: table_01, column:a,b,c, where: 1 =1,writer: plugin: odps, parameter: accessId: bazhen.csy, accessKey: xxx, project: project, table: table

39、,column: *,partition: pt=20140501参数说明instanceName描述: 阿里云RDS实例名称(Instance名称)。用户使用该配置指定RDS的Instance名称,CDP将翻译为底层执行的jdbc连接串连接。instanceName指定的是RDS实例,类似Mysql实例,需要和database配合使用。 必选:是默认值:无jdbcUrl描述:对于CDP部分私有云场景,数据库迁移场景等,其本身数据源是普通SqlServer数据 库不是RDS,对于这类场景,用户可以指定jdbc信息直连。两类信息概念上是等同的,因此只能配置其一。如果两 者均配置,CDP默认将使用

40、jdbc信息。必选:是默认值:无username描述:数据源的用户名必选:是默认值:无password描述:数据源指定用户名的密码必选:是默认值:无table描述:所选取的需要同步的表,一个作业只能支持一个表同步必选:是默认值:无column描述:所配置的表中需要同步的列名集合,使用JSON的数组描述字段信息。用户使用代表默认使用所有列配置,例如。支持列裁剪,即列可以挑选部分列进行导出。支持列换序,即列可以不按照表schema信息进行导出。支持常量配置,用户需要按照JSON格式:id, table, 1, bazhen.csy, null, COUNT(*), 2.3 , true,1为整形数

41、字常量,bazhen.csy为字符串常 量,null为空指针,to_char(a1)为表达式,2.3为浮点数,true为布尔值。column必须用户显示指定同步的列集合,不允许为空! 必选:是默认值:无splitPk描述:SQLServerReader进行数据抽取时,如果指定splitPk,表示用户希望使用splitPk代 表的字段进行数据分片,CDP因此会启动并发任务进行数据同步,这样可以大大提供数据同 步的效能。推荐splitPk用户使用表主键,因为表主键通常情况下比较均匀,因此切分出来的分片也不容 易出现数据热点。目前splitPk仅支持整形、字符串型数据切分,不支持浮点、日期等其他类型

42、。如果用户指定 其他非支持类型,SQLServerReader将报错!必选:否默认值:空where描述:筛选条件,SQLServerReader根据指定的column、table、where条件拼接SQL,并 根据这个SQL进行数据抽取。例如在做测试时,可以将where条件指定为limit 10;在实际业务场景中,往往会选择当天的数据进行同步,可以将where条件指定为gmt_create$bizdate; where条件可以有效地进行业务增量同步。如果该值为空,代表同步全表所有的信息。必选:否默认值:无querySql描述:在有些业务场景下,where这一配置项不足以描述所筛选的条件,用户可

43、以通过该配 置型来自定义筛选SQL。当用户配置了这一项之后,CDP系统就会忽略table,column这些 配置型,直接使用这个配置项的内容对数据进行筛选,例如需要进行多表join后同步数据,使用selecta,bfromtable_ajointable_bontable_a.idtable_b.id必选:否默认值:无fetchSize描述:该配置项定义了插件和数据库服务器端每次批量数据获取条数,该值决定了CDP和服 务器端的网络交互次数,能够较大的提升数据抽取性能。注意,该值过大(2048)可能造成CDP进程OOM。必选:否默认值:1024类型转换目前SQLServerReader支持大部分

44、SqlServer类型,但也存在部分个别类型没有支持的情况,请注意检查你的 类型。下面列出SQLServerReader针对SqlServer类型转换列表:CDP 内部类型SQLServer 数据类型Longbigint, int, smallint, tinyintDoublefloat, decimal, real, numericStringchar,nchar,ntext,nvarchar,text,varchar,nvarchar (MAX),varchar(MAX)Datedate, datetime, timeBooleanbitBytesbinary,varbinary,var

45、binary(MAX),timestamp请注意:除上述罗列字段类型外,其他类型均不支持。timestamp类型作为二进制类型。约束限制主备同步数据恢复问题主备同步问题指SqlServer使用主从灾备,备库从主库不间断通过binlog恢复数据。由于主备数据同步存在一定 的时间差,特别在于某些特定情况,例如网络延迟等问题,导致备库同步恢复的数据与主库有较大差别,导致 从备库同步的数据不是一份当前时间的完整镜像。CDP如果同步的是阿里云提供RDS,是直接从主库读取数据,不存在数据恢复问题。但是会引入主库负载问题,请注意流控配置。一致性约束SqlServer在数据存储划分中属于RDBMS系统,对外可

46、以提供强一致性数据查询接口。例如当一次同步任务启 动运行过程中,当该库存在其他数据写入方写入数据时,SQLServerReader完全不会获取到写入更新数据,这 是由于数据库本身的快照特性决定的。关于数据库快照特性,请参看MVCCWikipedia上述是在SQLServerReader单线程模型下数据同步一致性的特性,由于SQLServerReader可以根据用户配置信 息使用了并发数据抽取,因此不能严格保证数据一致性:当SQLServerReader根据splitPk进行数据切分后,会 先后启动多个并发任务完成数据同步。由于多个并发任务相互之间不属于同一个读事务,同时多个并发任务存 在时间间

47、隔。因此这份数据并不是完整的、一致的数据快照信息。针对多线程的一致性快照需求,在技术上目前无法实现,只能从工程角度解决,工程化的方式存在取舍,我们 提供几个解决思路给用户,用户可以自行选择:使用单线程同步,即不再进行数据切片。缺点是速度比较慢,但是能够很好保证一致性。关闭其他数据写入方,保证当前数据为静态数据,例如,锁表、关闭备库同步等等。缺点是可能影响 在线业务。数据库编码问题SQLServerReader底层使用JDBC进行数据抽取,JDBC天然适配各类编码,并在底层进行了编码转换。因此SQLServerReader不需用户指定编码,可以自动识别编码并转码。增量数据同步SQLServerR

48、eader使用JDBC SELECT语句完成数据抽取工作,因此可以使用SELECT.WHERE.进行增量数据抽取,方式有多种:数据库在线应用写入数据库时,填充modify字段为更改时间戳,包括新增、更新、删除(逻辑删)。对 于这类应用,SQLServerReader只需要WHERE条件跟上一同步阶段时间戳即可。对于新增流水型数据,SQLServerReader可以WHERE条件后跟上一阶段最大自增ID即可。对于业务上无字段区分新增、修改数据情况,SQLServerReader也无法进行增量数据同步,只能同步全量数据。Sql安全性SQLServerReader提供querySql语句交给用户自己

49、实现SELECT抽取语句,SQLServerReader本身对querySql不做任何安全性校验。这块交由CDP用户方自己保证。PostgreSQLReader快速介绍读取数据。在底层实现上,PostgreSQLReader通过JDBC连接 远程PostgreSQL数据库,并执行相应的sql语句将数据从PostgreSQL库中SELECT出来。公有云上RDS提供PostgreSQL存储引擎。实现原理简而言之,PostgreSQLReader通过JDBC连接器连接到远程的PostgreSQL数据库,并根据用户配置的信息生 成查询SELECT SQL语句并发送到远程PostgreSQL数据库,并将

50、该SQL执行返回结果使用CDP自定义的数据类型拼装为抽象的数据集,并传递给下游Writer处理。对于用户配置Table、Column、Where的信息,PostgreSQLReader将其拼接为SQL语句发送到PostgreSQL数据库;对于用户配置querySql信息,PostgreSQL直接将其发送到PostgreSQL数据库。功能说明配置样例- 使用Instance方式配置一个从PostgreSQL数据库同步到抽取数据作业type: job,traceId: 您可以在这里填写您作业的追踪ID,建议使用业务名+您的作业ID,version: 1.0, configuration: vers

51、ion: 1.0, configuration: reader: plugin: postgresql, parameter: instanceName: datasync001, table: table,username: xxx, password: xxxxxx, column: *, fetchSize: 512, splitPk: pk,where: 1 = 1,writer: plugin: odps, parameter: accessId: bazhen.csy, accessKey: xxxxxxx, project: project, table: table,colum

52、n: *, partition: pt=2014, truncate: true- 使用JDBC方式配置一个从PostgreSQL数据库同步抽取数据作业:type: job,traceId: 您可以在这里填写您作业的追踪ID,建议使用业务名+您的作业ID, version: 1.0,configuration: reader: plugin: postgresql, parameter: jdbcUrl: jdbc:postgresql:/HOST_NAME:PORT/DATABASE_NAME, table: table,username: xxx, password: xxxxxx, co

53、lumn: *, fetchSize: 512, splitPk: pk,where: 1 = 1,writer: plugin: odps, parameter: accessId: bazhen.csy, accessKey: xxxxxxx,project: project, table: table,project: project, table: table,column: *, partition: pt=2014, truncate: true- 配置一个自定义SQL的数据库同步任务到ODPS的作业:type: job,traceId: 您可以在这里填写您作业的追踪ID,建议使用

54、业务名+您的作业ID, version: 1.0,configuration: reader: plugin: postgresql, parameter: jdbcUrl: jdbc:postgresql:/HOST_NAME:PORT/DATABASE_NAME, username: xxx,password: xxxxxx,querySql: SELECT * from dual, fetchSize: 512,writer: plugin: odps, parameter: accessId: bazhen.csy, accessKey: xxxxxxx, project: proje

55、ct, table: table, column: *,partition: pt=2014, truncate: true参数说明instanceName描述: 阿里云RDS实例名称(Instance名称)。用户使用该配置指定RDS的Instance名称,CDP将翻译为底层执行的jdbcUrl连接串连接。instanceName指定的是RDS实例。发给类似Mysql实例,指定了数据源的IP+Port,需要和database配 合使用。例如,在RDSinstanceName指定的是RDS实例。发给类似Mysql实例,指定了数据源的IP+Port,需要和database配 合使用。例如,在RDS

56、 WebConsole页面中点击【基本信息】,其右侧详情的名称即是用户拥有RDS的instanceName。!RDS HYPERLINK /kf/HTB1wAz9GpXXXXc9XXXXq6xXFXXXl.jpg) instanceName图解(/kf/HTB1wAz9GpXXXXc9XXXXq6xXFXXXl.jpg)必选:是默认值:无jdbcUrl描述:描述的是到对端数据库的JDBC连接信息,jdbcUrl按照PostgreSQL官方规范,并可 以填写连接附件控制信息。必选:是默认值:无username描述:数据源的用户名。必选:是默认值:无password描述:数据源指定用户名的密码。必

57、选:是默认值:无table描述:所选取的需要同步的表名。必选:是默认值:无column描述:所配置的表中需要同步的列名集合,使用JSON的数组描述字段信息。用户使用代表默认使用所有列配置,例如。支持列裁剪,即列可以挑选部分列进行导出。支持列换序,即列可以不按照表schema信息进行导出。支持常量配置,用户需要按照JSON格式:id, 1, bazhen.csy, null, to_char(a + 1), 2.3 , trueid为普通列名,1为整形数字常量,bazhen.csy为字符串常量,null为空指针,to_char(a+ 1)为表达式,2.3为浮点数,true为布尔值。Column必

58、须显示填写,不允许为空! 必选:是默认值:无splitPk描述:PostgreSQLReader进行数据抽取时,如果指定splitPk,表示用户希望使用 因此会启动并发任务进行数据同步,这样可以大大提 供数据同步的效能。推荐splitPk用户使用表主键,因为表主键通常情况下比较均匀,因此切分出来的分片也不容 易出现数据热点。目前splitPk仅支持整形、字符串型数据切分,不支持浮点、日期等其他类型。如果用户指定 其他非支持类型,PostgreSQLReader将报错!,PostgreSQLReader使用单通道同步全 量数据。必选:否默认值:空where描述:筛选条件,PostgreSQLRe

59、ader根据指定的column、table、where条件拼接SQL,并根据这个SQL进行数据抽取。例如在做测试时,可以将where条件指定为limit 10;在实际业务场景中,往往会选择当天的数据进行同步,可以将where条件指定为gmt_create $bizdate ;where条件可以有效地进行业务增量同步。where条件不配置或者为空,视作全表同步数据。必选:否默认值:无querySql描述:在有些业务场景下,where这一配置项不足以描述所筛选的条件,用户可以通过该配 置型来自定义筛选SQL。当用户配置了这一项之后,CDP系统就会忽略table,column这些 配置型,直接使用这

60、个配置项的内容对数据进行筛选,例如需要进行多表join后同步数据,使用select a,b from table_a join table_b on table_a.id = table_b.id当用户配置querySql时,PostgreSQLReader直接忽略table、column、where条件的配置。必选:否默认值:无fetchSize描述:该配置项定义了插件和数据库服务器端每次批量数据获取条数,该值决定了CDP和服 务器端的网络交互次数,能够较大的提升数据抽取性能。注意,该值过大(2048)可能造成CDP进程OOM。必选:否默认值:512类型转换目前PostgreSQLReade

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论