您的当前位置:首页正文

开发技术:KETTLE+JAVA+API+开发实战记录

来源:化拓教育网
前言:

为什么要用Kettle和KETTLE JAVA API?

Kettle是什么?kettle:是一个开源ETL工具。kettle提供了基于java的图形化界面,使用很方便,kettle的ETL工具集合也比较多,常用的ETL工具都包含了。

为什么使用KETTLE JAVA API:就像kettle文档所说:KETTLE JAVA API : Program your own Kettle transformation,kettle提供了基于JAVA的脚步编写功能,可以灵活地自定义ETL过程,使自行定制、批量处理等成为可能,这才是一个程序员需要做的工作,而不仅是象使用word一样操作kettle用户界面。

KETTLE JAVA API 实战操作记录:

一、 搭建环境 :到kettle.be网站下载kettle的源码包,加压缩,例如解压缩到d:\\kettle目录

二、 打开eclipse,新建一个项目,要使用jdk1.5.0,因为kettle的要使用System.getenv(),只有在jdk1.5.0才被支持。提起getenv(),好像有一段几起几落的记录,曾一度被抛弃,现在又被jdk1.5支持了。

三、 建一个class : TransBuilder.java,可以把d:\\kettle\\ extra\\TransBuilder.java的内容原样拷贝到你的TransBuilder.java里。

四、 根据需要编辑源码。并需要对原程序进行如下修改,在头部增加:

import org.eclipse.swt.dnd.Transfer;

//这个包被遗漏了,原始位置kettle根目录 \\libswt\\win32\\swt.jar

//add by chq(chq.name) on 20XX.07.20

(后来发现,不必加这个引用,因为编译时不需要)

五、 编译准备,在eclipse中增加jar包,主要包括(主要依据extra\\TransBuilder.bat):

\\lib\\kettle.jar

\\libext\\CacheDB.jar

\\libext\\SQLBaseJDBC.jar \\libext\\activation.jar \\libext\\db2jcc.jar

\\libext\\db2jcc_license_c.jar \\libext\\edtftpj-1.4.5.jar

\\libext\\firebirdsql-full.jar \\libext\\firebirdsql.jar \\libext\\gis-shape.jar \\libext\\hsqldb.jar \\libext\\ifxjdbc.jar \\libext\\javadbf.jar \\libext\\jconn2.jar \\libext\\js.jar \\libext\\jt400.jar \\libext\\jtds-1.1.jar \\libext\\jxl.jar \\libext\\ktable.jar \\libext\\log4j-1.2.8.jar \\libext\\mail.jar

\\libext\\mysql-connector-java-3.1.7-bin.jar \\libext\\ojdbc14.jar \\libext\\orai18n.jar

\\libext\\pg74.215.jdbc3.jar \\libext\\edbc.jar

(注意 :下面这个包被遗漏了,要加上。原始位置kettle根目录\\libswt\\win32\\swt.jar)

\\libswt\\win32\\swt.jar

六、 编译成功后,准备运行

为使程序不必登陆就可以运行,需要设置环境署文件:kettle.properties,位置在用户目录里,一般在 \\Documents and Settings\\用户\\.kettle\\,主要内容如下:

KETTLE_REPOSITORY=kettle@m80

KETTLE_USER=admin

KETTLE_PASSWORD=passwd

七、 好了,现在可以运行一下了,看看数据是不是已经拷贝到目标表了。

以下为修改后的程序源码:

++++++++++++++++++++++++++++++++

package name.chq.test;

import java.io.DataOutputStream;

import java.io.File;

import java.io.FileOutputStream;

import be.ibridge.kettle.core.Const;

import be.ibridge.kettle.core.LogWriter;

import be.ibridge.kettle.core.NotePadMeta;

import be.ibridge.kettle.core.database.Database;

import be.ibridge.kettle.core.database.DatabaseMeta;

import be.ibridge.kettle.core.exception.KettleException;

import be.ibridge.kettle.core.util.EnvUtil;

import be.ibridge.kettle.trans.StepLoader;

import be.ibridge.kettle.trans.Trans;

import be.ibridge.kettle.trans.TransHopMeta;

import be.ibridge.kettle.trans.TransMeta;

import be.ibridge.kettle.trans.step.StepMeta;

import be.ibridge.kettle.trans.step.StepMetaInterface;

import be.ibridge.kettle.trans.step.selectvalues.SelectValuesMeta;

import be.ibridge.kettle.trans.step.tableinput.TableInputMeta;

import be.ibridge.kettle.trans.step.tableoutput.TableOutputMeta;

//这个包被遗漏了,原始位置kettle根目录\\libswt\\win32\\swt.jar

//add by chq([link=chq.name]chq.name[/link]) on 20XX.07.20

//import org.eclipse.swt.dnd.Transfer; /**

* Class created to demonstrate the creation of transformations on-the-fly. *

* @author Matt * */

public class TransBuilder {

public static final String[] databasesXML = {

\"\" +

\"\" +

\"target\" +

\"192.168.17.35\" +

\"ORACLE\" +

\"Native\" +

\"test1\" +

\"1521\" +

\"testuser\" +

\"pwd\" +

\"\" +

\"\" +

\"\" +

\"\" +

\"EXTRA_OPTION_MYSQL.defaultFetchSize500\" +

\"EXTRA_OPTION_MYSQL.useCursorFetchtrue\" +

\"PORT_NUMBER1521\" +

\"\" +

\"\" ,

\"\" +

\"\" +

\"source\" +

\"192.168.16.12\" +

\"ORACLE\" +

\"Native\" +

\"test2\" +

\"1521\" +

\"testuser\" +

\"pwd2\" +

\"\" +

\"\" +

\"\" +

\"\" +

\"EXTRA_OPTION_MYSQL.defaultFetchSize500\" +

\"EXTRA_OPTION_MYSQL.useCursorFetchtrue\" +

\"PORT_NUMBER1521\" +

\"\" +

\"\"

};

前言:

为什么要用Kettle和KETTLE JAVA API?

Kettle是什么?kettle:是一个开源ETL工具。kettle提供了基于java的图形化界面,使用很方便,kettle的ETL工具集合也比较多,常用的ETL工具都包含了。

为什么使用KETTLE JAVA API:就像kettle文档所说:KETTLE JAVA API : Program your own Kettle transformation,kettle提供了基于JAVA的脚步编写功能,可以灵活地自定义ETL过程,使自行定制、批量处理等成为可能,这才是一个程序员需要做的工作,而不仅

是象使用word一样操作kettle用户界面。

KETTLE JAVA API 实战操作记录:

一、 搭建环境 :到kettle.be网站下载kettle的源码包,加压缩,例如解压缩到d:\\kettle目录

二、 打开eclipse,新建一个项目,要使用jdk1.5.0,因为kettle的要使用System.getenv(),只有在jdk1.5.0才被支持。提起getenv(),好像有一段几起几落的记录,曾一度被抛弃,现在又被jdk1.5支持了。

三、 建一个class : TransBuilder.java,可以把d:\\kettle\\ extra\\TransBuilder.java的内容原样拷贝到你的TransBuilder.java里。

四、 根据需要编辑源码。并需要对原程序进行如下修改,在头部增加:

import org.eclipse.swt.dnd.Transfer;

//这个包被遗漏了,原始位置kettle根目录 \\libswt\\win32\\swt.jar

//add by chq(chq.name) on 20XX.07.20

(后来发现,不必加这个引用,因为编译时不需要)

五、 编译准备,在eclipse中增加jar包,主要包括(主要依据extra\\TransBuilder.bat):

\\lib\\kettle.jar

\\libext\\CacheDB.jar

\\libext\\SQLBaseJDBC.jar \\libext\\activation.jar \\libext\\db2jcc.jar

\\libext\\db2jcc_license_c.jar \\libext\\edtftpj-1.4.5.jar \\libext\\firebirdsql-full.jar \\libext\\firebirdsql.jar \\libext\\gis-shape.jar \\libext\\hsqldb.jar \\libext\\ifxjdbc.jar \\libext\\javadbf.jar \\libext\\jconn2.jar \\libext\\js.jar \\libext\\jt400.jar \\libext\\jtds-1.1.jar

\\libext\\jxl.jar \\libext\\ktable.jar \\libext\\log4j-1.2.8.jar \\libext\\mail.jar

\\libext\\mysql-connector-java-3.1.7-bin.jar \\libext\\ojdbc14.jar \\libext\\orai18n.jar

\\libext\\pg74.215.jdbc3.jar \\libext\\edbc.jar

(注意 :下面这个包被遗漏了,要加上。原始位置kettle根目录\\libswt\\win32\\swt.jar)

\\libswt\\win32\\swt.jar

六、 编译成功后,准备运行

为使程序不必登陆就可以运行,需要设置环境署文件:kettle.properties,位置在用户目录里,一般在 \\Documents and Settings\\用户\\.kettle\\,主要内容如下:

KETTLE_REPOSITORY=kettle@m80

KETTLE_USER=admin

KETTLE_PASSWORD=passwd

七、 好了,现在可以运行一下了,看看数据是不是已经拷贝到目标表了。

以下为修改后的程序源码:

++++++++++++++++++++++++++++++++

package name.chq.test;

import java.io.DataOutputStream;

import java.io.File;

import java.io.FileOutputStream;

import be.ibridge.kettle.core.Const;

import be.ibridge.kettle.core.LogWriter;

import be.ibridge.kettle.core.NotePadMeta;

import be.ibridge.kettle.core.database.Database;

import be.ibridge.kettle.core.database.DatabaseMeta;

import be.ibridge.kettle.core.exception.KettleException;

import be.ibridge.kettle.core.util.EnvUtil;

import be.ibridge.kettle.trans.StepLoader;

import be.ibridge.kettle.trans.Trans;

import be.ibridge.kettle.trans.TransHopMeta;

import be.ibridge.kettle.trans.TransMeta;

import be.ibridge.kettle.trans.step.StepMeta;

import be.ibridge.kettle.trans.step.StepMetaInterface;

import be.ibridge.kettle.trans.step.selectvalues.SelectValuesMeta;

import be.ibridge.kettle.trans.step.tableinput.TableInputMeta;

import be.ibridge.kettle.trans.step.tableoutput.TableOutputMeta;

//这个包被遗漏了,原始位置kettle根目录\\libswt\\win32\\swt.jar

//add by chq([link=chq.name]chq.name[/link]) on 20XX.07.20

//import org.eclipse.swt.dnd.Transfer;

/**

* Class created to demonstrate the creation of transformations on-the-fly. *

* @author Matt * */

public class TransBuilder {

public static final String[] databasesXML = {

\"\" +

\"\" +

\"target\" +

\"192.168.17.35\" +

\"ORACLE\" +

\"Native\" +

\"test1\" +

\"1521\" +

\"testuser\" +

\"pwd\" +

\"\" +

\"\" +

\"\" +

\"\" +

\"EXTRA_OPTION_MYSQL.defaultFetchSize500\" +

\"EXTRA_OPTION_MYSQL.useCursorFetchtrue\" +

\"PORT_NUMBER1521\" +

\"\" +

\"\" ,

\"\" +

\"\" +

\"source\" +

\"192.168.16.12\" +

\"ORACLE\" +

\"Native\" +

\"test2\" +

\"1521\" +

\"testuser\" +

\"pwd2\" +

\"\" +

\"\" +

\"\" +

\"\" +

\"EXTRA_OPTION_MYSQL.defaultFetchSize500\" +

\"EXTRA_OPTION_MYSQL.useCursorFetchtrue\" +

\"PORT_NUMBER1521\" +

\"\" +

\"\"

}; //

// create the source step...

//

String fromstepname = \"read from [\" + sourceTableName + \"]\";

TableInputMeta tii = new TableInputMeta();

tii.setDatabaseMeta(sourceDBInfo);

String selectSQL = \"SELECT \"+Const.CR;

for (int i=0;i{

/* modi by chq(chq.name): use * to replace the fields,经分析,以下语句可以处理‘*‘ */

if (i>0)

selectSQL+=\

else selectSQL+=\" \";

selectSQL+=sourceFields[i]+Const.CR;

}

selectSQL+=\"FROM \"+sourceTableName;

tii.setSQL(selectSQL);

StepLoader steploader = StepLoader.getInstance();

String fromstepid = steploader.getStepPluginID(tii);

StepMeta fromstep = new StepMeta(log, fromstepid, fromstepname, (StepMetaInterface) tii);

fromstep.setLocation(150, 100);

fromstep.setDraw(true);

fromstep.setDescription(\"Reads information from table [\" + sourceTableName

+ \"] on database [\" + sourceDBInfo + \"]\");

transMeta.addStep(fromstep);

//

// add logic to rename fields

// Use metadata logic in SelectValues, use SelectValueInfo...

//

/* 不必改名或映射 add by chq(chq.name) on 20XX.07.20

SelectValuesMeta svi = new SelectValuesMeta();

svi.allocate(0, 0, sourceFields.length);

for (int i = 0; i < sourceFields.length; i++)

{

svi.getMetaName()[i] = sourceFields[i];

svi.getMetaRename()[i] = targetFields[i];

}

String selstepname = \"Rename field names\";

String selstepid = steploader.getStepPluginID(svi);

StepMeta selstep = new StepMeta(log, selstepid, selstepname, (StepMetaInterface) svi);

selstep.setLocation(350, 100);

selstep.setDraw(true);

selstep.setDescription(\"Rename field names\");

transMeta.addStep(selstep);

TransHopMeta shi = new TransHopMeta(fromstep, selstep);

transMeta.addTransHop(shi);

fromstep = selstep; //设定了新的起点 by chq([link=chq.name]chq.name[/link]) on

20XX.07.20

*/

//

// Create the target step...

//

//

// Add the TableOutputMeta step...

//

String tostepname = \"write to [\" + targetTableName + \"]\";

TableOutputMeta toi = new TableOutputMeta();

toi.setDatabase(targetDBInfo);

toi.setTablename(targetTableName);

toi.setmitSize(200);

toi.setTruncateTable(true);

String tostepid = steploader.getStepPluginID(toi);

StepMeta tostep = new StepMeta(log, tostepid, tostepname, (StepMetaInterface) toi);

tostep.setLocation(550, 100);

tostep.setDraw(true);

tostep.setDescription(\"Write information to table [\" + targetTableName + \"] on database [\" + targetDBInfo + \"]\");

transMeta.addStep(tostep);

//

// Add a hop between the two steps...

//

TransHopMeta hi = new TransHopMeta(fromstep, tostep);

transMeta.addTransHop(hi);

// OK, if we're still here: overwrite the current transformation...

return transMeta;

}

catch (Exception e)

{

throw new KettleException(\"An unexpected error occurred creating the new transformation\

}

}

/**

* 1) create a new transformation

* 2) save the transformation as XML file

* 3) generate the SQL for the target table

* 4) Execute the transformation

* 5) drop the target table to make this program repeatable

*

* @param args

*/

public static void main(String[] args) throws Exception

{

EnvUtil.environmentInit();

// Init the logging...

LogWriter log = LogWriter.getInstance(\"TransBuilder.logrue, LogWriter.LOG_LEVEL_DETAILED);

// Load the Kettle steps & plugins

StepLoader stloader = StepLoader.getInstance();

if (!stloader.read())

{

log.logError(\"TransBuilder\ \"Error loading Kettle steps & plugins... stopping now!\");

return;

}

// The parameters we want, optionally this can be

String fileName = \"NewTrans.xml\";

String transformationName = \"Test Transformation\";

String sourceDatabaseName = \"source\";

String sourceTableName = \"testuser.source_table\";

String sourceFields[] = {

\"*\"

};

String targetDatabaseName = \"target\";

String targetTableName = \"testuser.target_table\";

String targetFields[] = {

\"*\"

};

// Generate the transformation.

TransMeta transMeta = TransBuilder.buildCopyTable(

transformationName,

sourceDatabaseName,

sourceTableName,

sourceFields,

targetDatabaseName,

targetTableName,

targetFields

);

// Save it as a file:

String xml = transMeta.getXML();

DataOutputStream dos = new DataOutputStream(new FileOutputStream(new File(fileName)));

dos.write(xml.getBytes(\"UTF-8\"));

dos.close();

System.out.println(\"Saved transformation to file: \"+fileName);

// OK, What's the SQL we need to execute to generate the target table?

String sql = transMeta.getSQLStatementsString();

// Execute the SQL on the target table:

Database targetDatabase = Database(transMeta.findDatabase(targetDatabaseName));

targetDatabase.connect();

targetDatabase.execStatements(sql);

// Now execute the transformation...

Trans trans = new Trans(log, transMeta);

trans.execute(null);

trans.waitUntilFinished();

// For testing/repeatability, we drop the target table again

/* modi by chq([link=chq.name]chq.name[/link]) on 20XX.07.20 不必删表

//targetDatabase.execStatement(\"drop table \"+targetTableName);

new

targetDatabase.disconnect();

} }

因篇幅问题不能全部显示,请点此查看更多更全内容