kettle源码分析.docx_第1页
kettle源码分析.docx_第2页
kettle源码分析.docx_第3页
kettle源码分析.docx_第4页
kettle源码分析.docx_第5页
已阅读5页,还剩28页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

Kettle源码分析一 获取并编译源码1.1. 获取源码Svn获取svn://svnkettleroot/Kettle/tags网页获取官方网站:/1.2. 编译源码将项目加载到eclipse将kettle项目拷贝到eclipse的workspace目录下,在eclipse中新建java project,项目名称和你拷贝过来的kettle文件夹名称一致 项目导入到eclipse中会出现一个错误,如下图,将这个文件的源码全部注释掉编译打开build.xml, 在右边的。Outline 点击kettle-run as -ant build 第一次编译的时候需要从网上下载几个文件,放在C:Documents and SettingsAdministrator.subfloor,网络不好的话下载会比较慢 ,也可以直接文件放在C:Documents and SettingsAdministrator下。编译完成后将bin目录下的.bat文件拷贝到Kettle目录下点击Spoon.bat运行,运行成功代表编译已近通过用源码运行SpoonKettle源码工程本身可能是在linux64位机器上调试的,swt配置是linux64的库,所以在运行源码前需要修改成win32的swt,步骤如下:工程属性Java Build Pathlibrariesadd jars然后将linux64的SWT库删除最后打开src-uiorg.pentaho.di.ui.spoonSpoon.java, Run As java application二 源码分析2.1. 修改kettle界面修改初始化界面打开package org.pentaho.di.ui.spoon的Spoon.Java,找到main函数,该main函数为Spoon工具的入口,找到如下语句Splash splash = new Splash(display);该语句为spoon初始化显示的界面,跳到定义Splash.java,下面函数canvas.addPaintListener(new PaintListener() public void paintControl(PaintEvent e) String versionText = BaseMessages.getString(PKG, SplashDialog.Version) + + Const.VERSION; /$NON-NLS-1$ /$NON-NLS-2$ StringBuilder sb = new StringBuilder(); String line = null; try BufferedReader reader = new BufferedReader(new InputStreamReader(Splash.class.getClassLoader().getResourceAsStream(org/pentaho/di/ui/core/dialog/license/license.txt); /$NON-NLS-1$ while(line = reader.readLine() != null) sb.append(line + System.getProperty(line.separator); /$NON-NLS-1$ catch (Exception ex) sb.append(); /$NON-NLS-1$ Log.warn(BaseMessages.getString(PKG, SplashDialog.LicenseTextNotFound); /$NON-NLS-1$ String licenseText = sb.toString(); e.gc.drawImage(kettle_image, 0, 0); / If this is a Milestone or RC release, warn the user if (Const.RELEASE.equals(Const.ReleaseType.MILESTONE) versionText = BaseMessages.getString(PKG, SplashDialog.DeveloperRelease) + - + versionText; /$NON-NLS-1$ /$NON-NLS-2$ drawVersionWarning(e); else if (Const.RELEASE.equals(Const.ReleaseType.RELEASE_CANDIDATE) versionText = BaseMessages.getString(PKG, SplashDialog.ReleaseCandidate) + - + versionText; /$NON-NLS-1$/$NON-NLS-2$ else if (Const.RELEASE.equals(Const.ReleaseType.PREVIEW) versionText = BaseMessages.getString(PKG, SplashDialog.PreviewRelease) + - + versionText; /$NON-NLS-1$/$NON-NLS-2$ else if (Const.RELEASE.equals(Const.ReleaseType.GA) versionText = BaseMessages.getString(PKG, SplashDialog.GA) + - + versionText; /$NON-NLS-1$/$NON-NLS-2$ Font verFont = new Font(e.display, Helvetica, 11, SWT.BOLD); /$NON-NLS-1$ e.gc.setFont(verFont); e.gc.drawText(versionText, 290, 205, true); / try using the desired font size for the license text int fontSize = 8; Font licFont = new Font(e.display, Helvetica, fontSize, SWT.NORMAL); /$NON-NLS-1$ e.gc.setFont(licFont); / if the text will not fit the allowed space while (!willLicenseTextFit(licenseText, e.gc) fontSize-; licFont = new Font(e.display, Helvetica, fontSize, SWT.NORMAL); /$NON-NLS-1$ e.gc.setFont(licFont); e.gc.drawText(licenseText, 290, 290, true); );1. 修改背景图片找到ui/image/下面的kettle_splash.png,替换该图片2. 修改版本信息找到e.gc.drawText(versionText, 290, 205, true); 改为e.gc.drawText(海康威视数据交换平台V1.0, 290, 205, true);3. 修改下面的描述性文字找到e.gc.drawText(licenseText, 290, 290, true);改为e.gc.drawText(作者:海康, 290, 290, true);4. 预览效果修改spoon主界面标题找到spoon.java下面的public static final String APP_NAME = BaseMessages.getString(PKG, Spoon.Application.Name);修改为要改成的标题修改spoon界面主菜单栏Ui/spoon.xul下引用menubar.xul代码,menubar.xul为主菜单栏的配置处修改spoon菜单栏Spoon菜单栏配置的地方在spoon.xul下的 修改转换和job插件列表及图标Src/kettle-steps.xml是旁边转换插件列表的配置文件处文件分析如下,这个为表输入的配置i18n:org.pentaho.di.trans.step:BaseStep.TypeLongDesc.TableInputorg.pentaho.di.trans.steps.tableinput.TableInputMetai18n:org.pentaho.di.trans.step:BaseStep.Category.Inputi18n:org.pentaho.di.trans.step:BaseStep.TypeTooltipDesc.TableInputui/images/TIP.pngclassname为对应的源文件, category为属于哪个分类下,例如tableinput属于“输入”组,tooltip提示信息,iconfile显示图片主面板布局在spoon.java下的private void addTree()方法内 2.2. Kettle数据转换流程Kettle数据转换流程是一个生产者消费者模型,每个步骤为一个节点,每个节点是一个单独的线程,节点和节点之间通过一个阻塞队列传递数据,前一个节点一条一条的往阻塞队列写入,后面节点一条一条的从阻塞队列读取,阻塞队列实现方法在src-core/org/pentaho/di/core/BlockingRowSet.java。表输入流程表输入通过JDBC连接数据库,执行SQL语句后返回一个ResultSet(结果集),如果数据库支持PreparedStatement . setFetchSize(每次返回的多少条记录到结果集),分批将数据返回到结果集中,然后逐条从结果集中读取数据写入到阻塞队列中;如果不支持分批次读取,则一次行将数据返回到ResultSet中,然后逐条读取写入阻塞队列,数据库的操作的方法在src-db/org/pentaho/di/core/database/Database.java中实现。插入更新流程2.3. 插件研究kettle转换步骤工作组件 这里有四个类构成了这个kettle 步骤/节点,每一个类都有其特定的目的及所扮演的角色。TemplateStep: 步骤类实现了StepInteface接口,在转换运行时,它的实例将是数据实际处理的位置。每一个执行线程都表示一个此类的实例。TemplateStepData: 数据类用来存储数据,当插件执行时,对于每个执行的线程都是唯一的。执行时里面存储的东西主要包括数据库连接、文件句柄、缓存等等其他东西。TemplateStepMeta: 元数据类实现了StepMetaInterface接口。它的职责是保存和序列化特定步骤实例的配置,在我们这个例子中,它负责保存用户设置的步骤名称和输出字段的名称。TemplateStepDialog:对话框类实现了该步骤与用户交互的界面,它显示一对话框,通过对话框用户可以自己的喜好设定步骤的操作。对话框类与元数据类关系非常紧密,元数据类可以追踪用户的设置。除了上面的代码,还有一个plugin.xml,它设置好了插件的元数据,定义了步骤在kettle图形工作台中的显示效果。为了更好的让大家理解,我将利用这个步骤设计一个转换流程并执行它。对于插件的开发,我们将从plugin.xml配置文件开始讲起,然后讲讲元数据和对话框类,最后再讲讲步骤类和数据类。书写你自己的plugin.xml: 下面plugin.xml是我们这个插件里面的内容,它的功能是告诉kettle插件的元数据类,插件的名称及描叙,还有需要加载的jar包。想要了解细节,可以查看文章:plug-in loading ID:在kettle插件中必须全局唯一,因为被kettle序列化了,所以不要随便改变Iconfile: kettle中插件显示的图片,必须是png图片Description:插件描叙,显示在树形菜单里面。Tooltip:树形菜单中,鼠标滑过的时候显示的提示信息Category:插件显示的父目录Classname:元数据类Library:指明了插件需要加载所依赖的jar包插件主要包含类介绍一、元数据类: 下面显示了元数据的几个关键的方法,注意元数据类里面用私有成员变量outputField 存储了下一个步骤的输出字段。/ keep track of the step settingspublic String getOutputField()public void setOutputField()public void setDefault()/ serialize the step settings to and from xmlpublic String getXML()public void loadXML()/ serialize the step settings to and from a kettle repositorypublic void readRep()public void saveRep()/ provide information about how the step affects the field structure of processed rowspublic void getFields()/ perform extended validation checks for the steppublic void check()/ provide instances of the step, data and dialog classes to Kettlepublic StepInterface getStep()public StepDataInterface getStepData()public StepDialogInterface getDialog()TemplateStepMeta元数据类其实还有很多方面,不过大多被他的父类BaseStepMeta给默认实现了,这些默认的实现足以使我们的元数据类工作良好。想要了解更多,大家可以查查关于StepMetaInteface和BaseStepMeta的kettle官方文档。二、对话框类: TemeplateStepDialog为步骤实现了对话框的设置,kettle的用户界面部件是使用的eclipse的swt框架,如果要开发比较复杂的对话框,你还必须熟悉大部分swt代码。 Swt文档大家可以从eclipse上的帮助菜单点击在线获取。在开发过程中,一个对话框对象拥有一个元数据对象,它记录了应该从哪里读取配置?应该把设置好的配置保存在哪里? 它仅仅设置了输出字段的名称在我们这个模板步骤里面。一个继承自BaseStepDialog特定的对话框类必须提供open()方法,这个方法必须返回这个步骤的名称(发生改变时)或NULL(对话框被取消时)三、步骤类: 步骤类是实际的处理和转换工作的地方。因为大部分样本代码已经由父类BaseStep提供了,大多数插件仅仅关注下面几个特定的方法就行。/ initialization and teardownpublic booleaninit()public voiddispose(.)/ processing rowspublic voidrun()public booleanprocessRow(.)Init()方法在转换执行前被kettle调用,转换必须在所有步骤初始化成功时才真正执行。我们这个模板步骤没有做任何事情,这里仅仅是拿出来让大家了解了解。dispose()方法是在步骤执行完之后执行(非转换执行完哈),它完成资源的关闭,像文件句柄、缓存等等。run()方法在实际处理记录集的时候调用。里面其实是个调用processRow()方法处理记录的小循环,当此步骤再没有数据处理或转换被停止时退出循环。processRow()方法在处理单条记录的时候被调用。这个方法通常通过调用getRow()来获取需要处理的单条记录。 这个方法如果有需要将会被阻塞,例如当此步骤希望放慢脚步处理数据时。processRow()随后的流程将执行转换工作并调用putRow()方法将处理过的记录放到它的下游步骤。注意:你的步骤可能会变记录的结构,为了安全起见,一定要多熟悉包org.pentaho.di.core.row,特别是类RowMetaInterface和RowDataUtil。基类BaseStep对处理的记录提供了第一次访问的标识,在某些代码只执行一次的时候可能非常有用,例如某个费时的查找,其实这就是缓存。四、数据类: 大多数步骤都需要临时的缓冲或者临时的数据。数据类就是这些数据合适的存放位置。每一个执行线程将得到其拥有的数据类实例,所以它能在独立的空间里面运行。TemplateStepData继承自BaseStepData,作为一个经验法则,不要将non-constant字段放置BaseStepData类里面,如果你必须,请将它最好放置TemplateStepData数据类里面.我们的步骤仅仅使用了一个数据对象来存储记录集输出的结构,没有用到其他的存储介质,例如文件等等。开发插件实例1. 在kettle-steps.xml下添加如下节点MyTestmytest.MyTestMeta插件测试测试ui/images/TIP.png2. 创建插件类MyTest类代码package mytest;import org.pentaho.di.trans.Trans;import org.pentaho.di.trans.TransMeta;import org.pentaho.di.trans.step.BaseStep;import org.pentaho.di.trans.step.StepDataInterface;import org.pentaho.di.trans.step.StepInterface;import org.pentaho.di.trans.step.StepMeta;public class MyTest extends BaseStep implements StepInterface public MyTest(StepMeta stepMeta, StepDataInterface stepDataInterface,int copyNr, TransMeta transMeta, Trans trans) super(stepMeta, stepDataInterface, copyNr, transMeta, trans);/ TODO Auto-generated constructor stubMyTestData类代码package mytest;import org.pentaho.di.trans.step.BaseStepData;import org.pentaho.di.trans.step.StepDataInterface;public class MyTestData extends BaseStepData implements StepDataInterface MyTestMeta类代码package mytest;import java.util.List;import java.util.Map;import org.pentaho.di.core.CheckResultInterface;import org.pentaho.di.core.Counter;import org.pentaho.di.core.database.DatabaseMeta;import org.pentaho.di.core.exception.KettleException;import org.pentaho.di.core.exception.KettleXMLException;import org.pentaho.di.core.row.RowMetaInterface;import org.pentaho.di.repository.ObjectId;import org.pentaho.di.repository.Repository;import org.pentaho.di.trans.Trans;import org.pentaho.di.trans.TransMeta;import org.pentaho.di.trans.step.BaseStepMeta;import org.pentaho.di.trans.step.StepDataInterface;import org.pentaho.di.trans.step.StepInterface;import org.pentaho.di.trans.step.StepMeta;import org.pentaho.di.trans.step.StepMetaInterface;import org.w3c.dom.Node;public class MyTestMeta extends BaseStepMeta implements StepMetaInterface public MyTestMeta()super();Overridepublic void setDefault() / TODO Auto-generated method stubOverridepublic void loadXML(Node stepnode, List databases,Map counters) throws KettleXMLException / TODO Auto-generated method stubOverridepublic void saveRep(Repository rep, ObjectId id_transformation,ObjectId id_step) throws KettleException / TODO Auto-generated method stubOverridepublic void readRep(Repository rep, ObjectId id_step,List databases, Map counters)throws KettleException / TODO Auto-generated method stubOverridepublic void check(List remarks, TransMeta transMeta,StepMeta stepMeta, RowMetaInterface prev, String input,String output, RowMetaInterface info) / TODO Auto-generated method stubOverridepublic StepInterface getStep(StepMeta stepMeta,StepDataInterface stepDataInterface, int copyNr,TransMeta transMeta, Trans trans) / TODO Auto-generated method stubreturn null;Overridepublic StepDataInterface getStepData() / TODO Auto-generated method stubreturn null;MyTestDialog类代码package mytest;import org.eclipse.swt.SWT;import org.eclipse.swt.widgets.Display;import org.eclipse.swt.widgets.Shell;import org.pentaho.di.trans.TransMeta;import org.pentaho.di.trans.step.BaseStepMeta;import org.pentaho.di.trans.step.StepDialogInterface;import org.pentaho.di.ui.trans.step.BaseStepDialog;public class MyTestDialog extends BaseStepDialog implements StepDialogInterface public MyTestDialog(Shell parent, Object in,TransMeta transMeta, String stepname) super(parent, (BaseStepMeta)in, transMeta, stepname);/ TODO Auto-generated constructor stubOverridepublic String open() Shell parent = getParent();Display display = parent.getDisplay();shell = new Shell(parent, SWT.DIALOG_TRIM | SWT.RESIZE | SWT.MAX | SWT.MIN); props.setLook(shell); shell.open(); shell.setSize(200, 200); shell.setText(hello);return null;除TestDialog类外的3个类的代码都是在加入继承基类和接口后更加eclipse插件提示自动生成的,也是我们自己需要实现的方法,TestDialog类中的MyTestDialog方法参数是固定的,根据生成的需要修改下,open函数是要我们自己实现的,运行效果如下双击MyTest后弹出一个空白的窗口调用use define java class 插件以kettle中自带的samplestransformationsUser Defined Java Class - Calculate the date of Easter.ktr为例public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException Object r=getRow();/从阻塞队列中获取一个数据对象(一行数据记录) if (r=null)/如果没有可获取的数据,代表以处理完成 setOutputDone();/设置处理完成标志return false;/退出循环 if (first) /第一次进入循环 /初始化动作 first=false;/设置为非第一次循环 /*处理函数*/ logBasic(r0.toString();/打印 putRow(data.outputRowMeta, r);/输出到阻塞队列 return true;use define java class 插件其实就是一个空插件,然后我们自己来实现processRow函数体,kettle通过while循环来调用processRow函数一行一行的处理数据,流程如下所示2.4. 如何快速从源码中定位功能我们要查找一个插件中的某个小功能,我们可以根据插件名称先找到插件源码文件夹,比如我们要找“表输入”,表输入属于transformation,如果我们要找实现的源码,我们可以先快速找到src目录下的src/org/pentaho/di/trans/steps/,然后在该命名空间下找到tableinput包,包下面一般都有X.java, XDate.java, XMeta.java, 比如“表输入”为tableinput.java, tableinputDate.java, tableinputMeta.java,我们要找处理流程,我们可以在public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) 函数下打断点,每个插件都有这样一个函数。如果我们要找某个控件的功能,我们可以到XMeta.java下根据该控件描述找到一个值,然后在get这个值的函数打断点,比如表输入的“允许延迟转换”,我们可以在tableinputMeta.java下找到一个标志值,“lazyConversionActive”, 这个值为blean行,我们可以找到public boolean isLazyConversionActive() return lazyConversionActive;设置断点。三 Kettle接口调用Kettle编译后的库都放在lib目录下,主要包含如下几个库 我们主要用到的是kettle-core.jar, kettle-db.jar, kettle-engine.jar, 剩下的2个kettle-test.jar(测试用的), kettle-ui-swt.jar(kettle spoon界面)。在通过代码调用transformation, job时需要加载前3个库,和libext目录下的第三方库。3.1 源码调用transformation接口3.1.1 通过ktr文件执行transformationimport org.pentaho.di.core.KettleEnvironment;import org.pentaho.di.core.exception.KettleException;import org.pentaho.di.trans.Trans;import org.pentaho.di.trans.TransMeta;public class ExcuteTransByFile public static void main(String args) String filename = ExcelMysql.ktr;/ktr文件名try KettleEnvironment.init();TransMeta transMeta = new TransMeta(filename);Trans trans = new Trans(transMeta);trans.execute(null); / You can pass arguments instead of null.trans.waitUntilFinished();/等transformation执行结束if (trans.getErrors() 0) throw new RuntimeException(There were errors during transformation execution.); catch (KettleException e) / TODO Put your exception-handling code here./ System.out.println(filename);System.out.println(e);3.1.2 通过资源库文件执行transformationimport org.pentaho.di.core.KettleEnvironment;import org.pentaho.di.core.database.DatabaseMeta;import org.pentaho.di.core.exception.KettleException;import org.pentaho.di.repository.RepositoryDirectory;import org.pentaho.di.repository.RepositoryDirectoryInterface;import org.pentaho.di.repository.kdr.KettleDatabaseRepository;import org.pentaho.di.repository.kdr.KettleDatabaseRepositoryMeta;import org.pentaho.di.trans.Trans;import org.pentaho.di.trans.TransMeta;public class ExcutTransByRep public static void main(String args) String transName = del;try KettleEnvironment.init();DatabaseMeta dataMeta = new DatabaseMeta(KettleDBRep, MySQL,Native, , rep, 3306, root, );KettleDatabaseRepositoryMeta repInfo = new KettleDatabaseRepositoryMeta();repInfo.setConnection(dataMeta);KettleDatabaseRepository rep = new KettleDatabaseRepository();rep.init(repInfo);rep.connect(admin, admin);RepositoryDirectoryInterface dir = new RepositoryDirectory();dir = rep.loadRepositoryDirectoryTree();/获取根目录TransMeta transMeta = rep.loadTransformation(rep.getTransformationID(transName, dir), null);/读取根目录下的del转换Trans trans = new Trans(transMeta);trans.execute(null); / You can pass arguments instead of null.trans.waitUntilFinished();if (trans.getErrors() 0) throw new RuntimeException(There were errors during transformation execution.); catch (KettleException e) / TODO Put your exception-handling code here./ System.out.println(filename);System.out.println(e);上面只是根目录下的转换,要读取其他目录下的转换,如何遍历目录参照/kettle4.1/src-core/org/pentaho/di/repository/RepositoryDirectory.java3.1.3 通过资源库transformation ID执行transformation

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论