学习Elasticsearch的惊恐日子(下)

接上一篇:学习Elasticsearch的惊恐日子(上)

放弃学习会吃不起茶叶蛋的。

学习Elasticsearch的惊恐日子(下)

4 数据篇

通过调用 Es java api 后与 Es 服务交互,Es 将数据散布到多个物理 Lucene 索引上,这些 Lucene 索引称为分片,es 默认是5个分片,如果是集群状态分片将会分配到多个节点上。

4.1 添加数据

AddData.java

import java.net.InetAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;

public class AddData {

    public static void main(String[] args) {
        //调用添加数据方法
        add();
    }

    /**
     * 添加索引数据
     */
    public static void add(){
        //索引服务的地址
        String elasticServer= "127.0.0.1"; 
        //索引服务的端口
        Integer elasticServerPort = 9300; 
        Client client=null;
        try{
            //初始化连接
            Settings settings = Settings.settingsBuilder().build();
            client = TransportClient.builder().settings(settings).build()
                        .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(elasticServer), elasticServerPort));
            BulkRequestBuilder bulkRequest = client.prepareBulk(); 
            //设置字段的值
             XContentBuilder docBuilder = XContentFactory.jsonBuilder().startObject();
             docBuilder.field("name", "张中国");
            docBuilder.field("nickname", "张中");
            docBuilder.field("nativeplace", "上海静安寺");
            docBuilder.field("address", "上海静安寺1街坊10栋");
            docBuilder.field("birthdate", "1980-02-14");
            //添加索引数据并且设置id为1
             bulkRequest.add(client.prepareIndex("user", "userInfo", "1") 
                      .setSource(docBuilder)); 
            BulkResponse bulkResponse = bulkRequest.execute().actionGet(); 
            System.out.println(bulkResponse.hasFailures());
            //判断添加是否成功
            if (bulkResponse.hasFailures()) { 
               System.out.println("error!!!");
            } 
            //关闭连接
            client.close();
         }catch (Exception e) {
                e.printStackTrace();
         }
         System.out.println("do end!!");
    }
}

访问head查看创建的数据

学习Elasticsearch的惊恐日子(下)

4.2 批量添加数据

 /**
     * 批量添加索引数据
     */
   public static void (){
        //初始化数据
        List dataList = new ArrayList();
        Map dataMap1=new HashMap();
        dataMap1.put("name","张中国1");
        dataMap1.put("nickname", "张中1");
        dataMap1.put("nativeplace", "上海静安寺1");
        dataMap1.put("address", "上海静安寺1街坊10栋1");
        dataMap1.put("birthdate", "1980-02-15");

        Map dataMap2=new HashMap();
        dataMap2.put("name","张中国2");
        dataMap2.put("nickname", "张中2");
        dataMap2.put("nativeplace", "上海静安寺2");
        dataMap2.put("address", "上海静安寺1街坊10栋2");
        dataMap2.put("birthdate", "1980-02-16");
        dataList.add(dataMap1);
        dataList.add(dataMap2);
        //索引服务的地址
        String elasticServer= "127.0.0.1"; 
        //索引服务的端口
        Integer elasticServerPort = 9300; 
        Client client=null;
        try{
            //初始化连接
            Settings settings = Settings.settingsBuilder().build();
            client = TransportClient.builder().settings(settings).build()
                     .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(elasticServer), elasticServerPort));
            BulkRequestBuilder bulkRequest = client.prepareBulk(); 
            int y=dataList.size();
            //添加数据
            for(int i=0;i<y;i++){
                Map<String, Object> m = (Map)dataList.get(i);
                bulkRequest.add(client.prepareIndex("user", "userInfo", i+2+"") 
                     .setSource(m));
                if (i % 10000 == 0) {
                    bulkRequest.execute().actionGet(); 
                }
            }
            bulkRequest.execute().actionGet(); 
            //关闭连接
            client.close();
        }catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("do end!!");
    }

代码中模拟实际项目构造多条list数据,list中放map类型的数据,并通过循环批量添加并且每1万条数据添加一次。

查看数据添加情况

学习Elasticsearch的惊恐日子(下)

4.3 搜索数据

不了解 elasticsearch 的人都以为 Es 只能进行模糊搜索,其实不仅能进行模糊搜索还能进行全匹配搜索。在集群模式elasticsearch的搜索把多个节点的数据汇集到一个节点最终显示给调用端,索引数据必须指定_index(database)及index下面的type(table)。

4.3.1 查询全部数据

QueryData.java

import java.net.InetAddress;

import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.MatchQueryBuilder;
import org.elasticsearch.index.query.MatchQueryBuilder.Operator;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHits;

public class QueryData1 {
    /**
     * @param args
     */
    public static void main(String[] args) {
        //调用搜索全部数据方法
        searchAll();
    }
    /**
     * 搜索全部数据
     */
    public static void searchAll(){
        //索引服务的地址
        String elasticServer= "127.0.0.1"; 
        //索引服务的端口
        Integer elasticServerPort = 9300; 
        Client client=null;
        try{
            //初始化连接
            Settings settings = Settings.settingsBuilder().build();
            client = TransportClient.builder().settings(settings).build()
                     .addTransportAddress(new InetSocketTransportAddress(
                             InetAddress.getByName(elasticServer), elasticServerPort));
            //搜索全部数据
            QueryBuilder  bqb=QueryBuilders.matchAllQuery();
            System.out.println(bqb.toString()+"====================");
            //代表查询user索引的userInfo
            //type(一个index索引下可以有多个type,所以要指定某个type)
            SearchResponse response = client.prepareSearch("user").setTypes("userInfo") 
            //setQuery(bqb.toString()) 代表设置查询的条件
                    .setQuery(bqb.toString()) 
                    .execute() 
                    .actionGet();
            SearchHits hits = response.getHits();
            System.out.println(hits.getTotalHits() +" "+ hits.getHits().length);
            for (int i = 0; i < hits.getHits().length; i++) {
                System.out.println("===searchAll()====="+hits.getAt(i).getId()+"-------"
                        + "------"+hits.getAt(i).getSource());
            }
            //关闭连接
            client.close();
        }catch (Exception e) {
            e.printStackTrace();
        }
    }
}

4.3.2 查询部分数据

   /**
     * 搜索指定数量数据
     */
    public static void searchSize(){
        //索引服务的地址
        String elasticServer= "127.0.0.1";
        //索引服务的端口
        Integer elasticServerPort = 9300;
        Client client=null;
        try{
            //初始化连接
            Settings settings = Settings.settingsBuilder().build();
            client = TransportClient.builder().settings(settings).build()
                     .addTransportAddress(new InetSocketTransportAddress(
                             InetAddress.getByName(elasticServer), elasticServerPort));
            //搜索数据
            QueryBuilder  bqb=QueryBuilders.matchAllQuery();
            SearchResponse response = client.prepareSearch("user").setTypes("userInfo")
                    .setQuery(bqb.toString()) 
                    //setFrom(0).setSize(2) 代表从开始查询返回2条数据
                    .setFrom(0).setSize(2)//设置条数
                    .execute() 
                    .actionGet();
            SearchHits hits = response.getHits();
            System.out.println(hits.getTotalHits() +" "+ hits.getHits().length);
            //打印搜索结果
            for (int i = 0; i < hits.getHits().length; i++) {
                System.out.println("====searchSize()==="+hits.getAt(i).getId()+"------"
                        + "-------"+hits.getAt(i).getSource());
            }
            //关闭连接
            client.close();
        }catch (Exception e) {
            e.printStackTrace();
        }
    }

4.3.3 模糊搜索数据

   /**
     * 模糊搜索索引数据
     */
    public static void searchMatchQuery(){
        //索引服务的地址
        String elasticServer= "127.0.0.1"; 
        //索引服务的端口
        Integer elasticServerPort = 9300; 
        Client client=null;
        try{
            //初始化连接
            Settings settings = Settings.settingsBuilder().build();
            client = TransportClient.builder().settings(settings).build()
                     .addTransportAddress(new InetSocketTransportAddress(
                             InetAddress.getByName(elasticServer), elasticServerPort));
            //设置查询条件
            BoolQueryBuilder bqb=QueryBuilders.boolQuery();
            float BOOST = (float) 1.2;
            MatchQueryBuilder titleSearchBuilder = QueryBuilders.matchQuery("name", "张中国");
            titleSearchBuilder.boost(BOOST);
            titleSearchBuilder.operator(Operator.AND);
            bqb.must(titleSearchBuilder);
            //模糊搜索数据
            SearchResponse response = client.prepareSearch("user").setTypes("userInfo")
                    .setQuery(bqb.toString()) 
                    .setFrom(0).setSize(60).setExplain(true) //setExplain 按查询匹配度排序
                    .execute() 
                    .actionGet();
            SearchHits hits = response.getHits();
            System.out.println(hits.getTotalHits() +" "+ hits.getHits().length);
            //打印搜索结果
            for (int i = 0; i < hits.getHits().length; i++) {
                System.out.println("===searchMatchQuery()==="+hits.getAt(i).getId()+"--" +
                        "-----------"+hits.getAt(i).getSource());
            }
            //关闭连接
            client.close();
        }catch (Exception e) {
            e.printStackTrace();
        }

    }
}

4.4 修改数据

4.4.1 elasticsearch 修改的底层逻辑

  • elasticsearch 修改在内部,修改标记旧文档为删除(增加了一个version版本号)并添加了一个完整的新文档。旧版本文档不会立即消失,但你也不能去访问它。
  • elasticsearch 会在你继续索引更多数据时清理被删除的文档,或者进行强制清除命令把这种过期的旧数据删除掉。因为这种过期旧数据太多在一定程度上将会影响查询效率。
  • 有时候我们修改的内容非常多,达到索引的 80% 上时,会考虑索引重建即 reindex,索引重建有点相当于删除表重新添加数据的感觉。

打开浏览器访问http://127.0.0.1:9200/_plugin/head,查看我们前面创建的userInfo,查看version,为1

学习Elasticsearch的惊恐日子(下)

import java.net.InetAddress;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;

public class UpdateData {

    /**
     * @param args
     */
    public static void main(String[] args) {
         updateEs();
    }

    /**
     * 更新索引
     */
    public static void updateEs(){
        //索引服务的地址
        String elasticServer= "127.0.0.1";
        //索引服务的端口
        Integer elasticServerPort = 9300; 
        Client client=null;
        try{
            //初始化连接
            Settings settings = Settings.settingsBuilder().build();
            client = TransportClient.builder().settings(settings).build()
                     .addTransportAddress(new InetSocketTransportAddress(
                             InetAddress.getByName(elasticServer), elasticServerPort));
            SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd"); 
            //设置更新的字段
            XContentBuilder jsonBuilder = XContentFactory.jsonBuilder()  
                    .startObject()  
                    .field("name","王美丽")  
                    .field("nickname","美丽")  
                    .field("birthdate",df.format(new Date()))  
                    .endObject(); 
            //更新为1的字段数据
            UpdateResponse response=client.prepareUpdate("user","userInfo","1")  
                    .setDoc(jsonBuilder)  
                    .get();
            String _index = response.getIndex();  
            String _type = response.getType();  
            String _id = response.getId();  
            long _version = response.getVersion();  
            boolean created = response.isCreated();  
            System.out.println(_index+" "+_type+" "+_id+" "+_version+" "+created);
            //关闭连接
            client.close();
        }catch (Exception e) {
            e.printStackTrace();
        }
    }
}

运行后查看version,为2

学习Elasticsearch的惊恐日子(下)

4.4.2 elasticsearch修改多条数据

/**
     * 批量更新索引数据
     */
    public static void batchUpdateEs(){
        String elasticServer= "127.0.0.1"; 
        Integer elasticServerPort = 9300; 
        Client client=null;
        try{
            Settings settings = Settings.settingsBuilder().build();
            client = TransportClient.builder().settings(settings).build()
                     .addTransportAddress(new InetSocketTransportAddress(
                             InetAddress.getByName(elasticServer), elasticServerPort));
            //初始化数据                 
            SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd");  
            List dataList = new ArrayList();
            Map dataMap1=new HashMap();
            dataMap1.put("name","张中国");
            dataMap1.put("nickname", "张中");
            dataMap1.put("nativeplace", "上海静安寺");
            dataMap1.put("address", "上海静安寺1街坊10栋");
            dataMap1.put("birthdate", "1980-02-15");
            dataMap1.put("id", "2");
            Map dataMap2=new HashMap();
            dataMap2.put("name","张三丰");
            dataMap2.put("nickname", "张三");
            dataMap2.put("nativeplace", "上海静安寺");
            dataMap2.put("address", "上海静安寺1街坊10栋");
            dataMap2.put("birthdate", "2018-01-23");
            dataMap2.put("id", "3");
            dataList.add(dataMap1);
            dataList.add(dataMap2);
            BulkRequestBuilder bulkRequest = client.prepareBulk(); 
            int y=dataList.size();
            //执行批量更新
            for(int i=0;i<y;i++){
                Map<String, Object> m =(Map<String, Object>) dataList.get(i);
                XContentBuilder jsonBuilder = XContentFactory.jsonBuilder()  
                        .startObject()
                        .field("name",m.get("name").toString())
                        .field("nickname",m.get("nickname").toString())
                        .field("nativeplace",m.get("nativeplace").toString())
                        .field("address",m.get("address").toString())
                        .field("birthdate",m.get("birthdate").toString())
                        .endObject();
                bulkRequest.add(client.prepareUpdate("user", "userInfo",m.get("id").toString()) 
                     .setDoc(jsonBuilder));
                if (i % 10000 == 0) {
                    bulkRequest.execute().actionGet(); 
                }
            }
            bulkRequest.execute().actionGet(); 
            //关闭连接
            client.close();
        }catch (Exception e) {
            e.printStackTrace();
        }
    }

运行,打开 head 查看 version 及数据变化(实际项目我们一般不会关注 version,这里只是让大家知道它底层没更新一次 version 都在变化) 

学习Elasticsearch的惊恐日子(下)

4.5 删除数据

4.5.1 elasticsearch 删除的底层逻辑

  • elasticsearch 删除在内部,删除文档的语法模式与之前基本一致,只不过要使用DELETE方法:DELETE /user/info/1 如果文档被找到,Elasticsearch将返回200 OK状态码和以下响应体。注意_version数字已经增加了。删除一个文档也不会立即从磁盘上移除,它只是被标记成已删除。Elasticsearch将会在你之后添加更多索引的时候才会在后台进行删除内容的清理。
  • 有时候我们删除的内容非常多,达到索引的80%上时,会考虑索引重建即 reindex,这时直接删除index是非常快的,类似于删除表的操作。

4.5.2 删除单条数据

import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;

import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;

public class DeleteData {

    public static void main(String args[]){
        deleteEs();
    }
    /**
     * 删除索引数据
     */
    public static void deleteEs(){
        //索引服务的地址
        String elasticServer= "127.0.0.1";
        //索引服务的端口
        Integer elasticServerPort = 9300;
        Client client=null;
        try{
            //初始化连接
            Settings settings = Settings.settingsBuilder().build();
            client = TransportClient.builder().settings(settings).build()
                     .addTransportAddress(new InetSocketTransportAddress(
                             InetAddress.getByName(elasticServer), elasticServerPort));
            //删除为1的索引数据
            client.prepareDelete("user","userInfo","1").execute().actionGet();
            //关闭连接
            client.close();
        }catch (Exception e) {
            e.printStackTrace();
        }
    }
}

 

4.5.3 删除多条数据

法一:deleteEsBulk

 /**
     * 批量删除索引数据
     */
    public static void deleteEsBulk(){
        //索引服务的地址
        String elasticServer= "127.0.0.1"; 
        //索引服务的端口
        Integer elasticServerPort = 9300; 
        Client client=null;
        try{
            //初始化连接
            Settings settings = Settings.settingsBuilder().build();
            client = TransportClient.builder().settings(settings).build()
                     .addTransportAddress(new InetSocketTransportAddress(
                             InetAddress.getByName(elasticServer), elasticServerPort));
            BulkRequestBuilder bulkRequest = client.prepareBulk();  
            //设置删除的索引的数据id
            bulkRequest.add(client.prepareDelete("user", "userInfo", "2")  );  
            bulkRequest.add(client.prepareDelete("user", "userInfo", "3")  ); 
            BulkResponse bulkResponse = bulkRequest.get();  
            //判断执行是否成功
            if(bulkResponse.hasFailures()){  
                System.out.println("bulk error:"+bulkResponse.buildFailureMessage());  
            }
            bulkRequest.get();
            //关闭连接
            client.close();
        }catch (Exception e) {
            e.printStackTrace();
        }
    }

法二:deleteEsBulkList (推荐:删除数据由 list 指定比较随意

   /**
     * 批量删除索引数据
     */
    public static void deleteEsBulkList(){
        //索引服务的地址
        String elasticServer= "127.0.0.1"; 
        //索引服务的端口
        Integer elasticServerPort = 9300; 
        Client client=null;
        try{
            //初始化连接
            Settings settings = Settings.settingsBuilder().build();
            client = TransportClient.builder().settings(settings).build()
                     .addTransportAddress(new InetSocketTransportAddress(
                             InetAddress.getByName(elasticServer), elasticServerPort));
            BulkRequestBuilder bulkRequest = client.prepareBulk();  
            //删除数据的id
            List dataList= new ArrayList();
            dataList.add("2");
            dataList.add("3");
            //执行删除
            for(int i=0;i<dataList.size();i++){
                bulkRequest.add(client.prepareDelete("user", "userInfo",dataList.get(i).toString()));
                if (i % 200 == 0) {
                    bulkRequest.get();
                }
            }
            bulkRequest.get();
            //关闭连接
            client.close();
        }catch (Exception e) {
            e.printStackTrace();
        }
    }

5 Spring Boot集成ES