编写kettle当中的java脚本获取多个数据库中表的数据
1.不同数据库中的表,保证这些表的结构相同
2.表中数据和生成文本数据
3.自定义常量设置和java代码
import java.sql.*;
import org.pentaho.di.core.database.*;
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
{
Object[] r = getRow();
if (r == null) {
setOutputDone();
return false;
}
//获取数据库名和表名
String dbName = getInputRowMeta().getString(r, "conname", null );
String tablename = getInputRowMeta().getString(r, "tablename", null );
if (dbName==null||tablename==null) {
throw new KettleException("Unable to find field with name "+tablename+" in the input row.");
}
logBasic("table---"+tablename);
//数据库连接
Database database=null;
DatabaseMeta databaseMeta=null;
try {
databaseMeta = getTransMeta().findDatabase(dbName);
if (databaseMeta==null) {
logError("A connection with name "+dbName+" could not be found!");
setErrors(1);
return false;
}
database = new Database(getTrans(), databaseMeta);
database.connect();
logBasic("success!");
} catch(Exception e) {
logError("Connecting to database "+dbName+" failed.", e);
setErrors(1);
return false;
}
//查询表数据
String sql="select id,name from "+tablename;
ResultSet resultSet;
try {
resultSet = database.openQuery(sql);
Object[] idxRow = database.getRow(resultSet);
RowMetaInterface idxRowMeta =null;
if(idxRow!=null){
idxRowMeta=database.getReturnRowMeta();
}
int i=0;
while(idxRow!=null){
r = createOutputRow(r, data.outputRowMeta.size());
int index = getInputRowMeta().size();
// Add the index name
//
r[index++] = idxRowMeta.getString(idxRow, "id", null);
// Add the column name
r[index++] = idxRowMeta.getString(idxRow, "name", null);
putRow(data.outputRowMeta, r);
idxRow = database.getRow(resultSet);
i++;
}
logBasic("idxRow--length"+i);
}
catch(Exception e) {
throw new KettleException(e);
}
//释放连接
if (database!=null) {
database.disconnect();
database.closeQuery(resultSet);
}
return true;
}