暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

17_Elasticsearch操作_数据预处理

lin在路上 2020-07-02
250

Elasticsearch 5.0 后,引入的一种新的节点类型(Ingest Node),可用于取代Logstash实现数据预处理。本文档介绍该节点常见用法

1.1 Logstash VS Ingest Node


LogstashIngest Node
数据输入与输出支持从不同的数据源读取,并写入不同的数据源支持从ES REST API获取数据,并且写入Elasticsearch
数据缓冲实现了简单的数据队列,支持重写不支持缓冲
数据处理支持大量的插件,也支持定制开发内置的插件,可用开发Plugin进行扩展(Plugin更新需要重启)
配置和使用增加了一定的架构复杂度无需额外部署

2 Pipeline

  • Pipeline - 管道会对通过的数据(文档),按照顺序进行加工

  • Processor - Elasticsearch 对一些加工的行为进行了抽象包装

    • Elasticsearch有很多内置的Processors。也支持通过插件的方式,实现自己的Processor

2.1 调用simulate API模拟Pipeline

调用Pipeline切分字符串

 POST _ingest/pipeline/_simulate
 {
  "pipeline": {
    "description": "to split blog tags",
    "processors": [
      {
        "split": {
          "field": "tags",
          "separator": ","
        }
      }
    ]
  },
  "docs": [
    {
      "_index": "index",
      "_id": "id",
      "_source": {
        "title": "Introducing big data......",
        "tags": "hadoop,elasticsearch,spark",
        "content": "You konw, for big data"
      }
    },
    {
      "_index": "index",
      "_id": "idxx",
      "_source": {
        "title": "Introducing cloud computering",
        "tags": "openstack,k8s",
        "content": "You konw, for cloud"
      }
    }
  ]
 }
 #1 simulate API,模拟Pipeline
 #5 在数组中模拟Processors
 #14 测试文档

为文档添加字段

 #同时为文档,增加一个字段。blog查看量
 POST _ingest/pipeline/_simulate
 {
  "pipeline": {
    "description": "to split blog tags",
    "processors": [
      {
        "split": {
          "field": "tags",
          "separator": ","
        }
      },
      {
        "set":{
          "field": "views",
          "value": 0
        }
      }
    ]
  },
 
  "docs": [
    {
      "_index":"index",
      "_id":"id",
      "_source":{
        "title":"Introducing big data......",
  "tags":"hadoop,elasticsearch,spark",
  "content":"You konw, for big data"
      }
    },
    {
      "_index":"index",
      "_id":"idxx",
      "_source":{
        "title":"Introducing cloud computering",
  "tags":"openstack,k8s",
  "content":"You konw, for cloud"
      }
    }
    ]
 }
 #14-16 添加字段views,默认值0

2.2 Pipeline操作

新增pipeline

 # 为ES添加一个 Pipeline
 PUT _ingest/pipeline/blog_pipeline
 {
  "description": "a blog pipeline",
  "processors": [
      {
        "split": {
          "field": "tags",
          "separator": ","
        }
      },
 
      {
        "set":{
          "field": "views",
          "value": 0
        }
      }
    ]
 }

查看pipeline

 #查看Pipleline
 GET _ingest/pipeline/blog_pipeline

测试pipeline

 #测试pipeline
 POST _ingest/pipeline/blog_pipeline/_simulate
 {
  "docs": [
    {
      "_source": {
        "title": "Introducing cloud computering",
        "tags": "openstack,k8s",
        "content": "You konw, for cloud"
      }
    }
  ]
 }

调用pipleline更新数据

 #不使用pipeline更新数据
 PUT tech_blogs/_doc/1
 {
  "title":"Introducing big data......",
  "tags":"hadoop,elasticsearch,spark",
  "content":"You konw, for big data"
 }
 
 #使用pipeline更新数据
 PUT tech_blogs/_doc/2?pipeline=blog_pipeline
 {
  "title": "Introducing cloud computering",
  "tags": "openstack,k8s",
  "content": "You konw, for cloud"
 }

对已上传数据进行更新

 #查看两条数据,一条被处理,一条未被处理
 POST tech_blogs/_search
 {}
 
 #update_by_query 会导致错误
 POST tech_blogs/_update_by_query?pipeline=blog_pipeline
 {
 }
 
 #增加update_by_query的条件
 POST tech_blogs/_update_by_query?pipeline=blog_pipeline
 {
    "query": {
        "bool": {
            "must_not": {
                "exists": {
                    "field": "views"
                }
            }
        }
    }
 }

2.3 内置的Processors

  • Split Processor

    • 将给定字段值分为一个数组

  • Remove / Rename Processor

    • 移除/重命名一个字段

  • Append

    • 为商品增加一个新的标签

  • Convert

    • 字符串转换成float,适用于商品价格等

  • Date/JSON

    • 日期格式转换,字符串转JSON对象

  • Date Index Name Processor

    • 将通过该处理器的文档,分配到指定时间格式的索引中

  • Fail Processor

    • 一旦出现异常,该Pipeline指定的错误信息能返回给该用户

  • Foreach Process

    • 数组字段,数组的每个元素都会适用到一个相同的处理器

  • Grok Processor

    • 日志的日期格式切割

  • Gsub/Join/Split

    • 字符串替换/数组转字符串/字符串转数组

  • Lowercase/Upcase

    • 大小写转换


文章转载自lin在路上,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论