|
前言: 为什么要用kettle和kettle java api? kettle是什么?kettle:是一个开源etl工具。kettle提供了基于java的图形化界面,使用很方便,kettle的etl工具集合也比较多,常用的etl工具都包含了。 为什么使用kettle java api:就像kettle文档所说:kettle java api : program your own kettle transformation,kettle提供了基于java的脚步编写功能,可以灵活地自定义etl过程,使自行定制、批量处理等成为可能,这才是一个程序员需要做的工作,而不仅是象使用word一样操作kettle用户界面。 kettle java api 实战操作记录: 一、 搭建环境 :到http://www.kettle.be网站下载kettle的源码包,加压缩,例如解压缩到d:/kettle目录 二、 打开eclipse,新建一个项目,要使用jdk1.5.0,因为kettle的要使用system.getenv(),只有在jdk1.5.0才被支持。提起getenv(),好像有一段几起几落的记录,曾一度被抛弃,现在又被jdk1.5支持了。 三、 建一个class : transbuilder.java,可以把d:/kettle/ extra/transbuilder.java的内容原样拷贝到你的transbuilder.java里。 四、 根据需要编辑源码。并需要对原程序进行如下修改,在头部增加: import org.eclipse.swt.dnd.transfer; //这个包被遗漏了,原始位置kettle根目录 /libswt/win32/swt.jar //add by chq(www.chq.name) on 2006.07.20 (后来发现,不必加这个引用,因为编译时不需要) 五、 编译准备,在eclipse中增加jar包,主要包括(主要依据extra/transbuilder.bat): /lib/kettle.jar /libext/cachedb.jar /libext/sqlbasejdbc.jar /libext/activation.jar /libext/db2jcc.jar /libext/db2jcc_license_c.jar /libext/edtftpj-1.4.5.jar /libext/firebirdsql-full.jar /libext/firebirdsql.jar /libext/gis-shape.jar /libext/hsqldb.jar /libext/ifxjdbc.jar /libext/javadbf.jar /libext/jconn2.jar /libext/js.jar /libext/jt400.jar /libext/jtds-1.1.jar /libext/jxl.jar /libext/ktable.jar /libext/log4j-1.2.8.jar /libext/mail.jar /libext/mysql-connector-java-3.1.7-bin.jar /libext/ojdbc14.jar /libext/orai18n.jar /libext/pg74.215.jdbc3.jar /libext/edbc.jar (注意 :下面这个包被遗漏了,要加上。原始位置kettle根目录/libswt/win32/swt.jar) /libswt/win32/swt.jar 六、 编译成功后,准备运行 为使程序不必登陆就可以运行,需要设置环境署文件:kettle.properties,位置在用户目录里,一般在 /documents and settings/用户/.kettle/,主要内容如下: kettle_repository=kettle@m80 kettle_user=admin kettle_password=passwd
七、 好了,现在可以运行一下了,看看数据是不是已经拷贝到目标表了。 以下为修改后的程序源码: ++++++++++++++++++++++++++++++++ package name.chq.test;
import java.io.dataoutputstream; import java.io.file; import java.io.fileoutputstream; import be.ibridge.kettle.core.const; import be.ibridge.kettle.core.logwriter; import be.ibridge.kettle.core.notepadmeta; import be.ibridge.kettle.core.database.database; import be.ibridge.kettle.core.database.databasemeta; import be.ibridge.kettle.core.exception.kettleexception; import be.ibridge.kettle.core.util.envutil; import be.ibridge.kettle.trans.steploader; import be.ibridge.kettle.trans.trans; import be.ibridge.kettle.trans.transhopmeta; import be.ibridge.kettle.trans.transmeta; import be.ibridge.kettle.trans.step.stepmeta; import be.ibridge.kettle.trans.step.stepmetainterface; import be.ibridge.kettle.trans.step.selectvalues.selectvaluesmeta; import be.ibridge.kettle.trans.step.tableinput.tableinputmeta; import be.ibridge.kettle.trans.step.tableoutput.tableoutputmeta; //这个包被遗漏了,原始位置kettle根目录/libswt/win32/swt.jar //add by chq([link=http://www.chq.name]www.chq.name[/link]) on 2006.07.20 //import org.eclipse.swt.dnd.transfer; /** * class created to demonstrate the creation of transformations on-the-fly. * * @author matt * */
public class transbuilder { public static final string[] databasesxml = { "<?xml version=/"1.0/" encoding=/"utf-8/"?>" + "<connection>" + "<name>target</name>" + "<server>192.168.17.35</server>" + "<type>oracle</type>" + "<access>native</access>" + "<database>test1</database>" + "<port>1521</port>" + "<username>testuser</username>" + "<password>pwd</password>" + "<servername/>" + "<data_tablespace/>" + "<index_tablespace/>" + "<attributes>" + "<attribute><code>extra_option_mysql.defaultfetchsize</code><attribute>500</attribute></attribute>" + "<attribute><code>extra_option_mysql.usecursorfetch</code><attribute>true</attribute></attribute>" + "<attribute><code>port_number</code><attribute>1521</attribute></attribute>" + "</attributes>" + "</connection>" , "<?xml version=/"1.0/" encoding=/"utf-8/"?>" + "<connection>" + "<name>source</name>" + "<server>192.168.16.12</server>" + "<type>oracle</type>" + "<access>native</access>" + "<database>test2</database>" + "<port>1521</port>" + "<username>testuser</username>" + "<password>pwd2</password>" + "<servername/>" + "<data_tablespace/>" + "<index_tablespace/>" + "<attributes>" + "<attribute><code>extra_option_mysql.defaultfetchsize</code><attribute>500</attribute></attribute>" + "<attribute><code>extra_option_mysql.usecursorfetch</code><attribute>true</attribute></attribute>" + "<attribute><code>port_number</code><attribute>1521</attribute></attribute>" + "</attributes>" + "</connection>" };
/** * creates a new transformation using input parameters such as the tablename to read from. * @param transformationname the name of the transformation * @param sourcedatabasename the name of the database to read from * @param sourcetablename the name of the table to read from * @param sourcefields the field names we want to read from the source table * @param targetdatabasename the name of the target database * @param targettablename the name of the target table we want to write to * @param targetfields the names of the fields in the target table (same number of fields as sourcefields) * @return a new transformation * @throws kettleexception in the rare case something goes wrong */ public static final transmeta buildcopytable( string transformationname,string sourcedatabasename, string sourcetablename, string[] sourcefields, string targetdatabasename, string targettablename, string[] targetfields) throws kettleexception { logwriter log = logwriter.getinstance(); envutil.environmentinit(); try { // // create a new transformation... // transmeta transmeta = new transmeta(); transmeta.setname(transformationname); // add the database connections for (int i=0;i<databasesxml.length;i++) { databasemeta databasemeta = new databasemeta(databasesxml[i]); transmeta.adddatabase(databasemeta); } databasemeta sourcedbinfo = transmeta.finddatabase(sourcedatabasename); databasemeta targetdbinfo = transmeta.finddatabase(targetdatabasename); // // add a note // string note = "reads information from table [" + sourcetablename+ "] on database [" + sourcedbinfo + "]" + const.cr; note += "after that, it writes the information to table [" + targettablename + "] on database [" + targetdbinfo + "]"; notepadmeta ni = new notepadmeta(note, 150, 10, -1, -1); transmeta.addnote(ni);
// // create the source step... // string fromstepname = "read from [" + sourcetablename + "]"; tableinputmeta tii = new tableinputmeta(); tii.setdatabasemeta(sourcedbinfo); string selectsql = "select "+const.cr; for (int i=0;i<sourcefields.length;i++) { /* modi by chq(www.chq.name): use * to replace the fields,经分析,以下语句可以处理‘*‘ */ if (i>0) selectsql+=", "; else selectsql+=" "; selectsql+=sourcefields[i]+const.cr; } selectsql+="from "+sourcetablename; tii.setsql(selectsql); steploader steploader = steploader.getinstance(); string fromstepid = steploader.getsteppluginid(tii); stepmeta fromstep = new stepmeta(log, fromstepid, fromstepname, (stepmetainterface) tii); fromstep.setlocation(150, 100); fromstep.setdraw(true); fromstep.setdescription("reads information from table [" + sourcetablename + "] on database [" + sourcedbinfo + "]"); transmeta.addstep(fromstep); // // add logic to rename fields // use metadata logic in selectvalues, use selectvalueinfo... // /* 不必改名或映射 add by chq(www.chq.name) on 2006.07.20 selectvaluesmeta svi = new selectvaluesmeta(); svi.allocate(0, 0, sourcefields.length); for (int i = 0; i < sourcefields.length; i++) { svi.getmetaname()[i] = sourcefields[i]; svi.getmetarename()[i] = targetfields[i]; } string selstepname = "rename field names"; string selstepid = steploader.getsteppluginid(svi); stepmeta selstep = new stepmeta(log, selstepid, selstepname, (stepmetainterface) svi); selstep.setlocation(350, 100); selstep.setdraw(true); selstep.setdescription("rename field names"); transmeta.addstep(selstep); transhopmeta shi = new transhopmeta(fromstep, selstep); transmeta.addtranshop(shi); fromstep = selstep; //设定了新的起点 by chq([link=http://www.chq.name]www.chq.name[/link]) on 2006.07.20 */ // // create the target step... // // // add the tableoutputmeta step... // string tostepname = "write to [" + targettablename + "]"; tableoutputmeta toi = new tableoutputmeta(); toi.setdatabase(targetdbinfo); toi.settablename(targettablename); toi.setcommitsize(200); toi.settruncatetable(true); string tostepid = steploader.getsteppluginid(toi); stepmeta tostep = new stepmeta(log, tostepid, tostepname, (stepmetainterface) toi); tostep.setlocation(550, 100); tostep.setdraw(true); tostep.setdescription("write information to table [" + targettablename + "] on database [" + targetdbinfo + "]"); transmeta.addstep(tostep); // // add a hop between the two steps... // transhopmeta hi = new transhopmeta(fromstep, tostep); transmeta.addtranshop(hi);
// ok, if we're still here: overwrite the current transformation... return transmeta; } catch (exception e) { throw new kettleexception("an unexpected error occurred creating the new transformation", e); } } /** * 1) create a new transformation * 2) save the transformation as xml file * 3) generate the sql for the target table * 4) execute the transformation * 5) drop the target table to make this program repeatable * * @param args */ public static void main(string[] args) throws exception { envutil.environmentinit(); // init the logging... logwriter log = logwriter.getinstance("transbuilder.log", true, logwriter.log_level_detailed); // load the kettle steps & plugins steploader stloader = steploader.getinstance(); if (!stloader.read()) { log.logerror("transbuilder", "error loading kettle steps & plugins... stopping now!"); return; } // the parameters we want, optionally this can be string filename = "newtrans.xml"; string transformationname = "test transformation"; string sourcedatabasename = "source"; string sourcetablename = "testuser.source_table"; string sourcefields[] = { "*" }; string targetdatabasename = "target"; string targettablename = "testuser.target_table"; string targetfields[] = { "*" };
// generate the transformation. transmeta transmeta = transbuilder.buildcopytable( transformationname, sourcedatabasename, sourcetablename, sourcefields, targetdatabasename, targettablename, targetfields ); // save it as a file: string xml = transmeta.getxml(); dataoutputstream dos = new dataoutputstream(new fileoutputstream(new file(filename))); dos.write(xml.getbytes("utf-8")); dos.close(); system.out.println("saved transformation to file: "+filename); // ok, what's the sql we need to execute to generate the target table? string sql = transmeta.getsqlstatementsstring(); // execute the sql on the target table: database targetdatabase = new database(transmeta.finddatabase(targetdatabasename)); targetdatabase.connect(); targetdatabase.execstatements(sql); // now execute the transformation... trans trans = new trans(log, transmeta); trans.execute(null); trans.waituntilfinished(); // for testing/repeatability, we drop the target table again /* modi by chq([link=http://www.chq.name]www.chq.name[/link]) on 2006.07.20 不必删表 //targetdatabase.execstatement("drop table "+targettablename); targetdatabase.disconnect(); } }
|