SpringBatch是解决企业数据逻辑较简单,重复性高,大数据量而设计的.从他提供的各种Reader就能看出来.起码我是这样理解的.最适合做的如:数据清洗,数据分析后转移,或者定时的和其他系统交互的地方等.
在上一篇文章中,我使用了 JdbcPagingItemReader读取HSQLDB数据库的数据.
01 |
< bean id = "sysAppStoreMapper" class = "net.dbatch.mapper.SysAppStoreMapper" />
|
04 |
class = "org.springframework.batch.item.database.JdbcPagingItemReader" >
|
05 |
< property name = "dataSource" ref = "dataSource" />
|
06 |
< property name = "rowMapper" ref = "sysAppStoreMapper" />
|
07 |
< property name = "queryProvider" ref = "appQueryProvider" />
|
11 |
< bean id = "appQueryProvider"
|
12 |
class = "org.springframework.batch.item.database.support.HsqlPagingQueryProvider" >
|
13 |
< property name = "selectClause" value = "a.APP_ID, a.PARENT_ID, a.APP_DESC, a.APP_URL, a.FOLDER, a.SEQ" />
|
14 |
< property name = "fromClause" value = "sys_appstore a" />
|
15 |
< property name = "sortKey" value = "SEQ" />
|
事实上SpringBatch提供了很多的Reader,自定义的Reader只要是继承自org.springframework.batch.item.ItemReader接口的都可以.但是好多都不用你麻烦了,SpringBatch都替你做好了.2.1.8API中基本常用的和数据库[Hibernate/Ibatis/JDBC],文件系统,JMS消息等Reader现成的实现.如图:
对于喜欢SpringJDBC的用户[我就非常不喜欢Hibernate ]可以使用JdbcPagingItemReader
,然后指定一个queryProvider ,queryProvider 是针对各种数据库的一个分页的实现,常用的数据库 queryProvider也有现成的.如图:

好了.如果上面你实在找不到你可以使用的数据库对应的实现,而你又了解你的数据库SQL,你可以使用JdbcCursorItemReader.这个Reader允许你自己set SQL.
如我上面实现的例子,用JdbcCursorItemReader改写也非常简单:
2 |
class = "org.springframework.batch.item.database.JdbcCursorItemReader" >
|
3 |
< property name = "dataSource" ref = "dataSource" />
|
4 |
< property name = "sql" value = "select a.APP_ID, a.PARENT_ID, a.APP_DESC, a.APP_URL, a.FOLDER
from sys_appstore a order by a.SEQ" />
|
5 |
< property name = "rowMapper" ref = "sysAppStoreMapper" />
|
他仍然可以工作的很好,而且还简单了.
如果我的数据来源不是从数据库,从文件的怎么办
看到刚才的Reader实现里有个FlatFileItemReader没他就是读取文件[文本文件]的.
假如我要分析这样结构的log日志信息
他都是一些结构化的文本文件,我可以很容易的实现.如Spring代码:
01 |
< bean id = "delimitedLineTokenizer" class = "org.springframework.batch.item.file.transform.DelimitedLineTokenizer" />
|
03 |
< bean id = "lineMapper" class = "org.springframework.batch.item.file.mapping.DefaultLineMapper" >
|
04 |
< property name = "lineTokenizer" ref = "delimitedLineTokenizer" />
|
05 |
< property name = "fieldSetMapper" >
|
06 |
< bean class = "net.dbatch.sample.UserMapper" />
|
10 |
< bean id = "messageReader" class = "org.springframework.batch.item.file.FlatFileItemReader" >
|
11 |
< property name = "lineMapper" ref = "lineMapper" />
|
12 |
< property name = "resource" value = "classpath:/users.txt" />
|
再写上一个对应的Bean
01 |
public class UserMapper implements FieldSetMapper<User> {
|
04 |
public User mapFieldSet(FieldSet fs) throws BindException {
|
06 |
u.setName(fs.readString( 0 ));
|
07 |
u.setAge(fs.readInt( 1 ));
|
Processor:
01 |
public class MessagesItemProcessor implements ItemProcessor<User, Message> {
|
03 |
public Message process(User user) throws Exception {
|
04 |
if (!StringUtils.hasText(user.getName())){
|
05 |
throw new RuntimeException( "The user name is required!" );
|
07 |
Message m = new Message(); //Message是user一个简单的包装
|
09 |
m.setContent( "Hello " + user.getName()
|
10 |
+ ",please pay promptly at end of this month." );
|
Writer:
1 |
public class MessagesItemWriter implements ItemWriter<Message> {
|
3 |
public void write(List<? extends Message> messages) throws Exception {
|
4 |
System.out.println( "write results" );
|
5 |
for (Message m : messages) {
|
6 |
System.out.println(m.getContent()); //只做输出
|
测试代码:
01 |
public static void main(String[] args) {
|
02 |
ClassPathXmlApplicationContext c = new ClassPathXmlApplicationContext( "localfile_job.xml" );
|
03 |
SimpleJobLauncher launcher = new SimpleJobLauncher();
|
04 |
launcher.setJobRepository((JobRepository) c.getBean( "jobRepository" ));
|
05 |
launcher.setTaskExecutor( new SyncTaskExecutor());
|
07 |
JobExecution je = launcher.run((Job) c.getBean( "messageJob" ),
|
08 |
new JobParametersBuilder().toJobParameters());
|
09 |
System.out.println(je);
|
10 |
System.out.println(je.getJobInstance());
|
11 |
System.out.println(je.getStepExecutions());
|
12 |
} catch (Exception e) {
|
输出:
01 |
10-20 15:28:32 INFO [job.SimpleStepHandler] - <Executing step: [messageStep]> |
03 |
Hello User1,please pay promptly at end of this month. |
04 |
Hello User2,please pay promptly at end of this month. |
05 |
Hello User3,please pay promptly at end of this month. |
06 |
Hello User4,please pay promptly at end of this month. |
07 |
Hello User5,please pay promptly at end of this month. |
09 |
Hello User6,please pay promptly at end of this month. |
10 |
Hello User7,please pay promptly at end of this month. |
11 |
Hello User8,please pay promptly at end of this month. |
12 |
Hello User9,please pay promptly at end of this month. |
13 |
Hello User10,please pay promptly at end of this month. |
14 |
10-20 15:28:32 INFO [support.SimpleJobLauncher] - <Job: [FlowJob: [name=messageJob]] completed with the following parameters: [{run.month=2011-10}] and the following status: [COMPLETED]> |
15 |
JobExecution: id =0, version=2, startTime=Sat Oct 20 15:28:32 CST 2012, endTime=Sat
Oct 20 15:28:32 CST 2012, lastUpdated=Sat Oct 20 15:28:32 CST 2012, status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=, job=[JobInstance: id =0, version=0, JobParameters=[{run.month=2011-10}], Job=[messageJob]]
|
16 |
JobInstance: id =0, version=0, JobParameters=[{run.month=2011-10}], Job=[messageJob]
|
17 |
[StepExecution: id =1, version=5, name=messageStep, status=COMPLETED, exitStatus=COMPLETED, readCount=10, filterCount=0, writeCount=10
readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=3, rollbackCount=0, exitDescription=]
|
从日志里,我们可以清楚的看到.他是每行的读取并送入Processor中处理,完成5次读取进行一次性的写入.tasklet的属性 commit-interval可以调节此值.
全部的Spring配置:
01 |
< batch:job id = "messageJob" restartable = "true" >
|
02 |
< batch:step id = "messageStep" >
|
04 |
< batch:chunk reader = "messageReader" |
05 |
processor = "messageProcessor" |
06 |
writer = "messageWriter"
|
08 |
chunk-completion-policy = "" |
10 |
< batch:retryable-exception-classes >
|
11 |
< batch:include class = "java.lang.RuntimeException" />
|
12 |
</ batch:retryable-exception-classes >
|