Logstash介绍要使用logstash收集到Elasticsearch的方式,需确保logstash版本与es版本一致。
由于我也是刚刚研究使用,所以本文暂不会出现原理性的东西。
Logstash是具有实时流水线能力的开源的数据收集引擎。Logstash可以动态统一不同来源的数据,并将数据标准化到您选择的目标输出。它提供了大量插件,可帮助我们解析,丰富,转换和缓冲任何类型的数据。
【资料图】
inputs(输入阶段)会生成事件。包括:file、kafka、beats等
filters(过滤器阶段)可以将过滤器和条件语句结合使用对事件进行处理。包括:grok、mutate等
outputs(输出阶段)将事件数据发送到特定的目的地,完成了所以输出处理,改事件就完成了执行。如:elasticsearch、file等
使用方式下载地址 :https://www.elastic.co/fr/downloads/logstash
下载之后随便解压到某个目录,会得到以下这些目录和文件,我们需要注意的就三个目录,bin、config、logs,下面一个一个说。
先来看config文件夹,进入后会有这几个文件:
查看logstash-sample.conf配置文件
# Sample Logstash configuration for creating a simple# Beats -> Logstash -> Elasticsearch pipeline.input { #输入插件beats,轻量化 beats { #监听端口 port => 5044 }}output { #es连接地址及索引配置 elasticsearch { hosts => ["http://localhost:9200"] index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}" #user => "elastic" #password => "changeme" }}
下来稍微修改一下,我们启动试试。
# Sample Logstash configuration for creating a simple# Beats -> Logstash -> Elasticsearch pipeline.input { beats { port => 5044 } #添加file插件 file { #测试环境中我们一般是nohup后台启动jar包,默认日志追加到nohup文件中,这里我们用插件读取这个日志发送到es上试试 path => "/home/mm/mmm/nohup.out" mode => "read" }}output { elasticsearch { #配置自己的es连接,这里是使用es默认模板 hosts => ["http://localhost:9200"] index => "ceshi" #user => "elastic" #password => "changeme" }}
退回到bin目录下,启动
./logstash -f /自己路径下的配置文件/logstash/config/logstash-sample.conf
这样子就是启动成功了。
下面就是我们收集到的日志,大家可以看看默认都有什么字段。
{ "_index" : "console-analysis", "_type" : "_doc", "_id" : "_DDNH4kBVvgVIOGHRiop", "_score" : 1.0, "_source" : { "port" : 57910, "thread_name" : "main", "host" : "172.17.0.5", "logger_name" : "com.alibaba.nacos.client.naming", "@version" : "1", "level_value" : 20000, "message" : "[BEAT] adding beat: BeatInfo{port=17007, ip="192.168.1.59", weight=1.0, serviceName="DEFAULT_GROUP@@amcp-analysis", cluster="DEFAULT", metadata={preserved.register.source=SPRING_CLOUD}, scheduled=false, period=5000, stopped=false} to beat map.", "level" : "INFO", "logHost" : "192.168.1.59:5044", "appname" : "analysis" }
Springboot集成logstash+elasticsearch加入依赖 net.logstash.logback logstash-logback-encoder 7.1.1
配置在resources目录下创建一个logback-spring.xml的xml文件。
如果是使用nacos来获取配置的话,文件名字不能是logback-spring.xml,因为会导致logback-spring.xml文件被加载两次,这样在logback-spring.xml文件中如果想读取nacos上的配置的话是拿不到的。
在yml或者properties文件中添加配置
logging: config: classpath:logback-nacos.xmllogstash: host: localhost:5044
logback-spring.xml配置文件 ${logHost} 5000 5000 {"appname":"analysis"} %d{yyyy-MM-dd HH:mm:ss.SSS} %-5level --- [%thread] %logger{36} : %msg%n
logstash配置logstash-sample.conf配置文件input { #配置监听端口,也就是destination tcp { mode => "server" host => "localhost" port => 5044 codec => json_lines }}filter { #判断appname,在配置中声明一个变量,不同的appname赋予不同的值,这里其实就是根据我自定义的字段来给不同的es索引名称 if [appname] == "analysis"{ mutate { add_field => { "[@metadata][index]" => "console-analysis" } } } #判断日志级别,收集到的日志默认字段level会记录日志级别,这个时候我们可以根据需要对日志进行操作。下面这个操作是将日志级别,日志记录到自定义字段,以及将记录的时间进行转换记录到time字段,默认记录的时间是带时区的 if [level] =~ /DEBUG/ { mutate { add_field => { "type" => "DEBUG" "details" => "%{message}" } } ruby { code => " event.set("time", event.timestamp.time.localtime.strftime("%Y-%m-%d %H:%M:%S")) " } mutate { remove_field => ["[@timestamp]"] } }}output { #这里判断message字段中如果不包含HiddenHorzOCR就记录,只是演示一下这里面也可以进行逻辑判断 if !([message] =~ /HiddenHorzOCR/) { if [@metadata][index] { elasticsearch { hosts => ["http://192.168.1.59:9200"] #这个索引就是我们在filter中判断appname时赋的值 index => "%{[@metadata][index]}" #指定es要使用的模板,也可以使用默认的 template => "/home/collect.json" #模板名称 template_name => "collect" #加载模板是否覆盖之前的模板 template_overwrite => true } } }}
es模板{ "index_patterns": ["console*"], "settings": { "number_of_shards": 5, "max_result_window": "500000000" }, "mappings": { //自定义几个字段 "properties": { "type": { "type": "keyword" }, "details": { "type": "text" }, "time": { "type": "keyword" } } }}
所有配置添加完成之后,启动logstash和自己的应用程序,这个时候就可以上es或者kibana上查看创建出的索引以及收集到的日志。
这是指定模板后收集到的日志。
{ "_index" : "console-analysis", "_type" : "_doc", "_id" : "_jDNH4kBVvgVIOGHRiop", "_score" : 1.0, "_source" : { "port" : 57910, "details" : "Scanning for api listing references", "type" : "INFO", "time" : "2023-07-04 15:28:13", "thread_name" : "main", "host" : "172.17.0.5", "logger_name" : "springfox.documentation.spring.web.scanners.ApiListingReferenceScanner", "@version" : "1", "level_value" : 20000, "message" : "Scanning for api listing references", "level" : "INFO", "logHost" : "192.168.1.59:5044", "appname" : "analysis" } }