Elasticsearch 5.0 后,引入的一种新的节点类型(Ingest Node),可用于取代Logstash实现数据预处理。本文档介绍该节点常见用法
1.1 Logstash VS Ingest Node
| Logstash | Ingest Node | |
|---|---|---|
| 数据输入与输出 | 支持从不同的数据源读取,并写入不同的数据源 | 支持从ES REST API获取数据,并且写入Elasticsearch |
| 数据缓冲 | 实现了简单的数据队列,支持重写 | 不支持缓冲 |
| 数据处理 | 支持大量的插件,也支持定制开发 | 内置的插件,可用开发Plugin进行扩展(Plugin更新需要重启) |
| 配置和使用 | 增加了一定的架构复杂度 | 无需额外部署 |
2 Pipeline
Pipeline - 管道会对通过的数据(文档),按照顺序进行加工
Processor - Elasticsearch 对一些加工的行为进行了抽象包装
Elasticsearch有很多内置的Processors。也支持通过插件的方式,实现自己的Processor
2.1 调用simulate API模拟Pipeline
调用Pipeline切分字符串
POST _ingest/pipeline/_simulate
{
"pipeline": {
"description": "to split blog tags",
"processors": [
{
"split": {
"field": "tags",
"separator": ","
}
}
]
},
"docs": [
{
"_index": "index",
"_id": "id",
"_source": {
"title": "Introducing big data......",
"tags": "hadoop,elasticsearch,spark",
"content": "You konw, for big data"
}
},
{
"_index": "index",
"_id": "idxx",
"_source": {
"title": "Introducing cloud computering",
"tags": "openstack,k8s",
"content": "You konw, for cloud"
}
}
]
}
#1 simulate API,模拟Pipeline
#5 在数组中模拟Processors
#14 测试文档
为文档添加字段
#同时为文档,增加一个字段。blog查看量
POST _ingest/pipeline/_simulate
{
"pipeline": {
"description": "to split blog tags",
"processors": [
{
"split": {
"field": "tags",
"separator": ","
}
},
{
"set":{
"field": "views",
"value": 0
}
}
]
},
"docs": [
{
"_index":"index",
"_id":"id",
"_source":{
"title":"Introducing big data......",
"tags":"hadoop,elasticsearch,spark",
"content":"You konw, for big data"
}
},
{
"_index":"index",
"_id":"idxx",
"_source":{
"title":"Introducing cloud computering",
"tags":"openstack,k8s",
"content":"You konw, for cloud"
}
}
]
}
#14-16 添加字段views,默认值0
2.2 Pipeline操作
新增pipeline
# 为ES添加一个 Pipeline
PUT _ingest/pipeline/blog_pipeline
{
"description": "a blog pipeline",
"processors": [
{
"split": {
"field": "tags",
"separator": ","
}
},
{
"set":{
"field": "views",
"value": 0
}
}
]
}
查看pipeline
#查看Pipleline
GET _ingest/pipeline/blog_pipeline
测试pipeline
#测试pipeline
POST _ingest/pipeline/blog_pipeline/_simulate
{
"docs": [
{
"_source": {
"title": "Introducing cloud computering",
"tags": "openstack,k8s",
"content": "You konw, for cloud"
}
}
]
}
调用pipleline更新数据
#不使用pipeline更新数据
PUT tech_blogs/_doc/1
{
"title":"Introducing big data......",
"tags":"hadoop,elasticsearch,spark",
"content":"You konw, for big data"
}
#使用pipeline更新数据
PUT tech_blogs/_doc/2?pipeline=blog_pipeline
{
"title": "Introducing cloud computering",
"tags": "openstack,k8s",
"content": "You konw, for cloud"
}
对已上传数据进行更新
#查看两条数据,一条被处理,一条未被处理
POST tech_blogs/_search
{}
#update_by_query 会导致错误
POST tech_blogs/_update_by_query?pipeline=blog_pipeline
{
}
#增加update_by_query的条件
POST tech_blogs/_update_by_query?pipeline=blog_pipeline
{
"query": {
"bool": {
"must_not": {
"exists": {
"field": "views"
}
}
}
}
}
2.3 内置的Processors
Split Processor
将给定字段值分为一个数组
Remove / Rename Processor
移除/重命名一个字段
Append
为商品增加一个新的标签
Convert
字符串转换成float,适用于商品价格等
Date/JSON
日期格式转换,字符串转JSON对象
Date Index Name Processor
将通过该处理器的文档,分配到指定时间格式的索引中
Fail Processor
一旦出现异常,该Pipeline指定的错误信息能返回给该用户
Foreach Process
数组字段,数组的每个元素都会适用到一个相同的处理器
Grok Processor
日志的日期格式切割
Gsub/Join/Split
字符串替换/数组转字符串/字符串转数组
Lowercase/Upcase
大小写转换
文章转载自lin在路上,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




