ElasticSearch 入门

1.基础

1.1 介绍

Elasticsearch 是一个开源的搜索引擎,建立在一个全文搜索引擎库 Apache Lucene™ 基础之上.

Lucene 可以说是当下最先进、高性能、全功能的搜索引擎库--无论是开源还是私有.Lucene 非常 复杂. Elasticsearch 也是使用 Java 编写的,它的内部使用 Lucene 做索引与搜索,但是它的目的是使全文检索变得简单, 通过隐藏 Lucene 的复杂性,取而代之的提供一套简单一致的 RESTful API.

然而,Elasticsearch 不仅仅是 Lucene,并且也不仅仅只是一个全文搜索引擎. 它可以被下面这样准确的形容:

  • 一个分布式的实时文档存储,每个字段 可以被索引与搜索
  • 一个分布式实时分析搜索引擎
  • 能胜任上百个服务节点的扩展,并支持 PB 级别的结构化或者非结构化数据

1.2 定义

1.2.1 Elasticsearch基本概念

  • 集群和节点
    • 一个集群由多个ES节点组成,上面的分佈式实例安装就是一个集群,集群名字是mytest,而裡面有3个ES节点,分别是master、slave1、slave2
  • 索引、类型、文档的关系
    • 索引
      • 含有相同属性的文档集合
      • 索引在ES中是透过一个名字来识别的,且必须是英文字母小写而且不能包含中划线-,在ES中就是透过这个名字来对文档进行增删改查的操作
    • 类型
      • 索引可以定义一个或多个类型,文档必须属于一个类型
    • 文档
      • 文档是可以被索引的基本数据单位,是ES裡面最小的存储单位
    • 用一个例子了解这三个的关系
      • 索引相当于MySQL裡的database数据库,类型相当于table表,文档相当于一行纪录
      • 假设有一个信息查询系统是使用ES来做存储,那么裡面的数据就可以分为各种各样的索引,像是汽⻋索引、图书索引...,而图书索引又可以细分各种类型,比如说科普类型、小说类型...,那么具体到每一本书籍,就是文档,就是整个系统的最小储存单位
  • 分片和备份
    • 和索引相关的高级概念,就是分片和备份
      • 分片:在ES中,每个索引都有多个分片,每个分片是一个Lucene索引
        • 硬限制是单个shard的doc数量不能超过Integer.MAX_VALUE
        • 更普遍的一个潜规则是,单个shard的数据量尽量控制在50GB左右
      • 备份:拷贝一片分片就完成了分片的备份
    • 分片的好处
      • 假设一个索引的数据量很大,就会造成硬盘压力很大,同时搜索速度也会出现瓶颈,如果将索引分成多个分片,就能分散压力
      • 分片还允许用户能进行水平的扩展和拆分,以及分佈式的操作,可以提高搜所以级其他操作的效率
    • 备份的好处
      • 当一个主分片失败,或出现问题时,备份的分片就可以代替工作,从而提高ES的可用性
      • 备份的分片还可以执行搜索操作,分摊搜索压力
    • ES默认在创建时会创建5个分片,对每个分片都会创建1个备份
      • 分片的数量只能在创建索引的时候指定,而不能在后期进行修改
      • 备份的数量则是可以动态修改的

1.2.2 文档

  • 文档的元数据metadata
    • 一个文档不仅仅包含它的数据,也包含元数据metadata,存放的是有关文档的信息
    • 三个必须的元数据元素如下
      • _index:索引,表示文档在哪存放
      • _type:类型,表示文档的对象类别
      • _id:文档唯一标识,id是一个字符串,当它和 _index 以及 _type 组合就可以唯一确定Elasticsearch 中的一个文档
      • 当创建一个新的文档,可以自定义自己的_id ,或是让Elasticsearch自动生成
  • 其他的元数据
    • _score:只在query时出现,用来指标匹配程度有多好
    • _version:ElasticSearch的乐观版本控制
      • 在ElasticSearch裡,当要更新一个文档的内容时,其实并不是更新了那个文档,而是创建了一个全新的文档,然后替换掉旧文档
      • 而因为新旧文档有同样的_index_type_id,因此为了表示哪一个文档是最新的,就需要使用_version来控制当文档被修改时,_vsesion会递增,
      • ElasticSearch使用这个_version号来确保哪一个文档是目前最新的,如果使用者要更改旧版本的内容,会被拒绝
    • _source:存放文档的实际内容

3.交互

3.1 RESTful API with JSON over HTTP

curl -X<VERB> '<PROTOCOL>://<HOST>:<PORT>/<PATH>?<QUERY_STRING>' -d '<BODY>'
< >标记 说明
VERB 适当的 HTTP 方法 或 谓词 :GETPOSTPUTHEAD 或者 DELETE
PROTOCOL http 或者 https(如果你在 Elasticsearch 前面有一个 https 代理)
HOST Elasticsearch 集群中任意节点的主机名,或者用 localhost 代表本地机器上的节点
PORT 运行 Elasticsearch HTTP 服务的端口号,默认是 9200
PATH API 的终端路径(例如 _count 将返回集群中文档数量).Path 可能包含多个组件,例如:_cluster/stats 和 _nodes/stats/jvm
QUERY_STRING 任意可选的查询字符串参数 (例如 ?pretty 将格式化地输出 JSON 返回值,使其更容易阅读)
BODY 一个 JSON 格式的请求体 (如果请求需要的话)

3.2 索引和文档操作

3.2.1 索引创建

PUT 127.0.0.1:9200/people
{
    "settings":{
        "number_of_shards":3,    // 分片数
        "number_of_replicas":1    // 备份数
    },
    "mappings":{
        // 定义一个man类型,properties是他的结构化设计
        "man":{
            "properties":{
                "name":{
                    "type":"text"
                },
                "country":{
                    "type":"keyword"
                },
                "age":{
                    "type":"integer"
                },
                "date":{
                    "type":"date",
                    "format":"yyyy-MM-dd HH:mm:ss || yyyy-MM-dd ||epoch_millis" //兼容多种时间格式储存方式
                }
            }
        }
    }
}

低版本<2.0

"date":{
    "type":"date",
    "format":"yyyy-MM-dd HH:mm:ss || yyyy-MM-dd || date_optional_time" //兼容多种时间格式储存方式
}

ES>2.0

"date":{
    "type":"date",
    "format":"yyyy-MM-dd HH:mm:ss || yyyy-MM-dd || epoch_millis" //兼容多种时间格式储存方式
}

type:ES<5.0时, 只有string字段,="" es="">5.0时,引入text,keyword字段

3.2.2 请求一个文档

GET 127.0.0.1:9200/people/man/3

// 找到文档,http状态码200
{
    "_index":"people",
    "_type":"man",
    "_id":"3",
    "_version":4,
    "found":true,
    "_source":{
        "name":"瓦力",
        "country":"China",
        "age":100,
        "date":"1987-03-07"
    }
}

// 未找到文档, http状态码404
{
    "_index":"people",
    "_type":"man",
    "_id":"1",
    "found":false
}

精确命中数据集

  • 查询API支持多个索引以及通配符
    • /indexA,indexB,indexC/_search
    • /index-prefix-*/_search
  • 缺点
    • 参与计算的数据量变大,每个shard独立运算
    • IO压力提高,响应时间变慢
    • 每个查询加time range是一个好习惯
  • 如果非要这样用...
    • 重新设计索引结构

3.2.3 创建文档

根据指定id插入

POST 127.0.0.1:9200/people/man/3
{
    "name":"瓦力",
    "country":"China",
    "age":30,
    "date":"1987-03-07"
}

ID如何选择

  • 三点要注意
    • 务必全局唯一
    • 长度尽量一致,可以考虑左补齐
    • 尽量有序
  • 使用者指定
    • 能作为数据主键的,可以考虑作为doc id
  • 自动生成
    • 20个字母的GUID
    • 5.x有额外优化,吞吐量会高一点

自动产生文档id插入

POST 127.0.0.1:9200/people/man
{
    "name":"超重瓦力",
    "country":[
        "China",
        "Beijing",
        "Shanhei"
    ],
    "age":40,
    "date":"1977-03-07"
}

3.2.4 修改文档数据

直接修改文档

POST 127.0.0.1:9200/people/man/3/_update
{
    "doc":{
        "name":"修改后的名字"
    }
}

通过脚本修改:ES支持许多脚本语言,像是python、js、painless(内置脚本语言)

POST 127.0.0.1:9200/people/man/3/_update
{
    "script":{
        "lang":"painless",
        "inline":"ctx._source.age = params.age",
        "params":{
            "age":100
        }
    }
}

3.2.5 删除文档

ElasticSearch中,删除文档不会立即将文档从磁盘中删除,只是将文档标记爲已删除状态,随着不断的创建更多的数据,Elasticsearch将会在后台清理标记爲已删除的文档

删除文档

DELETE 127.0.0.1:9200/people/man/3

删除索引

DELETE 127.0.0.1:9200/people

3.2.6 批量操作

POST /_bulk: 增删改放到一个批处理中完成

// 创建BulkRequest对象
BulkRequestBuilder bulkRequest = client.prepareBulk();
for (int i = 0; i < n; i++) {
    // 构造IndexRequest,相当于写入新的doc
    IndexRequestBuilder indexRequestBuilder = client.prepareIndex(index, type, id);
    // doc的数据
    indexRequestBuilder.source(...)
     // 把IndexRequest合并到本次bulkReuqest中
    bulkRequest.add(indexRequestBuilder);
}
// 阻塞的方法调用,等待结果返回
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
if (bulkResponse.hasFailures()) {
    Iterator<BulkItemResponse> itemResponseIterator = bulkResponse.iterator();
    while (itemResponseIterator.hasNext()) {
        BulkItemResponse bulkItemResponse = itemResponseIterator.next();
        // 检查每个request
        if (bulkItemResponse != null && bulkItemResponse.isFailed()) {
            // 如果不是版本冲突问题
            if (bulkItemResponse.getFailure().getStatus() != RestStatus.CONFLICT) {
                failIdList.add(bulkItemResponse.getId());
            } else {
                logger.error("更新冲突");
            }
        }
    }
}
// 构造BulkProcessor,可以定义单词flush的频率/大小等等
BulkProcessor bulkProcessor = BulkProcessor.builder(
        client,
        new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId, BulkRequest request) {
                // 执行真正的BulkRequest前做些额外的操作
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
                // BlukRequest执行完毕后,在这里验证执行结果等
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
                // 执行出错
            }
        })
        .setBulkActions(10000)
        .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
        .setFlushInterval(TimeValue.timeValueSeconds(10))
        .setConcurrentRequests(24)
        .build();
bulkProcessor.add(new IndexRequest("index1", "type1", "id1").source(source));
bulkProcessor.add(new DeleteRequest("index2", "type2", "id2"));

try {
  // 等待剩下的BulkRequest执行完毕
  bulkProcessor.awaitClose(10, TimeUnit.MINUTES);
} catch (InterruptedException e) {
  e.printStackTrace();
}

Java bulk API

  • BulkRequest
    • 同步提交
    • response.hasFailures()
  • BulkProcessor
    • 异步提交
    • BulkProcessor.Listener验证结果
    • 速度太快容易导致429

3.3 Client

3.3.1 HTTP API & Transport API

// 创建TransportClient初始化需要的配置
Settings settings = ImmutableSettings.settingsBuilder()
                .put("cluster.name", clusterName)
                .put("client.transport.sniff", true)
                .put("client.transport.ping_timeout", "5s")
                .build();

TransportClient transportClient = new TransportClient(settings)
    .addTransportAddress(new InetSocketTransportAddress("es1.example.com", 9300))
    .addTransportAddress(new InetSocketTransportAddress("es2.example.com", 9300))
    .addTransportAddress(new InetSocketTransportAddress("es3.example.com", 9300));
 // Construct a new Jest client according to configuration via factory
 JestClientFactory factory = new JestClientFactory();
 List<String> hosts = Arrays.asList("http://es1.example.com:9200",
                                    "http://es2.example.com:9200",
                                    "http://es3.example.com:9200");
 factory.setHttpClientConfig(new HttpClientConfig
                        .Builder(hosts)
                        .multiThreaded(true)
            //Per default this implementation will create no more than 2 concurrent connections per given route
            .defaultMaxTotalConnectionPerRoute(<YOUR_DESIRED_LEVEL_OF_CONCURRENCY_PER_ROUTE>)
            // and no more 20 connections in total
            .maxTotalConnection(<YOUR_DESIRED_LEVEL_OF_CONCURRENCY_TOTAL>)
                        .build());
 JestClient client = factory.getObject();

3.3.2 私有集群推荐Transport

  • 网络带宽压缩
  • 软路由
  • 不用写DSL
  • Java
  • 优点: 支持传输压缩,带宽占用低,速度快,额外消耗CPU资源
  • 缺点: 5.x之前的没有独立的Client SDK,需要使用elasticsearch.jar,官方只维护Java/Python的库.跨大版本不兼容

3.3.3 公共集群推荐Jest

  • 兼容性
  • Nginx二次编程

4.ES升级概览

4.1 1.0 -> 2.0

1.0->2.0

重要更新

  • 删除:
    • _shutdown api
    • delete_by_query
    • facets repalced by aggregations
  • 网络变更
    • bind to localhost network.host: 0.0.0.0
    • multicast removed discovery.zen.ping.unicast.hosts: [ 192.168.1.2, 192.168.1.3 ]
    • Multiple path.data striping same shard same path
  • mappings 变更
    • 同一index下不同 type 的相同字段的mapping必须相同
    • 字段不能有.
    • type名称不能以.开头,长度小于256
    • 不能单独删除index下的某一个 type
    • Previously, index_analyzer and search_analyzer could be set separately, while the analyzer setting would set both. The index_analyzer setting has been removed in favour of just using the analyzer setting.
    • If just the analyzer is set, it will be used at index time and at search time. To use a different analyzer at search time, specify both the analyzer and a search_analyzer.
  • qeury dsl 变更
    • Queries and filters have been merged — all filter clauses are now query clauses. Instead, query clauses can now be used in query context or in filter context:
    • terms废弃
    • filtered query and query filter废弃
    • filter自动缓存
  • search 变更
    • search_type=count废弃
    • Snapshot and Restore changes
    • url和path白名单

4.2 2.0->5.0

  • elasticsearch-migration plugin
    • 从2.3.0开始有elasticsearch-migration plugin,可以分析升级到5.0的问题
  • 索引数据
    • 5.0可以直接读取2.0以后创建的索引数据
  • 删除search_type=count
    • 可以设置 size=0 代替
  • 删除search_type=scan
    • 可以使用scroll和根据_doc排序代替
  • 搜索时shard数量限制
    • 默认拒绝掉会从1000个以上shard搜索数据的请求,可以通过action.search.shard_count.limit设置
  • fields被stored_fields 代替
  • text/keyword 代替 string 类型
    • text表示全文分析的字段, keyword表示不分词的字段
  • Mapping 字段数量限制
  • 不支持空字段名
  • 备份时删除/关闭索引
    • 备份不再失败,而是部分失败
  • 废弃warmer及相关api
    • 得益于doc value和基于磁盘的norms存储,warmer用途已经不大.
  • 索引名称不能以 -/+ 开头

4.3 5.0 -> 6.0

5.0->6.0

  • 索引数据
    • 6.0可以直接读取5.0以后创建的索引数据
  • 单索引单类型
    • 一个index只有一个type
  • data.path不再有集群名
  • Translog保留时间
    • translog 默认保留12小时,最大512m.执行flush之后不再删除
  • Java API
    • 更加严格的boolean类型校验
    • 只接受true,false.以前接受true,false,0,1,on,off 等
  • Scroll 搜索初始化时不再接受from参数
  • 配置文件限制
    • 名字只能是 elasticsearch.yml
    • 配置文件不支持重复 key

results matching ""

    No results matching ""