搜索引擎 ElasticSearch

ElasticSearch 是什么?

 ElasticSearch™ 是一款基于 Lucene 的搜索引擎,不但稳定、可靠、快速,同时具备良好的水平扩展能力

特性

  • 功能丰富,且开箱即用
  • 横向可扩展性
  • 分片机制更好地解决热点问题
  • 多副本有效保证了高可用
  • 社区庞大,生态完善

主要概念

Cluster 集群

 在一个分布式系统里面,可以通过多个 ElasticSearch 节点组成一个集群。集群中会动态选举出一个主节点,保证了 ElasticSearch 集群不存在单点故障
 在同一子网内,只需要将进程设置为相同的集群名, ElasticSearch 就会把这些集群名相同的进程自动组成一个集群。集群中各节点间的通讯和数据负载均衡,全部都由 ElasticSearch 自动管理

Node 节点

 每一个 ElasticSearch 进程称为一个 Node 节点。在测试环境中,可以在一台服务器上运行多个 ElasticSearch 进程;但生产环境中,则建议每台服务器只运行一个 ElasticSearch 进程

Index 索引

 ElasticSearch 中的索引是文档数据存储的地方,相当于是传统关系数据库中的 DataBase 概念。更多逻辑上的对应关系,如下表所示:

Relational DBHBaseElasticSearch说明
DatabaseNameSpaceTemplate一组索引的模板配置
TableTableIndex索引
RowRowKeyDocument文档,和 Lucene 概念一致
Column + ValueCellField如果将文档理解为 JSON,那么 Field 就是字段和值
--Term检索的基本单位,相当于是文本中的一个词
--TokenTerm 内容、类型,以及 Term 在文本中的起始及偏移
目前最新的 ElasticSearch 7.x 版本里面已经废弃了 Type 的概念

Shard 分片

 ElasticSearch 会把一个索引拆分为多个更细粒度的索引,并称之为 Shard
 完成分片之后,就可以把各个 Shard 分配到不同的节点中去

Replica 副本

 ElasticSearch 的每一个 Shard 分片都可以有多个副本,而每一个副本也都是分片的完整拷贝。如此设计的好处是,既可以提升查询速度,也能提高系统的容错性
 一旦 ElasticSearch 的某个节点数据损坏或者服务不可用的时候,那么就可以用其他节点来代替故障的节点,以达到高可用的目标

Recovery 故障恢复

 ElasticSearch 的 Recovery 代表的是数据恢复或者称之为数据重新分布
 当 ElasticSearch 集群中,有节点加入或退出时,它会根据机器负载对索引分片进行重新分配

另外,当挂掉的节点再次重新启动的时候也会进行数据恢复

River 数据源

 River 代表的是一个数据源,这也是同步数据到 ElasticSearch 的一个方法
 主要流程是读取 River 中的数据,再将其索引到 ElasticSearch 中,并以插件方式对外提供服务。官方支持的 River 类型有 CouchDBRabbitMQTwitterWikipedia

Gateway 时空门

 Gateway 是 ElasticSearch 索引的持久化存储方式,ElasticSearch 默认会先把索引存放到内存中去,当内存满了的时候再持久化到硬盘里。之后,当 ElasticSearch 集群重启时,就会从 Gateway 中读取索引数据
 Gateway 支持多种存储媒介,包括本地文件系统(默认)、分布式文件系统、HDFS 和 S3 云存储服务 等

discovery.zen 节点发现

 discovery.zen 是 ElasticSearch 的自动节点发现机制,支持基于配置(静态配置中设置的主机列表)与基于文件(定时加载外部文件中的主机列表)两种模式
 集群会以广播的方式去寻找存在的 Node 节点,然后再通过多播协议来进行节点之间的通信,同时也支持点对点(P2P,Point 2 Point)的交互操作

Transport 通讯机制

 Transport 是 ElasticSearch 内部的节点或者集群与客户端之间的交互方式,默认会使用 TCP 协议完成通讯

架构图

ElasticSearch Architecture

(利用 Axure™ 绘制而成)

环境部署

下载

 首先从官方的下载页面上,找到适合自己操作系统的安装包(这里我们以 MacOS 为例)

1
$ wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.4.1-darwin-x86_64.tar.gz

解压

1
2
3
$ tar zxvf elasticsearch-7.4.1-darwin-x86_64.tar.gz
$ ln -s elasticsearch-7.4.1 es
$ cd es

环境变量

1
2
3
4
5
6
$ vim ~/.bashrc
export ES_HOME=/apps/es
export PATH=$PATH:$ES_HOME/bin
$ source ~/.bashrc
$ elasticsearch -version
Version: 7.4.1, Build: default/tar/fc0eeb6e2c25915d63d871d344e3d0b45ea0ea1e/2019-10-22T17:16:35.176724Z, JVM: 12.0.2

启动

1
2
3
4
$ elasticsearch

# 增加 -d 可以使得 ElasticSearch 进程在后台运行
$ elasticsearch -d

校验

1
$ curl -XGET 'http://localhost:9200/_cluster/health' | python -m json.tool
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
{
"cluster_name": "elasticsearch",
"status": "green",
"timed_out": false,
"number_of_nodes": 1,
"number_of_data_nodes": 1,
"active_primary_shards": 0,
"active_shards": 0,
"relocating_shards": 0,
"initializing_shards": 0,
"unassigned_shards": 0,
"delayed_unassigned_shards": 0,
"number_of_pending_tasks": 0,
"number_of_in_flight_fetch": 0,
"task_max_waiting_in_queue_millis": 0,
"active_shards_percent_as_number": 100.0
}

实用技巧

以下介绍的部分内容是 ElasticSearch 2.x 版本的用法,和当前最新的 ElasticSearch 7.x 存在一定的区别,陆续更新中,敬请期待

查询数据

匹配所有

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 查询所有 index / type 的数据
GET _search
{
"query": {
"match_all": {}
}
}

# 指定一个 index,搜索该 index 下所有 type 的数据
GET /yuzhouwan/_search
# 指定多个 index
GET /yuzhouwan01,yuzhouwan02/_search
# 利用通配符进行指定多个
GET /yuzhouwan*/_search
# 查询指定 index 下指定 type 的数据
GET /yuzhouwan_index/yuzhouwan_type/_search
# 查询指定 index 下多个指定 type 的数据
GET /yuzhouwan_index/yuzhouwan_type1,yuzhouwan_type2/_search
# 查询多个指定 index 下多个指定 type 的数据
GET /yuzhouwan_index1,yuzhouwan_index2/yuzhouwan_type1,yuzhouwan_type2/_search
# 查询所有 index 下多个指定 type 的数据
GET /_all/yuzhouwan_type1,yuzhouwan_type2/_search
{
"query": {
"match_all": {}
}
}

控制展示数量

 由于默认只会展示 10 个匹配结果,所以需要设置 size 参数来调整

1
GET /yuzhouwan/_search?size=100
1
2
3
4
5
6
7
{
"query": {
"match": {
"pattern": "FLUSH"
}
}
}
1
GET /yuzhouwan/_search
1
2
3
4
5
6
7
8
9
{
"from": 0,
"size": 100,
"query": {
"match": {
"pattern": "FLUSH"
}
}
}

指定返回字段

1
GET /yuzhouwan/_search
1
2
3
4
5
6
7
8
9
10
{
"_source": [
"message"
],
"from": 0,
"size": 100,
"query": {
"match_all": {}
}
}

范围查询

1
GET /yuzhouwan/_search
1
2
3
4
5
6
7
8
9
10
{
"query": {
"range": {
"CREATETIME": {
"gte": "1496246400000",
"lt": "1498838400000"
}
}
}
}

聚合查询

Group Count
1
GET /yuzhouwan/_search
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
{
"size": 0,
"aggs": {
"group_by_link": {
"terms": {
"field": "link",
"size": 100
},
"aggs": {
"sales_bucket_filter": {
"bucket_selector": {
"buckets_path": {
"the_doc_count": "_count"
},
"script": "params.the_doc_count == 2"
}
}
}
}
}
}
1
2
3
4
5
6
7
8
9
10
[
{
"key": -1503538282794558000,
"doc_count": 2
},
{
"key": 982015180641152600,
"doc_count": 2
}
]
因为 aggregation 语句默认自带 match_all 查询字段,所以不可直接用 _delete_by_query 进行删除,否则将会清空所有数据

过滤查询

字段长度
1
GET /yuzhouwan/_search
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
{
"query": {
"bool": {
"must": [
{
"range": {
"time": {
"gte": "1514960000000"
}
}
}
],
"filter": {
"script": {
"script": {
"inline": "doc['message'].values.length > 9",
"lang": "painless"
}
}
}
}
}
}
需要在 mapping 里面设置 message 字段为 keyword 才行

删除数据

1
POST yuzhouwan/_delete_by_query
1
2
3
4
5
6
7
{
"query": {
"match": {
"message": "yuzhouwan.com"
}
}
}

索引

获取已存在的 Mapping 索引格式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# hostname 无效的时候,可以尝试用 IP 地址
$ curl -XGET 'host:port/{index}/_mapping/{type}'
$ curl -XGET 'http://localhost:9200/yuzhouwan_day_20160527/_mapping/'

# 如果仍然无法访问,查看 `config/elasticsearch.yml` 中配置的 http.port 端口是否在监听
$ lsof -i TCP | grep 9200
java 21116 es 9u IPv6 104371174 0t0 TCP *:9200 (LISTEN)

# 如果找不到监听的接口,并且检查 ES 进程是否正常运作
$ lsof -iTCP -sTCP:LISTEN | grep -iE 'es|elasticsearch'
java 21116 es 9u IPv6 104371174 0t0 TCP *:9201 (LISTEN)
java 24777 es 506u IPv6 266659517 0t0 TCP yuzhouwan01.com:vrace (LISTEN)
java 24777 es 700u IPv6 266659638 0t0 TCP yuzhouwan01.com:iua (LISTEN)
node 25161 es 15u IPv4 266679483 0t0 TCP yuzhouwan01.com:9202 (LISTEN)

# 最后验证下 ES 服务是否可用
$ curl -IGET http://localhost:9200
HTTP/1.1 200 OK
Date: Mon, 05 Jun 2017 08:55:44 GMT
Content-Type: text/html
Last-Modified: Fri, 31 Mar 2017 03:07:59 GMT
Accept-Ranges: bytes
Content-Length: 1480
Server: Jetty(9.2.z-SNAPSHOT)

# 获取所有 indices 副本和 types 类别
$ curl -XGET 'http://localhost:9200/_all/_mapping'
$ curl -XGET 'http://localhost:9200/_mapping'
# 指定 type 类别
$ curl -XGET 'http://localhost:9200/_all/_mapping/blog,site'

如何更新索引

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# 删掉之前的 template 模板
$ curl -XGET localhost:9200/_template/template_monitor_log_day

# 删掉之前的 template 模板
$ curl -XDELETE localhost:9200/_template/template_monitor_log_day

# 如果想要使现有的 index 索引生效,也需要把之前的 index 数据删除
$ curl -XDELETE 'http://localhost:9200/monitor_log_day_*/'

# 添加 template 模板
# 如果在粘贴下面大段 PUT 内容的时候,出现 "自动触发 tab 自动补全效果" 的时候,可以 `mkdir temp && cd temp`,在 temp 空文件夹中执行
$ curl -XPUT localhost:9200/_template/template_monitor_log_day -d '
{
"template": "monitor_log_day_*",
"order": 0,
"settings": {
"index": {
"number_of_shards": 3,
"number_of_replicas": 1
}
},
"mappings": {
"access_log": {
"_source": {
"enabled": true
},
"_all": {
"enabled": false
},
"properties": {
"ip": {
"type": "string"
},
"requestLength": {
"type": "integer"
},
"spendMs": {
"type": "long"
},
"attributes": {
"type": "object"
},
"time": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||strict_date_optional_time||epoch_millis"
}
}
}
}
}'

按天滚动新建索引

1
2
3
4
# 开启 auto_create_index 配置,设置符合 `*day*` 正则的索引;并关闭动态 Mapper
$ vim config/elasticsearch.yml
action.auto_create_index: +*day*,-*
index.mapper.dynamic: false

属性转换

  • Java 中的 Map 转换到 ElasticSearch 中,只需要一个 "type": "object"
  • "type": "object" 不可以设置为 "index": "not_analyzed"
    但是,查询的时候,如果是 Map 格式,那么查询 attributes.webEntry,则需要传入 "webEntry": "admin"(将 webEntry 的 value 小写)
1
2
3
4
5
"attributes": {
"webEntry": "ADMIN",
"userId": "admin",
"userName": "admin"
}

TimeZone

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
$ curl -XPOST http://192.168.0.198:9200/cgpboss_month_201609/attack_alarm/_search -d '{
"query": {
"bool": {
"must": [
{
"match": {
"customerId": {
"query": "69",
"type": "boolean"
}
}
},
{
"range": {
"startTime": {
"from": "2016-09-20 00:00:00,000",
"to": "2016-09-20 17:47:57,887",
"format": "yyyy-MM-dd HH:mm:ss,SSS",
"include_lower": true,
"include_upper": false
}
}
}
]
}
}
}'

 如果在构建 date 字段相关的查询是,可以直接传入 yyyy-MM-dd HH:mm:ss,SSS 格式的字符串,避免 +08:00 时区带来的困扰。也可以在程序中,将日期中的 local 时区去掉,从而使得日期和 ElasticSearch 本身的默认时区 +00:00 一致,具体写法如下

1
2
3
QueryBuilders.rangeQuery(params.get("timeField"))
.format("yyyy-MM-dd HH:mm:ss,SSS")
.gte("2016-09-20 00:00:00,000")

Hot-warm

 ElasticSearch 5.x 引入的新特性,可以通过 node.attr.box_type 参数,将 ElasticSearch 集群中部分高资源的节点配置为 hot,以存放需要频繁访问的数据,再将部分低资源的节点配置为 warm,以存放大量低频访问的数据。随后,在摄入业务数据的时候,可以通过设置 Index 索引的 index.routing.allocation.require.box_type 配置项,来指定数据具体存放在 hot 还是 warm 节点上。默认,可以将 Index 设置为 hot,在满足一定条件后,手动或通过 Curator 工具自动将冷数据设置为 warm

可以通过 rollover 查看 Index 中最早写入的一条数据距今已经过去多久、最大的文档数和磁盘空间大小

Shrink

介绍

 ElasticSearch 5.x 引入的新特性,可以将一个索引中的 Shard 分片进行合并。而且其利用了操作系统的硬链接,使得该操作的完成可以控制在毫秒级。当然,如果操作系统不支持硬链接的话,则仍然需要拷贝 segment 文件

使用
1
POST /yuzhouwan_source_index/_shrink/yuzhouwan_target_index
1
2
3
4
5
6
7
8
9
10
{
"settings": {
"index.number_of_replicas": 1,
"index.number_of_shards": 1,
"index.codec": "best_compression"
},
"aliases": {
"my_search_indices": {}
}
}

Rollup

介绍

 ElasticSearch 6.6 引入的新特性,可以通过启动 Rollup 任务的形式,对现有的 Index 索引进行 Rollup,以生成预聚合的新索引

使用
1
PUT _rollup/job/yuzhouwan
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
{
// 匹配 Index
"index_pattern": "yuzhouwan-*",
// Rollup 后生成的 Index
"rollup_index": "yuzhouwan_rollup",
// 执行周期
"cron": "*/30 * * * * ?",
// 批处理的大小
"page_size": 1000,
"groups": {
// 基于时间戳字段
// Rollup 的时间粒度
// 延迟 7 天后进行 Rollup,避免乱序数据无法被处理
"date_histogram": {
"field": "timestamp",
"fixed_interval": "1h",
"delay": "7d"
},
// 聚合的维度
"terms": {
"fields": [
"node"
]
}
},
// 聚合的度量,以及具体的聚合操作
"metrics": [
{
"field": "temperature",
"metrics": [
"min",
"max",
"sum"
]
},
{
"field": "voltage",
"metrics": [
"avg"
]
}
]
}
局限性

 截止到 ElasticSearch 7.4 版本,也只支持对 Date、Histogram 和 Terms 三种字段类型进行 Rollup 操作,且聚合算子也只有 max、min、sum、avg 和 count 五种

插件

1
2
3
4
$ bin/plugin -install karmi/ElasticSearch-paramedic

# 安装成功后,重启 ElasticSearch 集群
# 访问 http://localhost:9200/_plugin/paramedic/index.html(显示索引的各种信息的统计结果页面)

配置集群

1
$ curl -XGET 'http://localhost:9200/_nodes?os=true&process=true&pretty=true'

熔断机制

 ElasticSearch 7.x 引入的新特性,断路器会在接收到查询请求后,会评估出可能会耗费的内存大小,如果数据量过大超过了单机所能处理的最大内存限制时,就会拒绝请求,并返回异常。熔断机制通过一个总控断路器和多个子类断路器(字段数据加载、聚合请求计算需要的内存、请求本身的大小、加载后未释放的 Lucene Segment,以及内联脚本编译次数),对内存消耗进行控制,避免了 OOM 问题。相关的配置均属于集群层面,所以我们可以通过修改 config/elasticsearch.yml 配置文件或者调用 PUT /_cluster/settings 接口中进行设置

整合其他框架

ElasticSearch + Kafka 整合

 详见我的另一篇博客《Apache Kafka 分布式消息队列框架

ElasticSearch + Flume 整合

 参考 Flume 官网文档

ElasticSearch + IK 中文分词

安装

 参考 IK 官网文档

中文分词器比对表

分词器分词粒度出错情况支持处理字符新词识别词性标注认证方法接口
BosonNLP多选择识别繁体字TokenRESTful
IKAnalyzer多选择兼容韩文日文Jar
NLPIR多选择中文间隔符未知多语言接口
SCWS多选择未知PHP、Cli
结巴分词多选择识别繁体字Python
盘古分词多选择识别繁体字
庖丁解牛多选择Jar
搜狗分词字符长度过大识别繁体字未知上传文档
腾讯文智空白字符未知中文词性SignatureRESTful
新浪云未知创建仓库RESTful
语言云适中识别繁体字TokenRESTful

踩过的坑

Unknown Analyzer type [org.ElasticSearch.index.analysis.IkAnalyzerProvider] for [ik]

 属于配置的问题,需注意 ElasticSearch 和 IK 的版本,不同的版本中支持的配置项也会不一样

技术内幕

ElasticSearch 和 Lucene 的本质是什么,为什么会选型 ElasticSearch?

本质

 Lucene 是基于 Java 的全文检索库,而 ElasticSearch 是一款功能丰富的分布式搜索引擎

区别

Lucene

 需要为分布式、高可用性和实时写入(索引建立 -> 可检索)等特性,进行额外的编码工作

ElasticSearch

 具备分布式、实时分发、实时搜索、易用性(Multi-tenancy 多租户 / Gateway 备份)、高可用性(各节点组成对等的网络结构,方便故障转移)等特性

进一步思考

Solr、Lucene 作为全文检索系统,和 ElasticSearch 这种搜索引擎又有什么本质的区别?

 搜索引擎,包含网页数据的快速采集、海量数据的索引和存储、搜索结果的相关性排序、搜索效率的毫秒级要求、分布式处理和负载均衡、自然语言的理解技术 等等

 全文检索,即文本全文检索,包括信息的存储、组织、表现、查询、存取等各个方面,其核心为文本信息的索引和检索

为什么 ElasticSearch 集群使用 Hash 路由,而不是 Paxos,这两个算法有什么区别,各自最适的应用场景在哪里?

 Hash 路由,主要用于解决数据均衡分布的问题。而 Paxos 主要用于保证分布式数据副本的一致性

 ElasticSearch 中的 Hash Router 会对每次请求的 _id 属性(即 Document ID,默认的 ElasticSearch 会自动为每一个 Document 分配一个长度为 20 的 UUID 字符串,也可以指定其他的字段)进行 Hash 操作,再对 Shard 数量进行取模

 实际上,二者根本不是一个层面上的概念,更应该拿 ElasticSearch 的 Gossip + Bully 选举机制来和 Paxos 进行比较。下文会有对 Gossip + Bully 选主算法的详细说明,这里就多介绍了

(更多相关内容详见,我的另外两篇博客《大数据生态圈的一致性算法》和《Zookeeper 原理与优化》)

ElasticSearch 倒排索引的原理

图解

 已经有太多的书和博客来介绍,这里就不班门弄斧了,就简单画个图来帮助理解 “倒排索引” 的流程吧

Elasticsearch Reverted Index

(利用 Visio™ 绘制而成)

Lucene 4 实现的 DocValues 特性,为什么可以加速聚合运算?

DocValues 是什么

 Lucene 在构建倒排索引时,额外建立的一个有序的、基于 Document -> Field Value 的映射列表

场景

  • 对字段进行排序或聚合
  • 某些过滤,比如地理位置过滤
  • 某些与字段相关的脚本计算

用法

 可以通过 doc_values 字段控制 DocValues 特性是否开启。默认地,会对除了 analyzed strings 之外的所有字段启用(即数字、地理坐标、日期、IP 和 not_analyzed 字符类型)

1
PUT yuzhouwan_index
1
2
3
4
5
6
7
8
9
10
11
12
13
{
"mappings": {
"yuzhouwan_type": {
"properties": {
"blog": {
"type": "string",
"index": "not_analyzed",
"doc_values": true
}
}
}
}
}

原理

 倒排索引建立的是 KeyWord -> Document 的映射关系,以便在文本检索时,通过类似 Hash 算法,来快速定位到 KeyWord,再从索引中获取到包含该 KeyWord 的 Document ID 集合。虽然,该方法保证了高效的检索,但在对数据做聚合运算时,却需要将所有出现了 KeyWord 的 Document 加载到内存中,很容易导致进程的内存溢出问题

 针对该问题,DocValues 便应运而生了。其会对开启了该特性的字段,额外构建一个 Document -> Field Value 的正排索引。此时,聚合操作的流程就变成了,先找到 KeyWord 的 Document,再从 DocValues 中加载 Document 对应的 Filed Value 到内存中进行运算即可。同时,如果 Field Value 占用的内存远小于节点的可用内存,则会自动常驻在内存中,进而保证了性能;反之,则会序列化到磁盘中,避免出现 OOM 问题。另外,因为其列式存储的特性,使得压缩效率也很高

进一步思考

设置 stored=”true” 属性同样会构建正排索引,它和 DocValues 又有何不同?

 主要区别在于,前者是行式存储,而后者是列式存储;且前者直接存储字段原始值,而后者会进行分词

ElasticSearch 5.0 内核迁移为 Lucene 6.x,是如何利用 Block K-d Tree 来解决 深度分页 问题?

ElasticSearch 2.x 现状

 使用 form + size 或者 scroll 的方式效率很低

算法简介

 K-d Tree,利用 方差 + 中位数 分割数据入 二叉树,再利用 二叉查找 + 递归溯源 的方式来查找数据。而 Block K-d Tree 则是一组 K-d Tree,解决了多维空间数据搜索,较高的空间利用率,高性能的查询和更新。使得 ElasticSearch 处理数值型数据和高维数据的性能提高了约 30%,同时存储空间大小节约了约 60%

缺点

 在搜索过程中,N 个节点、K 维 的 K-d Tree 最坏的时间复杂度为:$T_{worst} = O(K*N^{1-\frac{1}{K}})$,所以其无法很好地处理高维数据;

 而 Block K-d Tree 虽然在插入性能方面比 K-d Tree 提升了几个数量级,但是如果系统使用的是非 SSD 硬盘,则可能会出现严重的毛刺现象。

参考

源码阅读

源码分析是基于 ElasticSearch 7.4.1 版本进行的

环境搭建

1
2
3
4
5
6
7
# Lucene
$ ant ivy-bootstrap
$ ant idea

# ElasticSearch
$ ./gradlew clean
$ ./gradlew idea
如果遇到未完全下载等问题,可以通过执行 rm -rf ~/.ivy2/cache 清理缓存的命令来解决

Lucene

基本概念

索引(Index)

 在 Lucene 中一个索引是放在一个文件夹中的,同一文件夹中的所有的文件构成一个 Lucene 索引

段(Segment)

 一个索引可以包含多个段,段与段之间是独立的,添加新文档可以生成新的段,不同的段可以进行合并

文档(Document)

 文档是创建索引的基本单位,不同的文档会保存在不同的段中,即一个段可以包含多个文档

域(Field)

 一个文档包含不同类型的信息,分别存在不同的域中,也就是文档的字段

词(Term)

 词是索引的最小单位,本质上是词法分析和语言处理后的一个字符串

主体

索引
  • org.apache.lucene.index.IndexWriter

    索引写操作的核心类

  • org.apache.lucene.store.Directory

    IndexWriter 通过获取 Directory 的一个具体实现,在 Directory 指向的位置中操作索引

    • FSDirectory

      表示一个存储在文件系统中的索引的位置

    • ByteBuffersDirectory

      表示一个存储在内存当中的索引的位置

  • org.apache.lucene.analysis.Analyzer

    分析器会对内容进行过滤,分词,转换等,把过滤之后的数据交给 IndexWriter 进行索引

  • org.apache.lucene.document.Document

    由 Field 组成的文档记录的

  • org.apache.lucene.document.Field

    Document 中的一个字段

检索
  • org.apache.lucene.search.IndexSearcher

    以只读的方式打开一个索引,并进行检索

  • org.apache.lucene.index.Term

    Term 是文本检索的基本单元,一个 Term 由 KeyValue 组成,Key 为需要检索的字段名称,Value 为字段的值

  • org.apache.lucene.search.Query

    封装查询请求

    • TermQuery

      最基本的 Term 查询

    • BooleanQuery

      可以组合多个查询条件

    • NumericRangeQuery

      数值区间查询

    • PrefixQuery

      前缀查询

    • FuzzyQuery

      模糊查询

    • WildcardQuery

      通配符查询

    • RegexQuery

      正则表达式查询

    • MultiFieldQueryParser

      多值查询

    • TopDocs

      TopN 查询

实践

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# 下载 Lucene
$ wget https://mirrors.tuna.tsinghua.edu.cn/apache/lucene/java/8.2.0/lucene-8.2.0.tgz
$ tar zxvf lucene-8.2.0.tgz
$ ln -s lucene-8.2.0 lucene
$ cd lucene

# 需要被索引的数据
$ vim blog.txt
google.com
yuzhouwan.com

# 创建索引文件
$ java -classpath 'demo/lucene-demo-8.2.0.jar:core/lucene-core-8.2.0.jar' org.apache.lucene.demo.IndexFiles -docs blog.txt
Indexing to directory 'index'...
adding blog.txt
404 total milliseconds

# 检索文件
$ java -classpath 'demo/lucene-demo-8.2.0.jar:core/lucene-core-8.2.0.jar:queryparser/lucene-queryparser-8.2.0.jar' org.apache.lucene.demo.SearchFiles
Enter query:
blog
Searching for: blog
0 total matching documents

Enter query:
yuzhouwan
Searching for: yuzhouwan
0 total matching documents

Enter query:
yuzhouwan.com
Searching for: yuzhouwan.com
1 total matching documents
1. blog.txt

Transport 模块

功能

 保障了集群内各节点之间可以进行网络通讯

主体

  • org.elasticsearch.cli.Command#main

  • org.elasticsearch.cli.EnvironmentAwareCommand#execute

  • org.elasticsearch.bootstrap.Elasticsearch#execute

  • org.elasticsearch.bootstrap.Bootstrap

  • org.elasticsearch.node.Node

  • org.elasticsearch.transport.TransportService

    • connectToNode

      连接集群中的其他 Node 节点

    • sendRequest

      发送请求

    • registerRequestHandler

      注册 TransportRequestHandler,用于处理接收到的请求

  • org.elasticsearch.transport.Transport

  • org.elasticsearch.transport.netty4.Netty4Transport(transport-netty4 模块)

  • org.elasticsearch.transport.netty4.Netty4MessageChannelHandler

配置

  • transport.port

    Transport 服务绑定的端口范围,默认值为 9300-9400

  • transport.type

    控制 Transport 通讯时的网络类型,默认值为 netty4

  • transport.compress

    控制是否针对 Request 请求和 Response 返回结果进行压缩,默认值为 false

流程

 Command 完成 ElasticSearch 进程的启动,随后依次初始化 Bootstrap、Node、Transport 和 TransportService 并启动。节点间通讯的消息处理,最终由 Netty4MessageChannelHandler 进行处理。该 Handler 的注册操作,则会在 Netty4Transport 中完成

Zen Discovery 模块

功能

 可以让 ElasticSearch 节点自动互相发现,并组成一个集群,且能通过选举机制找到一个 Master 节点

配置

  • node.master

    如果设置为 true,则可以参与到 Master 选举过程中,默认值为 true

  • node.data

    如果设置为 true,则可以存储数据,并处理与数据相关的 CRUD、查询和聚合等操作,默认值为 true

    排列组合 node.masternode.data 这两个参数的结果,如下表所示:

master or datatruefalse
true既是种子节点,又是数据节点单纯的种子节点
false单纯的数据节点最简单的协调节点
其中,协调节点负责查询时的数据收集、合并以及聚合等操作。默认的,ElasticSearch 集群中所有节点都是协调节点。其实严格来说,除了 node.master 和 node.data 设置为 false,还需要将 node.ingest 也设置为 false 之后,才能算纯粹的 Coordinating 节点
  • node.ingest

    如果设置为 true,则可以创建一个 ingest pipeline 任务,对未被索引到 Index 之前的 Document 数据进行预处理,例如设置某一些字段值或者增加一个新字段(如记录摄入时间的新字段)。考虑到可能会增加机器的负载,生产环境中,最好将 Master 节点和 Node 节点上的这个功能关闭,默认值为 true

  • xpack.ml.enabled

    如果设置为 true,则可以处理 ElasticSearch 机器学习的接口请求,默认值为 true

  • node.ml

    如果设置为 true,则可以运行机器学习相关的任务,默认值为 true

  • discovery.seed_hosts

    提供一组可以成为 Master 的种子节点的 IP 地址,如果未指定端口号,将会自动使用 transport.port 的配置值(默认 9300)

  • discovery.seed_providers

    另一种提供种子节点 IP 地址的方式,指定一个存放 IP 地址的文件,可以动态加载,避免集群扩缩容时需要重启 ElasticSearch 进程的问题

  • discovery.find_peers_interval

    Discovery 机制的周期频率,默认值为 1s

流程

 ElasticSearch 将集群内的节点分为了四种类型,分别是 Master-eligible nodeData nodeIngest nodeMachine learning node。Master 选举只会在 Master-eligible 种子节点之间进行,触发条件为:

  1. 当前 Master-eligible 节点不是 Master 节点
  2. 当前 Master-eligible 节点与其它的节点通信后发现不存在 Master

 发起 Gossip + Bully 的选举流程:

  1. 寻找 clusterStateVersion 比自己高的 Master-eligible 的节点,向其发送选票
  2. 如果 clusterStatrVersion 一样,则计算自己能找到的 Master-eligible 节点(包括自己)中节点 ID 最小的一个节点,向该节点发送选票
  3. 如果一个节点收到过半的选票,并且它也向自己投票了,那么该节点成为 Master 节点

 成功从 Master-eligible 节点中选举出 Master 节点之后,那么这个 Master 节点将会负责集群(集群信息、集群健康、集群状态、集群配置、所有分片、节点关闭、集群路由)和索引(索引的创建与删除、索引 Template 和 Mapping 的创建删除与更新、索引 Warmer 创建删除与获取、索引的打开与关闭)相关的所有请求

其他

Bully

 Bully 算法假定所有节点都有一个惟一的 ID,并可以依据 ID 对集群内节点进行排序,而拥有最高 ID 的那个节点则会成为 Master 节点

Gossip

 Gossip 本质上就是一个并行的图广度优先遍历搜索算法,可以借助一个动图来直观感受一下:

Gossip

(图片来源:jianshu.com™,已联系作者,侵删谢谢)
单播 vs. 组播 vs. 广播
网络模型优点缺点
单播(unicast)点对点通讯可以保证低响应延迟服务端流量消耗较大
组播(multicast)共享数据流可以减少带宽负载无纠错机制,发生丢包或错包之后,很难弥补
广播(broadcast)网络设备简单可以减少布网和维护的成本无法提供定制化服务,且客户端的网络带宽将成为瓶颈
(图片来源:wikipedia.org™,已确认无版权)

数据写入

流程

 因为 ElasticSearch 内所有节点都默认属于 Coordinating 节点,这意味着每个节点都有能力处理任何一种请求

ElasticSearch Write

(图片来源:ElasticSearch™ 官方文档)

 这里我们以单 Index、两 Shard、三 Replication、三 Node 的 ElasticSearch 集群为例

ElasticSearch Write

(图片来源:ElasticSearch™ 官方文档)
  1. 客户端向 NODE 1 发送新建、索引或者删除请求
  2. 节点使用 Document 的 _id 确定 Document 属于 Shard 0。请求会被转发到 NODE 3,因为分片 0 的主分片目前被分配在 NODE 3
  3. NODE 3 在主分片上面执行请求。如果成功了,它将请求并行转发到 NODE 1NODE 2 的副本分片上。一旦所有的副本分片都成功返回,Node 3 将向协调节点报告成功,随后协调节点再向客户端反馈请求成功

 而内部持久化到磁盘的流程和 HBase 很像,所以还可以借鉴 HBase 的概念进行理解:

HBaseElasticSearch
MemStoreIndexBuffer
HFileSegment
WALTranslog
CompactionMerge

 写入的数据都会先存放在 Buffer 中,以保证写入的效率。之后,每隔一段时间(默认 5s,最小 100ms),ElasticSearch 会批量将 Buffer 中的数据,Sync 到磁盘上的 Segment 中。此时,Segment 处于打开状态,可以被查询检索到。而为了确保数据不会丢失,ElasticSearch 会将每一个事务操作都记录到 Translog 中。每次 Sync 操作只会清空 Buffer 中的数据,并不会清空 Translog 中的事务日志。只有写入一段时间后,进行全量 Sync 操作的时候,才会将 Translog 清空(默认是 Translog 达到 512M 或者 12 小时之后)

ElasticSearch Flush

(图片来源:ElasticSearch™ 官方文档)

数据查询

流程

 和数据写入不同的是,数据查询是可以在主分片或者其任意副本分片上,完成文档检索的。同时,为了保证负载均衡,每次请求的时候,还会轮询地去查询副本分片

ElasticSearch Read

(图片来源:ElasticSearch™ 官方文档)

 这里我们仍然以单 Index、两 Shard、三 Replication、三 Node 的 ElasticSearch 集群为例

  1. 客户端向 NODE 1 发送查询请求
  2. 节点使用 Document 的 _id 来确定 Document 属于 Shard 0。而所有节点上都存在着 Shard 0 的副本分片。这种情况下,它将请求转发到 NODE 2
  3. NODE 2 将文档返回给 NODE 1 ,然后将文档返回给客户端

段合并

流程

 为了避免 Segment 小文件过多,ElasticSearch 会定时进行 Segment 合并操作。因为段合并需要消耗大量的磁盘 IO 和 CPU,所以为了不影响正常的集群运行,ElasticSearch 会进行严格的限流,默认 20MB/s

Before ElasticSearch Segment Merge

After ElasticSearch Segment Merge

(图片来源:ElasticSearch™ 官方文档)

踩过的坑

The number of object passed must be even but was [1]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 建议不使用 JSON.toJSONString,而是自己在 POJO 中拼装 Map<String, Object>
public Map<String, Object> toJSON() {
HashMap<String, Object> json = new HashMap<>();
json.put("systemName", systemName);
json.put("ip", ip);
json.put("path", path);
json.put("time", time);
json.put("message", message);
return json;
}

(ElasticsearchSinkFunction<HBaseServerLog>) (element, ctx, indexer) -> {
_log.debug("Message: {} in ES Sink", element);
if (element == null) return;
indexer.add(indexRequest()
.index(HBASE_SERVER_LOG_INDEX_NAME)
.type(HBASE_SERVER_LOG_TYPE_NAME)
.source(element.toJSON()));
}

资料

Doc

Lucene

ElasticSearch

主分类子分类字段类型
核心类型字符串类型text / keyword
整数类型long /integer / short / byte
浮点类型double / float / half_float / scaled_float
逻辑类型boolean
日期类型date
范围类型integer_range / float_range / long_range / double_range / date_range
二进制类型binary
复合类型对象类型object
嵌套类型nested
数组类型不存在数组类型,所有的字段都有存储多个相同类型的字段值
地理类型地理坐标类型geo_point
地理地图geo_shape
特殊类型IP 类型ip
范围类型completion
令牌计数类型token_count
存储索引的 Hash 值类型murmur3
抽取类型percolator

Github

Blog

Book

Tool

欢迎加入我们的技术群,一起交流学习

人工智能 (高级)& (进阶)| BigData | 算法

Benedict Jin wechat
Subscribe to my blog by scanning my public wechat account.