暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

快速上手ELK日志监控系统

程序员恰恰 2024-05-26
29
  • 一、ELK介绍

  • 二、安装ELK

  • 三、启动ELK

  • 四、测试


一、ELK介绍

ELK(Elasticsearch、Logstash、Kibana)它是一套用于处理大规模日志和实时数据分析的开源解决方案,由以下三个主要组件组成
Elasticsearch:它是一个高度可扩展的分布式搜索和分析引擎。Elasticsearch提供了快速的全文搜索、复杂查询、实时分析和数据可视化的功能。它还具有水平扩展性,可以处理大规模的数据集和高并发查询。
Logstash它是一个用于数据收集、转换和传输的开源工具。它可以从多个来源(如日志文件、消息队列、数据库等)收集数据,并将其转换为统一的格式。Logstash还支持对数据进行过滤、增强和转换,以便更好地进行存储和分析。
 Kibana:它是一个用于数据可视化和交互的开源工具。它提供了丰富的图表、图形和地图等可视化组件,用于展示和分析Elasticsearch中的数据。Kibana还提供了一个用户友好的界面,使用户能够轻松地创建和共享仪表板,并进行数据查询和导出。

二、安装ELK
本文使用dokcer快速安装ELK,不会docker的可以去B站快速上手一下。我的安装位置是/opt/elk,目录结构如下。

新建docker-elk.yml
version: '3'
services:
  elasticsearch:
    image: elasticsearch:7.17.3
    container_name: elasticsearch
    networks:
      -  elkDocker    
    user: root
    environment:
      - "cluster.name=elasticsearch" #设置集群名称为elasticsearch
      - "discovery.type=single-node" #以单一节点模式启动
      - "ES_JAVA_OPTS=-Xms512m -Xmx1024m" #设置使用jvm内存大小
    volumes:
      - /opt/elk/elasticsearch/plugins:/usr/share/elasticsearch/plugins #插件文件挂载
      - /opt/elk/elasticsearch/data:/usr/share/elasticsearch/data #数据文件挂载
    ports:
      - 9200:9200
      - 9300:9300
  logstash:
    image: logstash:7.17.3
    container_name: logstash
    environment:
      - TZ=Asia/Shanghai
    volumes:
       #挂载logstash的配置文件
      - /opt/elk/logstash/logstash.conf:/usr/share/logstash/pipeline/logstash.conf 
    networks:
      -  elkDocker  
    depends_on:
      - elasticsearch
    links:
      - elasticsearch:es #可以用es这个域名访问elasticsearch服务
    ports:
      - 4560:4560
      - 4561:4561
      - 4562:4562
      - 4563:4563
  kibana:
    image: kibana:7.17.3
    container_name: kibana
    links:
      - elasticsearch:es #可以用es这个域名访问elasticsearch服务
    depends_on:
      - elasticsearch
    networks:
      -  elkDocker
    environment:
      - "elasticsearch.hosts=http://es:9200" #设置访问elasticsearch的地址
    ports:
      - 5601:5601

networks:
  elkDocker:
    driver: bridge      

复制
在启动 docker 容器之前,建议将数据卷映射(volumes)的目录跟文件提前建好, 而且防止因为写权限引发一些问题,建议把这些目录或者文件改为777权限。然后在数据卷中配置 logstash.conf 文件,此文件是 logstash 的配置文件,它的作用是指定输入源和输出源,以及对数据进行处理、转换、过滤的规则。
chmod -R 777 elasticsearch
chmod -R 777 logstash/

复制
新建目录logstash、elasticsearch。并授权777,新建logstash的配置文件logstash.conf内容如下。
input {
  tcp {
    mode => "server"
    host => "0.0.0.0"
    port => 4560
    codec => json_lines
    type => "debug"
  }
}
filter{
      #对时间转换
      ruby {
         code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"
      }
      ruby {
        code => "event.set('@timestamp',event.get('timestamp'))"
      }
      mutate {
        remove_field =>["timestamp"]
      }
    json {
      source => "message"
      remove_field => "message"
      add_field => { "UserIP" => "%{[MDC][UserIP]}" }
      add_field => { "UserName" => "%{[MDC][UserName]}" }
      add_field => { "RequestUrl" => "%{[MDC][RequestUrl]}" }
      add_field => { "RequestId" => "%{[MDC][RequestId]}" }
      add_field => { "HandlerType" => "%{[MDC][HandlerType]}" }
    }
}
output {
  elasticsearch {
    hosts => "es:9200"
    index => "%{[appname]}-%{+YYYY.MM.dd}"
  }
}

复制

input 表示数据的来源,可以配置数据库,消息队列,文件,网络流等,这里我们使用的网络流,使用 tcp 协议监听4560端口(可配置多个)。

filter->json->add_field中,添加了一些字段,可以根据实际需求添加,这些字段来自MDC中的值, 我们将用MDC的值透传至整个服务调用请求链中。

output 的配置代表往es 中输出,且采用自定义字段appname建立appname+yyyy.mm.dd 的索引(索引在es中是非常重要的概念,可参考官方文档)。

三、启动ELK

如果是docker较新的版本,需要把 ‘docker-compose’ 换成 ‘docker compose’

docker-compose -f docker-elk.yml up -d
复制
如果es启动失败,提示如下错误,就是因为持久化目录没有读权限导致的,授权后,重新启动就行了。
验证ES启动成功
启动成功,在浏览器输入:IP:9200,看到"You Know, for Search"代表ES启动成功~
验证Kibana启动成功
IP:5601,就能打开Kibana的管理页面
点击Dev-Tools可以打开查询框
点击Stack Monitoring可以打开ES监控面板

四、测试

新建springboot项目,添加依赖

    <dependency>
      <groupId>net.logstash.logback</groupId>
      <artifactId>logstash-logback-encoder</artifactId>
      <version>7.1.1</version>
    </dependency>

复制

在resources目录中新建配置文件logback-spring.xml,自行修改配置文件中有一个logstash的appender,我们配置了logstash服务的ip+port,且为mdc的所有的key添加事件,并自定义了一个字段,用来区分应用名称。

<?xml version="1.0" encoding="UTF-8"?>
<!--
scan:当此属性设置为true时,配置文件如果发生改变,将会被重新加载,默认值为true。
scanPeriod:设置监测配置文件是否有修改的时间间隔,如果没有给出时间单位,默认单位是毫秒当scan为true时,此属性生效。默认的时间间隔为1分钟。
debug:当此属性设置为true时,将打印出logback内部日志信息,实时查看logback运行状态。默认值为false。
-->
<configuration scan="true" scanPeriod="30000" debug="false">

    <!-- 彩色日志依赖的渲染类 -->
    <conversionRule conversionWord="clr" converterClass="org.springframework.boot.logging.logback.ColorConverter" />
    <conversionRule conversionWord="wex" converterClass="org.springframework.boot.logging.logback.WhitespaceThrowableProxyConverter" />
    <conversionRule conversionWord="wEx" converterClass="org.springframework.boot.logging.logback.ExtendedWhitespaceThrowableProxyConverter" />

    <!--日志级别-->
    <property name="LOG_LEVEL" value="INFO" />
    <!--日志文件名前缀-->
    <property name="APP_NAME" value="nova-api" />
    <!--日志文件路径-->
    <property name="LOG_PATH" value="logs" />
    <!-- 日志文件名格式化 -->
    <property name="DATE_FORMAT" value="yyyy-MM-dd" />
    <!-- 控制台的输出格式 -->
    <property name="CONSOLE_LOG_PATTERN" value="${CONSOLE_LOG_PATTERN:-%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}"/>
    <!-- 日志文件的输出格式 -->
    <property name="FILE_LOG_PATTERN" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - User:%mdc{User} - RequestId:%mdc{RequestId} - HandlerType:%mdc{HandlerType} %msg%n" />
    <!-- 日志文件保留天数 -->
    <property name="LOG_MAX_HISTORY" value="90" />
    <!-- 日志文件大小最大限制 -->
    <property name="LOG_MAX_FILE_SIZE" value="200MB" />

    <!-- 控制台日志输出 -->
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
            <pattern>${FILE_LOG_PATTERN}</pattern>
        </encoder>
    </appender>

    <!--文件日志输出-->
    <!-- 滚动记录文件,先将日志记录到指定文件,当符合某个条件时,将日志记录到其他文件,并按照每天生成日志文件 -->
    <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <!--文件路径配置-->
        <file>${LOG_PATH}/${APP_NAME}.log</file>
        <!--日志文件滚动策略配置-->
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <FileNamePattern>${LOG_PATH}/${APP_NAME}.%d{${DATE_FORMAT}}-%i.log.gz</FileNamePattern>
            <MaxHistory>${LOG_MAX_HISTORY}</MaxHistory>
            <TimeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
                <MaxFileSize>${LOG_MAX_FILE_SIZE}</MaxFileSize>
            </TimeBasedFileNamingAndTriggeringPolicy>
        </rollingPolicy>
        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
            <pattern>${FILE_LOG_PATTERN}</pattern>
        </encoder>
    </appender>


    <!--输出到logstash的appender-->
    <appender name="logstash" class="net.logstash.logback.appender.LogstashTcpSocketAppender">

        <!--可以访问的logstash日志收集端口-->
        <destination>你的IP:4560</destination>
        <encoder charset="UTF-8" class="net.logstash.logback.encoder.LogstashEncoder">
            <!-- 将mdc中所有的key-value导入到logEvent中-->
            <includeMdc>true</includeMdc>
            <!--自定义字段,区分应用名称-->
            <customFields>{"AppName":"local-nova-api"}</customFields>
        </encoder>
    </appender>

    <!--异步输出日志-->
    <appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender">
        <!-- 不丢失日志.默认的,如果队列的80%已满,则会丢弃TRACT、DEBUG、INFO级别的日志 -->
        <discardingThreshold>0</discardingThreshold>
        <!-- 更改默认的队列的深度,该值会影性能.默认值为256-->
        <queueSize>256</queueSize>
        <appender-ref ref="logstash" />
    </appender>

    <!--指定日志等级-->
    <!--    <logger name="com.XXX.XXX.XXX.dao" level="debug"/>-->
    <logger name="org.apache.kafka.clients.NetworkClient" level="error"/>
    <logger name="o.a.k.c.c.internals.ConsumerCoordinator" level="error"/>


    <root level="${LOG_LEVEL}">
        <appender-ref ref="STDOUT" />
        <appender-ref ref="FILE" />
        <appender-ref ref="ASYNC" />
    </root>
</configuration>

复制

打开Kibana查询日志

按时间顺序查询日志级别是INFO的日志:

GET _search
{
  "sort": [
    {
      "@timestamp": {
        "order":"asc"
      }
    }
  ],
  "query": {
    "match": {
      "level": "INFO"
    }
  }
}

复制

按时间顺序查询查询message中包含启动端口8080的日志:

GET _search
{
  "sort": [
    {
      "@timestamp": {
        "order":"asc"
      }
    }
  ],
  "query": {
    "match": {
      "message": "8080"
    }
  }
}

复制

---THE END---


文章转载自程序员恰恰,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论