Многие админы слышали об ElasticSearch - свободной поисковой системе, основанной на Lucene. Несмотря на недоверие хейтеров Java, ElasticSearch реально быстр в плане поиска и анализа информации, которую вы поместили в базу ES. Много лет уже ElasticSearch нам помогает в работе: есть у нас и удалённый syslog сервер и успешно опробованный netflow сервер (коллектор + система обработки и представления данных). Часто требуется в админской практике что-то поискать в журнале Postfix. Даже вспомогательными скриптами тяжело grep'пить multiline журналы, хотелось бы по-современному загонять их в некую базу и потом там легко искать нужное, строить графики, проводить аналитику и т.д.
Благодаря грамотной архитектуре почтового сервера Postfix достигается безопасность и надёжность, но небольшой платой за спокойствие является сложность разбора журнала работы Postfix. Различные даемоны Postfix пишут нужную тебе информацию о происходящей SMTP сессии в виде множества строк, которые хотелось красиво разобрать (parse) и проанализировать.
Упрощённый пример SMTP сессии Postfix из текстового журнала.
Apr 7 11:54:54 mail postfix/smtpd[23202]: ACE141C020C: client=pc1.example.com[10.1.2.3]
Apr 7 11:54:54 mail postfix/cleanup[24213]: ACE141C020C: message-id=<f47ee90f5429@example.com>
Apr 7 11:54:54 mail postfix/qmgr[1485]: ACE141C020C: from=<firma@example.com>, size=928, nrcpt=2 (queue active)
Apr 7 11:54:55 mail postfix/smtp[24215]: ACE141C020C: to=<user1@gmail.com>, relay=gmail.1.google.com[173.194.220.1]:25, delay=0.91, delays=0.28/0.2/0.1/0.32, dsn=2.0.0, status=sent (250 2.0.0 OK 1616835295 m14si10145180lfh.102 - gsmtp)
Apr 7 11:54:57 mail postfix/qmgr[1485]: ACE141C020C: removed
Скажу честно, но в старые времена, до моего знакомства с ElasticSearch, была создана простая php формочка, которая делала запрос к базе данных MySQL куда данные заносились из журнала Postfix топорным bash скриптом. В целом всё работает, но нет желания развивать своё поделие, так как всё равно не приблизиться к мощи стека ElasticSearch + LogStash + Kibana (ELK), к его скорости, к его функционалу.
Через php + MySQL мною реализовано почти всё не правильно. В реальной жизни человек может указать множество адресатов в одном письме и в рамках SMTP сессии вы увидите множество to=. В реляционных базах данных это намекает на создание дополнительной таблицы, связанной с основной таблицей отношением один-ко-многим (One-to-Many). Я совсем не парсил строки и не вычленял нужные поля, чтобы занести их по правилам реляционных БД, а просто выуживал из строки дату-время, номер очереди и всё остальное "превращалось" в сообщение. Путеводной звездой, объединяющей множество строк уже в таблице оставалось имя очереди, которое уникально.
Вот структура единственной таблицы
CREATE TABLE postfixmaillog (
dateevent datetime NOT NULL,
namequeue varchar(20) NOT NULL,
textmsg text NOT NULL
)
Остальное извращение с поиском нужного было уже в скрипте php, который формировал и выводил информацию в более-менее приглядной форме. Это было лучше ползанья grep'ом и/или скриптами на самом сервере, но, в целом, оставалось парашей, за которую не испытываешь гордости. Ни какой аналитики и любых других хотелок не было и нужно было допиливать php скрипт. Даже чужие наработки, найденные на просторах Интернет, классически сковывали своим интерфейсом желание сделать какой-либо свой запрос и построить по нему какой-либо график и т.д.
ElasticSearch - это по сути своей база данных, но она не собирается сама чудесным способом парсить логи и вычленять из строк нужные поля. Значит вопрос остался открытым - как красиво распарсить множество строк в журнале, где путеводной звездой есть только уникальный номер очереди?
Как Чип и Дейл, нам на помощь спешит плагин Aggregate filter в LogStash, который традиционно занимается приёмом данных от различных источников, парсингом и занесением информации собственно в базу ElasticSearch.
Целью фильтра является, как понятно из его названия, агрегирование информации, доступной среди нескольких событий (обычно строк журнала), принадлежащих одной и той же задаче, и, наконец, формирование агрегированной информации в окончательное событие задачи. Есть масса ньюансов, но на примере обычной SMTP сессии, вы, как человек, понимаете что после подстроки ACE141C020C: client= началась новая сессия и она закончилась после подстроки ACE141C020C: removed. Даже если правильно предположить что строки могут "разбавляться" строками другой SMTP сессии, можно разобраться что и где по уникальному номеру очереди. Такое и позволяет запрограммировать Aggregate filter.
Все файлы-наработки вы найдёте в конце статьи. Далее будут выдержки для понимания как всё работает в данный момент.
К сожалению, фильтр агрегирования вынуждает вас убить напрочь параллельность от слова совсем. Вы обязаны выставить pipeline.workers: 1 в /etc/logstash/logstash.yml
Дело в том, что если множество потоков LogStash начнут получать строки, то они будут раскиданы по разным потокам, которые не смогут собрать итоговое событие, так как не делят между собой информацию.
Это настолько важный момент, что вам стоит трижды подумать над этим, даже если вас очень заинтересовала тема статьи. Если у вас есть действующий ElasticSearch, то, чтобы не убивать производительность для текущей задачи, придётся, возможно, делать отдельный сервер под приём журналов Postfix.
Для отправки журналов с почтового сервера достаточно добавить на нём в файле /etc/rsyslog.d/postfix.conf строку
mail.* @10.1.2.3:10555
где:
После рестарта службы sudo systemctl restart rsyslog
ваш почтовик начнёт слать логи на remote syslog ES, которые пока временно никто не принимает.
На сервере ElasticSearch 10.1.2.3 в файле /etc/logstash/conf.d/postfix-syslog.conf начинаем формировать первую нужную часть input для приёма логов.
Вся часть input (нажмите для раскрытия списка)
# begin input
input {
syslog {
type => syslog
port => 10555
}
}
# end input
Для правильной работы нам потребуется пара переменных. В файле /etc/logstash/patterns.d/postfix-patterns должны быть строки
SYSLOGTIMESTAMP %{MONTH} +%{MONTHDAY} %{TIME} POSTFIX_MSG_ID [0-9A-F]{10,11}
Вся часть filter (нажмите для раскрытия списка)
# begin filter
filter {
if [program] =~ "postfix" {
grok {
patterns_dir => ["/etc/logstash/patterns.d/"]
match => {
"message" => [
"%{POSTFIX_MSG_ID:taskid}: client=(?<client_hostname>[^\]]+)\[%{IP:client_ip}\]",
"%{POSTFIX_MSG_ID:taskid}: message-id=<(?<message_id>[^>]+)>.*",
"%{POSTFIX_MSG_ID:taskid}: (?<milter>milter-reject): .*from=<(?<from_mail>[^>]+)> to=<(?<to_mail>[^>]+)>.*",
"%{POSTFIX_MSG_ID:taskid}: from=<(?<from_mail>[^>]+)>, size=%{INT:size}, nrcpt=%{INT:nrcpt}.*",
"%{POSTFIX_MSG_ID:taskid}: to=<(?<to_mail>[^>]+)>,.* relay=%{NOTSPACE:relay},.* status=%{NOTSPACE:status} \((?<status_msg>.+)\).*",
"%{POSTFIX_MSG_ID:taskid}: removed"
]
}
}
# если нет имени очереди, то не работаем - drop
if ![taskid] {
drop {}
} else if [program] == "postfix/smtpd" and [client_ip] {
aggregate {
task_id => "%{taskid}"
code => "
map['client_hostname'] ||= event.get('client_hostname')
map['client_ip'] ||= event.get('client_ip')
map['program'] ||= event.get('program')
event.cancel()
"
map_action => create_or_update
}
} else if [program] == "postfix/cleanup" and [message_id] and [milter] != "milter-reject" {
aggregate {
task_id => "%{taskid}"
code => "
map['message_id'] ||= event.get('message_id')
event.cancel()
"
map_action => create_or_update
}
} else if [program] == "postfix/qmgr" and [from_mail] {
aggregate {
task_id => "%{taskid}"
code => "
map['from_mail'] ||= event.get('from_mail')
map['size'] ||= event.get('size')
map['nrcpt'] ||= event.get('nrcpt')
event.cancel()
"
map_action => update
}
} else if ( [program] == "postfix/smtp" or [program] == "postfix/virtual" ) {
aggregate {
task_id => "%{taskid}"
code => "
map['to_mail'] ||= [];
map['to_mail'] << event.get('to_mail');
map['relay'] ||= [];
map['relay'] << event.get('relay');
map['status'] ||= [];
map['status'] << event.get('status')
map['status_msg'] ||= [];
map['status_msg'] << event.get('status_msg')
event.cancel()
"
map_action => update
}
} else if ( [program] == "postfix/qmgr" and ![from_mail] ) or ( [program] == "postfix/cleanup" and [milter] == "milter-reject" ) {
aggregate {
task_id => "%{taskid}"
map_action => update
code => "
if map['status'].nil?
map['status'] = 'reject'
end
map['from_mail'] ||= event.get('from_mail')
map['to_mail'] ||= [];
map['to_mail'] << event.get('to_mail');
if map['from_mail'].nil?
map['from_mail'] = 'NULL_SENDER_RFC5321'
end
event.set('client_hostname',map['client_hostname']);
event.set('client_ip',map['client_ip']);
event.set('from_mail',map['from_mail']);
event.set('size',map['size']);
event.set('nrcpt',map['nrcpt']);
event.set('to_mail',map['to_mail']);
event.set('relay',map['relay']);
event.set('status',map['status']);
event.set('status_msg',map['status_msg']);
event.set('program',map['program']);
event.set('message_id',map['message_id']);
"
end_of_task => true
timeout_task_id_field => "taskid"
timeout_timestamp_field => "@timestamp"
timeout => 3600
push_map_as_event_on_timeout => false
push_previous_map_as_event => false
timeout_tags => ['_aggregatetimeout']
}
}
mutate {
convert => [
"size", "integer",
"nrcpt", "integer"
]
remove_field => ["host","message","beat.version","beat.hostname","prospector.type","input.type","offset","type"]
add_tag => [ "postfix" ]
}
}
}
# end filter
Важная часть - это фильтр и в нём легендарный grok. Вы можете найти в Интернет массу онлайн ресурсов в помощь, но даже в самой Kibana есть Dev Tools - Management - Grok Debugger, который поможет создать и проверить ваше регулярное выражение на разбор конкретной строки.
Так как тип input это syslog, то вам уже распарсят прибывающие строки на стандартные поля (дата, имя сервера, программа и т.д) и заполнят нужное в будущем JSON документе.
Наша задача распарсить текст message и для этого служит match => message. Grok всего лишь парсит приходящие строки и находит нужное, заполняя соответствующие поля. Магия сбора в единое целое происходит с помощью фильтра агрегирования.
Ответное письмо выглядит так
D826F1C060C: message-id=<20210330140158.D826F1C060C@mail.example.com>
D826F1C060C: from=<>, size=5727, nrcpt=1 (queue active)
D826F1C060C: to=<user@firma.ru>, relay=mx1.firma.ru[1.21.98.74]:25, delay=0.63, delays=0.03/0.01/0.11/0.47, dsn=2.0.0, status=sent (250 2.0.0 Ok: queued as 6BA9620003)
D826F1C060C: removed
Поэтому нам нужна ветка if [program] == "postfix/cleanup" and [message_id] and [milter] != "milter-reject", которая так же создаёт (map_action => create_or_update) новую карту (map), агрегируя по уникальному номеру очереди taskid.
Эта же ветка поможет обновить карту в рамках нормального письма.
которую пропустит grok правило
%{POSTFIX_MSG_ID:taskid}: to=<(?<to_mail>[^>]+)>,.* relay=%{NOTSPACE:relay},.* status=%{NOTSPACE:status} \((?<status_msg>.+)\).*
и сработает if ( [program] == "postfix/smtp" or [program] == "postfix/virtual" )
где мы просто обновляем карту (map_action => update) дополнительными данными.
Строки кода на языке Ruby
map['to_mail'] ||= [];
map['to_mail'] << event.get('to_mail');
следует читать так - создай если не создано и пополни полученным.
В данном месте специалист по реляционным базам данных поймёт без слов что на лицо нарушение Первой Нормальной Формы. В одном поле находится не скалярное значение, а массив.
Срабатывает ветка if ( [program] == "postfix/qmgr" and ![from_mail] ) or ( [program] == "postfix/cleanup" and [milter] == "milter-reject" )
Мы обновляем карту (map_action => update) окончательно формируем событие (event.set) и завершаем задачу (end_of_task => true).
Мы даём 1 час на SMTP сессию (timeout => 3600), что порождает проблемы при длительных SMTP сессиях, например в случае проблем с достижимостью почтовых серверов или работе серых списков сторонних почтовых серверов. На своё усмотрение можно увеличить таймаут на разумное число, но попытки продолжать доставлять письмо по стандартам RFC нужно несколько дней. Так что думайте и решайте.
Мы не разрешаем собирать карту из того что есть, если вышел таймаут (push_map_as_event_on_timeout => false) и не разрешаем пушить в базу предыдущую карту как событие (push_previous_map_as_event => false)
Последняя и заключительная секция - output!
Вся часть output (нажмите для раскрытия списка)
# begin output
output {
if "postfix" in [tags] {
# для отладки - если есть ошибки, то скидываем в отдельный текстовый файл
if "_grokparsefailure" in [tags] or "_aggregatetimeout" in [tags] {
file {
path => "/var/log/logstash/fail-postfix-%{+YYYY.MM.dd}.log"
}
} else {
# скидываем в базу
elasticsearch {
hosts => "localhost"
manage_template => false
index => "postfix-%{+YYYY.MM.dd}"
}
}
}
}
# end output
Если grok на какие-то строки ругался и добавил свой тег _grokparsefailure ИЛИ в результате сбора карты вышли за пределы таймаута (_aggregatetimeout), то скидываем такие события в файл для дальнейшего разбирательства и улучшения парсинга.
Если всё хорошо, то событие отправляем в базу ElasticSearch.
Если всё получилось и в Kibana Stack Management => Index patterns вы сделали шаблон индекса на postfix*, то можно в разделе Discover наблюдать поступающие и разобранные события в режиме реального времени. С помощью языка запросов Kibana Query Language (KQL) или Lucene вы теперь не ограничены чьим-либо интерфейсом. Вы можете вести любую аналитику на что хватит вашей фантазии. Строить любые срезы за любое время.
Строить свои графики и компоновать свои dashboard. Один раз создав нужные графики, можно быстро получать актуальную информацию.
Если есть желание, то можно легко добавить geoip в событие и у вас будет поле "страна адресата". Если тема статьи вам понравилась, то берите смело и развивайте дальше под свои нужды.
Дополнительные материалы:
Архив наработки postfix-logs-2-elasticsearch.tar.gz
Logstash Reference - Filter plugins - Aggregate filter plugin.
Kwaio’s blog. Aggregating postfix logs with Logstash.
Главная
Новости
Ворох бумаг
Видео Linux
Игры в Linux
Безопасность
Статьи о FreeBSD
Статьи об Ubuntu
Статьи о Snappy
Статьи об Ubuntu Phone
Статьи о Kubuntu
Статьи о Xubuntu
Статьи о Lubuntu
Статьи об Open Source
Карта сайта
1С под Linux. Ускорение Ubuntu. 21 пример iptables. Цикл статей о Ceph. Убунту в дикой среде. Ubuntu Linux на SSD. Ubuntu для блондинок. Поддержка железа в Linux. BTSync на службе у админа. Андроид программы в Ubuntu. Прокидывание портов для p2p. Анти СПАМ в Postfix.