用 Java Pack 组件实现用户自定义数据转换逻辑

实现用户自定义数据转换逻辑

在 WebSphere DataStage 中实现用户自定义数据转换逻辑有三种方法。

  • 通过 WebSphere DataStage BASIC 编程语言实现。 DataStage BASIC 语言是一种可以运行在 DataStage 平台上,功能强大的面向过程的开发语言。用户可以使用 DataStage BASIC 语言撰写用户自定义函数和自定义转换器来封装复杂的数据转换逻辑。关于具体方法,用户可以参考发表在 developerWorks 中国网站上的《 WebSphere DataStage BASIC 语言开发实践》一文,详见参考资源。
  • 对于 Parallel Job 来说,用户还可以通过为 WebSphere DataStage 创建定制操作符来实现。 DataStage 作业运行时,后台实际上是通过一个个操作符来对输入数据流的进行处理并产生相应的输出数据流。 DataStage 提供了相应的 C++ API,使得用户可以创建自定义的操作符,并加载到 DataStage 中使用。关于具体方法,用户可以参考 developerWorks 中国网站上的教程《为 WebSphere DataStage 创建定制操作符》,详见参考资源。
  • 通过 Java Pack 组件调用 Java 代码实现。 Java Pack 组件提供了一组 Java API,使得 DataStage 能够调用用户 Java 代码。借助 Java 语言的强大功能,用户可以方便实现任何复杂的数据转换逻辑。本文将主要介绍如何通过 Java Pack 组件来实现用户自定义数据转换逻辑。

    Java Pack 组件简介

    Java 语言是一种功能强大、跨平台的通用编程语言,在企业级应用中被广泛运用。 WebSphere DataStage 通过 Java Pack 组件提供了调用外部 Java 代码的机制,能够有效将已有的 Java 企业应用集成到数据整合流程之中,同时,也让 ETL 开发人员能够在 DataStage 中充分利用 Java 语言提供的强大功能,提高开发效率。

    在 DataStage 7.x 中,Java Pack 是作为一个附加组件提供的,需要单独进行安装,而在最新版本的 WebSphere DataStage 8 中,Java Pack 已经作为标准组件提供了。 Java Pack 提供了两种不同的 Stage,分别是 Java Client Stage 和 Java Transformer Stage,开发人员可以在组件面板的 Real Time 分类下找到这两个 Stage,如图 1 所示。


    图 1. Java Pack 组件
    用 Java Pack 组件实现用户自定义数据转换逻辑

    Java Client Stage 和 Java Transformer Stage 都可以调用外部的 Java 类。 Java Client Stage 主要用于对支持 Java 语言的数据系统进行读取、查询或写入。 Java Transformer Stage 则用于对数据进行处理,它通过调用 Java 代码将 Input link 上输入的数据进行转换,并将结果输出到 Output link 上。本文中实现用户定义数据转换逻辑所用到的正是 Java Transformer Stage 。

    Java Pack API 介绍

    WebSphere DataStage 提供了一系列简单易用的 Java API 来与外部 Java 类协同工作,并进行数据处理。这些 API 使得 Java 程序可以在运行时与 Datastage 引擎直接交互,获取当前作业中的 columns 和 links 的相关元数据,并在需要时在这些 links 上读取或者写入数据。开发人员需要在其 Java 类中定义若干特殊的方法,如 process() 方法,并在其中调用自己的数据处理逻辑。

    Java Pack API 所对应的包名是 com.ascentialsoftware.jds 。以 WebSphere Datastage 8 和 Windows 操作系统为例,其相应的 Jar 包文件为 C:\IBM\InformationServer\Server\DSEngine\java\lib\tr4j.jar 。该包由 3 个 public 类构成,分别是 Stage 类,Row 类和 Column 类。

    任何一个能被 DataStage 引擎调用的外部 Java 程序都必须实现一个 Stage 类的子类,其代码的框架结构如下。

    public class Mytransformer extends Stage 
    { 
    	public void initialize() 
    		{ 
    			// ...initialize logic 
    		} 
    	public void terminate() 
    		{
    			// ...terminate logic 
    		} 
    	public int process() 
    		{ 
    			Row inputRow = readRow(); 
    			// ...process input row... 
    			Row utputRow = createOutputRow(); 
    			// ...fill output row... 
    			writeRow(outputRow); 
    			return OUTPUT_STATUS_READY; 
    		} 
    }
    

    Stage 类抽象了 Java Client Stage 和 Java Transformer Stage 。在实现其子类时,开发人员必须根据自己的需求重新实现并覆盖 process() 方法,而 initialize() 方法和 terminate() 方法是可选的。

    当一个 Java Client Stage 或 Java Transformer Stage 准备开始在 DataStage 作业中运行时,Java Pack API 将先调用 initialize() 方法。如果开发人员需要在数据处理前进行一些初始化工作,如设置计数器、读取用户自定义属性或打开数据库连接等,可以通过在子类覆盖该方法来完成。

    process() 方法是处理输入输出数据的入口方法,每当数据到达并需要处理的时候,它就会被调用。在该方法内,开发人员可以分析到达的每一条记录,并调用自定义数据转换逻辑的代码进行数据转换,然后将转换后的数据输出。

    当所有数据都处理完毕后,Java Pack API 将调用 terminate() 方法,开发人员可以通过在子类中覆盖该方法,进行某些资源的释放和清除操作。

    下表中列举了 Stage 类所提供的一些重要方法。


    表 1. Stage 类中的重要方法

    方法名 描述
    initialize() 子类中覆盖该方法进行初始化操作
    terminate() 子类中覆盖该方法释放相关资源
    process() 子类中覆盖该方法处理输入输出数据
    createInputRow() 创建一个对应 Input link 的数据记录对象
    createOutputRow() 创建一个对应 Output link 的数据记录对象
    createRejectRow() 创建一个对应 Reject link 的数据记录对象
    readRow() 从 Input link 上读取下一条可获取的数据记录
    rejectRow() 将一条数据记录写到 Reject link 上
    writeRow() 将一条数据记录写到 Output link 上
    getUserProperties() 获取用户自定义属性

    Row 类是一条数据记录的抽象,它提供了相关方法得到每条记录上的字段数,并读取和设置每每个字段的值。

    下表列举了 Row 类所提供的一些重要方法。


    表 2. Row 类中的重要方法

    方法名 描述
    getColumn() 返回记录上相应字段
    getColumnCount() 返回一条数据记录上的字段数
    getValueAsSQLTyped() 读取某个字段的值
    setValueAsSQLTyped() 设置某个字段的值

    Column 类是一个数据记录字段的抽象,它提供了相关方法来查询该字段的元数据信息,如字段名,数据类型,字段长度,字段在记录中的位置等,同时提供了方法来读取和设置该字段的值。

    下表列举了 Column 类所提供的一些重要方法。


    表 3. Row 类中的重要方法

    方法名 描述
    getIndex() 获取字段在记录中的位置
    getName() 获取字段名
    getSQLType() 获取字段的数据类型
    isKey() 判断该字段是否是主键的一部分
    nullAllowed() 判断该字段是否可以设置 Null 值
    getValueAsSQLTyped() 读取该字段的值
    setValueAsSQLTyped() 设置该字段的值

    Java Pack 开发示例

    使用 Java Pack 进行用户自定义数据转换逻辑开发,我们可以将其粗略分为四个阶段。

    1. 明确数据转换需求
    2. 开发 Java 代码
    3. 开发 DataStage 作业
    4. 测试并运行 DataStage 作业并检查运行结果

    下面我们就按照这四个阶段来分步开发一个 DataStage 作业实例。

    明确数据转换需求

    我们有若干条员工信息的数据记录,其中一个字段是电子邮件地址,我们的需求是判断这个电子邮件地址是否是一个合法的电子邮件地址,并过滤掉电子邮件不合法的那些记录。

    测试数据如图 2 所示。


    图 2. 测试数据
    用 Java Pack 组件实现用户自定义数据转换逻辑

    员工信息的数据将存放在 CSV 文件中,过滤后,电子邮件地址合法的数据与电子邮件地址不合法的数据将分别写入到两个新的 CSV 文件中。

    开发 Java 代码

    根据需求,我们首先需要开发一个 Java 类用来判断一个电子邮件是否合法,这里我们实现一个 EmailChecker 类,其代码如下:


    清单 1. EmailChecker 类

    package com.ibm.cn.javapack;
    
    import java.util.regex.*;
    
    public class EmailChecker {
    	public static int IsValidEmaill(String addr)
    	{
    		String patternString = "[\\w-\\.][email protected]([\\w-]+\\.)+[\\w-]{2,4}";
    		Pattern pattern =null;
    		try
    		{
    			pattern = Pattern.compile(patternString);
    		}
    		catch (PatternSyntaxException e)
    		{
    			return -1;
    		}
    		Matcher matcher = pattern.matcher(addr);
    		boolean result = matcher.matches();
    		if (result) 
    			return 1 ; 
    		else return 0 ;
    	}
    }
    

    这里我们可以使用 Java 提供的正则表达式匹配功能,轻松实现对合法电子邮件的检测。我们可以通过调用 EmailChecker 类的 IsValidEmail() 方法来检测电子邮件地址的合法性。

    下面,我们将实现一个 Stage 类的子类 ValidEmailFilter,并在该类中根据输入数据记录的电子邮件地址进行过滤并输出数据。


    清单 2. ValidEmailFilter 类

    package com.ibm.cn.javapack;
    import java.text.ParseException;
    import com.ascentialsoftware.jds.*;
    public class ValidEmailFilter extends Stage {	
    	int filterColumnIndex;	
    	public void initialize() {
    		String userProperties = getUserProperties();		
    		filterColumnIndex = Integer.parseInt(userProperties);		
    	}
    	public int process() {		
    		Row inputRow = readRow();		
    		if (inputRow == null) {			
    return OUTPUT_STATUS_END_OF_DATA;
    }		
    	int columnCount = inputRow.getColumnCount();
    	Row utputRow = null;		
    	String emailAddress = 
    		inputRow.getValueAsString(filterColumnIndex).trim();		
    	int validateResult = 
    		EmailChecker.IsValidEmaill(emailAddress);		
    	if (validateResult == 0 "" hasRejectLink())			
    		utputRow = createRejectRow();		
    	if (validateResult == 0 ""  !hasRejectLink())			
    		return OUTPUT_STATUS_NOT_READY;		
    	if (validateResult == 1)		
    		utputRow = createOutputRow();		
    	for (int i = 0; i < columnCount; i++) {			
    		Object sqlObject = null;						
    		try {
    			sqlObject = inputRow.getValueAsSQLTyped(i);
    		} catch (ParseException e) {				
    			e.printStackTrace();
    		}			 
    		outputRow.setValueAsSQLTyped(i, sqlObject);
    	}
    	if (validateResult == 0 ""  hasRejectLink()){
    		rejectRow(outputRow);			
    	} else writeRow(outputRow);		
    	return OUTPUT_STATUS_READY;
    	}
    }
    

    在 ValidEmailFilter 类中,由于没有相关资源的释放和清除操作,我们仅实现了 initialize() 方法和 process() 方法。在 initialize() 方法中,我们调用了 getUserProperties() 方法来获取一个用户自定义属性,该属性的值将指出电子邮件地址对应的字段在数据记录中的位置。我们将在开发 DataStage 作业时指定该值。 readRow() 方法将在 Input link 上读入一行数据记录,getValueAsString() 方法取出该条记录上电子邮件地址对应字段上的值,并交由 EmailChecker 类来检查该地址的合法性。如果合法,将由 writeRow() 方法写入到 Output link 上,否则由 rejectRow() 方法写入到 Reject link 上。

    将 Java 代码编译成为相应的 .class 文件,需要注意的是,要尽量使用与 DataStage 引擎所使用的 JRE 版本相同的 JDK 来编译开发。

    开发 DataStage 作业

    打开 WebSphere DataStage Designer,新建一个 Server Job,如图 3 所示。


    图 3. DataStage 作业流程图
    用 Java Pack 组件实现用户自定义数据转换逻辑

    这里“ Filter_Email ”就是一个 Java Transformer Stage,它从“ Employee_List ”这个 Sequential File Stage 中读取数据,并将结果分别输出到“ Result_File ”和“ Reject_File ”这两个 Sequential File Stage 。

    下面我们来设置该 Job 的属性和各个 Stage 的属性:

    1. 设置 Job 属性
    2. 设置“ Employee_List ” Stage,“ Result_File ” Stage 和 Reject_File ” Stage
    3. 设置“ Filter_Email ” Stage

    设置 Job 属性

    打开 Job Properties 对话框,如图 4 所示,在 Parameters 页上设置如下 DataStage 参数:

    • class_path:指定 Java Transformer Stage 需要的 Java 类文件所在的目录
    • class_name:指定开发人员实现的 Stage 子类的类名(包括相应的 Package 名)
    • email_column_index:指定电子邮件地址在数据记录中的位置(该字段在第几列)
    • data_dir:源数据文件和输出数据文件存放的路径


    图 4.设置 DataStage 作业参数
    用 Java Pack 组件实现用户自定义数据转换逻辑

    设置“ Employee_List ” Stage,“ Result_File ” Stage 和“ Reject_File ” Stage

    在“ Employee_List ”,“ Result_File ”和“ Reject_File ”三个 Sequential File Stage 中,分别指定数据文件路径,数据文件格式和数据列的定义。我们以“ Employee_List ” Stage 为例,来说明如何设置这三个 Stage 。

    打开“ Employee_List ” Stage 的属性窗口,我们需要在“ Employee_List ” Stage 中指定数据源文件和设置数据列定义。

    请参看图 5,在 General 选项卡中,我们设置数据源文件为 employee.txt,数据以 CSV 格式存放。在这里,我们使用了用户定义的参数 #data_dir# 。


    图 5.指定数据文件
    用 Java Pack 组件实现用户自定义数据转换逻辑

    切换到 Columns 选项卡,如图 6 所示,我们定义了 EMP_NUM、EMP_NAME、EMAIL,TEL_NUM 四个数据列。


    图 6.设置数据列定义
    用 Java Pack 组件实现用户自定义数据转换逻辑

    设置“ Filter_Email ” Stage

    双击打开“ Filter Email ” Stage,在 Stage 页的 General 选项卡上,我们可以指定 Transformer Class Name 和 User ’ s Classpath,填入我们事先定义好的 DataStage 参数,如图 7 所示。


    图 7.设置数据列定义
    用 Java Pack 组件实现用户自定义数据转换逻辑

    切换到 Properties 选项卡,我们来设置用户自定义属性,如图 8 所示。在这里我们输入预定义好的参数 #email_column_index# 。在 User ’ s Properties 中输入的值,都可通过上面提到的 Stage 类的 getUserProperties() 方法以字符串的形式在 Java 程序中取到。


    图 8. 设置用户属性
    用 Java Pack 组件实现用户自定义数据转换逻辑

    切换到 Option 选项卡,这里可以指定 Java 代码在运行时所用到的 Java 虚拟机参数,本例中并不需要特别指定任何 Java 虚拟机参数。


    图 9. 指定 Java 虚拟机参数
    用 Java Pack 组件实现用户自定义数据转换逻辑

    切换到 Output 页,这里可以指定 Reject link 。需要注意的是,Java Transformer Stage 仅支持一条 Input link,一条 Output link 和一条 Reject link 。本例中我们指定 rejected 为 Reject link,被 Reject 的数据将流入到“ Reject_File ” Stage


    图 10. 指定 Reject link
    用 Java Pack 组件实现用户自定义数据转换逻辑

    测试并运行 DataStage 作业并检查运行结果

    在完成 DataStage 作业的开发工作后,我们就可以编译和运行了。如图 9 所示,作业成功运行,有 3 条数据流入“ Result_File ” Stage 中,另外两条数据流入“ Reject_File ” Stage 中。打开 “ Result_File ” Stage 和 “ Reject_File ” Stage 的属性对话框,在 Input 选项卡,点击 View Data …按钮,就可以浏览运行结果。请参看图 12 和图 13 中的数据结果。数据源中的数据记录被“ Email_Filter ” Stage 根据其电子邮件地址的合法性正确的处理了。


    图 11. 成功运行后的 JOB
    用 Java Pack 组件实现用户自定义数据转换逻辑

    图 12. “ Result_File ” Stage 中的数据结果
    用 Java Pack 组件实现用户自定义数据转换逻辑

    图 13. “ Reject_File ” Stage 中的数据结果
    用 Java Pack 组件实现用户自定义数据转换逻辑

来自 “ ITPUB博客 ” ,链接:http://blog.itpub.net/15082138/viewspace-591938/,如需转载,请注明出处,否则将追究法律责任。

转载于:http://blog.itpub.net/15082138/viewspace-591938/