ElasticSearch 源码分析七 ES读取索引 Get操作

GET或者MGET,必须指定三元组 _index, _type,_id , 是根据_id从正排索引中获取内容。而search不指定_id, 根据关键词从倒排索引中获取内容。

ElasticSearch 源码分析七 ES读取索引 Get操作

ElasticSearch 源码分析七 ES读取索引 Get操作

程序入口TransportSingleShardAction的doExecute方法

ElasticSearch 源码分析七 ES读取索引 Get操作

可以看到是调用异步AsyncSingleAction

先看下异步action的构造函数

ElasticSearch 源码分析七 ES读取索引 Get操作

1.获取当前es的集群状态clusterState,然后获取nodes

2.检查当前全局是否读阻塞block,阻塞抛异常

3.根据请求解析出 要构建的索引的构造体concreteSingleIndex

4.再检查下,请求是否读阻塞,阻塞抛异常

5.根据请求获取对应的分片shard,和之前批量索引一样,还是根据Murmur3HashFunction进行计算

6.计算出目标shardId后,然后结合请求参数中指定的优先级和集群状态确定目标节点。由于分片可能存在多个副本,会算出来一个列表

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

再来看下start方法

ElasticSearch 源码分析七 ES读取索引 Get操作

1.如果没有命中分片,就在本地localNode执行,调用transportService.sendRequest发送请求

2.如果能找到分片,就调用perform

   2.1 首先检查分片的shardRouting是否为空,为空就封装异常,return

   2.2 然后继续根据shardRouting的当前nodeId获取出DiscoveryNode,如果node为空,还是执行onFailure

   2.3 继续从shardRouting获取出internalSharId

   2.4 调用transportService.sendRequest发送请求,并且注册了回调函数

ElasticSearch 源码分析七 ES读取索引 Get操作

  ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

继续看messageReceived

ElasticSearch 源码分析七 ES读取索引 Get操作

1.执行execute,本质是调用TransportGetAction的shardOperation,获取索引内容

ElasticSearch 源码分析七 ES读取索引 Get操作

2.然后获取对应的索引分片indexShard

3.从请求request判断是否要refresh,需要就调用InternalEngin进行刷新。

4.获取ShardGetService,调用get方法

ElasticSearch 源码分析七 ES读取索引 Get操作

5.调用innerGet获取    

ElasticSearch 源码分析七 ES读取索引 Get操作

  5.1 循环类型type,创建uidTerm,能够唯一地指定文档document

 5.2 调用indexShard.get获取,本质还是通过engine.get获取doc

 ElasticSearch 源码分析七 ES读取索引 Get操作

6.增加读锁

7.判断realtime,如果是实时的,从versionMap中获取versionValue

8. 然后refresh

9.调用getFromSearcher获取

  

当我们向ES发送请求的时候,我们发现es貌似可以在我们发请求的同时进行搜索。而这个实时建索引并可以被搜索的过程实际上是一次es 索引提交(commit)的过程,如果这个提交的过程直接将数据写入磁盘(fsync)必然会影响性能,所以es中设计了一种机制,即:先将index-buffer中文档(document)解析完成的segment写到filesystem cache之中,这样避免了比较损耗性能io操作,又可以使document可以被搜索。以上从index-buffer中取数据到filesystem cache中的过程叫做refresh。

refresh操作可以通过API设置:

POST /index/_settings

{“refresh_interval”: “10s”}

当我们进行大规模的创建索引操作的时候,最好将将refresh关闭。

POST /index/_settings

{“refresh_interval”: “-1″}

es默认的refresh间隔时间是1s,这也是为什么ES可以进行近乎实时的搜索。

 

ElasticSearch 源码分析七 ES读取索引 Get操作

10.获取searcher,并且获取docIdAndVersion,校验版本不能冲突(要一致)

11.查询后,返回结果GetResult

 

flush操作与translog

    我们可能已经意识到如果数据在filesystem cache之中是很有可能在意外的故障中丢失。这个时候就需要一种机制,可以将对es的操作记录下来,来确保当出现故障的时候,保留在filesystem的数据不会丢失,并在重启的时候可以从这个记录中将数据恢复过来。elasticsearch提供了translog来记录这些操作。

    当向elasticsearch发送创建document索引请求的时候,document数据会先进入到index buffer之后,与此同时会将操作记录在translog之中,当发生refresh时(数据从index buffer中进入filesystem cache的过程)translog中的操作记录并不会被清除,而是当数据从filesystem cache中被写入磁盘之后才会将translog中清空。而从filesystem cache写入磁盘的过程就是flush

ElasticSearch 源码分析七 ES读取索引 Get操作

总结一下translog的功能:

1.保证在filesystem cache中的数据不会因为elasticsearch重启或是发生意外故障的时候丢失。

2.当系统重启时会从translog中恢复之前记录的操作。

3.当对elasticsearch进行CRUD操作的时候,会先到translog之中进行查找,因为tranlog之中保存的是最新的数据。

4.translog的清除时间时进行flush操作之后(将数据从filesystem cache刷入disk之中)。

 

再总结一下flush操作的时间点:

1.es的各个shard会每个30分钟进行一次flush操作。

2.当translog的数据达到某个上限的时候会进行一次flush操作。

 

有关于translog和flush的一些配置项:

index.translog.flush_threshold_ops:当发生多少次操作时进行一次flush。默认是 unlimited。

index.translog.flush_threshold_size:当translog的大小达到此值时会进行一次flush操作。默认是512mb。

index.translog.flush_threshold_period:在指定的时间间隔内如果没有进行flush操作,会进行一次强制flush操作。默认是30m。

index.translog.interval:多少时间间隔内会检查一次translog,来进行一次flush操作。es会随机的在这个值到这个值的2倍大小之间进行一次操作,默认是5s。