阿里云数据集成服务-SDK参考-D_第1页
阿里云数据集成服务-SDK参考-D_第2页
阿里云数据集成服务-SDK参考-D_第3页
阿里云数据集成服务-SDK参考-D_第4页
阿里云数据集成服务-SDK参考-D_第5页
已阅读5页,还剩12页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

1、数据集成SDK参考手册云解析/SDK手册云解析/SDK手册 PAGE 16 PAGE 16SDK参考手册环境准备- 适用于JDK 6及以上版本安装方式1、下载Java SDK开发包(2016-02-27) 版本号 1.1.2:cdp-sdk-java-1.1.2-SNAPSHOT.jar; SHA1:ce70acffe94c6e01ae82a2cef40769f66a40fe26 MD5:4ed80e40a103f09add9e074ff6ec2a9e2、在项目中导入JAR包针对CDP Java SDK,总体上遵循了以下几个原则:JavaAPI是CDPRESTAPI(对下称RESTAPI)的客

2、户端代理,通过RESTAPI访问CDP。Java API不是REST API接口的一一映射,要有更高的抽象层次,符合对CDP概念的一般理解。用户容易推断各个类的用法,具有较低的学习曲线。例如对于查询接口,若查询没有找到对象而报错,RESTAPI返回错误为404(NotFound),但是考虑到JavaAPI的设计风格,在Java层面不再抛出异常,取代的是返回null。Java API的设计是面向对象的,符合面向对象的设计原则。目前JavaAPI接口设计暂时不提供语法糖衣,所有操作是最小功能的。将来根据需求再逐步完善。JavaAPI对所有用户的入参不允许做任何修改,对于需要修改入参情况,CDP做法

3、是拷贝一份入参,并修改后作为方法返回值返回结果。Java API体现Session、Pipeline、Job等概念模型的层次结构,例如Pipeline.start(Job),用户必须持有Pipeline才能启动Job。完整示例package com.alibaba.cdp.sdk.example; import java.util.concurrent.ExecutorService;以一个用户利用CDP Java SDK完成创建Pipeline、提交Job、最后将其Kill为例,展示SDK详情(该Demo在package com.alibaba.cdp.sdk.example; import

4、 java.util.concurrent.ExecutorService;阿里云物联网套件/服务端SDK手册阿里云物联网套件/服务端SDK手册import java.util.concurrent.Executors; import com.alibaba.cdp.sdk.model.Job;import com.alibaba.cdp.sdk.model.JobStatus; import com.alibaba.cdp.sdk.model.Pipeline; import com.alibaba.cdp.sdk.model.Session;/*样例编码,注意这个样例由于缺乏关键的作业信息

5、实际不能运行!但是它能很好体现如何使用CDPSDK完成我们所需要的业务逻辑* */public class JobExample public static void main(String args) / 登录Session session = new HYPERLINK /api Session(/api, your_accessId, your_accessKey);/ 如果没有project则创建Pipeline pipeline = new Pipeline(); pipeline.setName(your_project_name); pipeline.setDescription

6、(your_project_info); pipeline = session.createPipeline(pipeline);/ 如果已有project,直接使用即可/Project project =session.getPipeline(your_project_name);/ 提交JobJob job = new Job();job.setTraceId(your job trace info); / 自定义用以追踪作业的信息job.setContext(your_job_json); / 详情参看不同的同步方式配置信息final Job starter = pipeline.sta

7、rt(job);/ 启动另外一个线程轮询打印状态ExecutorService service = Executors.newSingleThreadExecutor(); service.execute(new Runnable() Overridepublic void run() JobStatus status; do try Thread.sleep(5000); catch (InterruptedException e) e.printStackTrace(););status = starter.status(); System.out.println(status); whi

8、le (status.isJobAlive();service.shutdown();/ 等待job运行完成try Thread.sleep(60000); catch (InterruptedException e1) e1.printStackTrace();/ 如果没有完成,主线程Kill这个运行实例do try starter.stop(); Thread.sleep(3000); catch (Exception e) e.printStackTrace();e.printStackTrace(); while (starter.status().isJobAlive();CDP异常

9、指CDP(这里包括客户端与服务端)本身异常,针对这类异常,CDP需要告之用户具体的异常信息,如 果涉及到服务端异常信息,还需要告之服务端的request-id。问题排查时用户可将request-id发送给CDP管理 员,便于运维和问题排查。另外,CDP Java SDK涵盖的所有异常均是Java Runtime Exception,具体介绍如下:CDPException:CDPExceptionCDP异常基类,CDP具体的异常类必须继承自CDPException,定义为:publicclass CDPException extends java.lang.RuntimeExceptionCDP

10、ClientExceptionCDPJsonExceptionCDPJsonExceptionJSON格式错误异常CDPIllegalArgumentExceptionCDPIllegalArgumentExceptionPipeline创建重名CDPIllegalStateException 系统状态异常CDPIllegalStateException 异常适配系统状态异常情况,例如Pipeline.submit(Job),但是Pipeline中session失效CDPAuthorizationExceptionCDPAuthorizationExceptionCDPResourceLimi

11、tExceptionCDPResourceLimitException 异常适配系统容量限制失败,例如Pipeline创建超出单个用户最大限制Session主要记录用户的登录信息,具体如下表展示:字段类型说明endPointstringCDP HTTP 服务地址accessIdstring云账号accessIdaccessKeystring云账号accessKeypublicKeystringCDP颁发的公钥,该公钥不定期升级keyVersionstringCDP颁发的公钥版本configurationConfigurationSession 配置信息创建 Session (不指定Config

12、uration)(1)方法签名:反欺诈/sdk反欺诈/sdknew Session(final String endPoint, final StringaccessId, final String accessKey);new Session(final String endPoint, final StringaccessId, final String accessKey);(2)方法说明使用Java.造函数创建 Session,其中Configuration使用系统默认值。(3)方法返回返回新建Session,如果登录错误,将抛出CDPAuthorizationException异常,

13、并在CDPAuthorizationException说明详细错误信息。(4)异常情况参数endPoint、accessId、accessKey不能为空,否则抛出CDPIllegalArgumentException。登录校 验错误,抛出CDPAuthorizationException异常。创建 Session (指定Configuration)newnewSession(finalStringendPoint,finalStringaccessId,finalStringaccessKey,finalConfiguration configuration);(2)方法说明创建 Sessio

14、n,Configuration使用用户指定值。(3)方法返回返回新建Session。(4)异常情况参数endPoint、accessId、accessKey不能为空,否则抛出CDPIllegalArgumentException。登录校 验错误,抛出CDPAuthorizationException异常。Configuration 主要记录连接的参数,包括HTTP超时时间,重试次数等。这块信息由用户自定义,如果用户没有提供则使用系统自定义的Configuration, 字段暂无定义。Configuration默认定义值如下:字段类型说明clientVersionstringCDP Java S

15、DK 版本号userAgentstringJava SDK指定底层的HTTP协议头,默认值为alibaba-cdp- sdk-java-agentsocketTimeoutint客户端与CDP服务端Socket阻塞读取超时时长,单位为毫秒,默认值为120000connectionTimeoutint客户端与CDP服务端建立连接超时时长,单位为毫秒,默认值为30000maxErrorRetryint客户端接受到错误返回时,CDP内部自动尝试重发该请求的次数,默认为3errorRetryIntervalint客户端接受到错误返回时,CDP内部自动尝试重发该请求的次数,默认为3 空间与用户Pipel

16、ine管理Pipeline 对象描述项目空间,字段如下:字段类型说明namestring项目名称(唯一),不允许修改descriptionstring项目描述jobBandwidthint离线同步的带宽,单位是MB/screatorstring项目Owner(创建人云账号),不允许修改createTimedatetime项目创建时间,不允许修改modifierstring项目修改云账号,不允许修改modifyTimedatetime项目修改时间,不允许修改stateint状态 (0: 开启、1: 关闭、2: 停机、254: 删除、255: unknown)sessionSession登录会话状

17、态,不允许修改其中Pipeline命名规范为:长度必须在3-32字符之间必须以A-Z,a-z,0-9和下划线(_)构成必须以字母开头不区分大小写命名由用户提交,系统需要保证全局唯一Pipeline状态定义为:开启:状态代码0,代表该Pipeline开启并处于正常的运行状态。该状态是一个Pipeline创建后的初始状态。关闭:状态代码1,代表该Pipeline被Pipeline的Owner关闭。该Pipeline仍然属于当前Owner的个人资产,该Owner仍然对该Pipeline保留查看、修改、流量Job的功能。当一个Pipeline处于关闭状态下,正 在运行过程的Job将继续运行,但是不允许

18、新的Job提交,否则报错。Pipeline的Owner可以将一个关闭状态下 的Pipeline重新设置为开启状态恢复使用,处于关闭状态下的Pipeline同样视作该用户的Pipeline资源。停机:状态代码2,代表该Pipeline因为用户欠费、违反阿里云使用协议(包括违反政府法律行为)而被瑶池处于 停机状态。处于停机状态下,Pipeline当前运行Job继续运行完成,但是不允许新的Job提交。一旦Pipeline处 于停机状态,该Pipeline不能被Owner修改为开启状态。停机状态下的Pipeline仍然属于该用户资产,用户不 可创建新的Pipeline。删除:状态代码254,代表该Pi

19、peline被CDP官方管理员删除。当一个Pipeline处于删除状态,用户无法查阅浏 览Pipeline,正在运行过程的Job将继续运行,但是不允许新的Job提交。后续需要考虑实现直接Kill下属所有的Job、Stream。一旦Pipeline删除,CDP将该Pipeline标记为删除状态,保留下属所有的Job、Stream运行记录,该Pipeline不 属于用户资产,用户可以创建新的Pipeline。一旦Pipeline删除,该Pipeline不可恢复为正常状态!CDP可能定 期清理删除Pipeline。创建PipelinePipeline Session.createPipeline(f

20、inal PipelinePipeline);方法签名Pipeline Session.createPipeline(final PipelinePipeline);方法说明在当前Session中创建Pipeline。方法返回返回新建Pipeline,返回的Pipeline,包含该Pipeline中的最新Id,Key等信息。方法异常CDPIllegalArgumentException。当前 用户Pipeline个数已经达到上限不允许创建,抛出CDPResourceLimitException。ACE/常见问题ACE/常见问题查询PipelinePipeline Session.getPipe

21、line(final String name);方法签名Pipeline Session.getPipeline(final String name);方法说明在当前Session中查询Pipeline。方法返回返回查询Pipeline,如果没有找到目标Pipeline,返回null。方法异常如果该Pipeline当前用户无权限访问,抛出CDPAuthorizationException异常,并告之用户错误详情。浏览PipelinePage Session.listPipeline();方法签名Page Session.listPipeline();方法说明在当前Session中列出所有Pip

22、eline清单方法返回返回所有Pipeline清单,如果没有找到,返回的Page中List其长度为0。浏览 Pipeline (指定参数)Page Session.listPipeline(finalMap criteria);方法签名Page Session.listPipeline(finalMap criteria);方法说明Map criteria = new HashMap() criteria.put(pageSize, 10);criteria.put(pageIndex, 1);Map criteria = new HashMap() criteria.put(pageSize

23、, 10);criteria.put(pageIndex, 1);Page Pipelines = session.listPipeline(criteria);其中,criteria参数支持多种条件查询:pageSize=XXX 分页大小pageIndex=XXX 分页索引,以1开始方法返回返回所有Pipeline清单,如果没有找到,返回的List其长度为0。修改Pipelinevoid Pipeline.update();方法签名void Pipeline.update();方法说明Pipeline Pipeline =session.getPipeline(cdp); Pipeline.

24、setDescription(bazhen.csy); Pipeline.update();用户在修改Pipeline参数后,进行更新到CDP操作,代码片段:Pipeline Pipeline =session.getPipeline(cdp); Pipeline.setDescription(bazhen.csy); Pipeline.update();方法返回返回更新后的Pipeline。方法异常Pipeline中Session如果为空,抛出CDPIllegalStateException异常,并告之用户错误详情。Pipeline修改前提是用户必须是该Pipeline的Owner,若不满足

25、上述条件,服务端将报错,SDK抛出开放搜索/SDK参考手册开放搜索/SDK参考手册CDPAuthorizationException异常,并告之用户错误详情。注 意 事 项 只允许修改Description信息。关闭Pipelinevoid Pipeline.close();方法签名void Pipeline.close();Pipeline Pipeline =session.getPipeline(cdp); Pipeline.close();方法说明代码片段:Pipeline Pipeline =session.getPipeline(cdp); Pipeline.close();方法返

26、回无返回方法异常Pipeline中Session如果为空,抛出CDPIllegalStateException异常,并告之用户错误详情。Pipeline关闭前提是用户必须是该Pipeline的Owner,若不满足上述条件,服务端将报错,SDK抛出CDPAuthorizationException异常,并告之用户错误详情。注意事项该API遵守幂等性约定。Pipeline进入关闭状态后,已提交的的同步Job将继续完成,但无法启动新 同步Job。关闭的Pipeline仍然属于用户的个人资产,保留用户对该Pipeline进行查看、修改、查找Job等基本操作,CDP对该Pipeline不回收、不修改.开

27、启Pipeline方法签名void Pipeline.open();void Pipeline.open();代码片段:Pipeline Pipeline =session.getPipeline(cdp); Pipeline.open();Pipeline Pipeline =session.getPipeline(cdp); Pipeline.open();无返回方法异常 Pipeline中Session如果为空,抛出CDPIllegalStateException异常,并告之用户错误详情。Pipeline关闭前提是用户必须是该Pipeline的Owner,若不满足上述条件,服务端将报错,

28、SDK抛 出CDPAuthorizationException异常,并告之用户错误详情。该API遵守幂等性约定。Pipeline被开启后,用户恢复对该Pipeline提交Job的功能。作业管理Job描述一个离线同步作业,具体信息有:字段类型注释idbigintJob主键traceIdstring该Job启动外部提交用以追踪的的信息, CDP不负责解释该信息contextstring该Job启动的任务配置上文(json)submitUserstring触发该Job的云账号submitTimedatetime该Job提交的时间startTimedatetime该Job实际启动的时间, 存在资源竞争

29、,作业运行时间比submitTime晚endUserstring发起kill该Job的云账号,如没有kill,留空endTimedatetime该Job结束的时间(包括finish、fail、kill)statusstringjob当前状态,json格式pipelinePipelineJob运行的Pipeline空间需要注意的是,狭义上的Job状态信息(CDP约定称之为state) 是Job的当前运行阶段,定义如下:需要注意的是,狭义上的Job状态信息(CDP约定称之为state) 是Job的当前运行阶段,定义如下:0代号SUCCESS,指代该Job已经结束,并且处于运行成功1SUBMIT,指

30、代CDP才接受到该请求,所有提交的Job初始化均为submit状态2 代号WAIT,指代CDP已经进入Job运行就绪队列,等待运行。该状态可能由于底层同步运行资源不够,可能会持续较长一段时间。3RUN,指代CDP已经启动该Job,该Job已经运行起来。4FAIL,指代该Job启动后运行失败。5KILL,指代该Job启动后被人为kill,CDP会记录发起kill命令的操作人。255 代号UNKNOWN,指代系统出现异常情况,Job进入了unknown状态。广义的Job状态信息(CDP约定称之为status)不仅包括运行阶段,目前包括如下状态信息:state: 4,stage: 0.3,total

31、Records: 10000,totalBytes: 100000,speedRecords: 200,speedBytes: 1000,errorRecords: 20,errorBytes: 200,errorMessage: Wrong password!查询作业方法签名Job Pipeline.getJob(long jobId);Job Pipeline.getJob(long jobId);在当前Pipeline中查询指定id的Job信息。返回查询Job,如果没有找到目标Job,返回null。注意事项 服务端会校验jobId所属Pipeline,当前用户是否有权限访问,若无,抛出权

32、限异常CDPAuthorizationException。查看作业状态方法签名Map Job.status();Map Job.status();查看同步Job的状态信息方法返回 返回同步Job的状态信息, 广义的Job状态信息不仅包括运行状态(run、fail等),目前包括如下字段:state: fail, stagePercent: 30%,totalRecord: 10000,totalByte: 100000,speedRecord: 200,speedByte: 1000,errorRecord: 20,errorByte: 200,errorMessage: Wrong passw

33、ord!该方法自动调用refresh()进行刷新当前Job。刷新作业状态方法签名void Job.refresh();void Job.refresh();方法说明 处于运行期间的Job其内部状态会全部处于变化状态,这里可以调用refresh强制从CDP服务端获取并刷新该Job状态。无方法异常如果JobJob不存在,抛出异常CDPIllegalArgumentException。如果JobJob状态已经结束,抛出异常CDPIllegalStateException。如果Job当前用户无操作权限,抛出异常CDPAuthorizationException。启动作业Job Pipeline.sta

34、rt(final Job job);方法签名Job Pipeline.start(final Job job);方法说明Pipeline Pipeline = session.getPipeline(1L);用户使用该方法提交匿名Job,代码片段Pipeline Pipeline = session.getPipeline(1L);Job job = new Job(); Job job = new Job(); job.setTraceId(bazhen.start); job.setContext(一段json信息); Pipeline.start(job)方法返回提交成功后,返回新的Jo

35、b对象方法异常Pipeline中Session如果为空,抛出CDPIllegalStateException异常,并告之用户错误详情。Pipeline提交Job前提是用户必须是该Pipeline的Owner,若不满足上述条件,服务端将报错,SDK抛出CDPAuthorizationException异常,并告之用户错误详情。注意事项提交Job状态必须合法,合法定义为:context。其他字段用户均不用填写,若填写服务端直接忽略该API不信任用户提交submitUser,服务端负责根据鉴权信息获取真实User信息。浏览JobPage Pipeline.listJob(finalMap crite

36、ria);方法签名Page Pipeline.listJob(finalMap criteria);方法说明Map criteria = newHashMap() criteria.put(submitUser,bazhen.csy)criteria.put(startTime, 1399348117,1399348129) Map criteria = newHashMap() criteria.put(submitUser,bazhen.csy)criteria.put(startTime, 1399348117,1399348129) Page jobs =Pipeline.listJo

37、b(criteria);criteria参数支持多种条件查询:pageSize=XXXpageIndex=XXX1开始traceId=XXX根据traceId进行模糊(like)查询submitUser=XXXsubmitUser进行精确(is)查询submitIp=XXXsubmitIp进行精确(is)查询endUser=XXXendUser进行精确(is)查询submitTime=startTime,endTime 根据submitTime进行范围(between)查询,时间请使用Unix时间戳startTime=startTime,endTime 根据startTime进行范围(betw

38、een)查询,时间请使用Unix时间戳endTime=startTime,endTime 根据endTime进行范围(between)查询,时间请使用Unix时间戳方法返回返回符合搜索条件的Job清单,如无符合条件的Job对象,返回Page,但里面Job集合为空。方法异常Pipeline中Session如果为空,抛出CDPIllegalStateException异常,并告之用户错误详情。Pipeline浏览前提是当前用户必须是该Pipeline的Owner,若不满足上述条件,服务端将报错,SDK抛出CDPAuthorizationException异常,并告之用户错误详情。查询作业日志方法签名String

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论