网站首页
JSP空间
动态资讯
开源项目
技术文档
资源下载
J2EE资源
客户论坛
在线支付
 
  技术文档>>JAVA>>新手入门>>基础入门>查看文档  
  开发技术:kettle java api 开发实战记录     
  文章作者:未知  文章来源:水木森林  
  查看:89次  录入:管理员--2007-11-17  
 

  前言:

  为什么要用kettle和kettle java api?

  kettle是什么?kettle:是一个开源etl工具。kettle提供了基于java的图形化界面,使用很方便,kettle的etl工具集合也比较多,常用的etl工具都包含了。

  为什么使用kettle java api:就像kettle文档所说:kettle java api : program your own kettle transformation,kettle提供了基于java的脚步编写功能,可以灵活地自定义etl过程,使自行定制、批量处理等成为可能,这才是一个程序员需要做的工作,而不仅是象使用word一样操作kettle用户界面。

  kettle java api 实战操作记录:

  一、         搭建环境 :到http://www.kettle.be网站下载kettle的源码包,加压缩,例如解压缩到d:/kettle目录

  二、         打开eclipse,新建一个项目,要使用jdk1.5.0,因为kettle的要使用system.getenv(),只有在jdk1.5.0才被支持。提起getenv(),好像有一段几起几落的记录,曾一度被抛弃,现在又被jdk1.5支持了。

  三、         建一个class : transbuilder.java,可以把d:/kettle/ extra/transbuilder.java的内容原样拷贝到你的transbuilder.java里。

  四、         根据需要编辑源码。并需要对原程序进行如下修改,在头部增加:

  import org.eclipse.swt.dnd.transfer;

  //这个包被遗漏了,原始位置kettle根目录  /libswt/win32/swt.jar

  //add by chq(www.chq.name) on  2006.07.20

  (后来发现,不必加这个引用,因为编译时不需要)

  五、         编译准备,在eclipse中增加jar包,主要包括(主要依据extra/transbuilder.bat):

  /lib/kettle.jar
  /libext/cachedb.jar
  /libext/sqlbasejdbc.jar
  /libext/activation.jar
  /libext/db2jcc.jar
  /libext/db2jcc_license_c.jar
  /libext/edtftpj-1.4.5.jar
  /libext/firebirdsql-full.jar
  /libext/firebirdsql.jar
  /libext/gis-shape.jar
  /libext/hsqldb.jar
  /libext/ifxjdbc.jar
  /libext/javadbf.jar
  /libext/jconn2.jar
  /libext/js.jar
  /libext/jt400.jar
  /libext/jtds-1.1.jar
  /libext/jxl.jar
  /libext/ktable.jar
  /libext/log4j-1.2.8.jar
  /libext/mail.jar
  /libext/mysql-connector-java-3.1.7-bin.jar
  /libext/ojdbc14.jar
  /libext/orai18n.jar
  /libext/pg74.215.jdbc3.jar
  /libext/edbc.jar

  (注意 :下面这个包被遗漏了,要加上。原始位置kettle根目录/libswt/win32/swt.jar)

  /libswt/win32/swt.jar

  六、         编译成功后,准备运行

  为使程序不必登陆就可以运行,需要设置环境署文件:kettle.properties,位置在用户目录里,一般在 /documents and settings/用户/.kettle/,主要内容如下:

  kettle_repository=kettle@m80

  kettle_user=admin

  kettle_password=passwd

 

  七、         好了,现在可以运行一下了,看看数据是不是已经拷贝到目标表了。

 

  以下为修改后的程序源码:

++++++++++++++++++++++++++++++++


package name.chq.test;

 

import java.io.dataoutputstream;

import java.io.file;

import java.io.fileoutputstream;

 

import be.ibridge.kettle.core.const;

import be.ibridge.kettle.core.logwriter;

import be.ibridge.kettle.core.notepadmeta;

import be.ibridge.kettle.core.database.database;

import be.ibridge.kettle.core.database.databasemeta;

import be.ibridge.kettle.core.exception.kettleexception;

import be.ibridge.kettle.core.util.envutil;

import be.ibridge.kettle.trans.steploader;

import be.ibridge.kettle.trans.trans;

import be.ibridge.kettle.trans.transhopmeta;

import be.ibridge.kettle.trans.transmeta;

import be.ibridge.kettle.trans.step.stepmeta;

import be.ibridge.kettle.trans.step.stepmetainterface;

import be.ibridge.kettle.trans.step.selectvalues.selectvaluesmeta;

import be.ibridge.kettle.trans.step.tableinput.tableinputmeta;

import be.ibridge.kettle.trans.step.tableoutput.tableoutputmeta;

 

 

//这个包被遗漏了,原始位置kettle根目录/libswt/win32/swt.jar

//add by chq([link=http://www.chq.name]www.chq.name[/link]) on  2006.07.20

//import org.eclipse.swt.dnd.transfer;

 

/**

 * class created to demonstrate the creation of transformations on-the-fly.

 *

 * @author matt

 *

 */

 

public class transbuilder

{

    public static final string[] databasesxml = {

        "<?xml version=/"1.0/" encoding=/"utf-8/"?>" +

        "<connection>" +

          "<name>target</name>" +

          "<server>192.168.17.35</server>" +

          "<type>oracle</type>" +

                     "<access>native</access>" +

                     "<database>test1</database>" +

                     "<port>1521</port>" +

                     "<username>testuser</username>" +

                     "<password>pwd</password>" +

                     "<servername/>" +

                     "<data_tablespace/>" +

                     "<index_tablespace/>" +

                     "<attributes>" +

                       "<attribute><code>extra_option_mysql.defaultfetchsize</code><attribute>500</attribute></attribute>" +

                       "<attribute><code>extra_option_mysql.usecursorfetch</code><attribute>true</attribute></attribute>" +

                          "<attribute><code>port_number</code><attribute>1521</attribute></attribute>" +

                            "</attributes>" +

                       "</connection>" ,

 

        "<?xml version=/"1.0/" encoding=/"utf-8/"?>" +

                         "<connection>" +

                                "<name>source</name>" +

                                "<server>192.168.16.12</server>" +

                                "<type>oracle</type>" +

                                "<access>native</access>" +

                                "<database>test2</database>" +

                                "<port>1521</port>" +

                                "<username>testuser</username>" +

                                "<password>pwd2</password>" +

                                "<servername/>" +

                                "<data_tablespace/>" +

                                "<index_tablespace/>" +

                                "<attributes>" +

                                    "<attribute><code>extra_option_mysql.defaultfetchsize</code><attribute>500</attribute></attribute>" +

                                    "<attribute><code>extra_option_mysql.usecursorfetch</code><attribute>true</attribute></attribute>" +

                                       "<attribute><code>port_number</code><attribute>1521</attribute></attribute>" +

                                "</attributes>" +

                         "</connection>"

    };

 

    /**

     * creates a new transformation using input parameters such as the tablename to read from.

     * @param transformationname the name of the transformation

     * @param sourcedatabasename the name of the database to read from

     * @param sourcetablename the name of the table to read from

     * @param sourcefields the field names we want to read from the source table

     * @param targetdatabasename the name of the target database

     * @param targettablename the name of the target table we want to write to

     * @param targetfields the names of the fields in the target table (same number of fields as sourcefields)

     * @return a new transformation

     * @throws kettleexception in the rare case something goes wrong

     */

    public static final transmeta buildcopytable(

       string transformationname,string sourcedatabasename, string sourcetablename,

       string[] sourcefields, string targetdatabasename, string targettablename,

       string[] targetfields)

      throws kettleexception

    {

        logwriter log = logwriter.getinstance();

        envutil.environmentinit();

        try

        {

            //

            // create a new transformation...

            //

            transmeta transmeta = new transmeta();

            transmeta.setname(transformationname);

 

            // add the database connections

 

            for (int i=0;i<databasesxml.length;i++)

            {

                databasemeta databasemeta = new databasemeta(databasesxml[i]);

                transmeta.adddatabase(databasemeta);

            }

 

            databasemeta sourcedbinfo = transmeta.finddatabase(sourcedatabasename);

            databasemeta targetdbinfo = transmeta.finddatabase(targetdatabasename);

 

 

            //

            // add a note

            //

            string note = "reads information from table [" + sourcetablename+ "] on database ["

                            + sourcedbinfo + "]" + const.cr;

            note += "after that, it writes the information to table [" + targettablename + "] on database ["

                            + targetdbinfo + "]";

            notepadmeta ni = new notepadmeta(note, 150, 10, -1, -1);

            transmeta.addnote(ni);

 

            //

            // create the source step...

            //

            string fromstepname = "read from [" + sourcetablename + "]";

            tableinputmeta tii = new tableinputmeta();

            tii.setdatabasemeta(sourcedbinfo);

            string selectsql = "select "+const.cr;

            for (int i=0;i<sourcefields.length;i++)

            {

            /* modi by chq(www.chq.name): use * to replace the fields,经分析,以下语句可以处理‘*‘ */

                if (i>0)

                     selectsql+=", ";

                else selectsql+="  ";

 

                selectsql+=sourcefields[i]+const.cr;

            }

            selectsql+="from "+sourcetablename;

            tii.setsql(selectsql);

 

            steploader steploader = steploader.getinstance();

 

            string fromstepid = steploader.getsteppluginid(tii);

            stepmeta fromstep = new stepmeta(log, fromstepid, fromstepname, (stepmetainterface) tii);

            fromstep.setlocation(150, 100);

            fromstep.setdraw(true);

            fromstep.setdescription("reads information from table [" + sourcetablename

                                     + "] on database [" + sourcedbinfo + "]");

            transmeta.addstep(fromstep);

 

            //

            // add logic to rename fields

            // use metadata logic in selectvalues, use selectvalueinfo...

            //

            /* 不必改名或映射 add by chq(www.chq.name) on 2006.07.20

            selectvaluesmeta svi = new selectvaluesmeta();

            svi.allocate(0, 0, sourcefields.length);

            for (int i = 0; i < sourcefields.length; i++)

            {

                svi.getmetaname()[i] = sourcefields[i];

                svi.getmetarename()[i] = targetfields[i];

            }

 

            string selstepname = "rename field names";

            string selstepid = steploader.getsteppluginid(svi);

            stepmeta selstep = new stepmeta(log, selstepid, selstepname, (stepmetainterface) svi);

            selstep.setlocation(350, 100);

            selstep.setdraw(true);

            selstep.setdescription("rename field names");

            transmeta.addstep(selstep);

 

            transhopmeta shi = new transhopmeta(fromstep, selstep);

            transmeta.addtranshop(shi);

            fromstep = selstep; //设定了新的起点 by chq([link=http://www.chq.name]www.chq.name[/link]) on 2006.07.20

            */

            //

            // create the target step...

            //

            //

            // add the tableoutputmeta step...

            //

            string tostepname = "write to [" + targettablename + "]";

            tableoutputmeta toi = new tableoutputmeta();

            toi.setdatabase(targetdbinfo);

            toi.settablename(targettablename);

            toi.setcommitsize(200);

            toi.settruncatetable(true);

 

            string tostepid = steploader.getsteppluginid(toi);

            stepmeta tostep = new stepmeta(log, tostepid, tostepname, (stepmetainterface) toi);

            tostep.setlocation(550, 100);

            tostep.setdraw(true);

            tostep.setdescription("write information to table [" + targettablename + "] on database [" + targetdbinfo + "]");

            transmeta.addstep(tostep);

 

            //

            // add a hop between the two steps...

            //

            transhopmeta hi = new transhopmeta(fromstep, tostep);

            transmeta.addtranshop(hi);

 

            // ok, if we're still here: overwrite the current transformation...

            return transmeta;

        }

        catch (exception e)

        {

            throw new kettleexception("an unexpected error occurred creating the new transformation", e);

        }

    }

 

    /**

     * 1) create a new transformation

     * 2) save the transformation as xml file

     * 3) generate the sql for the target table

     * 4) execute the transformation

     * 5) drop the target table to make this program repeatable

     *

     * @param args

     */

    public static void main(string[] args) throws exception

    {

       envutil.environmentinit();

        // init the logging...

        logwriter log = logwriter.getinstance("transbuilder.log", true, logwriter.log_level_detailed);

 

        // load the kettle steps & plugins

        steploader stloader = steploader.getinstance();

        if (!stloader.read())

        {

            log.logerror("transbuilder",  "error loading kettle steps & plugins... stopping now!");

            return;

        }

 

        // the parameters we want, optionally this can be

        string filename = "newtrans.xml";

        string transformationname = "test transformation";

        string sourcedatabasename = "source";

        string sourcetablename = "testuser.source_table";

        string sourcefields[] = {

               "*"

               };

 

        string targetdatabasename = "target";

        string targettablename = "testuser.target_table";

        string targetfields[] = {

               "*"

               };

 

        // generate the transformation.

        transmeta transmeta = transbuilder.buildcopytable(

                transformationname,

                sourcedatabasename,

                sourcetablename,

                sourcefields,

                targetdatabasename,

                targettablename,

                targetfields

                );

 

        // save it as a file:

        string xml = transmeta.getxml();

        dataoutputstream dos = new dataoutputstream(new fileoutputstream(new file(filename)));

        dos.write(xml.getbytes("utf-8"));

        dos.close();

        system.out.println("saved transformation to file: "+filename);

 

        // ok, what's the sql we need to execute to generate the target table?

        string sql = transmeta.getsqlstatementsstring();

 

        // execute the sql on the target table:

        database targetdatabase = new database(transmeta.finddatabase(targetdatabasename));

        targetdatabase.connect();

        targetdatabase.execstatements(sql);

 

        // now execute the transformation...

        trans trans = new trans(log, transmeta);

        trans.execute(null);

        trans.waituntilfinished();

 

        // for testing/repeatability, we drop the target table again

        /* modi by chq([link=http://www.chq.name]www.chq.name[/link]) on  2006.07.20 不必删表

        //targetdatabase.execstatement("drop table "+targettablename);

        targetdatabase.disconnect();

    }

 

 

}

 
 
上一篇: wait()和notify()的简单调用程序    下一篇: weblogic使用jmx监控应用程序内、外部的状况
  相关文档
java基础:jsp处理表单的一些经验分享 11-16
研究tomcat结构,解决数据源连接数据库 11-17
使用 netbeans ide 5.0开发jsp快速入门 11-16
@if 语句 11-16
java新手看招 常用开发工具介绍 11-17
我看多态——多态的学习笔记 11-17
java基本名词解释 11-17
学习如何以oo方式创建web页 11-17
java基础:深入浅出java的访问者模式 11-17
一个applet-servlet通讯的例子 11-17
struts开发指南之工作流程实例演示 11-17
优化java性能 11-16
java传递对象给期望原始类型参数的方法 11-17
java卡与applet 11-17
compile 方法 11-16
很简短的几句就能访问文件输出文件 11-17
无线应用系统 11-17
开发框架--谈hibernate二级缓存攻略 01-23
java项目中使用hibernate处理数据 11-16
在java中连接sybase数据库(例子) 11-17
返回首页 | 关于我们 | J网章程 | JSP空间合租 | 客服中心 | 免责声明 | 常见问题 | 参观机房
本站主机空间代理至厦门市华众网络科技有限公司
《中华人民共和国增值电信业务经营许可证》
编号:闽B2-20050079
@2005-2008福建JSP技术网 版权所有 闽ICP备05000928号
厦门(总部):13616026886 福州:0591-87655121
邮箱:admin@fjjsp.com 站长QQ,点击这里给我发消息