分享交流
合作共赢!

ELK基础使用方法总结【二】——Logstash

一、Logstash简介

(简介内容摘自网络)

1、官网

https://www.elastic.co/products/logstash

2、软件介绍

官方介绍:Logstash is an open source data collection engine with real-time pipelining capabilities。简单来说logstash就是一个具备实时数据传输能力的管道,负责将数据信息从管道的输入端传输到管道的输出端;与此同时这根管道还可以让你根据自己的需求在中间加上滤网,Logstash提供里很多功能强大的滤网以满足你的各种应用场景。

Logstash常用于日志关系系统中做日志采集设备;

Logstash的事件(logstash将数据流中等每一条数据称之为一个event)处理流水线有三个主要角色完成:inputs –> filters –> outputs:

  • inpust:必须,负责产生事件(Inputs generate events),常用:File、syslog、redis、beats(如:Filebeats)
  • filters:可选,负责数据处理与转换(filters modify them),常用:grok、mutate、drop、clone、geoip
  • outpus:必须,负责数据输出(outputs ship them elsewhere),常用:elasticsearch、file、graphite、statsd

其中inputs和outputs支持codecs(coder&decoder)在1.3.0 版之前,logstash 只支持纯文本形式输入,然后以过滤器处理它。但现在,我们可以在输入 期处理不同类型的数据,所以完整的数据流程应该是:input | decode | filter | encode | output;codec 的引入,使得 logstash 可以更好更方便的与其他有自定义数据格式的运维产品共存,比如:graphite、fluent、netflow、collectd,以及使用 msgpack、json、edn 等通用数据格式的其他产品等 

二、Logstash安装部署

1.安装logstash程序包

方法一:RPM包安装方法

1.下载logstash RPM安装包

下载地址:https://www.elastic.co/downloads/logstash

下载程序包:logstash-7.9.1.rpm

安装

]# yum install logstash-7.9.1.rpm

方法二:YUM安装方法【推荐】

1.下载并安装公共签名密钥

sudo rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch

2.yum源创建repo文件

~]# cat > /etc/yum.repos.d/elastic.repo << EOF
[elastic-7.x]
name=Elastic repository for 7.x packages
baseurl=https://artifacts.elastic.co/packages/7.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
EOF

3.yum安装

]# sudo yum install logstash 

2.配置环境变量

1.在/etc/profile.d目录中创建文件logstash.sh,并写入export PATH=/usr/share/logstash/bin:$PATH内容

]# cat > /etc/profile.d/logstash.sh << EOF
export PATH=/usr/share/logstash/bin:\$PATH
EOF

2.使其生效:

]# source /etc/profile.d/logstash.sh

3.查看是否生效:

]# echo $PATH
/usr/share/logstash/bin:/usr/share/logstash/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/root/bin

3.安装java环境包【若没有jdk需要安装

]# yum install java-11-openjdk

三、Logstash配置和使用

官方使用文档:https://www.elastic.co/guide/en/logstash/current/index.html

1.常用参数:

参数 说明 举例
-e 立即执行,使用命令行里的配置参数启动实例 logstash -e ‘input {stdin {}} output {stdout {}}’
-f 指定启动实例的配置文件 logstash -f config/test.conf
-t 测试配置文件的正确性 logstash-f config/test.conf -t
-l 指定日志文件名称 logstash-f config/test.conf -l logs/test.log
-w 指定filter线程数量,默认线程数是5 logstash-f config/test.conf -w 8

2.Logstash常用插件

1.input插件:

File: 从指定的文件中读取事件流(FileWatch监听文件的变化;.sincedb记录了每个被监听的文件的inode,major,number,minor,number,pos);

参数名称 类型 默认值 描述信息
add_field hash {} 用于向Event中添加字段
close_older number 3600 设置文件多久秒内没有更新就关掉对文件的监听
codec string “plain” 输入数据之后对数据进行解码
delimiter string “\n” 文件内容的行分隔符,默认按照行进行Event封装
discover_interval number 15 间隔多少秒查看一下path匹配对路径下是否有新文件产生
enable_metric boolean true  
exclude array path匹配的文件中指定例外,如:path => “/var/log/“;exclude =>”.gz”
id string 区分两个相同类型的插件,比如两个filter,在使用Monitor API监控是可以区分,建议设置上ID
ignore_older number 忽略历史修改,如果设置3600秒,logstash只会发现一小时内被修改过的文件,一小时之前修改的文件的变化不会被读取,如果再次修改该文件,所有的变化都会被读取,默认被禁用
max_open_files number logstash可以同时监控的文件个数(同时打开的file_handles个数),如果你需要处理多于这个数量多文件,可以使用“close_older”去关闭一些文件
path array 必须设置项,用于匹配被监控的文件,如“/var/log/.log”或者“/var/log/*/*.log”,必须使用绝对路径
sincedb_path string 文件读取记录,必须指定一个文件而不是目录,文件中保存没个被监控的文件等当前inode和byteoffset,默认存放位置“$HOME/.sincedb*”
sincedb_write_interval number 15 间隔多少秒写一次sincedb文件
start_position “beginning”,“end” ” end” 从文件等开头还是结尾读取文件内容,默认是结尾,如果需要导入文件中的老数据,可以设置为“beginning”,该选项只在第一次启动logstash时有效,如果文件已经存在于sincedb的记录内,则此配置无效
stat_interval number 1 间隔多少秒检查一下文件是否被修改,加大此参数将降低系统负载,但是增加了发现新日志的间隔时间
tags array 可以在Event中增加标签,以便于在后续的处理流程中使用
type string  

Event的type字段,如果采用elasticsearch做store,在默认情况下将作为elasticsearch

的type

 

file使用实例:

1.在/etc/logstash/conf.d目录下创建文件fiels.conf,并写入如下内容:

input {
file {
path => ["/var/log/messages"]
type => "system"
start_position => "beginning"
}
}


output {
stdout {
codec => rubydebug
}
}

2.设置/var/log/messages文件的访问权限

]# chmod 644 /var/log/messages

3.执行logstash开始测试,如下即以行的形式获取到messages的数据信息

]# logstash -f /etc/logstash/conf.d/fiels.conf
......
{
"@version" => "1",
"@timestamp" => 2020-09-23T02:17:40.973Z,
"path" => "/var/log/messages",
"type" => "system",
"message" => "Sep 23 10:17:02 master01 logstash: \"message\" => \"Sep 23 07:49:29 master01 kernel: SRAT: PXM 0 -> APIC 0x48 -> Node 0\",",
"host" => "master01"
}
......

udp: 通过udp协议从网络连接来读取Message,其必须参数为port,用于指明自己监听的端口,host则指明自己监听的IP地址;

udp使用示例:

实现目的:让另一个主机的日志收集工具(这里使用collectd演示)将日志数据通过udp协议传递到logstash的input插件中,logstash收集后进行输出。

1.下载epel源

[root@node02 ~]# yum install wget -y
]# wget -O /etc/yum.repos.d/epel.repo http://mirrors.aliyun.com/repo/epel-7.repo

2.安装collectd

]# yum install collectd -y

3.配置collectd:在/etc/collectd.conf配置文件中配置如下选项项:

]# grep -v "^#" /etc/collectd.conf | grep -v "^$"
Hostname "node02"
LoadPlugin syslog
LoadPlugin cpu
LoadPlugin interface
LoadPlugin load
LoadPlugin memory
LoadPlugin network
<Plugin network>
<Server "192.168.239.130" "50051">
# 192.168.239.130是对应logstash主机的IP地址,50051是其监听的udp端口
</Server>
</Plugin>

4.启动collectd

]# systemctl start collectd

5.在logstash上创建配置文件:

]# more udp.conf 
input {
udp {
port => 50051
codec => collectd {}
type => "colletcd"
}
}

output {
stdout {
codec => rubydebug
}
}

6.logstash运行udp配置文件

]# logstash -f /etc/logstash/conf.d/udp.conf
......
{
"plugin" => "interface",
"@version" => "1",
"plugin_instance" => "lo",
"rx" => 0,
"type" => "colletcd",
"@timestamp" => 2020-09-23T06:28:46.548Z,
"collectd_type" => "if_octets",
"tx" => 0,
"host" => "node02"
}
......

redis从redis读取数据,支持redis channel和lists两种方式;

2.filter插件

用于在将event通过output发出之前对其实现某些处理功能。

grok:用于分析并结构化文本数据,目前是logstash中将非结构化日志转化为结构化可查询数据的最好选择。

自定义grok模式:

grok模式是基于正则表达式编写,其元字符与其它用到正则表达式的工具awk/sed/grep/pcre差别不大。

grok pattren的语法为:%{SYNTAX:semantic},”:” 前面是grok-pattrens中定义的变量,后面可以自定义变量的名称。(?:%{SYNTAX:semantic}|-)这种形式是条件判断。

如果有双引号””或者中括号[],需要加 \ 进行转义。

grok-patterns默认路径:/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-patterns-core-4.1.2/patterns/grok-patterns

 自定义NGINX的access.log日志的grok模式:

# nginx access log
NGUSERNAME [a-zA-Z\.\@\+_%]+
NGUSER %{NGUSERNAME}
NGINXACCESS %{IPORHOST:clientip} - %{NOTSPACE:remote_user} \[%{HTTPDATE:timestamp}\] \"(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})\" %{NUMBER:response} (?:%{NUMBER:bytes}|-) %{QS:referrer} %{QS:agent} %{NOTSPACE:http_x_forwarded_for}

配置文件:

input {
file {
path => "/var/log/nginx/access.log"
type => "nginxaccess"
start_position => "beginning"
}
}

filter {
grok {
patterns_dir => "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-patterns-core-4.1.2/patter
ns/grok-patterns"
match => { "message" => "%{NGINXACCESS}" }
}
}

output {
stdout {
codec => rubydebug
}
}

输出结果:

{
"message" => "192.168.239.1 - - [25/Sep/2020:18:13:41 +0800] \"GET / HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.121 Safari/537.36\" \"-\"",
"host" => "master01",
"remote_user" => "-",
"path" => "/var/log/nginx/access.log",
"bytes" => "0",
"httpversion" => "1.1",
"http_x_forwarded_for" => "\"-\"",
"@timestamp" => 2020-09-25T10:14:06.088Z,
"agent" => "\"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.121 Safari/537.36\"",
"timestamp" => "25/Sep/2020:18:13:41 +0800",
"response" => "304",
"type" => "nginxaccess",
"referrer" => "\"-\"",
"clientip" => "192.168.239.1",
"request" => "/",
"verb" => "GET",
"@version" => "1"
}

未完待续……

赞(3) 打赏
未经允许不得转载:琼杰笔记 » ELK基础使用方法总结【二】——Logstash

评论 抢沙发

评论前必须登录!

 

分享交流,合作共赢!

联系我们加入QQ群

觉得文章有用就打赏一下文章作者

非常感谢你的打赏,我们将继续给力更多优质内容,让我们一起创建更加美好的网络世界!

支付宝扫一扫打赏

微信扫一扫打赏