学习Elasticsearch的惊恐日子(下)
放弃学习会吃不起茶叶蛋的。
4 数据篇
通过调用 Es java api 后与 Es 服务交互,Es 将数据散布到多个物理 Lucene 索引上,这些 Lucene 索引称为分片,es 默认是5个分片,如果是集群状态分片将会分配到多个节点上。
4.1 添加数据
AddData.java
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
public class AddData {
public static void main(String[] args) {
//调用添加数据方法
add();
}
/**
* 添加索引数据
*/
public static void add(){
//索引服务的地址
String elasticServer= "127.0.0.1";
//索引服务的端口
Integer elasticServerPort = 9300;
Client client=null;
try{
//初始化连接
Settings settings = Settings.settingsBuilder().build();
client = TransportClient.builder().settings(settings).build()
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(elasticServer), elasticServerPort));
BulkRequestBuilder bulkRequest = client.prepareBulk();
//设置字段的值
XContentBuilder docBuilder = XContentFactory.jsonBuilder().startObject();
docBuilder.field("name", "张中国");
docBuilder.field("nickname", "张中");
docBuilder.field("nativeplace", "上海静安寺");
docBuilder.field("address", "上海静安寺1街坊10栋");
docBuilder.field("birthdate", "1980-02-14");
//添加索引数据并且设置id为1
bulkRequest.add(client.prepareIndex("user", "userInfo", "1")
.setSource(docBuilder));
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
System.out.println(bulkResponse.hasFailures());
//判断添加是否成功
if (bulkResponse.hasFailures()) {
System.out.println("error!!!");
}
//关闭连接
client.close();
}catch (Exception e) {
e.printStackTrace();
}
System.out.println("do end!!");
}
}
访问head查看创建的数据
4.2 批量添加数据
/**
* 批量添加索引数据
*/
public static void (){
//初始化数据
List dataList = new ArrayList();
Map dataMap1=new HashMap();
dataMap1.put("name","张中国1");
dataMap1.put("nickname", "张中1");
dataMap1.put("nativeplace", "上海静安寺1");
dataMap1.put("address", "上海静安寺1街坊10栋1");
dataMap1.put("birthdate", "1980-02-15");
Map dataMap2=new HashMap();
dataMap2.put("name","张中国2");
dataMap2.put("nickname", "张中2");
dataMap2.put("nativeplace", "上海静安寺2");
dataMap2.put("address", "上海静安寺1街坊10栋2");
dataMap2.put("birthdate", "1980-02-16");
dataList.add(dataMap1);
dataList.add(dataMap2);
//索引服务的地址
String elasticServer= "127.0.0.1";
//索引服务的端口
Integer elasticServerPort = 9300;
Client client=null;
try{
//初始化连接
Settings settings = Settings.settingsBuilder().build();
client = TransportClient.builder().settings(settings).build()
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(elasticServer), elasticServerPort));
BulkRequestBuilder bulkRequest = client.prepareBulk();
int y=dataList.size();
//添加数据
for(int i=0;i<y;i++){
Map<String, Object> m = (Map)dataList.get(i);
bulkRequest.add(client.prepareIndex("user", "userInfo", i+2+"")
.setSource(m));
if (i % 10000 == 0) {
bulkRequest.execute().actionGet();
}
}
bulkRequest.execute().actionGet();
//关闭连接
client.close();
}catch (Exception e) {
e.printStackTrace();
}
System.out.println("do end!!");
}
代码中模拟实际项目构造多条list数据,list中放map类型的数据,并通过循环批量添加并且每1万条数据添加一次。
查看数据添加情况
4.3 搜索数据
不了解 elasticsearch 的人都以为 Es 只能进行模糊搜索,其实不仅能进行模糊搜索还能进行全匹配搜索。在集群模式elasticsearch的搜索把多个节点的数据汇集到一个节点最终显示给调用端,索引数据必须指定_index(database)及index下面的type(table)。
4.3.1 查询全部数据
QueryData.java
import java.net.InetAddress;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.MatchQueryBuilder;
import org.elasticsearch.index.query.MatchQueryBuilder.Operator;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHits;
public class QueryData1 {
/**
* @param args
*/
public static void main(String[] args) {
//调用搜索全部数据方法
searchAll();
}
/**
* 搜索全部数据
*/
public static void searchAll(){
//索引服务的地址
String elasticServer= "127.0.0.1";
//索引服务的端口
Integer elasticServerPort = 9300;
Client client=null;
try{
//初始化连接
Settings settings = Settings.settingsBuilder().build();
client = TransportClient.builder().settings(settings).build()
.addTransportAddress(new InetSocketTransportAddress(
InetAddress.getByName(elasticServer), elasticServerPort));
//搜索全部数据
QueryBuilder bqb=QueryBuilders.matchAllQuery();
System.out.println(bqb.toString()+"====================");
//代表查询user索引的userInfo
//type(一个index索引下可以有多个type,所以要指定某个type)
SearchResponse response = client.prepareSearch("user").setTypes("userInfo")
//setQuery(bqb.toString()) 代表设置查询的条件
.setQuery(bqb.toString())
.execute()
.actionGet();
SearchHits hits = response.getHits();
System.out.println(hits.getTotalHits() +" "+ hits.getHits().length);
for (int i = 0; i < hits.getHits().length; i++) {
System.out.println("===searchAll()====="+hits.getAt(i).getId()+"-------"
+ "------"+hits.getAt(i).getSource());
}
//关闭连接
client.close();
}catch (Exception e) {
e.printStackTrace();
}
}
}
4.3.2 查询部分数据
/**
* 搜索指定数量数据
*/
public static void searchSize(){
//索引服务的地址
String elasticServer= "127.0.0.1";
//索引服务的端口
Integer elasticServerPort = 9300;
Client client=null;
try{
//初始化连接
Settings settings = Settings.settingsBuilder().build();
client = TransportClient.builder().settings(settings).build()
.addTransportAddress(new InetSocketTransportAddress(
InetAddress.getByName(elasticServer), elasticServerPort));
//搜索数据
QueryBuilder bqb=QueryBuilders.matchAllQuery();
SearchResponse response = client.prepareSearch("user").setTypes("userInfo")
.setQuery(bqb.toString())
//setFrom(0).setSize(2) 代表从开始查询返回2条数据
.setFrom(0).setSize(2)//设置条数
.execute()
.actionGet();
SearchHits hits = response.getHits();
System.out.println(hits.getTotalHits() +" "+ hits.getHits().length);
//打印搜索结果
for (int i = 0; i < hits.getHits().length; i++) {
System.out.println("====searchSize()==="+hits.getAt(i).getId()+"------"
+ "-------"+hits.getAt(i).getSource());
}
//关闭连接
client.close();
}catch (Exception e) {
e.printStackTrace();
}
}
4.3.3 模糊搜索数据
/**
* 模糊搜索索引数据
*/
public static void searchMatchQuery(){
//索引服务的地址
String elasticServer= "127.0.0.1";
//索引服务的端口
Integer elasticServerPort = 9300;
Client client=null;
try{
//初始化连接
Settings settings = Settings.settingsBuilder().build();
client = TransportClient.builder().settings(settings).build()
.addTransportAddress(new InetSocketTransportAddress(
InetAddress.getByName(elasticServer), elasticServerPort));
//设置查询条件
BoolQueryBuilder bqb=QueryBuilders.boolQuery();
float BOOST = (float) 1.2;
MatchQueryBuilder titleSearchBuilder = QueryBuilders.matchQuery("name", "张中国");
titleSearchBuilder.boost(BOOST);
titleSearchBuilder.operator(Operator.AND);
bqb.must(titleSearchBuilder);
//模糊搜索数据
SearchResponse response = client.prepareSearch("user").setTypes("userInfo")
.setQuery(bqb.toString())
.setFrom(0).setSize(60).setExplain(true) //setExplain 按查询匹配度排序
.execute()
.actionGet();
SearchHits hits = response.getHits();
System.out.println(hits.getTotalHits() +" "+ hits.getHits().length);
//打印搜索结果
for (int i = 0; i < hits.getHits().length; i++) {
System.out.println("===searchMatchQuery()==="+hits.getAt(i).getId()+"--" +
"-----------"+hits.getAt(i).getSource());
}
//关闭连接
client.close();
}catch (Exception e) {
e.printStackTrace();
}
}
}
4.4 修改数据
4.4.1 elasticsearch 修改的底层逻辑
- elasticsearch 修改在内部,修改标记旧文档为删除(增加了一个version版本号)并添加了一个完整的新文档。旧版本文档不会立即消失,但你也不能去访问它。
- elasticsearch 会在你继续索引更多数据时清理被删除的文档,或者进行强制清除命令把这种过期的旧数据删除掉。因为这种过期旧数据太多在一定程度上将会影响查询效率。
- 有时候我们修改的内容非常多,达到索引的 80% 上时,会考虑索引重建即 reindex,索引重建有点相当于删除表重新添加数据的感觉。
打开浏览器访问http://127.0.0.1:9200/_plugin/head,查看我们前面创建的userInfo,查看version,为1
import java.net.InetAddress;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
public class UpdateData {
/**
* @param args
*/
public static void main(String[] args) {
updateEs();
}
/**
* 更新索引
*/
public static void updateEs(){
//索引服务的地址
String elasticServer= "127.0.0.1";
//索引服务的端口
Integer elasticServerPort = 9300;
Client client=null;
try{
//初始化连接
Settings settings = Settings.settingsBuilder().build();
client = TransportClient.builder().settings(settings).build()
.addTransportAddress(new InetSocketTransportAddress(
InetAddress.getByName(elasticServer), elasticServerPort));
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd");
//设置更新的字段
XContentBuilder jsonBuilder = XContentFactory.jsonBuilder()
.startObject()
.field("name","王美丽")
.field("nickname","美丽")
.field("birthdate",df.format(new Date()))
.endObject();
//更新为1的字段数据
UpdateResponse response=client.prepareUpdate("user","userInfo","1")
.setDoc(jsonBuilder)
.get();
String _index = response.getIndex();
String _type = response.getType();
String _id = response.getId();
long _version = response.getVersion();
boolean created = response.isCreated();
System.out.println(_index+" "+_type+" "+_id+" "+_version+" "+created);
//关闭连接
client.close();
}catch (Exception e) {
e.printStackTrace();
}
}
}
运行后查看version,为2
4.4.2 elasticsearch修改多条数据
/**
* 批量更新索引数据
*/
public static void batchUpdateEs(){
String elasticServer= "127.0.0.1";
Integer elasticServerPort = 9300;
Client client=null;
try{
Settings settings = Settings.settingsBuilder().build();
client = TransportClient.builder().settings(settings).build()
.addTransportAddress(new InetSocketTransportAddress(
InetAddress.getByName(elasticServer), elasticServerPort));
//初始化数据
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd");
List dataList = new ArrayList();
Map dataMap1=new HashMap();
dataMap1.put("name","张中国");
dataMap1.put("nickname", "张中");
dataMap1.put("nativeplace", "上海静安寺");
dataMap1.put("address", "上海静安寺1街坊10栋");
dataMap1.put("birthdate", "1980-02-15");
dataMap1.put("id", "2");
Map dataMap2=new HashMap();
dataMap2.put("name","张三丰");
dataMap2.put("nickname", "张三");
dataMap2.put("nativeplace", "上海静安寺");
dataMap2.put("address", "上海静安寺1街坊10栋");
dataMap2.put("birthdate", "2018-01-23");
dataMap2.put("id", "3");
dataList.add(dataMap1);
dataList.add(dataMap2);
BulkRequestBuilder bulkRequest = client.prepareBulk();
int y=dataList.size();
//执行批量更新
for(int i=0;i<y;i++){
Map<String, Object> m =(Map<String, Object>) dataList.get(i);
XContentBuilder jsonBuilder = XContentFactory.jsonBuilder()
.startObject()
.field("name",m.get("name").toString())
.field("nickname",m.get("nickname").toString())
.field("nativeplace",m.get("nativeplace").toString())
.field("address",m.get("address").toString())
.field("birthdate",m.get("birthdate").toString())
.endObject();
bulkRequest.add(client.prepareUpdate("user", "userInfo",m.get("id").toString())
.setDoc(jsonBuilder));
if (i % 10000 == 0) {
bulkRequest.execute().actionGet();
}
}
bulkRequest.execute().actionGet();
//关闭连接
client.close();
}catch (Exception e) {
e.printStackTrace();
}
}
运行,打开 head 查看 version 及数据变化(实际项目我们一般不会关注 version,这里只是让大家知道它底层没更新一次 version 都在变化)
4.5 删除数据
4.5.1 elasticsearch 删除的底层逻辑
- elasticsearch 删除在内部,删除文档的语法模式与之前基本一致,只不过要使用DELETE方法:DELETE /user/info/1 如果文档被找到,Elasticsearch将返回200 OK状态码和以下响应体。注意_version数字已经增加了。删除一个文档也不会立即从磁盘上移除,它只是被标记成已删除。Elasticsearch将会在你之后添加更多索引的时候才会在后台进行删除内容的清理。
- 有时候我们删除的内容非常多,达到索引的80%上时,会考虑索引重建即 reindex,这时直接删除index是非常快的,类似于删除表的操作。
4.5.2 删除单条数据
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
public class DeleteData {
public static void main(String args[]){
deleteEs();
}
/**
* 删除索引数据
*/
public static void deleteEs(){
//索引服务的地址
String elasticServer= "127.0.0.1";
//索引服务的端口
Integer elasticServerPort = 9300;
Client client=null;
try{
//初始化连接
Settings settings = Settings.settingsBuilder().build();
client = TransportClient.builder().settings(settings).build()
.addTransportAddress(new InetSocketTransportAddress(
InetAddress.getByName(elasticServer), elasticServerPort));
//删除为1的索引数据
client.prepareDelete("user","userInfo","1").execute().actionGet();
//关闭连接
client.close();
}catch (Exception e) {
e.printStackTrace();
}
}
}
4.5.3 删除多条数据
法一:deleteEsBulk
/**
* 批量删除索引数据
*/
public static void deleteEsBulk(){
//索引服务的地址
String elasticServer= "127.0.0.1";
//索引服务的端口
Integer elasticServerPort = 9300;
Client client=null;
try{
//初始化连接
Settings settings = Settings.settingsBuilder().build();
client = TransportClient.builder().settings(settings).build()
.addTransportAddress(new InetSocketTransportAddress(
InetAddress.getByName(elasticServer), elasticServerPort));
BulkRequestBuilder bulkRequest = client.prepareBulk();
//设置删除的索引的数据id
bulkRequest.add(client.prepareDelete("user", "userInfo", "2") );
bulkRequest.add(client.prepareDelete("user", "userInfo", "3") );
BulkResponse bulkResponse = bulkRequest.get();
//判断执行是否成功
if(bulkResponse.hasFailures()){
System.out.println("bulk error:"+bulkResponse.buildFailureMessage());
}
bulkRequest.get();
//关闭连接
client.close();
}catch (Exception e) {
e.printStackTrace();
}
}
法二:deleteEsBulkList (推荐:删除数据由 list 指定比较随意)
/**
* 批量删除索引数据
*/
public static void deleteEsBulkList(){
//索引服务的地址
String elasticServer= "127.0.0.1";
//索引服务的端口
Integer elasticServerPort = 9300;
Client client=null;
try{
//初始化连接
Settings settings = Settings.settingsBuilder().build();
client = TransportClient.builder().settings(settings).build()
.addTransportAddress(new InetSocketTransportAddress(
InetAddress.getByName(elasticServer), elasticServerPort));
BulkRequestBuilder bulkRequest = client.prepareBulk();
//删除数据的id
List dataList= new ArrayList();
dataList.add("2");
dataList.add("3");
//执行删除
for(int i=0;i<dataList.size();i++){
bulkRequest.add(client.prepareDelete("user", "userInfo",dataList.get(i).toString()));
if (i % 200 == 0) {
bulkRequest.get();
}
}
bulkRequest.get();
//关闭连接
client.close();
}catch (Exception e) {
e.printStackTrace();
}
}
5 Spring Boot集成ES