kettle ftp下载插件,下载执行结果源码分析
kettle ftp下载插件执行结果源码分析
问题背景
一个ftp插件下载任务,可能下载很多文件,下载过程中可能有些文件下载成功,有写文件下载失败,这些结果信息,kettle是如何记录的?
源码分析
分析JobEntryFTP 插件类:
JobEntryFTP -> execute() -> downloadFile()
private void downloadFile( FTPClient ftpclient, String filename, String realMoveToFolder, Job parentJob,
Result result ) throws Exception {
String localFilename = filename;
targetFilename = KettleVFS.getFilename( KettleVFS.getFileObject( returnTargetFilename( localFilename ) ) );
if ( ( !onlyGettingNewFiles ) || ( onlyGettingNewFiles && needsDownload( targetFilename ) ) ) {
if ( isDetailed() ) {
logDetailed( BaseMessages.getString(
PKG, "JobEntryFTP.GettingFile", filename, environmentSubstitute( targetDirectory ) ) );
}
ftpclient.get( targetFilename, filename );
// Update retrieved files
updateRetrievedFiles();
if ( isDetailed() ) {
logDetailed( BaseMessages.getString( PKG, "JobEntryFTP.GotFile", filename ) );
}
// Add filename to result filenames
addFilenameToResultFilenames( result, parentJob, targetFilename );
// Delete the file if this is needed!
if ( remove ) {
ftpclient.delete( filename );
if ( isDetailed() ) {
if ( isDetailed() ) {
logDetailed( BaseMessages.getString( PKG, "JobEntryFTP.DeletedFile", filename ) );
}
}
} else {
if ( movefiles ) {
// Try to move file to destination folder ...
ftpclient.rename( filename, realMoveToFolder + FILE_SEPARATOR + filename );
if ( isDetailed() ) {
logDetailed( BaseMessages.getString( PKG, "JobEntryFTP.MovedFile", filename, realMoveToFolder ) );
}
}
}
}
}
分析
- updateRetrievedFiles() 方法,每成功下载一个文件,计数加1;最终记录到result的NrfilesRetrieved。
- addFilenameToResultFilenames( result, parentJob, targetFilename ); 具体的把当前这个下载成功的文件信息记录到 Result实例result中。
如下addFilenameToResultFilenames()函数片段。
targetFile = KettleVFS.getFileObject( filename, this );
// Add to the result files...
ResultFile resultFile =
new ResultFile( ResultFile.FILE_TYPE_GENERAL, targetFile, parentJob.getJobname(), toString() );
result.getResultFiles().put( resultFile.getFile().toString(), resultFile );
分析:
ResultFile 是kettle自己的定义的类,里面成员变量resultFile 是 apache commons vfs2 的FileObject类实例。
resultFile.getFile()就是获取成员变量 vfs FileObject类实例。
result的成员变量resultFiles是一个map,key就是 FileObject类实例.toString。
这段代码,就是把下载成功的文件获取对应vsf FileObject存储到result的map成员变量resultFiles中。key为 FileObject类实例.toString, value为自定义的ResultFile里面封装一些该文件信息,以及那个job/那个job entry 生成的该记录。因为result应该具有传递性,后面的可以知道是前面那一个job以及job entry处理的这个信息。
总结: ftp下载插件下载成功的文件,记录在reslut的成员变量resultFiles,一个map结构中。
思考: 自己开发的kettle插件,可以往result中填入我特定的 java成员变量吗?
思路1:继承Result,拓展增强添加自己的成员属性。
该思路的前提是,kettle框架获取插件Result使用,使用其 对应的抽象接口接的返回结果。
查看Result类,是否是实现自某个接口,发现没有一个抽象接口类,而且
Kitchen.java中,如下: job.start(),执行Job线程类的run方法,从而执行插件 (关键字: newResult = cloneJei.execute( prevResult, nr ); 该方法就是真正调插件的中execute方法。)
job.start(); // Execute the selected job.
job.waitUntilFinished();
setResult( job.getResult() ); // get the execution result
继续,查看 job.getResult(),发现使用Result接受的。
看setResult( job.getResult() ); // get the execution result方法,是设置抽象类的成员变量result,同样如下,也是Result类型。
总结:kettle框架层面对Result没有抽象出来接口类,也没有用接口类接插件返回结果。kettle框架获取插件执行结果,是直接Result类接受相关结果信息。 所以,我们自己编写继承的Result的插件执行结果返回类,不可行,除非改写kettle框架代码。
结论:暂时舍弃该方案。如果需要采用该方案,需要改写kettle代码,抽象出来一个abstractResult接口类,让执行插件的地方用用接口类接,改动了还是稍微比较大。
思路2: 看Result 有没有已有的成员属性,我们可以利用,赋值成我们的java对象,或 赋值为我们 java对象的 json序列化信息也行。
分析Result类:如下,每个成员变量,都有特定用途,没有可以利用的Sting 之类的属性。
/**
* Describes the result of the execution of a Transformation or a Job. The information available includes the following:
* <p>
* <ul>
* <li>Number of errors the job or transformation encountered</li>
* <li>Number of lines input</li>
* <li>Number of lines output</li>
* <li>Number of lines updated</li>
* <li>Number of lines read</li>
* <li>Number of lines written</li>
* <li>Number of lines deleted</li>
* <li>Number of lines rejected</li>
* <li>Number of files retrieved</li>
* <li>Boolean result of the execution</li>
* <li>Exit status value</li>
* <li>Whether the transformation was stopped</li>
* <li>Logging information (channel ID and text)</li>
* </p>
*
* After execution of a job or transformation, the Result can be evaluated.
*
* @author Matt
* @since 05-11-2003
*/
@XmlRootElement
public class Result implements Cloneable {
/** A constant specifying the tag value for the XML node of the result object */
public static final String XML_TAG = "result";
/** A constant specifying the tag value for the XML node for result files entry */
public static final String XML_FILES_TAG = "result-file";
/** A constant specifying the tag value for the XML node for the result file entry */
public static final String XML_FILE_TAG = "result-file";
/** A constant specifying the tag value for the XML node for the result rows entry */
public static final String XML_ROWS_TAG = "result-rows";
/** The number of errors during the transformation or job */
private long nrErrors;
/** The number of lines input. */
private long nrLinesInput;
/** The number of lines output. */
private long nrLinesOutput;
/** The number of lines updated. */
private long nrLinesUpdated;
/** The number of lines read. */
private long nrLinesRead;
/** The number of lines written. */
private long nrLinesWritten;
/** The number of lines deleted. */
private long nrLinesDeleted;
/** The number of files retrieved. */
private long nrFilesRetrieved;
/** The result of the job or transformation, true if successful, false otherwise. */
private boolean result;
/** The entry number. */
private long entryNr;
/** The exit status. */
private int exitStatus;
/** The rows. */
private List<RowMetaAndData> rows;
/** The result files. */
private Map<String, ResultFile> resultFiles;
/** Whether the job or transformation was stopped. */
public boolean stopped;
/** The number of lines rejected. */
private long nrLinesRejected;
/** The log channel id. */
private String logChannelId;
/** The log text. */
private String logText;
/**
* safe stop.
*/
private boolean safeStop;
/**
* Elapsed time of the ETL execution in milliseconds
*/
private long elapsedTimeMillis;
/**
* Unique identifier of an ETL execution, should one ever care to declare one such
*/
private String executionId;
结论:暂时舍弃该方案。如果需要这样做,同样需要改动kettle框架代码,直接Result类添加一个String xxx属性即可,里面存储你的对象json序列化,在用到的地方json解析出来。或者定义一个Object 属性,在用到的地方强转。
思路3: 不往kettle 框架的Result类中存储,执行结果json化写到特定文件中,这样多个job entry可以共享你之前执行结果信息。
暂时采用该方案。