Flink之 SavePoint & Checkpoint
SavePoint & Checkpoint
savepoint和checkpoint都是flink为容错提供的强大功能特性,能够自动或手动保存job的运行状态
两者区别
checkpoint:
应用定时触发,用户保存状态,会过期,内部应用失败重启的时候启用,但是手动cancel时,会删除之前的checkpoint
savepoint:
用户手动置顶,相当于状态的备份,可以在bin/flink cancel xx的时候调用,一般用于修改并行度,程序升级等等。
如果想要在程序停掉后重启,数据一致的情况下,强烈推荐使用cancel时做savepoint,当然如果程序中没有涉及到state,可以不用
savepoint的使用
一、在配置中增加备份目录路径
在flink-conf.yaml中配置:state.savepoints.dir: hdfs://namenode:9000/flink/savepoints
,这个是必须指定的
二、触发savepoint
触发有2种方式:
1)直接触发===>bin/flink savepoint jobId [targetDirectory] [-yid yarnAppId] 针对yarn模式需要指定 -yid参数
2)cancel/stop时触发===> bin/flink cancel/stop -s [targetDirectory] jobId [-yid yarnAppId]。针对yarn模式需要指定 -yid参数
Checkpoint解读
在Flink分布式计算中,容错往往是值得关注的点,Flink本身通过Checkpoint操作进行快照处理,保证job执行的稳定性以及Failover的可靠性。checkpoint不会打断stream、data-flow。
在Checkpoint中有个很关键的要素:barriers
-
barrier将stream中的record划分不同的checkpoint段,barrier不会超过record,严格排在数据的后面
-
每个barrier都会携带相应的ID,来区分相应的快照snapshot(checkpoint)
-
不同checkpoint的barrier可以在stream中并行传输,意味着相同时间可以并行执行快照操作
关键字:
input buffer
NOTE:当算子还没收到所有input streams中的 barrier-n时 ,属于checkpoint n+1的数据是不会处理的,会放进inputbuffer中进行缓存,等所有checkpoint n的barrier n都接受到以后,checkpint n才算complete,算子会向output streams中emit barrier n,此时处理并放行属于 checkpoint n+1的records