Java调用Kettle6的transaction和job

在进行复杂数据传输时,特别是异构数据库的多表数据传输,我们经常会用到ETL工具来完成。Kettle是一个典型的ETL工具且使用广泛。由于Kettle功能强大且复杂,对于java开发人员来说无疑增加了项目运维的难度和复杂度。因此将Kettle的集成到Java项目中可以大大降低项目的开发难度和开发效率,同时也降低了运维复杂度。网上大多关于Kettle集成的中文资料都是基于Kettle4.0之前的。以下是根据6.0官方文档及网上相关资料开发的几个Demo如有错误还望及时指出!
1、jar包引用
文件安装目录的data-integration\lib文件夹下有很多jar包,可以根据实际需要进行添加。经过个人测试以下几个是必要的:

avalon-framework-4.1.5.jar;commons-codec-1.9.jar;commons-collections-3.2.1.jar;commons-io-2.1.jar;commons-lang-2.5.jar;commons-logging-1.1.3.jar;commons-vfs2-2.1-20150824.jar;guava-17.0.jar;jug-lgpl-2.0.0.jar;kettle-core-6.0.1.0-386.jar;kettle-dbdialog-6.0.1.0-386.jar;kettle-engine-6.0.1.0-386.jar;kettle-ui-swt-6.0.1.0-386.jar;metastore-6.0.1.0-386.jar;ognl-2.6.9.jar;scannotation-1.0.2.jar

maven:
pentaho-releases http://repository.pentaho.org/artifactory/repo/ pentaho-kettle kettle-core 6.1.0.4-225 com.verhas license3j 1.0.7 pentaho-kettle kettle-dbdialog 6.1.0.4-225 pentaho-kettle kettle-engine 6.1.0.4-225 pentaho metastore 6.1.0.4-225 org.safehaus.jug jug 2.0.0lgpl

2、Java创建transaction
/*** Creates a new Transformation using input parameters such as the tablename to read from.* @param transformationName transformation的名称* @param sourceDatabaseName 输入的 database 名称* @param sourceTableName 要读取的表名* @param sourceFields 要读取的列名* @param targetDatabaseName 目标database名* @param targetTableName要写入的表名* @param targetFields要写入的列名(要跟读取的列长度相同)* @return A new transformation metadata object* @throws KettleException In the rare case something goes wrong*/public static final TransMeta buildCopyTable(String transformationName,String sourceDatabaseName, String sourceTableName,String[] sourceFields, String targetDatabaseName,String targetTableName, String[] targetFields,DatabaseMeta[] databases)throws KettleException {EnvUtil.environmentInit(); try{// Create a new transformation...TransMeta transMeta = new TransMeta(); transMeta.setName(transformationName); // 添加数据库连接for (int i = 0; i < databases.length; i++) {DatabaseMeta databaseMeta = databases[i]; transMeta.addDatabase(databaseMeta); }DatabaseMeta sourceDBInfo = transMeta.findDatabase(sourceDatabaseName); DatabaseMeta targetDBInfo= transMeta.findDatabase(targetDatabaseName); //添加注释String note = "Reads information from table [" + sourceTableName+ "] on database [" + sourceDBInfo + "]" + Const.CR; note += "After that, it writes the information to table ["+ targetTableName + "] on database [" + targetDBInfo + "]"; NotePadMeta ni = new NotePadMeta(note, 150, 10, -1, -1); transMeta.addNote(ni); // 创建读取数据源的 step...String fromstepname = "read from [" + sourceTableName + "]"; TableInputMeta tii = new TableInputMeta(); tii.setDatabaseMeta(sourceDBInfo); String selectSQL = "SELECT " + Const.CR; for (int i = 0; i < sourceFields.length; i++) {if (i > 0) selectSQL += ", "; else selectSQL += " "; selectSQL += sourceFields[i] + Const.CR; }selectSQL += "FROM " + sourceTableName; tii.setSQL(selectSQL); PluginRegistry registry = PluginRegistry.getInstance(); String fromstepid = registry.getPluginId(tii); StepMeta fromstep = new StepMeta(fromstepid, fromstepname,(StepMetaInterface) tii); fromstep.setLocation(150, 100); fromstep.setDraw(true); fromstep.setDescription("Reads information from table [" + sourceTableName + "] on database [" + sourceDBInfo + "]"); transMeta.addStep(fromstep); // 添加 重命名 fields的逻辑// Use metadata logic in SelectValues, use SelectValueInfo...SelectValuesMeta svi = new SelectValuesMeta(); svi.allocate(0, 0, sourceFields.length); for (int i = 0; i < sourceFields.length; i++) {svi.getSelectName()[i] = sourceFields[i]; svi.getSelectRename()[i] = targetFields[i]; }String selstepname = "Rename field names"; String selstepid = registry.getPluginId(svi); StepMeta selstep = new StepMeta(selstepid, selstepname, (StepMetaInterface) svi); selstep.setLocation(350, 100); selstep.setDraw(true); selstep.setDescription("Rename field names"); transMeta.addStep(selstep); TransHopMeta shi = new TransHopMeta(fromstep, selstep); transMeta.addTransHop(shi); fromstep = selstep; // 创建 写数据的 step...// 添加 输出表 step...String tostepname = "write to [" + targetTableName + "]"; TableOutputMeta toi = new TableOutputMeta(); toi.setDatabaseMeta(targetDBInfo); toi.setTablename(targetTableName); toi.setCommitSize(200); toi.setTruncateTable(true); String tostepid = registry.getPluginId(toi); StepMeta tostep = new StepMeta(tostepid, tostepname, (StepMetaInterface) toi); tostep.setLocation(550, 100); tostep.setDraw(true); tostep.setDescription("Write information to table [" + targetTableName + "] on database [" + targetDBInfo + "]"); transMeta.addStep(tostep); // 添加连线...TransHopMeta hi = new TransHopMeta(fromstep, tostep); transMeta.addTransHop(hi); // The transformation is complete, return it...return transMeta; } catch (Exception e) {throw new KettleException("An unexpected error occurred creating the new transformation", e); }}

3、Java运行Kettle的transaction:
/*** 运行转换文件方法* @param params 多个参数变量值* @param ktrPath 转换文件的路径,后缀ktr*/public static void runTransfer(String[] params, String ktrPath) {Trans trans = null; try {// 初始化// 转换元对象KettleEnvironment.init(); EnvUtil.environmentInit(); TransMeta transMeta = new TransMeta(ktrPath); // 转换trans = new Trans(transMeta); // 执行转换trans.execute(params); // 等待转换执行结束trans.waitUntilFinished(); // 抛出异常if (trans.getErrors() > 0) {throw new Exception("There are errors during transformation exception!(传输过程中发生异常)"); }} catch (Exception e) {e.printStackTrace(); }}

4、Java运行Kettle的Job:
/*** java 调用 kettle 的job** @paramjobPath**/public static void runJob(String[] params, String jobPath) {try {KettleEnvironment.init(); //jobPath是Job脚本的路径及名称JobMeta jobMeta = new JobMeta(jobPath, null); Job job = new Job(null, jobMeta); // 向Job 脚本传递参数,脚本中获取参数值:${参数名}// job.setVariable(paraname, paravalue); job.setVariable("id", params[0]); job.setVariable("dt", params[1]); job.start(); job.waitUntilFinished(); if (job.getErrors() > 0) {throw new Exception("There are errors during job exception!(执行job发生异常)"); }} catch (Exception e) {e.printStackTrace(); }}

注:
1、在Kettle连接SqlServer数据库时建议使用开源的jtds数据库jar包,微软官方jar包不受支持。
2、个人建议使用项目中的调度框架(如quartz、Spring的schedule等)调用transaction来实现定时执行,可以更灵活的控制我们的Job。
3、Kettle有强大的图形化设计器,transaction的创建建议在Kettle中进行。
【Java调用Kettle6的transaction和job】顺便附上实现后的系统界面样例
Java调用Kettle6的transaction和job
文章图片
clipboard.png

    推荐阅读