ElasticSearch 源码分析六 服务端 批量索引(2)

调用TransportShardBulkAction的execute方法执行索引操作

ElasticSearch 源码分析六 服务端 批量索引(2)

1.调用executeIndexRequestOnPrimary,在主分片上面进行索引

2.继续调用primary.applyIndexOperationOnPrimary执行写主分片

3.调用applyIndexOperation执行写索引

    3.1 校验写的版本号version

    3.2 校验当前主分片可写: 校验分片必须是主分片,校验分片状态必须是恢复中 或恢复完毕,或者已启动。

    3.3 校验如果是恢复分片:当前分片状态必须是恢复中

    3.4 复制场景:校验分片状态必须是恢复中,已恢复,已启动,已迁移

4.调用prepareIndex,进行索引前准备工作

ElasticSearch 源码分析六 服务端 批量索引(2)

   4.1 根据传入的source,解析出doc(解析出token),详细过程见DocumentParser

   4.2 判断一下索引的版本,如果是ES 6.0及以后,就使用encode对doc的id进行编码。之前的索引版本不用

 5. 继续调用index(getEngine(), operation); 进行操作

    5.1 getEngine()获取引擎

    5.2 调用engine.index(index)方法

   ElasticSearch 源码分析六 服务端 批量索引(2)

5.3 使用锁

    (1) 验证engine是打开的

    (2)  验证sequenceNumber

    (3)   记录索引开始时间 lastWriteNanos

    (4)   根据入参index,来获取索引策略plan,optimizedAppendOnly

    (5)   调用 indexResult = indexIntoLucene(index, plan); 

    (6)  先生成Sequence Number和version,前者每次增加1,version根据当前doc最大版本增加1

          核心是调用lucene的indexWriter.addDocument进行索引生成

6.写入索引成功后,再写入事务日志translog

7.对事务日志translog进行刷盘操作,定时或者立即刷盘

8.开始写副分片,也是同样的写入流程。会把写入结果返回给主节点

 

写入出现异常,会关闭engine,上报给Master,调用maybeFailEngine