ElasticSearch 源码分析六 服务端 批量索引(2)
调用TransportShardBulkAction的execute方法执行索引操作
1.调用executeIndexRequestOnPrimary,在主分片上面进行索引
2.继续调用primary.applyIndexOperationOnPrimary执行写主分片
3.调用applyIndexOperation执行写索引
3.1 校验写的版本号version
3.2 校验当前主分片可写: 校验分片必须是主分片,校验分片状态必须是恢复中 或恢复完毕,或者已启动。
3.3 校验如果是恢复分片:当前分片状态必须是恢复中
3.4 复制场景:校验分片状态必须是恢复中,已恢复,已启动,已迁移
4.调用prepareIndex,进行索引前准备工作
4.1 根据传入的source,解析出doc(解析出token),详细过程见DocumentParser
4.2 判断一下索引的版本,如果是ES 6.0及以后,就使用encode对doc的id进行编码。之前的索引版本不用
5. 继续调用index(getEngine(), operation); 进行操作
5.1 getEngine()获取引擎
5.2 调用engine.index(index)方法
5.3 使用锁
(1) 验证engine是打开的
(2) 验证sequenceNumber
(3) 记录索引开始时间 lastWriteNanos
(4) 根据入参index,来获取索引策略plan,optimizedAppendOnly
(5) 调用 indexResult = indexIntoLucene(index, plan);
(6) 先生成Sequence Number和version,前者每次增加1,version根据当前doc最大版本增加1
核心是调用lucene的indexWriter.addDocument进行索引生成
6.写入索引成功后,再写入事务日志translog
7.对事务日志translog进行刷盘操作,定时或者立即刷盘
8.开始写副分片,也是同样的写入流程。会把写入结果返回给主节点
写入出现异常,会关闭engine,上报给Master,调用maybeFailEngine