开发技术:KETTLE+JAVA+API+开发实战记录
为什么要用Kettle和KETTLE JAVA API?
Kettle是什么?kettle:是一个开源ETL工具。kettle提供了基于java的图形化界面,使用很方便,kettle的ETL工具集合也比较多,常用的ETL工具都包含了。
为什么使用KETTLE JAVA API:就像kettle文档所说:KETTLE JAVA API : Program your own Kettle transformation,kettle提供了基于JAVA的脚步编写功能,可以灵活地自定义ETL过程,使自行定制、批量处理等成为可能,这才是一个程序员需要做的工作,而不仅是象使用word一样操作kettle用户界面。
KETTLE JAVA API 实战操作记录:
一、 搭建环境 :到kettle.be网站下载kettle的源码包,加压缩,例如解压缩到d:\\kettle目录
二、 打开eclipse,新建一个项目,要使用jdk1.5.0,因为kettle的要使用System.getenv(),只有在jdk1.5.0才被支持。提起getenv(),好像有一段几起几落的记录,曾一度被抛弃,现在又被jdk1.5支持了。
三、 建一个class : TransBuilder.java,可以把d:\\kettle\\ extra\\TransBuilder.java的内容原样拷贝到你的TransBuilder.java里。
四、 根据需要编辑源码。并需要对原程序进行如下修改,在头部增加:
import org.eclipse.swt.dnd.Transfer;
//这个包被遗漏了,原始位置kettle根目录 \\libswt\\win32\\swt.jar
//add by chq(chq.name) on 20XX.07.20
(后来发现,不必加这个引用,因为编译时不需要)
五、 编译准备,在eclipse中增加jar包,主要包括(主要依据extra\\TransBuilder.bat):
\\lib\\kettle.jar
\\libext\\CacheDB.jar
\\libext\\SQLBaseJDBC.jar \\libext\\activation.jar \\libext\\db2jcc.jar
\\libext\\db2jcc_license_c.jar \\libext\\edtftpj-1.4.5.jar
\\libext\\firebirdsql-full.jar \\libext\\firebirdsql.jar \\libext\\gis-shape.jar \\libext\\hsqldb.jar \\libext\\ifxjdbc.jar \\libext\\javadbf.jar \\libext\\jconn2.jar \\libext\\js.jar \\libext\\jt400.jar \\libext\\jtds-1.1.jar \\libext\\jxl.jar \\libext\\ktable.jar \\libext\\log4j-1.2.8.jar \\libext\\mail.jar
\\libext\\mysql-connector-java-3.1.7-bin.jar \\libext\\ojdbc14.jar \\libext\\orai18n.jar
\\libext\\pg74.215.jdbc3.jar \\libext\\edbc.jar
(注意 :下面这个包被遗漏了,要加上。原始位置kettle根目录\\libswt\\win32\\swt.jar)
\\libswt\\win32\\swt.jar
六、 编译成功后,准备运行
为使程序不必登陆就可以运行,需要设置环境署文件:kettle.properties,位置在用户目录里,一般在 \\Documents and Settings\\用户\\.kettle\\,主要内容如下:
KETTLE_REPOSITORY=kettle@m80
KETTLE_USER=admin
KETTLE_PASSWORD=passwd
七、 好了,现在可以运行一下了,看看数据是不是已经拷贝到目标表了。
以下为修改后的程序源码:
++++++++++++++++++++++++++++++++
package name.chq.test;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import be.ibridge.kettle.core.Const;
import be.ibridge.kettle.core.LogWriter;
import be.ibridge.kettle.core.NotePadMeta;
import be.ibridge.kettle.core.database.Database;
import be.ibridge.kettle.core.database.DatabaseMeta;
import be.ibridge.kettle.core.exception.KettleException;
import be.ibridge.kettle.core.util.EnvUtil;
import be.ibridge.kettle.trans.StepLoader;
import be.ibridge.kettle.trans.Trans;
import be.ibridge.kettle.trans.TransHopMeta;
import be.ibridge.kettle.trans.TransMeta;
import be.ibridge.kettle.trans.step.StepMeta;
import be.ibridge.kettle.trans.step.StepMetaInterface;
import be.ibridge.kettle.trans.step.selectvalues.SelectValuesMeta;
import be.ibridge.kettle.trans.step.tableinput.TableInputMeta;
import be.ibridge.kettle.trans.step.tableoutput.TableOutputMeta;
//这个包被遗漏了,原始位置kettle根目录\\libswt\\win32\\swt.jar
//add by chq([link=chq.name]chq.name[/link]) on 20XX.07.20
//import org.eclipse.swt.dnd.Transfer; /**
* Class created to demonstrate the creation of transformations on-the-fly. *
* @author Matt * */
public class TransBuilder {
public static final String[] databasesXML = {
\"\" +
\" \" \" \" \" \" \" \" \" \" \" \" \" \" \" \" \" \"EXTRA_OPTION_MYSQL.defaultFetchSize
EXTRA_OPTION_MYSQL.useCursorFetch
PORT_NUMBER
\"\" +
\" \" \" \" \" \" \" \" \" \" \" \" \" \" \" \" \" \"EXTRA_OPTION_MYSQL.defaultFetchSize
EXTRA_OPTION_MYSQL.useCursorFetch
PORT_NUMBER
};
前言:
为什么要用Kettle和KETTLE JAVA API?
Kettle是什么?kettle:是一个开源ETL工具。kettle提供了基于java的图形化界面,使用很方便,kettle的ETL工具集合也比较多,常用的ETL工具都包含了。
为什么使用KETTLE JAVA API:就像kettle文档所说:KETTLE JAVA API : Program your own Kettle transformation,kettle提供了基于JAVA的脚步编写功能,可以灵活地自定义ETL过程,使自行定制、批量处理等成为可能,这才是一个程序员需要做的工作,而不仅
是象使用word一样操作kettle用户界面。
KETTLE JAVA API 实战操作记录:
一、 搭建环境 :到kettle.be网站下载kettle的源码包,加压缩,例如解压缩到d:\\kettle目录
二、 打开eclipse,新建一个项目,要使用jdk1.5.0,因为kettle的要使用System.getenv(),只有在jdk1.5.0才被支持。提起getenv(),好像有一段几起几落的记录,曾一度被抛弃,现在又被jdk1.5支持了。
三、 建一个class : TransBuilder.java,可以把d:\\kettle\\ extra\\TransBuilder.java的内容原样拷贝到你的TransBuilder.java里。
四、 根据需要编辑源码。并需要对原程序进行如下修改,在头部增加:
import org.eclipse.swt.dnd.Transfer;
//这个包被遗漏了,原始位置kettle根目录 \\libswt\\win32\\swt.jar
//add by chq(chq.name) on 20XX.07.20
(后来发现,不必加这个引用,因为编译时不需要)
五、 编译准备,在eclipse中增加jar包,主要包括(主要依据extra\\TransBuilder.bat):
\\lib\\kettle.jar
\\libext\\CacheDB.jar
\\libext\\SQLBaseJDBC.jar \\libext\\activation.jar \\libext\\db2jcc.jar
\\libext\\db2jcc_license_c.jar \\libext\\edtftpj-1.4.5.jar \\libext\\firebirdsql-full.jar \\libext\\firebirdsql.jar \\libext\\gis-shape.jar \\libext\\hsqldb.jar \\libext\\ifxjdbc.jar \\libext\\javadbf.jar \\libext\\jconn2.jar \\libext\\js.jar \\libext\\jt400.jar \\libext\\jtds-1.1.jar
\\libext\\jxl.jar \\libext\\ktable.jar \\libext\\log4j-1.2.8.jar \\libext\\mail.jar
\\libext\\mysql-connector-java-3.1.7-bin.jar \\libext\\ojdbc14.jar \\libext\\orai18n.jar
\\libext\\pg74.215.jdbc3.jar \\libext\\edbc.jar
(注意 :下面这个包被遗漏了,要加上。原始位置kettle根目录\\libswt\\win32\\swt.jar)
\\libswt\\win32\\swt.jar
六、 编译成功后,准备运行
为使程序不必登陆就可以运行,需要设置环境署文件:kettle.properties,位置在用户目录里,一般在 \\Documents and Settings\\用户\\.kettle\\,主要内容如下:
KETTLE_REPOSITORY=kettle@m80
KETTLE_USER=admin
KETTLE_PASSWORD=passwd
七、 好了,现在可以运行一下了,看看数据是不是已经拷贝到目标表了。
以下为修改后的程序源码:
++++++++++++++++++++++++++++++++
package name.chq.test;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import be.ibridge.kettle.core.Const;
import be.ibridge.kettle.core.LogWriter;
import be.ibridge.kettle.core.NotePadMeta;
import be.ibridge.kettle.core.database.Database;
import be.ibridge.kettle.core.database.DatabaseMeta;
import be.ibridge.kettle.core.exception.KettleException;
import be.ibridge.kettle.core.util.EnvUtil;
import be.ibridge.kettle.trans.StepLoader;
import be.ibridge.kettle.trans.Trans;
import be.ibridge.kettle.trans.TransHopMeta;
import be.ibridge.kettle.trans.TransMeta;
import be.ibridge.kettle.trans.step.StepMeta;
import be.ibridge.kettle.trans.step.StepMetaInterface;
import be.ibridge.kettle.trans.step.selectvalues.SelectValuesMeta;
import be.ibridge.kettle.trans.step.tableinput.TableInputMeta;
import be.ibridge.kettle.trans.step.tableoutput.TableOutputMeta;
//这个包被遗漏了,原始位置kettle根目录\\libswt\\win32\\swt.jar
//add by chq([link=chq.name]chq.name[/link]) on 20XX.07.20
//import org.eclipse.swt.dnd.Transfer;
/**
* Class created to demonstrate the creation of transformations on-the-fly. *
* @author Matt * */
public class TransBuilder {
public static final String[] databasesXML = {
\"\" +
\" \" \" \" \" \" \" \" \" \" \" \" \" \" \" \" \" \"EXTRA_OPTION_MYSQL.defaultFetchSize
EXTRA_OPTION_MYSQL.useCursorFetch
PORT_NUMBER
\"\" +
\" \" \" \" \" \" \" \" \" \" \" \" \" \" \" \" \" \"EXTRA_OPTION_MYSQL.defaultFetchSize
EXTRA_OPTION_MYSQL.useCursorFetch
PORT_NUMBER
}; //
// create the source step...
//
String fromstepname = \"read from [\" + sourceTableName + \"]\";
TableInputMeta tii = new TableInputMeta();
tii.setDatabaseMeta(sourceDBInfo);
String selectSQL = \"SELECT \"+Const.CR;
for (int i=0;i /* modi by chq(chq.name): use * to replace the fields,经分析,以下语句可以处理‘*‘ */ if (i>0) selectSQL+=\ else selectSQL+=\" \"; selectSQL+=sourceFields[i]+Const.CR; } selectSQL+=\"FROM \"+sourceTableName; tii.setSQL(selectSQL); StepLoader steploader = StepLoader.getInstance(); String fromstepid = steploader.getStepPluginID(tii); StepMeta fromstep = new StepMeta(log, fromstepid, fromstepname, (StepMetaInterface) tii); fromstep.setLocation(150, 100); fromstep.setDraw(true); fromstep.setDescription(\"Reads information from table [\" + sourceTableName + \"] on database [\" + sourceDBInfo + \"]\"); transMeta.addStep(fromstep); // // add logic to rename fields // Use metadata logic in SelectValues, use SelectValueInfo... // /* 不必改名或映射 add by chq(chq.name) on 20XX.07.20 SelectValuesMeta svi = new SelectValuesMeta(); svi.allocate(0, 0, sourceFields.length); for (int i = 0; i < sourceFields.length; i++) { svi.getMetaName()[i] = sourceFields[i]; svi.getMetaRename()[i] = targetFields[i]; } String selstepname = \"Rename field names\"; String selstepid = steploader.getStepPluginID(svi); StepMeta selstep = new StepMeta(log, selstepid, selstepname, (StepMetaInterface) svi); selstep.setLocation(350, 100); selstep.setDraw(true); selstep.setDescription(\"Rename field names\"); transMeta.addStep(selstep); TransHopMeta shi = new TransHopMeta(fromstep, selstep); transMeta.addTransHop(shi); fromstep = selstep; //设定了新的起点 by chq([link=chq.name]chq.name[/link]) on 20XX.07.20 */ // // Create the target step... // // // Add the TableOutputMeta step... // String tostepname = \"write to [\" + targetTableName + \"]\"; TableOutputMeta toi = new TableOutputMeta(); toi.setDatabase(targetDBInfo); toi.setTablename(targetTableName); toi.setmitSize(200); toi.setTruncateTable(true); String tostepid = steploader.getStepPluginID(toi); StepMeta tostep = new StepMeta(log, tostepid, tostepname, (StepMetaInterface) toi); tostep.setLocation(550, 100); tostep.setDraw(true); tostep.setDescription(\"Write information to table [\" + targetTableName + \"] on database [\" + targetDBInfo + \"]\"); transMeta.addStep(tostep); // // Add a hop between the two steps... // TransHopMeta hi = new TransHopMeta(fromstep, tostep); transMeta.addTransHop(hi); // OK, if we're still here: overwrite the current transformation... return transMeta; } catch (Exception e) { throw new KettleException(\"An unexpected error occurred creating the new transformation\ } } /** * 1) create a new transformation * 2) save the transformation as XML file * 3) generate the SQL for the target table * 4) Execute the transformation * 5) drop the target table to make this program repeatable * * @param args */ public static void main(String[] args) throws Exception { EnvUtil.environmentInit(); // Init the logging... LogWriter log = LogWriter.getInstance(\"TransBuilder.logrue, LogWriter.LOG_LEVEL_DETAILED); // Load the Kettle steps & plugins StepLoader stloader = StepLoader.getInstance(); if (!stloader.read()) { log.logError(\"TransBuilder\ \"Error loading Kettle steps & plugins... stopping now!\"); return; } // The parameters we want, optionally this can be String fileName = \"NewTrans.xml\"; String transformationName = \"Test Transformation\"; String sourceDatabaseName = \"source\"; String sourceTableName = \"testuser.source_table\"; String sourceFields[] = { \"*\" }; String targetDatabaseName = \"target\"; String targetTableName = \"testuser.target_table\"; String targetFields[] = { \"*\" }; // Generate the transformation. TransMeta transMeta = TransBuilder.buildCopyTable( transformationName, sourceDatabaseName, sourceTableName, sourceFields, targetDatabaseName, targetTableName, targetFields ); // Save it as a file: String xml = transMeta.getXML(); DataOutputStream dos = new DataOutputStream(new FileOutputStream(new File(fileName))); dos.write(xml.getBytes(\"UTF-8\")); dos.close(); System.out.println(\"Saved transformation to file: \"+fileName); // OK, What's the SQL we need to execute to generate the target table? String sql = transMeta.getSQLStatementsString(); // Execute the SQL on the target table: Database targetDatabase = Database(transMeta.findDatabase(targetDatabaseName)); targetDatabase.connect(); targetDatabase.execStatements(sql); // Now execute the transformation... Trans trans = new Trans(log, transMeta); trans.execute(null); trans.waitUntilFinished(); // For testing/repeatability, we drop the target table again /* modi by chq([link=chq.name]chq.name[/link]) on 20XX.07.20 不必删表 //targetDatabase.execStatement(\"drop table \"+targetTableName); new targetDatabase.disconnect(); } } 因篇幅问题不能全部显示,请点此查看更多更全内容