Установка logstash для сбора логов Windows и syslog
25 Apr 2017Устанавливаем Java 8
sudo add-apt-repository -y ppa:webupd8team/java
sudo apt-get update
sudo apt-get -y install oracle-java8-installer
Устанавливаем elasticsearch
wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -
sudo apt-get install apt-transport-https
echo "deb https://artifacts.elastic.co/packages/5.x/apt stable main" | sudo tee -a /etc/apt/sources.list.d/elastic-5.x.list
sudo apt-get update && sudo apt-get install elasticsearch
Редактируем конфиг /etc/elasticsearch/elasticsearch.yml:
cluster.name: logstash.satin-pl.com
node.name: node-1
Добавляем в автозагрузку:
sudo update-rc.d elasticsearch defaults 95 10
Запускаем:
sudo /etc/init.d/elasticsearch start
Проверяем:
sudo curl http://localhost:9200
Правильный вывод:
{
"name" : "node-1",
"cluster_name" : "logstash.satin-pl.com",
"cluster_uuid" : "L_MQUkE0TNKG9IgttanYvQ",
"version" : {
"number" : "5.3.1",
"build_hash" : "5f9cf58",
"build_date" : "2017-04-17T15:52:53.846Z",
"build_snapshot" : false,
"lucene_version" : "6.4.2"
},
"tagline" : "You Know, for Search"
}
Устанавливаем logstash
sudo apt-get install logstash
Создаём INPUT-файл:
sudo vi /etc/logstash/conf.d/input-beats.conf
Содержимое файла:
input {
beats {
port => 5044
}
udp {
port => 8514
type => "syslog"
}
tcp {
port => 8514
type => "syslog"
}
}
Создаём FILTER-файл:
sudo vi /etc/logstash/conf.d/filter-syslog.conf
Содержимое файла:
filter {
#Преобразуем имена Winodws компьютеров к нижнему регистру (полезно для связки с icinga2)
if [type] == "wineventlog" {
mutate {
lowercase => ["computer_name]
}
}
#Преобразуем сообщения syslog различныx моделей оборудования
if [type] == "syslog" {
grok {
patterns_dir => [ "/etc/logstash/patterns" ]
match => [
#D-Link
"message", "<%{POSINT:syslog_pri}>%{SYSLOGTIMESTAMP:syslog_timestamp} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}",
"message", "<%{POSINT:syslog_pri}> %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}",
#Linux
"message", "<%{POSINT:syslog_pri}>%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}",
#IOS
"message", "<%{POSINT:syslog_pri}>(%{NUMBER:log_sequence#})?:( %{NUMBER}:)? %{CISCOTIMESTAMPTZ:log_date}: %%{CISCO_REASON:facility}-%{CISCO_REASON:facility_sub}-%{INT:severity_level}-%{CISCO_REASON:facility_mnemonic}: %{GREEDYDATA:syslog_message}",
"message", "<%{POSINT:syslog_pri}>(%{NUMBER:log_sequence#})?:( %{NUMBER}:)? %{CISCOTIMESTAMPTZ:log_date}: %%{CISCO_REASON:facility}-%{INT:severity_level}-%{CISCO_REASON:facility_mnemonic}: %{GREEDYDATA:syslog_message}",
"message", "<%{POSINT:syslog_pri}>(%{NUMBER:log_sequence#})?:( %{IPORHOST:syslog_hostname}:)?( %{NUMBER}:)? %{CISCOTIMESTAMPTZ:log_date}: %%{CISCO_REASON:facility}-%{INT:severity_level}-%{CISCO_REASON:facility_mnemonic}: %{GREEDYDATA:syslog_message}",
#ASA
"message", "<%{POSINT:syslog_pri}>%{CISCOTIMESTAMP:log_date} %{IPORHOST:syslog_hostname} : %%{CISCO_REASON:facility}-%{INT:severity_level}-%{CISCO_REASON:facility_mnemonic}: %{GREEDYDATA:syslog_message}",
#SmallBusiness
"message", "<%{POSINT:syslog_pri}>(%{NUMBER})? %{TIMESTAMP_ISO8601:log_date}( %{IPORHOST:syslog_hostname})? %{CISCO_REASON:facility} - %{CISCO_REASON:facility_mnemonic} - %{GREEDYDATA:syslog_message}",
# Nexus
"message", "<%{POSINT:syslog_pri}>(%{NUMBER:log_sequence#})?: %{NEXUSTIMESTAMP:log_date}: %%{CISCO_REASON:facility}-%{INT:severity_level}-%{CISCO_REASON:facility_mnemonic}: %{GREEDYDATA:syslog_message}",
"message", "<%{POSINT:syslog_pri}>(%{NUMBER:log_sequence#})?: %{NEXUSTIMESTAMP:log_date}: %%{CISCO_REASON:facility}-%{CISCO_REASON:facility_sub}-%{INT:severity_level}-%{CISCO_REASON:facility_mnemonic}: %{GREEDYDATA:syslog_message}"
]
add_field => [ "received_at", "%{@timestamp}" ]
add_field => [ "received_from", "%{host}" ]
}
syslog_pri {
syslog_pri_field_name => "syslog_pri"
}
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
}
#Возможно это лишнее ...
mutate {
gsub => [
"severity_level", "0", "0 - Emergency",
"severity_level", "1", "1 - Alert",
"severity_level", "2", "2 - Critical",
"severity_level", "3", "3 - Error",
"severity_level", "4", "4 - Warning",
"severity_level", "5", "5 - Notification",
"severity_level", "6", "6 - Informational",
"severity_level", "7", "7 - Debugging"
]
}
}
Создаём файл c patterns для разбора логов cisco:
sudo vi /etc/logstash/patterns/cisco
Содержимое файла:
#CISCOTIMESTAMPTZ %{CISCOTIMESTAMP}( %{TZ})?
NEXUSTIMESTAMP %{YEAR} %{MONTH} %{MONTHDAY} %{TIME}( %{TZ})?
#Патерн %{TZ} почему-то не сработал, поэтому определим его по другому
CISCOTZ %{WORD}
CISCOTIMESTAMPTZ %{CISCOTIMESTAMP}( %{CISCOTZ})?
Создаём OUTPUT-файл:
sudo vi /etc/logstash/conf.d/output-elasticsearch.conf
Содержимое файла:
output {
if [type] == "syslog" {
elasticsearch {
hosts => ["localhost:9200"]
sniffing => true
manage_template => false
index => "syslog-%{+YYYY.MM.dd}"
document_type => "%{type}"
}
} else {
elasticsearch {
hosts => ["localhost:9200"]
sniffing => true
manage_template => false
index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
document_type => "%{[@metadata][type]}"
}
}
if [type] == "syslog" and "_grokparsefailure" in [tags] {
file { path => "/var/log/logstash/failed_syslog_events-%{+YYYY-MM-dd}" }
}
}
Проверяем конфиг на ошибки, запускаем, и вносим в автозапуск:
sudo service logstash configtest
sudo service logstash restart
sudo update-rc.d logstash defaults 96 9
Проверяем порт:
netstat -a | grep 5044
Пример успешной работы:
tcp6 0 0 [::]:5044 [::]:* LISTEN
Устанавливаем kibana
sudo apt-get install kibana
sudo update-rc.d kibana defaults 95 10
Редактируем конфиг /etc/kibana/kibana.yml:
server.port: 5601
server.host: "logstash.satin-pl.com"
server.name: "logstash.satin-pl.com"
elasticsearch.url: "http://localhost:9200"
kibana.index: ".kibana"
Запускаем:
service kibana start
Устанавливаем Dashboards:
curl -L -O http://download.elastic.co/beats/dashboards/beats-dashboards-1.3.1.zip
unzip beats-dashboards-1.3.1.zip
cd beats-dashboards-1.3.1/
./load.sh
Создаем файл шаблона индекса beats:
winlogbeat.template.json.
```json
{
"mappings": {
"_default_": {
"_all": {
"norms": false
},
"_meta": {
"version": "5.3.1"
},
"date_detection": false,
"dynamic_templates": [
{
"strings_as_keyword": {
"mapping": {
"ignore_above": 1024,
"type": "keyword"
},
"match_mapping_type": "string"
}
}
],
"properties": {
"@timestamp": {
"type": "date"
},
"activity_id": {
"ignore_above": 1024,
"type": "keyword"
},
"beat": {
"properties": {
"hostname": {
"ignore_above": 1024,
"type": "keyword"
},
"name": {
"ignore_above": 1024,
"type": "keyword"
},
"version": {
"ignore_above": 1024,
"type": "keyword"
}
}
},
"computer_name": {
"ignore_above": 1024,
"type": "keyword"
},
"event_data": {
"properties": {}
},
"event_id": {
"type": "long"
},
"fields": {
"properties": {}
},
"keywords": {
"ignore_above": 1024,
"type": "keyword"
},
"level": {
"ignore_above": 1024,
"type": "keyword"
},
"log_name": {
"ignore_above": 1024,
"type": "keyword"
},
"message": {
"norms": false,
"type": "text"
},
"message_error": {
"ignore_above": 1024,
"type": "keyword"
},
"meta": {
"properties": {
"cloud": {
"properties": {
"availability_zone": {
"ignore_above": 1024,
"type": "keyword"
},
"instance_id": {
"ignore_above": 1024,
"type": "keyword"
},
"machine_type": {
"ignore_above": 1024,
"type": "keyword"
},
"project_id": {
"ignore_above": 1024,
"type": "keyword"
},
"provider": {
"ignore_above": 1024,
"type": "keyword"
},
"region": {
"ignore_above": 1024,
"type": "keyword"
}
}
}
}
},
"opcode": {
"ignore_above": 1024,
"type": "keyword"
},
"process_id": {
"type": "long"
},
"provider_guid": {
"ignore_above": 1024,
"type": "keyword"
},
"record_number": {
"ignore_above": 1024,
"type": "keyword"
},
"related_activity_id": {
"ignore_above": 1024,
"type": "keyword"
},
"source_name": {
"ignore_above": 1024,
"type": "keyword"
},
"tags": {
"ignore_above": 1024,
"type": "keyword"
},
"task": {
"ignore_above": 1024,
"type": "keyword"
},
"thread_id": {
"type": "long"
},
"type": {
"ignore_above": 1024,
"type": "keyword"
},
"user": {
"properties": {
"domain": {
"ignore_above": 1024,
"type": "keyword"
},
"identifier": {
"ignore_above": 1024,
"type": "keyword"
},
"name": {
"ignore_above": 1024,
"type": "keyword"
},
"type": {
"ignore_above": 1024,
"type": "keyword"
}
}
},
"user_data": {
"properties": {}
},
"version": {
"type": "long"
},
"xml": {
"norms": false,
"type": "text"
}
}
}
},
"order": 0,
"settings": {
"index.mapping.total_fields.limit": 10000,
"index.refresh_interval": "5s"
},
"template": "winlogbeat-*"
}
```
Создаем файл шаблона индекса syslog:
cisco.template.json.
```json
{
"template" : "syslog-*",
"settings" : {
"index.refresh_interval" : "5s"
},
"aliases": {
"syslog": {}
},
"mappings" : {
"_default_" : {
"_all" : {"enabled" : true, "omit_norms" : true},
"dynamic_templates" : [ {
"message_field" : {
"match" : "message",
"match_mapping_type" : "string",
"mapping" : {
"type" : "string", "index" : "analyzed", "omit_norms" : true
}
}
}, {
"string_fields" : {
"match" : "*",
"match_mapping_type" : "string",
"mapping" : {
"type" : "string", "index" : "analyzed", "omit_norms" : true,
"fields" : {
"raw" : {"type": "string", "index" : "not_analyzed", "ignore_above" : 256}
}
}
}
} ],
"properties" : {
"@host": { "type": "string", "index": "not_analyzed" },
"@project_environment": { "type": "string", "index": "not_analyzed" },
"@project": { "type": "string", "index": "not_analyzed" },
"@location": { "type": "string", "index": "not_analyzed" },
"@location_rack": { "type": "string", "index": "not_analyzed" },
"@syslog_facility": { "type": "string", "index": "not_analyzed" },
"@syslog_hostname": { "type": "string", "index": "not_analyzed" },
"@syslog_message": { "type": "string", "analyzer": "whitespace" },
"@message": { "type": "string", "analyzer": "whitespace" },
"@syslog_pri": { "type": "integer", "ignore_malformed": true, "index": "not_analyzed" },
"@syslog_program": { "type": "string", "index": "not_analyzed" },
"@syslog_severity": { "type": "string", "index": "not_analyzed" },
"@tags": { "type": "string", "index": "not_analyzed" },
"@version": { "type": "string", "index": "not_analyzed" },
"geoip" : {
"type" : "object",
"dynamic": true,
"properties" : {
"location" : { "type" : "geo_point" }
}
}
}
}
}
}
```
Загружаем шаблоны в elasticsearch:
sudo curl -XPUT 'http://localhost:9200/_template/winlogbeat' -d@winlogbeat.template.json
sudo curl -XPUT 'http://localhost:9200/_template/syslog' -d@syslog.template.json
Установка Windows агента отправки логов
Скачиваем https://download.elastic.co/beats/winlogbeat/winlogbeat-1.3.1-windows.zip. Распаковываем в C:\ и переименовываем в Winlogbeat. Запускаем PowerShell от админа и устанавливаем сервис:
PS C:\Users\Administrator> cd 'C:\Winlogbeat'
PS C:\Winlogbeat> .\install-service-winlogbeat.ps1
Если мы видим сообщение о том что скрипты отключены в системе по умолчанию (а так оно и будет), то мы просто создаём политику для Winlogbeat:
PowerShell.exe -ExecutionPolicy UnRestricted -File .\install-service-winlogbeat.ps1
Security warning
Run only scripts that you trust. While scripts from the internet can be useful,
this script can potentially harm your computer. If you trust this script, use
the Unblock-File cmdlet to allow the script to run without this warning message.
Do you want to run C:\Program Files\Winlogbeat\install-service-winlogbeat.ps1?
[D] Do not run [R] Run once [S] Suspend [?] Help (default is "D"): R
Status Name DisplayName
------ ---- -----------
Stopped winlogbeat winlogbeat
Перед стартом сервиса правим в конфиге — C:\Winlogbeat\winlogbeat.yml.
output:
#elasticsearch:
# hosts: localhost:9200
logstash:
hosts: ["logstash.satin-pl.com:5044"]
В блоку event_logs перечислены основные журналы системы, которые нужно транспортировать на Logstash:
winlogbeat:
registry_file: C:/ProgramData/winlogbeat/.winlogbeat.yml
event_logs:
- name: Application
- name: Security
- name: System
logging:
to_files: true
files:
path: C:/winlogbeat/winlogbeat/Logs
level: info
В event_logs можно добавить и другие журналы, список которых можно посмотреть так:
PS C:\Users\Administrator> Get-EventLog *
Если система выше Vista, то можно указать каналы:
PS C:\Users\Administrator> Get-WinEvent -ListLog * | Format-List -Property LogName
Далее нам нужно загрузить на сервер индексы для winlogbeat как мы это делали для topbeat, filebeat, packetbeat. Это можно сделать удалённо:
PS C:\Winlogbeat> Invoke-WebRequest -Method Put -InFile winlogbeat.template.json -Uri http://IP_address_elk-server:9200/_template/winlogbeat?pretty
Полезное
При длительной инициализации идекса Kibana можно удалить индексы с состоянием RED.
pslater@logstash:~$ curl 'localhost:9200/_cat/indices?v'
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size
red open syslog-2017.04.26 jbTbC3HvSv2XfNgznvHk3w 5 1
yellow open syslog-2017.05.04 OGeu1JQHRsu4RkA6VB8jkg 5 1 2397 0 1mb 1mb
yellow open winlogbeat-2017.06.02 x6cdWqfeTCeMrM2THNqjrQ 5 1 269252 0 279.3mb 279.3mb
yellow open winlogbeat-2017.05.28 LswZSmcsRQ6col1yK5eFDA 5 1 215294 0 211mb 211mb
red open winlogbeat-2017.05.02 0GeJzpiiQtmoFm4H5fRXuw 5 1
yellow open syslog-2017.05.22 AJ6gf6w7Tomw-jy3IJdLcA 5 1 2510 0 1.1mb 1.1mb
yellow open winlogbeat-2017.05.31 TvoUIR32T9WB_acb1yZH3Q 5 1 272311 0 282.7mb 282.7mb
red open winlogbeat-2017.04.27 ujAlo1kgQBC9hKlOhqvyCg 5 1
red open syslog-2017.04.30 iyA9wDVRTUu4YR9jOpoGPw 5 1
curl -XDELETE 'localhost:9200/winlog*'
curl -XDELETE 'localhost:9200/syslog*'
Просмотр индексов:
curl 'localhost:9200/_cat/indices?v'
Удаление всех индексов (Внимание! Слетят все настройки Kibana):
curl -XDELETE 'localhost:9200/*'
Вместо * можно указать неугодный индекс, например:
curl -XDELETE 'localhost:9200/winlogbeat-2017.04.25'
Для удаление старых логов не обходимо установить «питоновский» модуль:
pip install elasticsearch-curator
Если pip не установлен, то устанавливаем:
apt-get install python-pip
Настраиваем Cron:
crontab -e
#Удалять индексы если они превысили 5 Гигабайт:
20 0 20 0 * * * root /usr/local/bin/curator --host localhost delete --disk-space 5 >/dev/null
# Удалять индексы старше, например, 30 дней:
20 0 * * * root /usr/local/bin/curator --host localhost delete --older-than 30 >/dev/null
Посмотреть ноды:
curl 'localhost:9200/_cat/nodes?v'
Посмотерть статус работы Elasticsearch:
curl 'localhost:9200/_cat/health?v'
Таблица syslog_pri:
emergency alert critical error warning notice info debug
kernel 0 1 2 3 4 5 6 7
user 8 9 10 11 12 13 14 15
mail 16 17 18 19 20 21 22 23
system 24 25 26 27 28 29 30 31
security 32 33 34 35 36 37 38 39
syslog 40 41 42 43 44 45 46 47
lpd 48 49 50 51 52 53 54 55
nntp 56 57 58 59 60 61 62 63
uucp 64 65 66 67 68 69 70 71
time 72 73 74 75 76 77 78 79
security 80 81 82 83 84 85 86 87
ftpd 88 89 90 91 92 93 94 95
ntpd 96 97 98 99 100 101 102 103
logaudit 104 105 106 107 108 109 110 111
logalert 112 113 114 115 116 117 118 119
clock 120 121 122 123 124 125 126 127
local0 128 129 130 131 132 133 134 135
local1 136 137 138 139 140 141 142 143
local2 144 145 146 147 148 149 150 151
local3 152 153 154 155 156 157 158 159
local4 160 161 162 163 164 165 166 167
local5 168 169 170 171 172 173 174 175
local6 176 177 178 179 180 181 182 183
local7 184 185 186 187 188 189 190 191