一、ELK介绍
二、安装ELK
三、启动ELK
四、测试
一、ELK介绍

version: '3'
services:
elasticsearch:
image: elasticsearch:7.17.3
container_name: elasticsearch
networks:
- elkDocker
user: root
environment:
- "cluster.name=elasticsearch" #设置集群名称为elasticsearch
- "discovery.type=single-node" #以单一节点模式启动
- "ES_JAVA_OPTS=-Xms512m -Xmx1024m" #设置使用jvm内存大小
volumes:
- /opt/elk/elasticsearch/plugins:/usr/share/elasticsearch/plugins #插件文件挂载
- /opt/elk/elasticsearch/data:/usr/share/elasticsearch/data #数据文件挂载
ports:
- 9200:9200
- 9300:9300
logstash:
image: logstash:7.17.3
container_name: logstash
environment:
- TZ=Asia/Shanghai
volumes:
#挂载logstash的配置文件
- /opt/elk/logstash/logstash.conf:/usr/share/logstash/pipeline/logstash.conf
networks:
- elkDocker
depends_on:
- elasticsearch
links:
- elasticsearch:es #可以用es这个域名访问elasticsearch服务
ports:
- 4560:4560
- 4561:4561
- 4562:4562
- 4563:4563
kibana:
image: kibana:7.17.3
container_name: kibana
links:
- elasticsearch:es #可以用es这个域名访问elasticsearch服务
depends_on:
- elasticsearch
networks:
- elkDocker
environment:
- "elasticsearch.hosts=http://es:9200" #设置访问elasticsearch的地址
ports:
- 5601:5601
networks:
elkDocker:
driver: bridge复制
chmod -R 777 elasticsearch
chmod -R 777 logstash/复制
input {
tcp {
mode => "server"
host => "0.0.0.0"
port => 4560
codec => json_lines
type => "debug"
}
}
filter{
#对时间转换
ruby {
code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"
}
ruby {
code => "event.set('@timestamp',event.get('timestamp'))"
}
mutate {
remove_field =>["timestamp"]
}
json {
source => "message"
remove_field => "message"
add_field => { "UserIP" => "%{[MDC][UserIP]}" }
add_field => { "UserName" => "%{[MDC][UserName]}" }
add_field => { "RequestUrl" => "%{[MDC][RequestUrl]}" }
add_field => { "RequestId" => "%{[MDC][RequestId]}" }
add_field => { "HandlerType" => "%{[MDC][HandlerType]}" }
}
}
output {
elasticsearch {
hosts => "es:9200"
index => "%{[appname]}-%{+YYYY.MM.dd}"
}
}复制
input 表示数据的来源,可以配置数据库,消息队列,文件,网络流等,这里我们使用的网络流,使用 tcp 协议监听4560端口(可配置多个)。
filter->json->add_field中,添加了一些字段,可以根据实际需求添加,这些字段来自MDC中的值, 我们将用MDC的值透传至整个服务调用请求链中。
output 的配置代表往es 中输出,且采用自定义字段appname建立appname+yyyy.mm.dd 的索引(索引在es中是非常重要的概念,可参考官方文档)。
如果是docker较新的版本,需要把 ‘docker-compose’ 换成 ‘docker compose’
docker-compose -f docker-elk.yml up -d
复制





四、测试
新建springboot项目,添加依赖
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>7.1.1</version>
</dependency>复制
在resources目录中新建配置文件logback-spring.xml,自行修改配置文件中有一个logstash的appender,我们配置了logstash服务的ip+port,且为mdc的所有的key添加事件,并自定义了一个字段,用来区分应用名称。
<?xml version="1.0" encoding="UTF-8"?>
<!--
scan:当此属性设置为true时,配置文件如果发生改变,将会被重新加载,默认值为true。
scanPeriod:设置监测配置文件是否有修改的时间间隔,如果没有给出时间单位,默认单位是毫秒当scan为true时,此属性生效。默认的时间间隔为1分钟。
debug:当此属性设置为true时,将打印出logback内部日志信息,实时查看logback运行状态。默认值为false。
-->
<configuration scan="true" scanPeriod="30000" debug="false">
<!-- 彩色日志依赖的渲染类 -->
<conversionRule conversionWord="clr" converterClass="org.springframework.boot.logging.logback.ColorConverter" />
<conversionRule conversionWord="wex" converterClass="org.springframework.boot.logging.logback.WhitespaceThrowableProxyConverter" />
<conversionRule conversionWord="wEx" converterClass="org.springframework.boot.logging.logback.ExtendedWhitespaceThrowableProxyConverter" />
<!--日志级别-->
<property name="LOG_LEVEL" value="INFO" />
<!--日志文件名前缀-->
<property name="APP_NAME" value="nova-api" />
<!--日志文件路径-->
<property name="LOG_PATH" value="logs" />
<!-- 日志文件名格式化 -->
<property name="DATE_FORMAT" value="yyyy-MM-dd" />
<!-- 控制台的输出格式 -->
<property name="CONSOLE_LOG_PATTERN" value="${CONSOLE_LOG_PATTERN:-%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}"/>
<!-- 日志文件的输出格式 -->
<property name="FILE_LOG_PATTERN" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - User:%mdc{User} - RequestId:%mdc{RequestId} - HandlerType:%mdc{HandlerType} %msg%n" />
<!-- 日志文件保留天数 -->
<property name="LOG_MAX_HISTORY" value="90" />
<!-- 日志文件大小最大限制 -->
<property name="LOG_MAX_FILE_SIZE" value="200MB" />
<!-- 控制台日志输出 -->
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>${FILE_LOG_PATTERN}</pattern>
</encoder>
</appender>
<!--文件日志输出-->
<!-- 滚动记录文件,先将日志记录到指定文件,当符合某个条件时,将日志记录到其他文件,并按照每天生成日志文件 -->
<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!--文件路径配置-->
<file>${LOG_PATH}/${APP_NAME}.log</file>
<!--日志文件滚动策略配置-->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<FileNamePattern>${LOG_PATH}/${APP_NAME}.%d{${DATE_FORMAT}}-%i.log.gz</FileNamePattern>
<MaxHistory>${LOG_MAX_HISTORY}</MaxHistory>
<TimeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<MaxFileSize>${LOG_MAX_FILE_SIZE}</MaxFileSize>
</TimeBasedFileNamingAndTriggeringPolicy>
</rollingPolicy>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>${FILE_LOG_PATTERN}</pattern>
</encoder>
</appender>
<!--输出到logstash的appender-->
<appender name="logstash" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
<!--可以访问的logstash日志收集端口-->
<destination>你的IP:4560</destination>
<encoder charset="UTF-8" class="net.logstash.logback.encoder.LogstashEncoder">
<!-- 将mdc中所有的key-value导入到logEvent中-->
<includeMdc>true</includeMdc>
<!--自定义字段,区分应用名称-->
<customFields>{"AppName":"local-nova-api"}</customFields>
</encoder>
</appender>
<!--异步输出日志-->
<appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender">
<!-- 不丢失日志.默认的,如果队列的80%已满,则会丢弃TRACT、DEBUG、INFO级别的日志 -->
<discardingThreshold>0</discardingThreshold>
<!-- 更改默认的队列的深度,该值会影性能.默认值为256-->
<queueSize>256</queueSize>
<appender-ref ref="logstash" />
</appender>
<!--指定日志等级-->
<!-- <logger name="com.XXX.XXX.XXX.dao" level="debug"/>-->
<logger name="org.apache.kafka.clients.NetworkClient" level="error"/>
<logger name="o.a.k.c.c.internals.ConsumerCoordinator" level="error"/>
<root level="${LOG_LEVEL}">
<appender-ref ref="STDOUT" />
<appender-ref ref="FILE" />
<appender-ref ref="ASYNC" />
</root>
</configuration>复制
打开Kibana查询日志
按时间顺序查询日志级别是INFO的日志:
GET _search
{
"sort": [
{
"@timestamp": {
"order":"asc"
}
}
],
"query": {
"match": {
"level": "INFO"
}
}
}复制
按时间顺序查询查询message中包含启动端口8080的日志:
GET _search
{
"sort": [
{
"@timestamp": {
"order":"asc"
}
}
],
"query": {
"match": {
"message": "8080"
}
}
}复制
---THE END---

文章转载自程序员恰恰,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。
评论
相关阅读
阿里云 Elasticsearch Serverless 检索增强型 8.17 版来袭!
阿里云大数据AI技术
362次阅读
2025-04-18 10:24:15
拉卡拉 x Apache Doris:统一金融场景 OLAP 引擎,查询提速 15 倍,资源直降 52%
SelectDB
128次阅读
2025-04-02 17:52:59
阿里云 Elasticsearch Serverless 检索增强型8.17版免费邀测!
阿里云大数据AI技术
60次阅读
2025-04-15 13:18:15
2025 XCOPS广州站:故障预测、根因分析全链路实战指南
铭毅天下Elasticsearch
44次阅读
2025-04-09 11:01:27
利用 EDB Postgres AI - WarehousePG 替换 Greenplum 实现数据仓库现代化
新智锦绣
37次阅读
2025-04-18 17:28:36
Elasticsearch 8.X 如何利用嵌入向量提升搜索能力?
铭毅天下Elasticsearch
37次阅读
2025-04-09 11:01:28
【视频上新】Coco AI 部署及使用详解
铭毅天下Elasticsearch
34次阅读
2025-04-09 11:01:26
还在用 ELK?你已经 Out 了
GreptimeDB
33次阅读
2025-04-18 17:28:41
为流复制优化TCP设置
新智锦绣
27次阅读
2025-04-10 11:52:27
深入解析 Coco AI:一款开源、跨平台的智能搜索与协作工具
铭毅天下Elasticsearch
26次阅读
2025-04-07 09:46:42