Авторские статьи об OpenSource

Журналы Postfix в ElasticSearch.


Многие админы слышали об ElasticSearch - свободной поисковой системе, основанной на Lucene. Несмотря на недоверие хейтеров Java, ElasticSearch реально быстр в плане поиска и анализа информации, которую вы поместили в базу ES. Много лет уже ElasticSearch нам помогает в работе: есть у нас и удалённый syslog сервер и успешно опробованный netflow сервер (коллектор + система обработки и представления данных). Часто требуется в админской практике что-то поискать в журнале Postfix. Даже вспомогательными скриптами тяжело grep'пить multiline журналы, хотелось бы по-современному загонять их в некую базу и потом там легко искать нужное, строить графики, проводить аналитику и т.д.

Постановка задачи

Благодаря грамотной архитектуре почтового сервера Postfix достигается безопасность и надёжность, но небольшой платой за спокойствие является сложность разбора журнала работы Postfix. Различные даемоны Postfix пишут нужную тебе информацию о происходящей SMTP сессии в виде множества строк, которые хотелось красиво разобрать (parse) и проанализировать.

Упрощённый пример SMTP сессии Postfix из текстового журнала.

Apr 7 11:54:54 mail postfix/smtpd[23202]: ACE141C020C: client=pc1.example.com[10.1.2.3]
Apr 7 11:54:54 mail postfix/cleanup[24213]: ACE141C020C: message-id=<f47ee90f5429@example.com>
Apr 7 11:54:54 mail postfix/qmgr[1485]: ACE141C020C: from=<firma@example.com>, size=928, nrcpt=2 (queue active)
Apr 7 11:54:55 mail postfix/smtp[24215]: ACE141C020C: to=<user1@gmail.com>, relay=gmail.1.google.com[173.194.220.1]:25, delay=0.91, delays=0.28/0.2/0.1/0.32, dsn=2.0.0, status=sent (250 2.0.0 OK 1616835295 m14si10145180lfh.102 - gsmtp)
Apr 7 11:54:57 mail postfix/qmgr[1485]: ACE141C020C: removed

Старая попытка занесения журналов Postfix в MySQL

Скажу честно, но в старые времена, до моего знакомства с ElasticSearch, была создана простая php формочка, которая делала запрос к базе данных MySQL куда данные заносились из журнала Postfix топорным bash скриптом. В целом всё работает, но нет желания развивать своё поделие, так как всё равно не приблизиться к мощи стека ElasticSearch + LogStash + Kibana (ELK), к его скорости, к его функционалу.

Через php + MySQL мною реализовано почти всё не правильно. В реальной жизни человек может указать множество адресатов в одном письме и в рамках SMTP сессии вы увидите множество to=. В реляционных базах данных это намекает на создание дополнительной таблицы, связанной с основной таблицей отношением один-ко-многим (One-to-Many). Я совсем не парсил строки и не вычленял нужные поля, чтобы занести их по правилам реляционных БД, а просто выуживал из строки дату-время, номер очереди и всё остальное "превращалось" в сообщение. Путеводной звездой, объединяющей множество строк уже в таблице оставалось имя очереди, которое уникально.

Вот структура единственной таблицы

CREATE TABLE postfixmaillog (
   dateevent datetime NOT NULL,
   namequeue varchar(20) NOT NULL,
   textmsg text NOT NULL
)

Остальное извращение с поиском нужного было уже в скрипте php, который формировал и выводил информацию в более-менее приглядной форме. Это было лучше ползанья grep'ом и/или скриптами на самом сервере, но, в целом, оставалось парашей, за которую не испытываешь гордости. Ни какой аналитики и любых других хотелок не было и нужно было допиливать php скрипт. Даже чужие наработки, найденные на просторах Интернет, классически сковывали своим интерфейсом желание сделать какой-либо свой запрос и построить по нему какой-либо график и т.д.

Занесение журналов Postfix в ElasticSearch

Журналы через LogStash в ElasticSearch и на экран Kibana

ElasticSearch - это по сути своей база данных, но она не собирается сама чудесным способом парсить логи и вычленять из строк нужные поля. Значит вопрос остался открытым - как красиво распарсить множество строк в журнале, где путеводной звездой есть только уникальный номер очереди?

Как Чип и Дейл, нам на помощь спешит плагин Aggregate filter в LogStash, который традиционно занимается приёмом данных от различных источников, парсингом и занесением информации собственно в базу ElasticSearch.

Целью фильтра является, как понятно из его названия, агрегирование информации, доступной среди нескольких событий (обычно строк журнала), принадлежащих одной и той же задаче, и, наконец, формирование агрегированной информации в окончательное событие задачи. Есть масса ньюансов, но на примере обычной SMTP сессии, вы, как человек, понимаете что после подстроки ACE141C020C: client= началась новая сессия и она закончилась после подстроки ACE141C020C: removed. Даже если правильно предположить что строки могут "разбавляться" строками другой SMTP сессии, можно разобраться что и где по уникальному номеру очереди. Такое и позволяет запрограммировать Aggregate filter.

Все файлы-наработки вы найдёте в конце статьи. Далее будут выдержки для понимания как всё работает в данный момент.

Ложка дёгтя

К сожалению, фильтр агрегирования вынуждает вас убить напрочь параллельность от слова совсем. Вы обязаны выставить pipeline.workers: 1 в /etc/logstash/logstash.yml

Дело в том, что если множество потоков LogStash начнут получать строки, то они будут раскиданы по разным потокам, которые не смогут собрать итоговое событие, так как не делят между собой информацию.

Это настолько важный момент, что вам стоит трижды подумать над этим, даже если вас очень заинтересовала тема статьи. Если у вас есть действующий ElasticSearch, то, чтобы не убивать производительность для текущей задачи, придётся, возможно, делать отдельный сервер под приём журналов Postfix.

Отправка и приём логов

Для отправки журналов с почтового сервера достаточно добавить на нём в файле /etc/rsyslog.d/postfix.conf строку
mail.* @10.1.2.3:10555

где:

  • @ - означает использовать UDP протокол
  • 10.1.2.3 - IP адрес вашего сервера ElasticSearch
  • 10555 - выбранный вами произвольный, свободный порт выше 1024

После рестарта службы sudo systemctl restart rsyslog ваш почтовик начнёт слать логи на remote syslog ES, которые пока временно никто не принимает.

На сервере ElasticSearch 10.1.2.3 в файле /etc/logstash/conf.d/postfix-syslog.conf начинаем формировать первую нужную часть input для приёма логов.


Вся часть input (нажмите для раскрытия списка)

# begin input
input {
    syslog {
        type => syslog
        port => 10555
    }
}
# end input

Парсим и фильтрируем

Для правильной работы нам потребуется пара переменных. В файле /etc/logstash/patterns.d/postfix-patterns должны быть строки

SYSLOGTIMESTAMP %{MONTH} +%{MONTHDAY} %{TIME}
POSTFIX_MSG_ID [0-9A-F]{10,11}


Вся часть filter (нажмите для раскрытия списка)

# begin filter
filter {

if [program] =~ "postfix" {

    grok {
        patterns_dir => ["/etc/logstash/patterns.d/"]
        match => {
            "message" => [
            "%{POSTFIX_MSG_ID:taskid}: client=(?<client_hostname>[^\]]+)\[%{IP:client_ip}\]",
            "%{POSTFIX_MSG_ID:taskid}: message-id=<(?<message_id>[^>]+)>.*",
            "%{POSTFIX_MSG_ID:taskid}: (?<milter>milter-reject): .*from=<(?<from_mail>[^>]+)> to=<(?<to_mail>[^>]+)>.*",
            "%{POSTFIX_MSG_ID:taskid}: from=<(?<from_mail>[^>]+)>, size=%{INT:size}, nrcpt=%{INT:nrcpt}.*",
"%{POSTFIX_MSG_ID:taskid}: to=<(?<to_mail>[^>]+)>,.* relay=%{NOTSPACE:relay},.* status=%{NOTSPACE:status} \((?<status_msg>.+)\).*",
            "%{POSTFIX_MSG_ID:taskid}: removed"
                        ]
                }
        }

# если нет имени очереди, то не работаем - drop
    if ![taskid] {
        drop {}
    } else if [program] == "postfix/smtpd" and [client_ip] {
        aggregate {
            task_id => "%{taskid}"
            code => "
                map['client_hostname'] ||= event.get('client_hostname')
                map['client_ip'] ||= event.get('client_ip')
                map['program'] ||= event.get('program')
                event.cancel()
            "
            map_action => create_or_update
        }
    } else if [program] == "postfix/cleanup" and [message_id] and [milter] != "milter-reject" {
        aggregate {
            task_id => "%{taskid}"
            code => "
                map['message_id'] ||= event.get('message_id')
                event.cancel()
            "
            map_action => create_or_update
        }
    } else if [program] == "postfix/qmgr" and [from_mail] {
        aggregate {
            task_id => "%{taskid}"
            code => "
                map['from_mail'] ||= event.get('from_mail')
                map['size'] ||= event.get('size')
                map['nrcpt'] ||= event.get('nrcpt')
                event.cancel()
            "
            map_action => update
        }
    } else if ( [program] == "postfix/smtp" or [program] == "postfix/virtual" ) {
        aggregate {
            task_id => "%{taskid}"
            code => "
                map['to_mail'] ||= [];
                map['to_mail'] << event.get('to_mail');
                map['relay'] ||= [];
                map['relay'] << event.get('relay');
                map['status'] ||= [];
                map['status'] << event.get('status')
                map['status_msg'] ||= [];
                map['status_msg'] << event.get('status_msg')
                event.cancel()
            "
            map_action => update
        }
    } else if ( [program] == "postfix/qmgr" and ![from_mail] ) or ( [program] == "postfix/cleanup" and [milter] == "milter-reject" ) {
        aggregate {
            task_id => "%{taskid}"
            map_action => update
            code => "
                if map['status'].nil?
                    map['status'] = 'reject'
                end

                map['from_mail'] ||= event.get('from_mail')
                map['to_mail'] ||= [];
                map['to_mail'] << event.get('to_mail');

                if map['from_mail'].nil?
                    map['from_mail'] = 'NULL_SENDER_RFC5321'
                end

                event.set('client_hostname',map['client_hostname']);
                event.set('client_ip',map['client_ip']);
                event.set('from_mail',map['from_mail']);
                event.set('size',map['size']);
                event.set('nrcpt',map['nrcpt']);
                event.set('to_mail',map['to_mail']);
                event.set('relay',map['relay']);
                event.set('status',map['status']);
                event.set('status_msg',map['status_msg']);
                event.set('program',map['program']);
                event.set('message_id',map['message_id']);
            "
            end_of_task => true
            timeout_task_id_field => "taskid"
            timeout_timestamp_field  => "@timestamp"
            timeout => 3600
            push_map_as_event_on_timeout => false
            push_previous_map_as_event => false
            timeout_tags => ['_aggregatetimeout']
        }
    }

    mutate {
        convert => [
            "size", "integer",
            "nrcpt", "integer"
        ]
        remove_field => ["host","message","beat.version","beat.hostname","prospector.type","input.type","offset","type"]
        add_tag => [ "postfix" ]
    }
 }
}
# end filter

Важная часть - это фильтр и в нём легендарный grok. Вы можете найти в Интернет массу онлайн ресурсов в помощь, но даже в самой Kibana есть Dev Tools - Management - Grok Debugger, который поможет создать и проверить ваше регулярное выражение на разбор конкретной строки.

Так как тип input это syslog, то вам уже распарсят прибывающие строки на стандартные поля (дата, имя сервера, программа и т.д) и заполнят нужное в будущем JSON документе.

Наша задача распарсить текст message и для этого служит match => message. Grok всего лишь парсит приходящие строки и находит нужное, заполняя соответствующие поля. Магия сбора в единое целое происходит с помощью фильтра агрегирования.

  • Если начнётся SMTP сессия нормального письма, то через grok правило
    %{POSTFIX_MSG_ID:taskid}: client=(?<client_hostname>[^\]]+)\[%{IP:client_ip}\]
    пройдёт строка
    ACE141C020C: client=pc1.example.com[10.1.2.3]
    и сработает ветка if [program] == "postfix/smtpd" and [client_ip]
    мы создаём (map_action => create_or_update) новую карту (map), агрегируя по уникальному номеру очереди taskid.
  • Если отправитель попросил уведомить его о доставке письма, то Postfix напишет встречное письмо, которое
    1) к сожалению для нас НЕ начинается как обычное нормальное письмо со слова client
    2) в качестве отправителя используется по стандарту RFC 5321 from=<> (NULL SENDER) для предотвращения email loop между почтовыми серверами.

    Ответное письмо выглядит так
    D826F1C060C: message-id=<20210330140158.D826F1C060C@mail.example.com>
    D826F1C060C: from=<>, size=5727, nrcpt=1 (queue active)
    D826F1C060C: to=<user@firma.ru>, relay=mx1.firma.ru[1.21.98.74]:25, delay=0.63, delays=0.03/0.01/0.11/0.47, dsn=2.0.0, status=sent (250 2.0.0 Ok: queued as 6BA9620003)
    D826F1C060C: removed

    Поэтому нам нужна ветка if [program] == "postfix/cleanup" and [message_id] and [milter] != "milter-reject", которая так же создаёт (map_action => create_or_update) новую карту (map), агрегируя по уникальному номеру очереди taskid.

    Эта же ветка поможет обновить карту в рамках нормального письма.

  • В рамках SMTP сессии нормального письма придёт строка
    ACE141C020C: from=<firma@example.com>, size=928, nrcpt=2 (queue active)
    которую пропустит grok правило
    %{POSTFIX_MSG_ID:taskid}: from=<(?<from_mail>[^>]+)>, size=%{INT:size}, nrcpt=%{INT:nrcpt}.*
    и сработает
    if [program] == "postfix/qmgr" and [from_mail]
    где мы просто обновляем карту (map_action => update) дополнительными данными.
  • В рамках SMTP сессии нормального письма придёт строка и не ОДНА!
    ACE141C020C: to=<user1@gmail.com>, relay=gmail.1.google.com[173.194.220.1]:25, delay=0.91, delays=0.28/0.2/0.1/0.32, dsn=2.0.0, status=sent (250 2.0.0 OK 1616835295 m14si10145180lfh.102 - gsmtp)

    которую пропустит grok правило
    %{POSTFIX_MSG_ID:taskid}: to=<(?<to_mail>[^>]+)>,.* relay=%{NOTSPACE:relay},.* status=%{NOTSPACE:status} \((?<status_msg>.+)\).*

    и сработает if ( [program] == "postfix/smtp" or [program] == "postfix/virtual" )
    где мы просто обновляем карту (map_action => update) дополнительными данными.

    Строки кода на языке Ruby
    map['to_mail'] ||= [];
    map['to_mail'] << event.get('to_mail');

    следует читать так - создай если не создано и пополни полученным.

    В данном месте специалист по реляционным базам данных поймёт без слов что на лицо нарушение Первой Нормальной Формы. В одном поле находится не скалярное значение, а массив.

  • Нормальное письмо заканчивается так - ACE141C020C: removed
    а заблокированное почтовым фильтром (mail filter, milter) примерно так (придётся адаптировать под вашу систему фильтрации, если она используется)
    34EBB1C0617: milter-reject: END-OF-MESSAGE from mail.evilsite.com[95.14.15.1]: 5.7.1 Command rejected; from=<badletter@evilsite.com> to=<user@firma.ru> proto=ESMTP helo=<mail.evilsite.com>

    Срабатывает ветка if ( [program] == "postfix/qmgr" and ![from_mail] ) or ( [program] == "postfix/cleanup" and [milter] == "milter-reject" )

    Мы обновляем карту (map_action => update) окончательно формируем событие (event.set) и завершаем задачу (end_of_task => true).

Мы даём 1 час на SMTP сессию (timeout => 3600), что порождает проблемы при длительных SMTP сессиях, например в случае проблем с достижимостью почтовых серверов или работе серых списков сторонних почтовых серверов. На своё усмотрение можно увеличить таймаут на разумное число, но попытки продолжать доставлять письмо по стандартам RFC нужно несколько дней. Так что думайте и решайте.

Мы не разрешаем собирать карту из того что есть, если вышел таймаут (push_map_as_event_on_timeout => false) и не разрешаем пушить в базу предыдущую карту как событие (push_previous_map_as_event => false)

Скидываем в базу ElasticSearch

Последняя и заключительная секция - output!


Вся часть output (нажмите для раскрытия списка)

# begin output
output {
    if "postfix" in [tags] {
        # для отладки - если есть ошибки, то скидываем в отдельный текстовый файл
        if "_grokparsefailure" in [tags] or "_aggregatetimeout" in [tags] {
            file {
                    path => "/var/log/logstash/fail-postfix-%{+YYYY.MM.dd}.log"
                }
        } else {
        # скидываем в базу
            elasticsearch {
                    hosts           => "localhost"
                    manage_template => false
                    index           => "postfix-%{+YYYY.MM.dd}"
                }
        }
    }
}
# end output

Если grok на какие-то строки ругался и добавил свой тег _grokparsefailure ИЛИ в результате сбора карты вышли за пределы таймаута (_aggregatetimeout), то скидываем такие события в файл для дальнейшего разбирательства и улучшения парсинга.

Если всё хорошо, то событие отправляем в базу ElasticSearch.

Итог

Если всё получилось и в Kibana Stack Management => Index patterns вы сделали шаблон индекса на postfix*, то можно в разделе Discover наблюдать поступающие и разобранные события в режиме реального времени. С помощью языка запросов Kibana Query Language (KQL) или Lucene вы теперь не ограничены чьим-либо интерфейсом. Вы можете вести любую аналитику на что хватит вашей фантазии. Строить любые срезы за любое время.

Журнал Postfix в ElasticSearch

Строить свои графики и компоновать свои dashboard. Один раз создав нужные графики, можно быстро получать актуальную информацию.

Dashboard с графиками Postfix в ElasticSearch

Если есть желание, то можно легко добавить geoip в событие и у вас будет поле "страна адресата". Если тема статьи вам понравилась, то берите смело и развивайте дальше под свои нужды.

Дополнительные материалы:
Архив наработки postfix-logs-2-elasticsearch.tar.gz
Logstash Reference - Filter plugins - Aggregate filter plugin.
Kwaio’s blog. Aggregating postfix logs with Logstash.

Дата последней правки: 2021-04-09 08:48:29

    Twitter   


Разделы

Главная
Новости
Ворох бумаг
Видео Linux
Игры в Linux
Безопасность
Статьи о FreeBSD
Статьи об Ubuntu
Статьи о Snappy
Статьи об Ubuntu Phone
Статьи о Kubuntu
Статьи о Xubuntu
Статьи о Lubuntu
Статьи об Open Source
Карта сайта

Лучшее на сайте:

1С под Linux.   Ускорение Ubuntu.   21 пример iptables.   Цикл статей о Ceph.   Убунту в дикой среде.   Ubuntu Linux на SSD.   Ubuntu для блондинок.   Поддержка железа в Linux.   BTSync на службе у админа.   Андроид программы в Ubuntu.   Прокидывание портов для p2p.   Анти СПАМ в Postfix.  



Группа поддержки