常见术语

  • Document:存储在ES中的一些数据,存储的最小单元。表中的一行数据。是JSON object,由field组成,有数据类型,每一个文档有唯一id,拥有meta data标注文档(_source,_id等)的相关信息。
  • Index:具有相同结构的文档的集合。对应表。拥有自己的mapping定义,定义字段名和类型。一个集群可以有多个索引,例如nginx日志可以按照每天生成一个索引来存储。
  • Node:一个es的运行实例,是构成集群的基本单元
  • Cluster:由一个或者多个节点组成,对外提供服务

倒排索引和分词

  • 正排索引:文档id到文档内容、单词的关联关系
  • 倒排索引:单词到文档id的关联关系

查询流程,通过关键字从倒排索引中找到文档id,然后通过文档id使用正排索引返回所有符合条件的文档。

倒排索引是搜索引擎的核心,主要包含2部分:

  • 单词词典(Term Dictionary):记录文档所有单词和单词到倒排列表的关联信息,一般比较大。实现方式一般是B+树
  • 倒排列表(Posting List):包含文档id,单词词频(Term Frequency,用于后续相关性算分),位置(position,记录单词在文档中分词的位置,可以是多个,用于做词语搜索Phrase Query),偏移(Offset,记录单词在文档开始和结束的位置,用于做高亮显示)

使用B+树实现单词字典

B+树好处:插入和查询性能高,更高效利用磁盘和内存的映射机制。

单词词典和倒排索引

例如:search 搜索引擎的时候通过term dictionary快速在b+树中定位到搜索引擎,拿到在postion list中的偏移量,从而得到倒排索引项的docid 1和3

es中分词称为文本分析analysis.自带分词器有standard/simple,whitespace,stop,keyword,pattern,language.

1
2
3
4
5
POST _analyze
{
"analyzer": "standard",
"text": "hello world!"
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
{
"tokens": [
{
"token": "hello",
"start_offset": 0,
"end_offset": 5,
"type": "<ALPHANUM>",
"position": 0
},
{
"token": "world",
"start_offset": 6,
"end_offset": 11,
"type": "<ALPHANUM>",
"position": 1
}
]
}

自定义分词器,其实是自定义分词器的3个组成部分:Character Filters,Tokenizer和Token Filter实现。

1
2
3
4
5
6
POST _analyze
{
"analyzer": "standard",
"filter": ["lowercase"],
"text": "Hello World!"
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
{
"tokens": [
{
"token": "hello",
"start_offset": 0,
"end_offset": 5,
"type": "<ALPHANUM>",
"position": 0
},
{
"token": "world",
"start_offset": 6,
"end_offset": 11,
"type": "<ALPHANUM>",
"position": 1
}
]
}

中文分词的难点

  • 英文可以使用空格作为自然分界符,但是中文没有一个形式上的分界符。
  • 上下文不同,分词结果迥异,例如交叉歧义问题,例如下面的两种分词结果其实都是合理的:
  • 乒乓球拍/卖/完了
  • 乒乓球/拍卖/完了

常用的中文分词系统有IK和jieba。更高阶的则是基于NLP的分词系统,例如Hanlp,THULAC.

Mapping设置

类似DB中表结构的定义,主要作用:

  • 定义index下的字段名(field name)
  • 定义字段类型,例如Number,Text,Boolean
  • 定义倒排索引相关的配置,例如是否索引,记录position等

自定义mapping和dynamic的效果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
PUT test_index
{
"mappings": {
"doc": {
"dynamic":false,
"properties": {
"title":{
"type": "text"
},
"name":{
"type": "text"
},
"age":{
"type": "integer"
}
}
}
}
}

GET test_index/_mapping

{
"test_index": {
"mappings": {
"doc": {
"dynamic": "false",
"properties": {
"age": {
"type": "integer"
},
"name": {
"type": "text"
},
"title": {
"type": "text"
}
}
}
}
}
}

PUT test_index/doc/1
{
"title":"hello world",
"desc":"nothing exist."
}

# 查询title可以返回结果
GET test_index/doc/_search
{
"query": {
"match": {
"title": "hello"
}
}
}

{
"took": 84,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 1,
"max_score": 0.2876821,
"hits": [
{
"_index": "test_index",
"_type": "doc",
"_id": "1",
"_score": 0.2876821,
"_source": {
"title": "hello world",
"desc": "nothing exist."
}
}
]
}
}

# 查询desc得不到结果
GET test_index/doc/_search
{
"query": {
"match": {
"desc": "no"
}
}
}

copy_to,将该字段的值复制到目标字段,实现类似_all的作用,一般不会出现在_source中,只用来搜索。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
PUT my_index
{
"mappings": {
"doc":{
"properties": {
"first_name":{
"type": "text",
"copy_to": "full_name"
},
"last_name":{
"type": "text",
"copy_to": "full_name"
},
"full_name":{
"type": "text"
}
}
}
}
}

PUT my_index/doc/1
{
"first_name":"张",
"last_name":"三丰"
}

GET my_index/_search
{
"query": {
"match": {
"full_name": {
"query": "张三丰",
"operator": "and"
}
}
}
}
{
"took": 117,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 1,
"max_score": 0.8630463,
"hits": [
{
"_index": "my_index",
"_type": "doc",
"_id": "1",
"_score": 0.8630463,
"_source": {
"first_name": "张",
"last_name": "三丰"
}
}
]
}
}

index字段控制当前字段是否索引,默认为true,如果设置为false则不可搜索。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
PUT my_index
{
"mappings": {
"doc":{
"properties": {
"cookie":{
"type": "text",
"index": false
}
}
}
}
}

PUT my_index/doc/1
{
"cookie":"name=root"
}

GET my_index/_search
{
"query": {
"match": {
"cookie": "name"
}
}
}
# 查询将会报错 Cannot search on field [cookie] since it is not indexed.

索引设置为false的情形主要用于一些敏感信息,例如身份证号、手机号,index_options用于控制倒排索引记录的内容。

dynamic templates:es默认会为字符串设置text类型,并增加一个keyword子字段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
PUT test_index
{
"mappings": {
"doc": {
"dynamic_templates": [
{
"strings_as_keywords":{
"match_mapping_type":"string",
"mapping":{
"type":"keyword"
}
}
}
]
}
}
}

PUT test_index/doc/1
{
"name":"hello"
}

GET test_index/_mapping
{
"test_index": {
"mappings": {
"doc": {
"dynamic_templates": [
{
"strings_as_keywords": {
"match_mapping_type": "string",
"mapping": {
"type": "keyword"
}
}
}
],
"properties": {
"name": {
"type": "keyword"
}
}
}
}
}
}

字段名匹配:以message开头的字段都设置为text类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
注意:2个dynamic_templates匹配顺序从上到下,只要匹配一个就结束了
PUT test_index
{
"mappings": {
"doc": {
"dynamic_templates": [
{
"message_as_text":{
"match_mapping_type":"string",
"match":"message*",
"mapping":{
"type":"text"
}
}
},
{
"strings_as_keywords":{
"match_mapping_type":"string",
"mapping":{
"type":"keyword"
}
}
}
]
}
}
}

PUT test_index/doc/1
{
"name":"hello world",
"message":"today is suny"
}

GET test_index/_mapping
{
"test_index": {
"mappings": {
"doc": {
"dynamic_templates": [
{
"message_as_text": {
"match": "message*",
"match_mapping_type": "string",
"mapping": {
"type": "text"
}
}
},
{
"strings_as_keywords": {
"match_mapping_type": "string",
"mapping": {
"type": "keyword"
}
}
}
],
"properties": {
"message": {
"type": "text"
},
"name": {
"type": "keyword"
}
}
}
}
}
}

自定义mapping的操作步骤如下:

  1. 写入一条文档到es的临时索引中,获取es自动生成的mapping
  2. 修改步骤1得到的mapping,自定义相关设置
  3. 使用步骤2的mapping创建实际所需索引
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
DELETE test_index
PUT test_index/doc/1
{
"referrer":"-",
"response_code":200,
"remote_ip":"189,90,89,00",
"method":"POST",
"username":"-",
"http_version":"1.1",
"body_sent":{
"bytes":0
},
"url":"/stat"
}

GET test_index/_mapping
拷贝上面的结果然后根据自己的需要进行适当修改即可

可以使用dynamic_templates将所有字段设置为keyword不分词,只将自己需要的字段按照合适的类型进行单独定义。灵活使用这个技术可以减少设置mapping的工作量。

index template,用于新建索引的时候自动应用预定的设置,简化创建索引的步骤。

Search API

endpoint为_search,调用如下:

1
2
3
4
GET _search
GET my_index/_search
GET my_index,test_index/_search
GET my_*/_search

查询有两种形式URI(仅包含部分语法)和request body(基于完整的query dsl)

1
2
3
4
5
6
7
8
9
10
GET /my_index/_search?q=username:tom

GET /my_index/_search
{
"query":{
"term":{
"username":"tom"
}
}
}
1
2
3
4
GET my_index/_seach?q=tom&df=username&sort=age:asc&from=4&size=10&timeout=1s
{
"profile":true
}

es提供了查询profile可以进行查询调优。

注意:term查询,phrase查询和泛查询

1
2
3
4
5
6
7
8
9
10
GET my_index/_search?q=username:tom black # tom为查询username,而black会查询所有字段
GET my_index/_search?q=username:"tom black" # 查询username完全匹配tom black
GET my_index/_search?q=username:(tom black) # group query,查询username=tom或者username=black

# + 和 -分别表示must和must not
GET my_index/_search?q=username:(tom NOT black) # username 包含tom不包含black
GET my_index/_search?q=username:(tom +black) # username 一定有black,包含tom的文档,注意urisearch的时候加号要替换成%2B

# 范围查询
GET my_index/_search?q=username:tom and age:>20 # username包含tom并且age>20

通配符查询执行效率低,并且吃内存,不建议使用,千万不要放在最前面可能导致OOM。

模糊匹配(fuzzy):roam~1,foam、roams都会匹配
近似度查询(proximity):"fox quick"~5,以term为单位进行差异比较,quick fox,quick brown fox都会匹配
一般用于用户输入的纠错。

Query DSL

Match Query

会进行分词处理。

match query的流程

通过operator参数可以控制match中单词的匹配关系,可选值and,or(默认)
minimum_should_match参数可以控制需要匹配的单词数

1
2
3
4
5
6
7
8
9
10
11
12
GET test_index/_search
{
"explain":true,
"query":{
"match":{
"username":{
"query":"jack tom",
"operator":"and"
}
}
}
}

5.x之后默认的相关性算分基于BM25,是针对TF-IDF模型的一个优化。BM(Best Match),迭代25次。一个大的优化是降低了TF在过大的时候的权重,我们可以使用explain参数查看详细的算分过程。

match_phase则将query看做一个完整的词语,强调顺序,可以使用slop实现和proximity相似的效果。
query_string类似于search uri中q参数的查询。

Term & Terms Query

不对查询语句进行分词处理。

注意

1
2
3
4
5
6
7
8
GET test_index/_search
{
"query":{
"term":{
"username":"jack tom"
}
}
}

以上查询将不会返回结果,因为构建倒排索引的时候会分词,倒排索引中不存在”jack tom”这个词语。

Range Query

针对数值和日期,gt,gte,lt,lte。

1
2
3
4
5
6
7
8
9
10
11
GET test_index/_search
{
"query":{
"range":{
"age":{
"gte":10,
"lt":20
}
}
}
}

针对日期提供了一种更友好的方式Date Math,可以实现类似“最近一个小时”的查询

now - 1d

now为基准日期,可以是具体的时间(使用双竖线隔离),例如2018-01-01
-1d为计算公式 +1h表示加一个小时 /d表示将时间舍入到天

Bool Query

Filter查询只过滤符合条件的文档,不计算相关性得分,es针对filter有智能缓存,因此执行效率很高。做简单匹配且不考虑分数的时候,推荐使用filter代替query。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
GET test_index/_search
{
"query":{
"bool":{
"filter":{
{
"term":{
"username":"admin"
}
}
}
}
}
}

分布式特性

cerebro是一个便捷的针对es集群的web管理工具。

1
2
3
./bin/elasticsearch -Ecluster.name=my_cluster -Enode.name=node1 -Epath.data=my_cluster_node1 -Ehttp.port=5200 -d
./bin/elasticsearch -Ecluster.name=my_cluster -Enode.name=node2 -Epath.data=my_cluster_node2 -Ehttp.port=5300 -d
./bin/elasticsearch -Ecluster.name=my_cluster -Enode.name=node3 -Epath.data=my_cluster_node3 -Ehttp.port=5400 -d
  • 可以修改cluster state的节点称为master节点,一个集群只能有一个,由集群中所有节点选举产生,可以被选举的节点称为master-eligible节点,相关配置node.master:true
  • 处理请求的节点称为coordinating节点,该节点为所有节点的默认您角色,不能取消。
  • 路由请求到正确的节点处理,例如创建索引的请求到master节点。
  • 存储数据的节点称为data节点,默认节点都是data类型,相关配置node.data:true

副本和分片

引入分片(Shard)机制可以将数据均匀分布到所有节点上,从而充分利用集群的存储资源,是ES能够存储PB级别数据的基石。分片数在索引创建的时候指定,后续不可修改,默认为5。分片分为主分片和副本分片,实现数据高可用。副本分片由主分片负责同步,可以有多个,从而提高读取的吞吐量。

ES分片和副本

注意:如果磁盘空间不足的时候es不会分片,参考集群设置cluster.routing.allocation.disk.threshold_enabled

分片

上面图中实线是主分片,虚线是副本分片。分片数的设定非常重要,需要提前规划好。过小会导致后续无法通过增加节点实现水平扩容;过大会导致一个节点上分布过多分片,造成资源浪费,同时影响查询性能。

Cluster Health & 故障转移

green:所有主副分片都正常分配
yellow:所有主分片正常分配,但是又副本分片未正常分配
red:有主分片未分配

GET _cluster/health

注意:即使集群处于RED状态,并不意味着集群不能提供服务。

1
2
# 找到node1所在pid并kill
ps aux | grep -i elasticsearc | grep node1

node1挂掉之后集群变成了yellow
经过一段时间集群重新生成副本变成了green

文档分布式存储

文档分片存储

document 1是如何存储在分片P1上,这取决于文档到分片的映射算法,目的:文档均匀分布在所有分片上,充分利用资源。

随机选择或者round-robin算法是不可取的,因为文档存储进去之后还需要读取,需要维护文档到分片的映射关系,成本巨大。es采取的做法是根据文档实时算出其所在的分片。

1
shard = hash(routing) % number_of_primary_shards

hash算法可以保证数据分布,routing默认为文档id。该算法和主分片数相关,因此这也是分片数一旦确定之后就不可更改的原因。

文档创建流程
文档读取流程

脑裂问题

脑裂问题
加入选举法定人数quorum避免脑裂问题

小技巧 :生产环境中设置master-eligible为3,quorum为2即可。

ES运行机制

Query Then Fetch

相关性算分

相关性算分在shard和shard之间是独立的(一个shard是一个Lucene Index,是一个完整的相关性算分的单位。),也就意味着同一Term的IDF等值在不同shard上是不同的。文档的相关性算分和它所处的shard相关
当文档数量不够多的时候会导致相关性算分严重不准的情况发生。

解决思路有2个:

  • 分片数设置为1。可以从根本上解决问题,在文档数量不多的时候可以考虑(百万、千万)
  • DFS Query then Fetch。拿到所有文档后重新进行一次相关性算分,耗费更多CPU和内存,性能低,不建议使用。

分页和遍历

from/size是最常用的分页解决方案。

深度分页

深度分页是所有的分布式搜索引擎和分布式系统都会遇到的问题,例如使用google搜索,调整url中的start

1
对不起,Google 为所有查询的结果数都不会超过 1000 个。 (您所请求的结果在第 28000000 个之后。)

scroll提供了遍历文档集的API,以快照的方式来避免深度分页的问题。

  • 不能用来做实时搜索,因为数据不是实时的
  • 尽量不要使用复杂的sort条件,使用_doc最高效
  • 使用稍微复杂
  • 过多的scroll调用会占用大量内存,可以使用clear api删除过多快照

search_after避免了深度分页的性能问题,提供了实时的下一页文档的获取功能。

  • 不能使用from参数,即:不能指定页数
  • 只能下一页,不能上一页
  • 使用简单

应用场景

  • from/size:需要实时获取顶部分不分文档,且需要自由翻页
  • scroll:需要全部文档,例如导出所有数据
  • search_after:需要全部文档,不需要自由翻页

聚合分析

ES聚合分析示例

聚合分析分为以下4类:

  • Bucket:类似SQL中的GROUP BY
  • Metric:计算最大值、最小值、平均值等
  • Pipeline:基于上一级的聚合分析结果再次进行分析
  • Matrix:矩阵分析类型

所有的Kibana图表都是基于ES的Aggression实现的。

Bucket聚合

按照一定的规则将分档分配到不同的桶中,达到分类分析的目的。

  • terms:直接按照terms来分桶,如果是text类型,则按照分词后的结果进行分桶
  • ranges:通过指定数值范围来设定分桶规则
  • historgram:直方图。以固定间隔的策略来分割数据

Metric聚合

  • Cardinality意思是集合的势或者基数,是指不同集合的个数,类似SQL中的distinct count的概念。
  • 使用stats可以一次性返回min,max,count,avg,sum。
  • extends_stats是对stats的拓展,包含了方差、标准差等。
  • percentile可以实现百分位数的统计。可以查看数据分布,例如:95%的请求在200ms内返回这样的需求。
  • top hits一般用于分桶后获取该桶内最匹配的顶部文档列表,即详情数据。

Bucket和Metric聚合分析

Bucket聚合分析允许通过添加子分析来进行进一步分析,该子分析可以是Bucket也可以是Metric。这也使得es的聚合分析能力变得异常强大。

作用范围

es默认作用范围是query的结果集,可以通过以下的方式改变其作用范围:

  • filter
  • post_filter
  • global

排序以及精准度

ES Terms排序可能是不准的

不准确的原因在于数据分散在多个Shard上,Coordinating Node无法得悉数据全貌。解决方案:

  1. 设置Shard数为1,可以消除数据分散的问题,但是无法承载大数据量
  2. 合理设置Shard_Size的大小,即每次从Shard上额外多获取数据,以提升准确度

数据规模&准确度&实时性的均衡

ES的聚合分析中Cardinality和Percentile分析使用的是近似统计算法。

数据建模

一个博客文章的索引可以设置如下:

// bash
{
“mappings”:{
“doc”:{
“source”:{
“enabled”:false
},
“properties”:{
“title”:{
“type”:”text”,
“fields”:{
“keyword”:{
“type”:”keyword”
}
},
“store”:true
},
“publish_date”:{
“type”:”date”,
“store”:true
},
“author”:{
“type”:”keyword”,
“store”:true
},
“abstract”:{
“type”:”text”,
“store”:true
},
“url”:{
“doc_values”:false,
“norms”:false,
“ignore_above”:100
“store”:true
}
}
}
}
}

由于文章内容可能非常大,所以关掉_source避免取过大的原始文档,为每个字段加了store属性专门存储字段原始值

  • 文章标题是需要进行全文检索和分词的,所以设置为text,keyword子字段可以对博客的标题进行完全匹配
  • url只需要做展示并不需要搜索

关联关系处理

ES不擅长处理RDBMS中的外键,可以通过Nested Object或者Parent/Child(Join数据类型)变相解决。其中使用Nested Object适用于查询频繁的场景,而Parent/Child父子文档独立更新,为了维护join关系需要占用部分内存并且读取性能差。

Reindex

重建所有数据的过程,一般发生在如下的情况:

  • mapping设置变更,例如字段类型变化、分词器字典更新等
  • index设置变更,例如分片数目更改
  • 迁移数据

es提供了_update_by_query(在现有索引上重建)和_reindex(将source中的数据重建到dest中)。数据重建的时间受源索引文档规模的影响,当规模越大的时候所需要的时间越多,此时需要设定url参数wait_for_completion为false启动异步任务来执行,可以通过_task来查看任务的执行进度和相关数据

集群调优

JVM设定

JVM内存的设定不要超过31GB,预留一般内存给OS,用来做文件缓存。具体大小根据node存储的数据量来估算,为了保证性能,在内存和数据量之间有一个建议的比例:

JVM内存的设定

写性能优化

es写数据存在3个过程:

  1. refresh:segment写入磁盘的过程非常耗时,可以借助fs缓存的特性先将segment在缓存中创建并开放查询来进一步提升实时性。在refresh之前文档会先存储在一个buffer中,refresh时将buffer中的所有文档清空并猩猩segment。es默认每秒执行一次refresh,这也是es被称为Near Real Time的原因。
  2. translog:用来避免segment还没有写入磁盘的时候发生了宕机。写入文档到buffer的同时将该操作写入translog。translog会即时写入磁盘(fsync)。es启动会检查translog并从中恢复数据。
  3. flush:将内存中的segment落盘,主要做如下工作:将translog写入磁盘;将index buffer清空,其中的文档生成一个新的segment,相当于一个refresh操作;更新commit point并写入磁盘;执行fsync操作将内存中的segment落盘;删除旧的translog文件。

写性能优化的目标是增大EPS(Events Per Second),优化方案:

  • 客户端:多线程写,批量写
  • ES:在高质量数据建模的前提下,主要是在refresh,translog和flush之间做文章

调优参数:refresh_interval,indices.memory.index_buffer_sizeindex.translog.durability,index.translog.sync_interval,index.translog.flush_threshold_size

读性能优化

  • 尽量使用filter上下文,减少算分的场景,由于filter有缓存机制,可以极大提高查询性能
  • 尽量不使用script进行字段计算或者算分排序
  • 结合profile、explain api分析慢查询语句,然后优化数据模型

如何设定Shard数

es的性能基本是线性拓展的,例如单个Shard的eps是1W,而线上eps要求是5W,则需要5个Shard(实际还需要考虑副本)。测试一个shard的流程如下:

  1. 搭建和生产环境相同配置的单节点集群
  2. 设定一个单分片零副本的索引
  3. 写入实际生产数据进行测试,获得写性能指标
  4. 针对数据进行查询请求,获取读性能指标

压测工具可以使用esrally,x-pack插件可以对es进行监控

LogStash

入门 & 运行机制

LogStash流程
LogStash架构

可以使用-r命令行参数启动LogStash,可以热重载配置,便于调试。

线程分析

LogStash使用不同的线程处理输入、过滤和输出:

LogStash线程

可以先使用jps -ml命令获得logstash的pid,再使用top -H -p pid查看线程信息。

Queue

LogStash的Queue分为2种:

  • In Memory:无法处理宕机
  • Persistent Queue In Disk:保证不丢失数据;保证数据至少消费一次;充当缓冲区,可以替代Kafka

相比内存中的队列,PQ的EPS下降不是特别严重(5%以内),相关配置

1
2
queue.type:persisted
queue.max_bytes:4gb

线程

  • 每个Input Thread对应一个线程(Input->Codec)
  • 每个pipeline worker Thread对应一个线程(Batcher->Filter->Output)

相关配置有pipeline.workers,pipeline.batch.size,pipeline.batch.delay

线程查看工具可以使用visual vm。

插件

Input

file插件用于从文件读取数据。

  • 通过sincedb解决重启LS时从上次读取的位置开始读取
  • 定时检查文件是否更新读取文件的新内容
  • 定时检查新文件来发现新文件
  • 如果文件发生了rotation操作,则被rotation的文件可以继续被读取(基于inode,和文件名没有关系)
  • 基于Filewatch的ruby库实现

调试小技巧:sincedb_path设置为/dev/null,start_position何止为beginning可以每次从头读取文件。

codec主要负责将数据在原始与Logstash Event之间转换。 multiline可以匹配多行,藏剑的用途是提取错误堆栈信息。

Filter

对Logstash Event进行转换解析,常用:date,grok常用正则,dissect基于分隔符解析数据,解决grok消耗过多CPU的问题。

dissect常用配置:

1
2
3
4
5
6
7
8
9
# / 后面的指定匹配的顺序
原始日志:two three one go
配置: %{+order/2} %{+order/3} %{+order/1} %{+order/4}
结果:{"order":"one two three go"}

# 动态的键值对(例如query参数)
原始日志:a=1&b=2
配置:%{?key1}=%{&key1}&%{?key2}=%{&key2}
结果:{"a":"1","b":"2"}

mutate是使用最频繁的插件,可以对字段进行各种操作:重命名、删除、替换、更新等。常用操作:convert类型转换,gsub字符串替换,split/join/merge,字符串切割/数组合并,rename字段重命名,update/replace字段内容更新或者替换,remove_field删除字段。

调试配置建议

使用http做input方便输入测试数据,并且可以结合reload特性(stdin无法reload)

1
2
3
input {http{port => 7474}}
filter {}
output {stdout{codec => rubydebug}}

@metadata特殊字段其内部不会输出到output中,适合用来存储做条件判断、临时存储的字段,相比remove_field有一定的性能提升。

1
2
3
4
5
6
7
8
9
10
11
12
13
input {http{port => 7474}}
filter {
# 这里相当于一个调试开关
mutate { add_field => {"[@metadata][debug]" => true} }
mutate { add_field => {"show" => "this data will in output"} }
}
output {
if [@metadata][debug] {
stdout {codec => rubydebug}
} else {
stdout {codec => dots}
}
}

实战apache日志

debug.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
input {http{port => 7474}}
filter {
grok {
match => {
"message" => "%{COMBINEDAPACHELOG}"
}
}
mutate {
remove_field => ["headers","message","timestamp"]
}
ruby {
code => "event.set('@read_timestamp',event.get('@timestamp'))"
}
geoip {
source => "clientip"
fields => ["location","country_name","city_name","region_name"]
}
useragent {
source => "agent"
target => "useragent"
}
date {
match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z"]
}
}
output {stdout{codec => rubydebug}}
1
./bin/logstash -f debug.conf -r # 使用-r参数可以热重载配置

使用POSTMAN发送HTTP请求,注意将请求内容放在body中:

1
218.19.140.242 - - [10/Dec/2010:09:31:17 +0800] "GET /query/trendxml/district/todayreturn/month/2009-12-14/2010-12-09/haizhu_tianhe.xml HTTP/1.1" 200 1933 "-" "Mozilla/5.0 (Windows; U; Windows NT 5.1; zh-CN; rv:1.9.2.8) Gecko/20100722 Firefox/3.6.8 (.NET CLR 3.5.30729)"

实战CSV文件

csv.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
input {
file {
path => "/tmp/player_data.csv"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
csv {
autodetect_column_names => true
autogenerate_column_names => false
convert => {
"uid" => "integer"
"play_count" => "integer"
"hundred_win_rate" => "float"
"landlord_count" => "integer"
"farmer_count" => "integer"
}
}
mutate {
add_field => {"role_count" => "%{landlord_count},%{farmer_count}"}
remove_field => ["landlord_count","farmer_count","message"]
}
date {
match => ["last_play_time","yyyy-MM-dd HH:mm:ss"]
}
}
output {
elasticsearch {
index => "player_data"
}
stdout {codec => rubydebug}
}

player_data.csv:

1
2
3
4
uid,play_count,max_rate,win_count,win_max,win_max_time,"last_play_time","hundred_results",hundred_win_rate,max_keep_win_count,win_max_rate,win_total,lose_total,spring_count,bomb_count,win_point,tnmt_count,tnmt_win_count,tnmt_best_rank,landlord_count,farmer_count,play_time,diamond_play_count
9682421,504,73728,247,460800,1534842968,"2018-10-24 14:33:48","1111111000001110010010111011111001011101001100001110100000010010110111011101111110000010000100101001",0.52,3,24576,3855191,5083567,5,348,0,0,0,99999,219,285,29220,0
9682423,15,6912,5,2832,1531899522,"2018-07-18 15:53:12","110001101000000",0.33,2,3072,9971,18879,NULL,26,0,0,0,99999,0,0,0,0
9682424,1447,6144,795,4822,1536214982,"2018-12-14 11:11:14","0001111010100001100011110000001110100101100001010111111010011010000101111100101100000010110011001101",0.48,9,4608,10230,393207,NULL,15,0,32,20,1,3,11,132,0

Beats

Filebeat

读取日志文件,但是不做数据处理,保证”At least once”至少被读取一次,数据不会丢(但某些情况下数据可能被重复消费)。可以处理多行数据、解析JSON、简单的过滤功能。

FileBeats架构

1
2
3
# -e输出到stderr,默认输出到syslog和logs/filebeat
# 由于6.x之后只能指定一个output,所以加上publish参数指定相关的debug日志
filebeat -e -c filebeat.yml -d "publish"

Elasticsearch Ingest Node

5.x新增的一个节点类型,可以在数据写入ES之前(bulk/index操作)对数据进行处理,可以配置独立的ingest node,(node.ingest:true)专门进行数据处理。api endpoint为pipeline.类似于logstash的filter。

Filebeats Module

filebeats提供了许多开箱即用的module用于数据采集->ES建模->数据处理->存储并分析数据的一整套解决方案。

1
2
filebeat modules list
filebeat modules enable nginx

参见安装目录下的modules目录和fields.yml。按照这个规则可以制定属于我们自己的module。

Metrics Beats

定期收集OS,软件或者服务的指标数据存储在ES中进行实时分析。

  • Logs:记录离散的事件,具有随机性。例如程序的调试信息或者错误信息
  • Metrics:记录度量可以聚合的数据,具有计划性。例如服务的响应时间
1
2
./metricbeat -e -d "publish"
./metricbeat setup --dashboards # 导入dashboard

metricsbeats同filebeats也会向es中导入index template。

1
2
GET _template/metricbeat*
GET metricbeat-6.1.1-2019.01.05/_search

也可以在kibana的discover中替代上面的命令行进行操作,dashboard面板用于展示指标的监控数据。

Packet Beats

抓取并解析网络包数据。packetbeats抓包配置有2种:

  • pcap:基于libcap,跨平台
  • af_packet:仅支持Linux,基于内存映射的嗅探技术,性能更好
1
2
3
4
5
./packetbeat setup --dashboards
sudo ./packetbeat -e -c packetbeat.yml -d "publish"

GET _template/packetbeat*
GET packetbeat-6.0.1-2019.01.10/_search

heartbeats用于检测主机是否存活。此外还有社区提供的beats可以满足需各种求。

Kibana

线上部署推荐专门部署一个Coordinating Only ES Node和Kibana在同一台机器上。

测试数据导入

在Kibana的management界面中创建一个名为logstash-*的index pattern。

Kibana可视化
Buckets-SplitSeries使用Terms aggression
Buckets-SplitCharts使用不同的图表展示不同的数据

注意Buckets中聚合的顺序不同将影响展示的结果。

时间序列图

项目实战

房屋搜索项目

数据源airbnb.csv

创建数据模型

airbnb.settings.json

导入数据

logstash配置文件ls.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
input{
stdin{}
}

filter{
csv{
columns => ["accommodates","bathrooms","bed_type","bedrooms","beds","date_from","date_o","date_rom","date_to","has_availability","host_image","host_name","image","listing_url","location","name","price","property_type","room_type"]
}

mutate{
remove_field=>["message"]
lowercase=>["has_availability"]
}
}

output{
elasticsearch{
index => "testairbnb"
}

stdout{
codec=>rubydebug

}
}

导入:

cat airbnb.csv | bin/logstash -f ls.conf

配置Kibana

  1. Management->Index Patterns->Create Index Pattern创建一个testairbnb的index pattern,Time Filter field name选择I don't want to use the Time Filter
  2. 修改host_image,image和listing_url的format为Url,并且将host_image,image的Type配置为Image,Url Template配置为{{rawValue}},配置成{{value}}会被转义。
  3. 接下来就可以在Discover中发现testairbnb了,由于我们没有选择time_filter,所以右上角不会出现时间选择。步骤2中修改的URL和图片也会在这个面板被正确展示出来。

房屋搜索

一个更加友好的房屋搜索项目

注意ES跨域访问

nginx日志分析

资料下载:

nginx_template.json
access.1w.log
nginx_log.conf

1
bin/logstash -f nginx_log.conf

导入数据后可以使用GET _cat/indices/nginx_logs_*查看创建的索引,在kinana中配置nginx_logs*index pattern。

nginx日志分析结果

北京空气质量分析

air_quality_index.json
air_quality_filebeat.yml

空气质量的CSV数据可以到美大使馆网站下载

导入数据:

1
cat air_quality/*.csv | ./filebeat -e -c filebeat-air_quality.yml -d "publish"

使用Python脚本将小时数据聚合成天数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
from datetime import datetime
from elasticsearch import Elasticsearch

es = Elasticsearch(['127.0.0.1:9200'])

search_query = {
"query": {
"range": {
"value": {
"gte": 1
}
}
},
"aggs": {
"days": {
"date_histogram": {
"field": "@timestamp",
"interval": "day",
"time_zone": "+08:00"
},
"aggs": {
"pm25": {
"stats": {
"field": "value"
}
}
}
}
},
"size": 0
}
res = es.search(index='air_quality', body=search_query)

index_name = 'air_quality_days'
index_type = 'doc'
es.indices.delete(index=index_name, ignore=[400, 404])

for info in res['aggregations']['days']['buckets']:
cur_date = datetime.strptime(info['key_as_string'], '%Y-%m-%dT%H:%M:%S.%f+08:00')
new_doc = {
"@timestamp": info['key_as_string'],
'year': cur_date.year,
'month': cur_date.month,
'day': cur_date.day,
"value_max": info['pm25']['max'],
"value_avg": info['pm25']['avg'],
"value_min": info['pm25']['min'],
}
es.index(index=index_name, doc_type=index_type, id=new_doc['@timestamp'], body=new_doc)
print(new_doc)

对于在es中没有的字段,可以在kibana的index pattern中的script fields中添加

2016年的北京雾霾
北京空气质量趋势

常用工具