企业级logstash简单使用(ELK)

博客园   2023-07-05 15:42:36

企业级logstash简单使用(ELK)

要使用logstash收集到Elasticsearch的方式,需确保logstash版本与es版本一致。

由于我也是刚刚研究使用,所以本文暂不会出现原理性的东西。

Logstash介绍

Logstash是具有实时流水线能力的开源的数据收集引擎。Logstash可以动态统一不同来源的数据,并将数据标准化到您选择的目标输出。它提供了大量插件,可帮助我们解析,丰富,转换和缓冲任何类型的数据。


【资料图】

inputs(输入阶段)

会生成事件。包括:file、kafka、beats等

filters(过滤器阶段)

可以将过滤器和条件语句结合使用对事件进行处理。包括:grok、mutate等

outputs(输出阶段)

将事件数据发送到特定的目的地,完成了所以输出处理,改事件就完成了执行。如:elasticsearch、file等

使用方式

下载地址 :https://www.elastic.co/fr/downloads/logstash

下载之后随便解压到某个目录,会得到以下这些目录和文件,我们需要注意的就三个目录,bin、config、logs,下面一个一个说。

先来看config文件夹,进入后会有这几个文件:

查看logstash-sample.conf配置文件

# Sample Logstash configuration for creating a simple# Beats -> Logstash -> Elasticsearch pipeline.input {  #输入插件beats,轻量化  beats {     #监听端口    port => 5044  }}output {  #es连接地址及索引配置  elasticsearch {    hosts => ["http://localhost:9200"]    index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"    #user => "elastic"    #password => "changeme"  }}

下来稍微修改一下,我们启动试试。

# Sample Logstash configuration for creating a simple# Beats -> Logstash -> Elasticsearch pipeline.input {  beats {    port => 5044  }  #添加file插件  file {    #测试环境中我们一般是nohup后台启动jar包,默认日志追加到nohup文件中,这里我们用插件读取这个日志发送到es上试试    path => "/home/mm/mmm/nohup.out"    mode => "read"  }}output {  elasticsearch {    #配置自己的es连接,这里是使用es默认模板    hosts => ["http://localhost:9200"]    index => "ceshi"    #user => "elastic"    #password => "changeme"  }}

退回到bin目录下,启动

./logstash -f /自己路径下的配置文件/logstash/config/logstash-sample.conf

这样子就是启动成功了。

下面就是我们收集到的日志,大家可以看看默认都有什么字段。

{                "_index" : "console-analysis",        "_type" : "_doc",        "_id" : "_DDNH4kBVvgVIOGHRiop",        "_score" : 1.0,        "_source" : {          "port" : 57910,          "thread_name" : "main",          "host" : "172.17.0.5",          "logger_name" : "com.alibaba.nacos.client.naming",          "@version" : "1",          "level_value" : 20000,          "message" : "[BEAT] adding beat: BeatInfo{port=17007, ip="192.168.1.59", weight=1.0, serviceName="DEFAULT_GROUP@@amcp-analysis", cluster="DEFAULT", metadata={preserved.register.source=SPRING_CLOUD}, scheduled=false, period=5000, stopped=false} to beat map.",          "level" : "INFO",          "logHost" : "192.168.1.59:5044",          "appname" : "analysis"        }
Springboot集成logstash+elasticsearch加入依赖
                    net.logstash.logback            logstash-logback-encoder            7.1.1        
配置

在resources目录下创建一个logback-spring.xml的xml文件。

如果是使用nacos来获取配置的话,文件名字不能是logback-spring.xml,因为会导致logback-spring.xml文件被加载两次,这样在logback-spring.xml文件中如果想读取nacos上的配置的话是拿不到的。

在yml或者properties文件中添加配置

logging: config: classpath:logback-nacos.xmllogstash: host: localhost:5044
logback-spring.xml配置文件
                        ${logHost}                 5000        5000                                {"appname":"analysis"}                                        %d{yyyy-MM-dd HH:mm:ss.SSS}  %-5level --- [%thread] %logger{36} : %msg%n                                         
logstash配置logstash-sample.conf配置文件
input {  #配置监听端口,也就是destination  tcp {    mode => "server"    host => "localhost"    port => 5044    codec => json_lines  }}filter {  #判断appname,在配置中声明一个变量,不同的appname赋予不同的值,这里其实就是根据我自定义的字段来给不同的es索引名称  if [appname] == "analysis"{    mutate {      add_field => {        "[@metadata][index]" => "console-analysis"      }    }  }  #判断日志级别,收集到的日志默认字段level会记录日志级别,这个时候我们可以根据需要对日志进行操作。下面这个操作是将日志级别,日志记录到自定义字段,以及将记录的时间进行转换记录到time字段,默认记录的时间是带时区的  if [level] =~ /DEBUG/ {    mutate {      add_field => {        "type" => "DEBUG"        "details" => "%{message}"      }    }    ruby {      code => "        event.set("time", event.timestamp.time.localtime.strftime("%Y-%m-%d %H:%M:%S"))      "    }    mutate {     remove_field => ["[@timestamp]"]    }  }}output { #这里判断message字段中如果不包含HiddenHorzOCR就记录,只是演示一下这里面也可以进行逻辑判断 if !([message] =~ /HiddenHorzOCR/) {   if [@metadata][index] {     elasticsearch {       hosts => ["http://192.168.1.59:9200"]       #这个索引就是我们在filter中判断appname时赋的值       index => "%{[@metadata][index]}"       #指定es要使用的模板,也可以使用默认的       template => "/home/collect.json"       #模板名称       template_name => "collect"       #加载模板是否覆盖之前的模板       template_overwrite => true     }   } }}
es模板
{  "index_patterns": ["console*"],  "settings": {    "number_of_shards": 5,    "max_result_window": "500000000"  },  "mappings": {    //自定义几个字段    "properties": {      "type": { "type": "keyword" },      "details": { "type": "text" },      "time": {                 "type": "keyword"       }    }  }}

所有配置添加完成之后,启动logstash和自己的应用程序,这个时候就可以上es或者kibana上查看创建出的索引以及收集到的日志。

这是指定模板后收集到的日志。

{        "_index" : "console-analysis",        "_type" : "_doc",        "_id" : "_jDNH4kBVvgVIOGHRiop",        "_score" : 1.0,        "_source" : {          "port" : 57910,          "details" : "Scanning for api listing references",          "type" : "INFO",          "time" : "2023-07-04 15:28:13",          "thread_name" : "main",          "host" : "172.17.0.5",          "logger_name" : "springfox.documentation.spring.web.scanners.ApiListingReferenceScanner",          "@version" : "1",          "level_value" : 20000,          "message" : "Scanning for api listing references",          "level" : "INFO",          "logHost" : "192.168.1.59:5044",          "appname" : "analysis"        }      }