任务扫描的架构设计
任务扫描的架构设计
一、 闲言
做业务系统的时候,遇到过太多次从db中扫描任务处理的需求,数据量,从每天几百上千条到上百万条不等,所以这次聊聊在不同场景中,应该如何设计次系统才能尽可能的降低开发工作量,提升系统稳定性、实时性,同时能够在中长期支持起业务的发展。
二、 场景
做业务系统开发的时候经常会遇到以下场景:
l 运营同学需要给某一批用户发送一些优惠券、提醒。
l 升级系统后需要对历史用户的数据进行订正。
l 任务处理失败了,希望后续能重试此任务。
l 任务不是立即执行,而是需要在未来的某个时刻执行。
l 任务是耗时任务,实时执行失败可能性比较大,而且会影响其他业务的进行,所以线持久化下来,后续慢慢执行。
l ……
具体到代码层面,就是DB中有一批需要处理的任务,系统需要取出这些任务,然后处理掉。
三、 要点
这个问题看似很简单,但是在做架构设计的时候有以下几种情况需要仔细考虑:
l 有些任务实时性要求很高,如何保证实时性,让任务尽可能在预期的时间点上执行。
l 如何保证公平性;每个任务都应该拥有尽可能公平的执行机会,不能因为部分任务占用系统资源,而导致其他任务阻塞,没有执行机会。
l 如果任务数据错误或者其他情况导致无法执行成功,应该放弃执行或执行数次后放弃执行。
l 如何保证服务器的性能稳定。Cpu、内存的耗费尽可能低,DB压力尽可能小;DB扫描程序、任务执行程序、系统的其他服务器之间相互隔离、互不影响。
l 在大量数据时,如何保证不出现性能瓶颈。
l 当任务无法被成功处理时,如何降低无效的调用;当服务恢复时,保证任务能够继续被处理。
l 如何让服务可以水平扩展,当服务遇到性能瓶颈时,如何快速、轻松的提升服务能力。
下面让我们来一起看看如何解决这些问题,以及每种方案的优缺点和适用场景。
四、 方案
1. DB任务扫描流程图
这是最简单的一个流程图,不过不幸的事大多数情况下,不能满足业务的需要。
根据业务场景的不同,需要注意以下优化:
1) 任务的执行是否快速、高效。
如果任务执行较慢,是否有必要异步化,使用线程池或MQ。使用线程池的时候,需要注意线程池的大小、等待队列的设置、任务拒绝的处理方式。等待队列设置的过大会导致内存的大量被占用,影响其他业务;等待队列过小可能会导致大量从db扫描的任务加入到队列的时候被拒绝。
2) 任务执行后状态的改变是否会引发分页数据跳跃。
按照上述流程,如果使用分页的时候pageNo+1。假设我们任务初始的时候处于init状态,执行成功以后改为complete状态。因为数据集合变化了,导致根据相同条件查询的数据集改变了,而分页条件导致数据的启示位置也不同了,所以每次扫描都会漏掉部分数据。
另外一种方式是pageNo不变,因为执行完了状态会改表,所以可以查到未处理的任务。如果有部分任务一直无法执行成功,每次分页查询到的数据可能都是这几条,将导致其他任务失去执行的机会。有一种简单的处理方式可以满足部分业务场景的需要:修改时间字段,查询的时候,根据时间排序。
3) 一直执行失败的任务可能影响其他任务的执行。
给任务指定最大之行次数T。超过此次数T的任务不应该被扫描到;另外每次之行前检查是否大于最大次数了,大于则放弃。如果执行失败了,将执行次数+1后持久化。
上面的流程图无法解决以下问题:
1) 集群环境中,多台服务器将同时都执行这个流程,此db扫描任务被多次执行的问题。
同一个流程,扫描出来的任务是相同的,导致任务重复执行。一来耗费服务器资源;二来程序中如果不做特殊处理,一个任务同时执行多次,数据很容易出问题。
2) 无法保证每个任务都到公平的处理机会。
一个任务执行失败后,将继续占用执行机会,后续程序将被阻塞。
从保证公平性的角度而言,未执行过的任务的优先级应该高于已经执行过但是失败的任务。
任务本身就是拥有不同优先级的,如何保证高优先级的任务有限执行?
3) 数据量大时,性能无法满足需要。
后文将主要讨论优化问题。
2. 优化:限制任务的最大次数
任务扫描、任务执行时检查任务执行次数是否超过预设的限制了。
3. 优化:任务的降级处理
1) 失败任务降级
没有执行过的任务和执行过的任务应该具有不同的优先级;失败过1次的任务和失败过10次的任务应该具有不同的优先级。
这个其实比较容易实现,这里就讲一种比较简单的策略:没有执行过的任务、执行失败次数不同的任务使用不同频率进行扫描。例如没有失败过的任务每3s扫描一次;失败1-5次的每30s扫描一次,失败5-15次的每60s扫描一次。本质是通过将时间拉长来降低执行频率。
2) 任务的优先级
绝大多数场景中,初始条件下,任务是不分优先级的(或者说具有相同优先级);所以扫描db任务并执行的这个过程应该尽可能保证公平。一个任务执行失败了,就应该让出优先级,常见的做法例如:将此任务放到最后去执行。
有些场景会将多种业务的数据放在同一张表中,而不同的业务可能拥有不同的优先级。处理方式:
l 可以采用不同的频率扫描不同业务场景的数据来实现,即启动多个扫描任务,每个任务扫描不同的业务数据,根据每个任务启动时间来实现降级处理。
l 可以将不同优先级的任务放在不同的队列中处理。保证优先执行高优先级的任务就可以了。
l 根据任务执行的时间,调整任务的优先级。如果任务长时间没执行,根据业务的需要,提升任务的优先级。这个优点类似于线上故障,严重的故障,优先级要优先解决;比严重的故障,如果长时间不处理,优先级逐步提高,变成高优先级故障,也需要尽快解决。
相比较而言第一种方案的实施更加简单、使用场景更多广泛。
4. 优化:耗时任务执行处理
如果任务处理是一个很慢的操作,需要耗费大量的时间,将会严重的阻塞其他任务的执行。众多任务中,就算只有少量耗时多的任务,也可能因为这少量任务,让所有任务执行的实时性大打折扣。一般而言,出现这种情况时,会涌现各种稀奇古怪的问题。想象一下,聊天中你的好友推送了一个图片给你,过了10分钟以后,你才收到,你会是一种什么感受呢。
这里介绍几种处理方式:
l 根据业务场景,将耗时的任务和能快速处理的任务分开处理(例如通过不同的扫描程序,或者使用不同的消费队列),使他们之间无影响。
l 耗时任务异步化处理任务。
l 扫描任务和执行任务分开。
l 使用线程池异步处理。
这里介绍一下异步化处理的一种简单实现思路:在where循环中,判断当前线程池是否有接受任务的能力,如果没有,休眠一段时间;如果有,那么从db中查询出一批数据来处理。示例代码如下:
while (true) {
int acceptNum =threadPoolTaskExecutor.getMaxPoolSize() -threadPoolTaskExecutor.getActiveCount();
boolean acceptAble = acceptNum > 0;
if (!acceptAble) {
// 休眠一段时间,然后再检查是否有能接受任务并处理
try {
Thread.sleep(1000);
} catch (InterruptedExceptione) {
e.printStackTrace();
}
continue;
}
// to do task
final intpageNo = 0;
final intpageSize = acceptNum;
PageQueryquery = new PageQuery();
query.setPage(pageNo);
query.setPageSize(pageSize);
threadPoolTaskExecutor.execute(newRunnable() {
public void run() {
// 执行业务逻辑
}
});
5. 优化:集群环境下单机执行优化
线上环境应用一般为集群部署,因为这些服务器上代码相同,服务器时间一样,假设我们使用的是定时器,那么每次服务器上扫描程序的执行时间也几乎一样,此时服务器查询到的结果集也是一样的。很多时候这不是我们想看到的结果,因为我们需要面对两个问题:
1) 存在不必要的性能损失。
同一件事情做多次,会造成DB的性能损耗;服务器的性能损耗。
2) 对业务带来影响。
如果任务执行的代码中添加了幂等和防并发逻辑,最多损失一点性能,业务上不会有大的问题,但如果没有相关的逻辑,业务数据就可能出各种稀奇古怪的问题了。另外有很多场景下,是无法添加幂等和防并发逻辑的,例如向外发送短信、邮件无法添加幂等逻辑;如果业务上无法确定唯一标示字段,将无法防并发。
优化后的流程图如下:
注意:上述流程仅仅保证同一时间,只有一台服务器可以执行次扫描程序,并不能保证任务在同一时间仅能被执行一次。任务执行的时候也需要加分布式锁。
1) 分布式锁的实现:
原理:每次进行add/update时都带上version,执行add/update逻辑时先比较version是否相同,如果相同则更新,同时version都会自增;如果不同则放弃执行。
l 使用分布式的存储:
tair:向缓存中放一个带有版本号(固定)信息的key,如果两个服务器同时写入,必定有一个失败。
Redis:使用SETNX命令,如果key已经存在了,第二个请求就直接返回false。
示例代码:
private void doScan() {
// 添加分布式锁标示
boolean flagSuccess = cacheManager.flagScaning();
if (!flagSuccess) {
return null ;
}
try {
final int pageNo= 0;
final int pageSize= 20;
while (true) {
PageQuery query = new PageQuery();
query.setPage(pageNo);
query.setPageSize(pageSize);
List<Object> list= dao.pageQuery(query);
if ( CollectionUtils.isEmpty(list)){
// 没有记录后返回,此次扫描结束
break;
}
for (Object obj : list) {
// 执行任务
// 执行失败,修改失败次数,执行成功,修改状态
} } } finally { // 删除分布式锁标示 cacheManager.removeScaningFlag(); } }
l 具有选主功能的中间件。
Zookeeper:通过zookeeper的选主流程来实现。
6. 优化:集群环境下多机同时执行
线上一般多机部署,如果只让一台服务器运行扫描程序,是一件极其浪费性能的事情。上面之所以花费了大量功夫讨论,是因为很多场景单机扫描的性能已经足够了。但大部分场景不意味着能解决所有问题,有些场景对性能的要求高到单机无法满足,那这个时候应该如何优化?
1) 解决方案
比较简单的方式是,扫描数据的时候做好数据切片,即程序让每台服务器址查询部分的数据出来处理。有两种方式解决:
l 查询的时候让每台服务器只查询出其中的一部分数据,所有服务器查询的记录总和等于所有的记录就好了。
l 启动一个应用,统一管理扫db任务的工作,其他服务器都从这里取任务。(思路是将扫描任务和执行任务拆分成两个系统,每个系统只关注自己核心的内容。这种方式放在最后一部分在讨论)
2) 数据切片原理
扫描任务时,根据任务taskId %S = I将数据库中的记录均匀分开就可以了。(S表示服务器的数目,I表示每台服务器查询的一批记录索引标示)。下面介绍几种方案来确定S、I:
l 静态指定:服务器部署的时候,根据实际部署的机器数目确定S;为每台服务器指定查询的记录I,保证R处于[0,S)这个区间,并且不重复就可以了。这种方案的优点是极其简单;缺点也非常明显,如果服务器down掉了,有一批数据无法被处理掉。虽然服务期down掉是一个小概率事件,但是线上还是需要谨慎使用此种方式。
l 动态计算-心跳方式:服务器运行过程中,我们可以每隔1s向一个地方发送一个心跳,然后统计所以心跳的信息,就可以确定服务器总数目S;如果在发送心跳的时候,带上服务器ip,我们将所有ip排序,取ip对应的索引为I。动态计算的方式优点很明显,线上的机器扩容、挂掉均不需要做额外处理,系统自动检测并能保证所有记录均可以被处理掉。
l 动态计算-长链接方式:dubbo、tair、rocketmq集群均是使用这种方式来管理集群中服务器的信息的。具体做法可以参考他们的实现原理,这里不再累述。
3) 动态计算方式时注意问题:
l 服务器正常在运行中,如果因为外部原因,导致某次心跳没有发送出去,此时要处理好误判。一般如果1s发送一次,判断的时候发现30s(只是一个示例时间,具体的根据业务对时间的敏感度确定)都没有发送心跳过来,可以认为此服务器离线了。
l 如果某一台服务器离线了,或加了一台服务器,方案中的S、I都会变化,即之前数据切片会失效,结果就是同一个任务可能开始被A服务器查询到,后来被B服务器查询到。这种情况对于业务数据的处理来说,即为出现脏数据,需要做数据的清洗。
l 服务器频繁断掉、重连的性能影响。
心跳方式计算S、I的结构图:
服务器可以每次发送心跳的时候计算一次S、I,也可以通过接口定时取S、I。
五、 海量任务处理方案设计探讨
上面讨论了几种方案,对于海量数据或者数据量大到需要分库分表的场景就可能就不适合了。接下来就详细讨论一下海量数据场景下的任务扫描和处理方案。
1. 依赖中间件实现
1) 准实时性任务
可以将带处理任务发送到mq服务器,通过消费mq消息来处理这种任务。RocketMQ是通过文件存储消息的,所以其消息堆积能力非常强大,实时性也比较好,其他MQ服务不熟,使用的时候需要注意,避免消息堆积时将MQ服务器压垮。
2) 非准实时性任务
即存在根据时间来决定任务什么时候执行。此时也可使用RocketMQ提供的定时消息服务器,但因为使用commitlog存储消息,所以消息的查看很不方便,很难实时知道任务执行的失败次数、还有多少任务未执行、已执行的次数。如果修改这些任务,改动成本比较高。
当然也可使用MQ+DB的方式来实现,消费MQ消息,执行任务后,更新db中任务的状态、失败次数等信息(此时实现很复杂)。
2. 扫描DB实现
1) 结构图
Server维护任务队列,Consumer拉任务
Server维护任务队列,推数据到Consumer
中间件中维护任务队列
l方案
和前面的几种方案相比,最大的差别是将扫描db抽取成一个服务,任务执行抽取成另外一个服务。这样做开发的成本明显增大,但是好处也非常明显,因为功能内聚,扫描任务的集群可以做更多优化,例如性能、实时性、可复用性、任务重复执行、任务优先级。
根据设计方案的不同,Consumer可以通过RPC的方式从Server集群中获取带处理的任务;也可以通过让Server通过长链接的方式推送数据到Consumer。
l角色
DB集群中存放在处理的任务。
服务集群Server的主要任务就是扫描DB的所有任务。
Consumer从Server集群中获取任务并执行。
3. 任务队列的存储问题
1) 不存储,实时获取
consumer获取任务的时候,从db中扫描,如果有数据就返回。此方案对于单表的场景可能还可行;对于分多库多表的就不合适了。想象一下,假如有1024张表,如果前1000张表中没有需要处理的任务,后24张表中有需要处理的任务,出现的结果是需要扫描1000次以上才能取到需要处理的数据。一来性能低,二来后24张表的记录无法得到及时处理。另外这种方案下任务的降级、延迟策略实现比较麻烦。
2) 维护在内存队列中
开一个线程池,将扫描db得到的任务存入内存队列中;消费者取的时候从内存队列中取出数据。这种方案下,处理方式可以非常灵活:
l 扫描任务的处理方式
让线程池中的线程数和表的数量一致,这样每个线程扫描一张表的任务。
让线程池中的线程数和库的数量一致,这样每个线程处理一个库的任务。
根据任务的优先级,启动不同的线程池,分别处理。
集群环境中,服务器的数目不固定,可以采用动态计算服务器数目的方式来计算总数目。此时每台服务器负责一张表、一个库的部分表、部分库的任务。如果集群环境中服务器的数目固定,或者服务器的监控完善,能及时修复或者替换down掉的服务器,此时可以让服务器的数量与库、表的数量一致,这样不用考虑动态计算服务器数量的情况下服务器负责的数据偏移引起的问题,此时业务上可以将逻辑大量简化。
3) 维护在中间件中
所有数据存储在同一个队列中
不同优先级的任务存储在同一个队列中
4. 内存队列的处理方式
1) 数据入队列
l 一个服务器中为每一张表扫描的数据,创建一个队列。
l 每个服务器中创建一个队列。
l 每个服务器中不同优先级的任务分别创建队列,此时一张表可能对应多个队列,或者此服务器负责的所有表分别创建队列。
2) 从队列中取数据
其实这就是一个负载均衡算法。
l 轮训方式从每个队列中取数据,例如A、B、C三个队列,第一次请求从A中取,第二次请求就从B中取,第三次请求从C中取,第四次请求又从A中取,依次类推;如果一次请求中,从A中没取到数据,可以从B中取,以此类推,所有队列都没有数据(当然也可以设定最多从多少个队列中获取),则返回。
l 随机方式取其中一个队列的数据,如果有数据,则返回,如果没有可以再随机取下一个队列。可以设置随机次数的最大值,也可以在第一次随机取不到数据的时候,采用轮询算法。
5. 消费任务的方式:
1) 同一个服务器中:
启动一个线程池,直接从队列中取任务消费就可以了。
2) 跨服务器:
推任务给Consumer;这其实是一个监听者模式。可以参考消息服务器的推送方式。这部分比较复杂,以后有机会的话详细讨论推送的方案。
consumer拉取任务:consumer通过rpc调用,从中间服务器那边取数据就好了。
六、 其他重要优化
1. DB层面优化
可以经常备份扫描任务表中已经处理过任务的数据,保持扫描数据时数据量尽可能少,这样尽可能提升扫描db任务这部分线程的性能。此时需要注意碎片数据对查询性能的影响。
一般来说任务执行完了以后,不适合立即迁移数据,否则经常会遇到通过任务id到db中找不到记录的情况。
2. 队列优化
队列中存储的数据越简单越好,例如只存储任务id。
队列长度设定时做好评估,避免占用内存过大导致影响其他服务。
可是使用先进先出的队列来提高任务执行的公平性。
3. 任务入队列时的优化
之前一直有一个问题没有讨论,如果一个任务已经在任务队列中了,第二次扫描时又将此任务取出来了,此时再将任务加入到队列中无疑是多余的。解决方案如下:
任务加入队列前,通过分布式锁(一定要带上过期时间)判断是否应该将此任务加入队列中。此时最好是使用内存缓存。
在内存开辟一块空间来维护简单的缓存,注意一定要使用软引用来做,避免内存无法回收。
4. 任务出队列时的优化
这个和出队列的问题时一样的,处理方式也是一样的,不再累述。
5. 任务处理层面
任务处理时一定要加上防并发(任务的单机并发、任务的集群并发)、幂等逻辑。
任务处理成功可以在缓存中留一个标示,下次如果同一个任务在此到来,直接忽略掉。
6. 服务器数量大于扫描线程数的优化方案
如果服务器的数量大于扫描任务线程数,可以通过动态计算的方式,让每台服务器负责一个线程,并通过心跳、长链接维护服务器状态,当一个服务器down以后,让一个空余的服务器迅速顶上down的服务器。此时要特别注意集群中down掉服务器大于之前空余服务器数量的情况。
7. 数据切片发生变化时的优化方案
切片发生变化后,会涉及到一次数据迁移,此时尽可能降低数据的变动范围,具体可以参考一致性的处理方案和tair的数据迁移方案。