暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

ELK排错——logstash pipiline id 引发的血案

耶喝运维 2021-09-18
795

软件版本:
logstash-oss:7.12.1
错误事件描述:
由于logstash 有配置变更,变更后发现另一个管道中的索引(a.conf)字段message被无缘无故的删掉了
错误原因:
经过一些列的排查,定位到了原因在配置文件,新增配置文件如下

b.conf: |
    input {
      kafka {
        bootstrap_servers => "xxx"
        group_id => "xxx"
        topics_pattern => "xxx"
        consumer_threads => 3
        decorate_events => true
        auto_offset_reset => "latest"
        codec => "json"
      }
    }
    
    filter {
      mutate {
        remove_field => ["message""@version""agent""fields""ecs""input,tags"]
      }
    }
    
    output {
      elasticsearch {
        hosts => "xxx"
        ssl => true
        user => "xxx"
        password => "xxx"
        ilm_enabled => false
        index => "xxx"
      }
    }

复制

由于新增了b.conf后,发现a.conf中的message字段被删掉了a.conf配置如下

  a.conf: |
    input {
      kafka {
        bootstrap_servers => "xxx"
        group_id => "xxx"
        topics_pattern => "xxx"
        consumer_threads => 3
        decorate_events => true
        auto_offset_reset => "latest"
        codec => "json"
      }
    }

    filter {
      kv {
        source => "message"
        field_split => ","
        value_split => ": "
      }
      mutate {
        remove_field => ["message""@version""agent""fields""ecs""input"]
      }
    }

    output {
      elasticsearch {
        hosts => "xxx"
        ssl => true
        user => "xxx"
        password => "xxx"
        ilm_enabled => false
        index => "xxx"
      }
    }

复制

经过一些列复杂的排查,定位到原因是因为logstash pipeline使用了一个id即配置为

 pipelines.yml: |
    # This file is where you define your pipelines. You can define multiple.
    # For more information on multiple pipelines, see the documentation:
    #   https://www.elastic.co/guide/en/logstash/current/multiple-pipelines.html
    - pipeline.id: main
      path.config: "/usr/share/logstash/pipeline/"

复制

这么写是有问题的,具体原因通过分析源码可以得到一个结论,logstash在pipeline的流程中使用了一个生产者+消费者模型,生产者为写客户端inputQueueClient(input),消费者为读客户端filterQueueClient(filter+output)

@JRubyMethod
@SuppressWarnings("unchecked")
public JrubyWrappedSynchronousQueueExt initialize(final ThreadContext context,
    IRubyObject size) {
    int typedSize = ((RubyNumeric)size).getIntValue();
    this.queue = new ArrayBlockingQueue<>(typedSize);
    return this;
}

@Override
protected JRubyAbstractQueueWriteClientExt getWriteClient(final ThreadContext context) {
    return JrubyMemoryWriteClientExt.create(queue);
}

@Override
protected QueueReadClientBase getReadClient() {
    // batch size and timeout are currently hard-coded to 125 and 50ms as values observed
    // to be reasonable tradeoffs between latency and throughput per PR #8707
    return JrubyMemoryReadClientExt.create(queue, 125, 50);
}

复制

所以如果在同一个pipelineid 中那么你的input,output,filter会被合并到一起 (这个是经过测试后得到的答案,源码没找到,太乱了一会java 一会ruby)
解决办法:
修改pipeline配置文件,这样就完美解决了,后续还需要统一对logstash 的pipeline 做一下规则整理,实现pipeline-to-pipeline等功能

pipelines.yml: |
    # This file is where you define your pipelines. You can define multiple.
    # For more information on multiple pipelines, see the documentation:
    #   https://www.elastic.co/guide/en/logstash/current/multiple-pipelines.html
    - pipeline.id: a
      path.config: "/usr/share/logstash/pipeline/a.conf"
    - pipeline.id: b
      path.config: "/usr/share/logstash/pipeline/b.conf"

复制


文章转载自耶喝运维,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论