diff --git a/journal-postfix-doc/20191127_pyugat_talk.html b/journal-postfix-doc/20191127_pyugat_talk.html new file mode 100644 index 0000000..5bebe03 --- /dev/null +++ b/journal-postfix-doc/20191127_pyugat_talk.html @@ -0,0 +1,378 @@ +

journal-postfix - A log parser for Postfix

+

Experiences from applying Python to the domain of bad old email.

+

Email ✉

+ +

SMTP

+

SMTP session example: envelope sender, envelope recipient may differ from From:, To:

+

Lists of error codes:

+ +

Example of an error within a bounced email (Subject: Mail delivery failed: returning message to sender)

+
SMTP error from remote server for TEXT command, host: smtpin.rzone.de (81.169.145.97) reason: 550 5.7.1 Refused by local policy. No SPAM please!
+
+ +

Postfix

+ +
Nov 27 16:19:22 mail postfix/smtpd[18995]: connect from unknown[80.82.79.244]
+Nov 27 16:19:22 mail postfix/smtpd[18995]: NOQUEUE: reject: RCPT from unknown[80.82.79.244]: 454 4.7.1 <spameri@tiscali.it>: Relay access denied; from=<spameri@tiscali.it> to=<spameri@tiscali.it> proto=ESMTP helo=<WIN-G7CPHCGK247>
+Nov 27 16:19:22 mail postfix/smtpd[18995]: disconnect from unknown[80.82.79.244] ehlo=1 mail=1 rcpt=0/1 rset=1 quit=1 commands=4/5
+
+Nov 27 16:22:43 mail postfix/anvil[18997]: statistics: max connection rate 1/60s for (smtp:80.82.79.244) at Nov 27 16:19:22
+Nov 27 16:22:43 mail postfix/anvil[18997]: statistics: max connection count 1 for (smtp:80.82.79.244) at Nov 27 16:19:22
+Nov 27 16:22:43 mail postfix/anvil[18997]: statistics: max cache size 1 at Nov 27 16:19:22
+
+Nov 27 16:22:48 mail postfix/smtpd[18999]: connect from mail.cosmopool.net[2a01:4f8:160:20c1::10:107]
+Nov 27 16:22:49 mail postfix/smtpd[18999]: 47NQzY13DbzNWNQG: client=mail.cosmopool.net[2a01:4f8:160:20c1::10:107]
+Nov 27 16:22:49 mail postfix/cleanup[19003]: 47NQzY13DbzNWNQG: info: header Subject: Re: test from mail.cosmopool.net[2a01:4f8:160:20c1::10:107]; from=<ibu@cosmopool.net> to=<ibu@multiname.org> proto=ESMTP helo=<mail.cosmopool.net>
+Nov 27 16:22:49 mail postfix/cleanup[19003]: 47NQzY13DbzNWNQG: message-id=<d5154432-b984-d65a-30b3-38bde7e37af8@cosmopool.net>
+Nov 27 16:22:49 mail postfix/qmgr[29349]: 47NQzY13DbzNWNQG: from=<ibu@cosmopool.net>, size=1365, nrcpt=2 (queue active)
+Nov 27 16:22:49 mail postfix/smtpd[18999]: disconnect from mail.cosmopool.net[2a01:4f8:160:20c1::10:107] ehlo=1 mail=1 rcpt=2 data=1 quit=1 commands=6
+Nov 27 16:22:50 mail postfix/lmtp[19005]: 47NQzY13DbzNWNQG: to=<ibu2@multiname.org>, relay=mail.multiname.org[private/dovecot-lmtp], delay=1.2, delays=0.56/0.01/0.01/0.63, dsn=2.0.0, status=sent (250 2.0.0 <ibu2@multiname.org> nV9iJ9mi3l0+SgAAZU03Dg Saved)
+Nov 27 16:22:50 mail postfix/lmtp[19005]: 47NQzY13DbzNWNQG: to=<ibu@multiname.org>, relay=mail.multiname.org[private/dovecot-lmtp], delay=1.2, delays=0.56/0.01/0.01/0.63, dsn=2.0.0, status=sent (250 2.0.0 <ibu@multiname.org> nV9iJ9mi3l0+SgAAZU03Dg:2 Saved)
+Nov 27 16:22:50 mail postfix/qmgr[29349]: 47NQzY13DbzNWNQG: removed
+
+ +

Idea

+ +

Why Python?

+ +

Development iterations

+ +

Structure

+ + + + + + + blockdiag + blockdiag { + default_fontsize = 20; + node_height = 80; + journal_since -> run_loop; + journal_follow -> run_loop; + logfile -> run_loop; + run_loop -> parse -> extract_delivery -> store; + store -> delivery_from; + store -> delivery_to; + store -> noqueue; + + group { label="input iterables"; journal_since; journal_follow; logfile; }; + group { label="output tables"; delivery_from; delivery_to; noqueue; }; +} + + + + + + + + + + + + + + + run_loop + + journal_sin + ce + + journal_fol + low + + logfile + + parse + + extract_del + ivery + + store + + delivery_fr + om + + delivery_to + + noqueue + + + + + + + + + + + + + + + + + + + + + + + + + + + input iter ... + output tables + +

Iterables

+
def iter_journal_messages_since(timestamp: Union[int, float]):
+    """
+    Yield False and message details from the journal since *timestamp*.
+
+    This is the loading phase (loading messages that already existed
+    when we start).
+
+    Argument *timestamp* is a UNIX timestamp.
+
+    Only journal entries for systemd unit UNITNAME with loglevel
+    INFO and above are retrieved.
+    """
+    ...
+
+def iter_journal_messages_follow(timestamp: Union[int, float]):
+    """
+    Yield commit and message details from the journal through polling.
+
+    This is the polling phase (after we have read pre-existing messages
+    in the loading phase).
+
+    Argument *timestamp* is a UNIX timestamp.
+
+    Only journal entries for systemd unit UNITNAME with loglevel
+    INFO and above are retrieved.
+
+    *commit* (bool) tells whether it is time to store the delivery
+    information obtained from the messages yielded by us.
+    It is set to True if max_delay_before_commit has elapsed.
+    After this delay delivery information will be written; to be exact:
+    the delay may increase by up to one journal_poll_interval.
+    """
+    ...
+
+def iter_logfile_messages(filepath: str, year: int,
+                          commit_after_lines=max_messages_per_commit):
+    """
+    Yield messages and a commit flag from a logfile.
+
+    Loop through all lines of the file with given *filepath* and
+    extract the time and log message. If the log message starts
+    with 'postfix/', then extract the syslog_identifier, pid and
+    message text.
+
+    Since syslog lines do not contain the year, the *year* to which
+    the first log line belongs must be given.
+
+    Return a commit flag and a dict with these keys:
+        't': timestamp
+        'message': message text
+        'identifier': syslog identifier (e.g., 'postfix/smtpd')
+        'pid': process id
+
+    The commit flag will be set to True for every
+    (commit_after_lines)-th filtered message and serves
+    as a signal to the caller to commit this chunk of data
+    to the database.
+    """
+    ...
+

Running loops

+
def run(dsn, verp_marker=False, filepath=None, year=None, debug=[]):
+    """
+    Determine loop(s) and run them within a database context.
+    """
+    init(verp_marker=verp_marker)
+    with psycopg2.connect(dsn) as conn:
+        with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as curs:
+            if filepath:
+                run_loop(iter_logfile_messages(filepath, year), curs, debug=debug)
+            else:
+                begin_timestamp = get_latest_timestamp(curs)
+                run_loop(iter_journal_messages_since(begin_timestamp), curs, debug=debug)
+                begin_timestamp = get_latest_timestamp(curs)
+                run_loop(iter_journal_messages_follow(begin_timestamp), curs, debug=debug)
+
+def run_loop(iterable, curs, debug=[]):
+    """
+    Loop over log messages obtained from *iterable*.
+
+    Parse the message, extract delivery information from it and store
+    that delivery information.
+
+    For performance reasons delivery items are collected in a cache
+    before writing them (i.e., committing a database transaction).
+    """
+    cache = []
+    msg_count = max_messages_per_commit
+    for commit, msg_details in iterable:
+        ...
+

Parsing

+

Parse what you can. (But only msg_info in Postfix, and only relevant components.)

+
def parse(msg_details, debug=False):
+    """
+    Parse a log message returning a dict.
+
+    *msg_details* is assumed to be a dict with these keys:
+
+      * 'identifier' (syslog identifier),
+      * 'pid' (process id),
+      * 'message' (message text)
+
+    The syslog identifier and process id are copied to the resulting dict.
+    """
+    ...
+
+def _parse_branch(comp, msg, res):
+    """
+    Parse a log message string *msg*, adding results to dict *res*.
+
+    Depending on the component *comp* we branch to functions
+    named _parse_{comp}.
+
+    Add parsing results to dict *res*. Always add key 'action'.
+    Try to parse every syntactical element.
+    Note: We parse what we can. Assessment of parsing results relevant
+    for delivery is done in :func:`extract_delivery`.
+    """
+    ...
+

Extracting

+

Extract what is relevant.

+
def extract_delivery(msg_details, parsed):
+    """
+    Compute delivery information from parsing results.
+
+    Basically this means that we map the parsed fields to
+    a type ('from' or 'to') and to the database
+    fields for table 'delivery_from' or 'delivery_to'.
+
+    We branch to functions _extract_{comp} where comp is the
+    name of a Postfix component.
+
+    Return a list of error strings and a dict with the
+    extracted information. Keys with None values are removed
+    from the dict.
+    """
+    ...
+

Regular expressions

+ +

BTW: email.utils.parseaddr

+
>>> from email.utils import parseaddr
+>>> parseaddr('Ghost <"hello@nowhere"@pyug.at>')
+('Ghost', '"hello@nowhere"@pyug.at')
+>>> print(parseaddr('"more\"fun\"\\"hello\\"@nowhere"@pyug.at')[1])
+"more"fun"\"hello\"@nowhere"@pyug.at
+>>> print(parseaddr('""@pyug.at')[1])
+""@pyug.at
+

Storing

+
def store_deliveries(cursor, cache, debug=[]):
+    """
+    Store cached delivery information into the database.
+
+    Find queue_ids in *cache* and group delivery items by
+    them, but separately for delivery types 'from' and 'to'.
+    In addition, collect delivery items with queue_id is None.
+
+    After grouping we merge all items withing a group into a
+    single item. So we can combine several SQL queries into 
+    a single one, which improves performance significantly.
+
+    Then store the merged items and the deliveries with
+    queue_id is None.
+    """
+    ...
+

Database schema: 3 tables:

+ +

Table noqueue contains all the spam; for this we only use SQL INSERT, no ON CONFLICT ... UPDATE; it's faster.

+

Demo

+
...
+
+

Questions / Suggestions

+ +

Both were implemented after the talk.

diff --git a/journal-postfix-doc/20191127_pyugat_talk.md b/journal-postfix-doc/20191127_pyugat_talk.md new file mode 100644 index 0000000..7243d17 --- /dev/null +++ b/journal-postfix-doc/20191127_pyugat_talk.md @@ -0,0 +1,340 @@ +# journal-postfix - A log parser for Postfix + +Experiences from applying Python to the domain of bad old email. + +## Email ✉ + + * old technology (starting in the 70ies) + * [store-and-forward](https://en.wikipedia.org/wiki/Store_and_forward): sent != delivered to recipient + * non-delivery reasons: + * recipient over quota + * inexistent destination + * malware + * spam + * server problem + * ... + * permanent / non-permanent failure ([DSN ~ 5.X.Y / 4.X.Y](https://www.iana.org/assignments/smtp-enhanced-status-codes/smtp-enhanced-status-codes.xhtml)) + * non-delivery modes + * immediate reject on SMTP level + * delayed [bounce messages](https://en.wikipedia.org/wiki/Bounce_message) by [reporting MTA](https://upload.wikimedia.org/wikipedia/commons/a/a2/Bounce-DSN-MTA-names.png) - queueing (e.g., ~5d) before delivery failure notification + * discarding + * read receipts + * [Wikipedia: email tracking](https://en.wikipedia.org/wiki/Email_tracking) + +## [SMTP](https://en.wikipedia.org/wiki/SMTP) + +[SMTP session example](https://en.wikipedia.org/wiki/Simple_Mail_Transfer_Protocol#SMTP_transport_example): +envelope sender, envelope recipient may differ from From:, To: + +Lists of error codes: + + * [SMTP and ESMTP](https://www.inmotionhosting.com/support/email/email-troubleshooting/smtp-and-esmtp-error-code-list) + * [SMTP](https://serversmtp.com/smtp-error/) + * [SMTP](https://info.webtoolhub.com/kb-a15-smtp-status-codes-smtp-error-codes-smtp-reply-codes.aspx) + +Example of an error within a bounced email (Subject: Mail delivery failed: returning message to sender) + + SMTP error from remote server for TEXT command, host: smtpin.rzone.de (81.169.145.97) reason: 550 5.7.1 Refused by local policy. No SPAM please! + + * email users are continually asking for the fate of their emails (or those of their correspondents which should have arrived) + +## [Postfix](http://www.postfix.org) + + * popular [MTA](https://en.wikipedia.org/wiki/Message_transfer_agent) + * written in C + * logging to files / journald + * example log messages for a (non-)delivery + stats + +``` +Nov 27 16:19:22 mail postfix/smtpd[18995]: connect from unknown[80.82.79.244] +Nov 27 16:19:22 mail postfix/smtpd[18995]: NOQUEUE: reject: RCPT from unknown[80.82.79.244]: 454 4.7.1 : Relay access denied; from= to= proto=ESMTP helo= +Nov 27 16:19:22 mail postfix/smtpd[18995]: disconnect from unknown[80.82.79.244] ehlo=1 mail=1 rcpt=0/1 rset=1 quit=1 commands=4/5 + +Nov 27 16:22:43 mail postfix/anvil[18997]: statistics: max connection rate 1/60s for (smtp:80.82.79.244) at Nov 27 16:19:22 +Nov 27 16:22:43 mail postfix/anvil[18997]: statistics: max connection count 1 for (smtp:80.82.79.244) at Nov 27 16:19:22 +Nov 27 16:22:43 mail postfix/anvil[18997]: statistics: max cache size 1 at Nov 27 16:19:22 + +Nov 27 16:22:48 mail postfix/smtpd[18999]: connect from mail.cosmopool.net[2a01:4f8:160:20c1::10:107] +Nov 27 16:22:49 mail postfix/smtpd[18999]: 47NQzY13DbzNWNQG: client=mail.cosmopool.net[2a01:4f8:160:20c1::10:107] +Nov 27 16:22:49 mail postfix/cleanup[19003]: 47NQzY13DbzNWNQG: info: header Subject: Re: test from mail.cosmopool.net[2a01:4f8:160:20c1::10:107]; from= to= proto=ESMTP helo= +Nov 27 16:22:49 mail postfix/cleanup[19003]: 47NQzY13DbzNWNQG: message-id= +Nov 27 16:22:49 mail postfix/qmgr[29349]: 47NQzY13DbzNWNQG: from=, size=1365, nrcpt=2 (queue active) +Nov 27 16:22:49 mail postfix/smtpd[18999]: disconnect from mail.cosmopool.net[2a01:4f8:160:20c1::10:107] ehlo=1 mail=1 rcpt=2 data=1 quit=1 commands=6 +Nov 27 16:22:50 mail postfix/lmtp[19005]: 47NQzY13DbzNWNQG: to=, relay=mail.multiname.org[private/dovecot-lmtp], delay=1.2, delays=0.56/0.01/0.01/0.63, dsn=2.0.0, status=sent (250 2.0.0 nV9iJ9mi3l0+SgAAZU03Dg Saved) +Nov 27 16:22:50 mail postfix/lmtp[19005]: 47NQzY13DbzNWNQG: to=, relay=mail.multiname.org[private/dovecot-lmtp], delay=1.2, delays=0.56/0.01/0.01/0.63, dsn=2.0.0, status=sent (250 2.0.0 nV9iJ9mi3l0+SgAAZU03Dg:2 Saved) +Nov 27 16:22:50 mail postfix/qmgr[29349]: 47NQzY13DbzNWNQG: removed +``` + + * [involved postfix components](http://www.postfix.org/OVERVIEW.html) + * smtpd (port 25: smtp, port 587: submission) + * cleanup + * smtp/lmtp + * missing log parser + +## Idea + + * follow log stream and write summarized delivery information to a database + * goal: spot delivery problems, collect delivery stats + * a GUI could then display the current delivery status to users + +## Why Python? + + * simple and fun language, clear and concise + * well suited for text processing + * libs available for systemd, PostgreSQL + * huge standard library (used here: datetime, re, yaml, argparse, select) + * speed sufficient? + +## Development iterations + + * hmm, easy task, might take a few days + * PoC: reading and polling from journal works as expected + * used postfix logfiles in syslog format and wrote regexps matching them iteratively + * separated parsing messages from extracting delivery information + * created a delivery table + * hmm, this is very slow, takes hours to process log messages from a few days (from a server with not much traffic) + * introduced polling timeout and SQL transactions handling several messages at once + * ... much faster + * looks fine, but wait... did I catch all syntax variants of Postfix log messages? + * looked into Postfix sources and almost got lost + * weeks of hard work identifying relevant log output directives + * completely rewrote parser to deal with the rich log msg syntax, e.g.:
+ `def _strip_pattern(msg, res, pattern_name, pos='l', target_names=None)` + * oh, there are even more Postfix components... limit to certain Postfix configurations, in particular virtual mailboxes and not local ones + * mails may have multiple recipients... split delivery table into delivery_from and delivery_to + * decide which delivery information is relevant + * cleanup and polish (config mgmt, logging) + * write ansible role + +## Structure + +```blockdiag +blockdiag { + default_fontsize = 20; + node_height = 80; + journal_since -> run_loop; + journal_follow -> run_loop; + logfile -> run_loop; + run_loop -> parse -> extract_delivery -> store; + store -> delivery_from; + store -> delivery_to; + store -> noqueue; + + group { label="input iterables"; journal_since; journal_follow; logfile; }; + group { label="output tables"; delivery_from; delivery_to; noqueue; }; +} +``` + +## Iterables + +```python +def iter_journal_messages_since(timestamp: Union[int, float]): + """ + Yield False and message details from the journal since *timestamp*. + + This is the loading phase (loading messages that already existed + when we start). + + Argument *timestamp* is a UNIX timestamp. + + Only journal entries for systemd unit UNITNAME with loglevel + INFO and above are retrieved. + """ + ... + +def iter_journal_messages_follow(timestamp: Union[int, float]): + """ + Yield commit and message details from the journal through polling. + + This is the polling phase (after we have read pre-existing messages + in the loading phase). + + Argument *timestamp* is a UNIX timestamp. + + Only journal entries for systemd unit UNITNAME with loglevel + INFO and above are retrieved. + + *commit* (bool) tells whether it is time to store the delivery + information obtained from the messages yielded by us. + It is set to True if max_delay_before_commit has elapsed. + After this delay delivery information will be written; to be exact: + the delay may increase by up to one journal_poll_interval. + """ + ... + +def iter_logfile_messages(filepath: str, year: int, + commit_after_lines=max_messages_per_commit): + """ + Yield messages and a commit flag from a logfile. + + Loop through all lines of the file with given *filepath* and + extract the time and log message. If the log message starts + with 'postfix/', then extract the syslog_identifier, pid and + message text. + + Since syslog lines do not contain the year, the *year* to which + the first log line belongs must be given. + + Return a commit flag and a dict with these keys: + 't': timestamp + 'message': message text + 'identifier': syslog identifier (e.g., 'postfix/smtpd') + 'pid': process id + + The commit flag will be set to True for every + (commit_after_lines)-th filtered message and serves + as a signal to the caller to commit this chunk of data + to the database. + """ + ... +``` + +## Running loops + +```python +def run(dsn, verp_marker=False, filepath=None, year=None, debug=[]): + """ + Determine loop(s) and run them within a database context. + """ + init(verp_marker=verp_marker) + with psycopg2.connect(dsn) as conn: + with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as curs: + if filepath: + run_loop(iter_logfile_messages(filepath, year), curs, debug=debug) + else: + begin_timestamp = get_latest_timestamp(curs) + run_loop(iter_journal_messages_since(begin_timestamp), curs, debug=debug) + begin_timestamp = get_latest_timestamp(curs) + run_loop(iter_journal_messages_follow(begin_timestamp), curs, debug=debug) + +def run_loop(iterable, curs, debug=[]): + """ + Loop over log messages obtained from *iterable*. + + Parse the message, extract delivery information from it and store + that delivery information. + + For performance reasons delivery items are collected in a cache + before writing them (i.e., committing a database transaction). + """ + cache = [] + msg_count = max_messages_per_commit + for commit, msg_details in iterable: + ... +``` + +## Parsing + +Parse what you can. (But only msg_info in Postfix, and only relevant components.) + +```python +def parse(msg_details, debug=False): + """ + Parse a log message returning a dict. + + *msg_details* is assumed to be a dict with these keys: + + * 'identifier' (syslog identifier), + * 'pid' (process id), + * 'message' (message text) + + The syslog identifier and process id are copied to the resulting dict. + """ + ... + +def _parse_branch(comp, msg, res): + """ + Parse a log message string *msg*, adding results to dict *res*. + + Depending on the component *comp* we branch to functions + named _parse_{comp}. + + Add parsing results to dict *res*. Always add key 'action'. + Try to parse every syntactical element. + Note: We parse what we can. Assessment of parsing results relevant + for delivery is done in :func:`extract_delivery`. + """ + ... +``` + +## Extracting + +Extract what is relevant. + +```python +def extract_delivery(msg_details, parsed): + """ + Compute delivery information from parsing results. + + Basically this means that we map the parsed fields to + a type ('from' or 'to') and to the database + fields for table 'delivery_from' or 'delivery_to'. + + We branch to functions _extract_{comp} where comp is the + name of a Postfix component. + + Return a list of error strings and a dict with the + extracted information. Keys with None values are removed + from the dict. + """ + ... +``` + +## Regular expressions + + * see sources + + * [Stackoverflow: How to validate an email address](https://stackoverflow.com/questions/201323/how-to-validate-an-email-address-using-a-regular-expression) [FSM](https://i.stack.imgur.com/YI6KR.png) + +### BTW: [email.utils.parseaddr](https://docs.python.org/3/library/email.utils.html#email.utils.parseaddr) + +```python +>>> from email.utils import parseaddr +>>> parseaddr('Ghost <"hello@nowhere"@pyug.at>') +('Ghost', '"hello@nowhere"@pyug.at') +>>> print(parseaddr('"more\"fun\"\\"hello\\"@nowhere"@pyug.at')[1]) +"more"fun"\"hello\"@nowhere"@pyug.at +>>> print(parseaddr('""@pyug.at')[1]) +""@pyug.at +``` + +## Storing + +```python +def store_deliveries(cursor, cache, debug=[]): + """ + Store cached delivery information into the database. + + Find queue_ids in *cache* and group delivery items by + them, but separately for delivery types 'from' and 'to'. + In addition, collect delivery items with queue_id is None. + + After grouping we merge all items withing a group into a + single item. So we can combine several SQL queries into + a single one, which improves performance significantly. + + Then store the merged items and the deliveries with + queue_id is None. + """ + ... +``` + + +Database schema: 3 tables: + + * delivery_from: smtpd, milters, qmgr + * delivery_to: smtp, virtual, bounce, error + * noqueue: rejected by smtpd before even getting a queue_id + +Table noqueue contains all the spam; for this we only use SQL INSERT, no ON CONFLICT ... UPDATE; it's faster. + +## Demo + + ... + +## Questions / Suggestions + + * Could you enhance speed by using prepared statements? + * Will old data be deleted (as required by GDPR)? + +Both were implemented after the talk. diff --git a/journal-postfix.yml b/journal-postfix.yml new file mode 100644 index 0000000..3fa8a78 --- /dev/null +++ b/journal-postfix.yml @@ -0,0 +1,34 @@ +# Deploy journal-postfix + +# This will install a service that writes mail delivery information +# obtained from systemd-journal (unit postfix@-.service) to a +# PostgreSQL database. +# +# You can configure the database connection parameters (and optionally +# a verp_marker) as host vars like this: +# +# mailserver: +# postgresql: +# host: 127.0.0.1 +# port: 5432 +# dbname: mailserver +# username: mailserver +# password: !vault | +# $ANSIBLE_VAULT;1.1;AES256 +# XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX +# XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX +# XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX +# XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX +# XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX +# postfix: +# verp_marker: rstxyz +# +# If you do not, then you must edit /etc/journal-postfix/main.yml +# on the destination hosts and run systemctl start journal-postfix +# manually. + +- name: install journal-postfix + user: root + hosts: mail + roles: + - journal-postfix diff --git a/journal-postfix/files/journal-postfix.service b/journal-postfix/files/journal-postfix.service new file mode 100644 index 0000000..08561c8 --- /dev/null +++ b/journal-postfix/files/journal-postfix.service @@ -0,0 +1,17 @@ +# this file is part of ansible role journal-postfix + +[Unit] +Description=Extract postfix message delivery information from systemd journal messages\ +and store them in a PostgreSQL database. Configuration is in /etc/journal-postfix/main.yml +After=multi-user.target + +[Service] +Type=simple +ExecStart=/srv/journal-postfix/run.py +User=journal-postfix +WorkingDirectory=/srv/journal-postfix/ +Restart=on-failure +RestartPreventExitStatus=97 + +[Install] +WantedBy=multi-user.target diff --git a/journal-postfix/files/srv/README.md b/journal-postfix/files/srv/README.md new file mode 100644 index 0000000..e61ef18 --- /dev/null +++ b/journal-postfix/files/srv/README.md @@ -0,0 +1,85 @@ +Parse postfix entries in systemd journal and collect delivery information. + +The information on mail deliveries is written to tables in a PostgreSQL +database. The database can then be queried by a UI showing delivery status +to end users. The UI is not part of this package. + +This software is tailor-made for debian buster with systemd as init system. +It is meant to run on the same system on which Postfix is running, +or on a system receiving the log stream of a Postfix instance in its +systemd journal. + +Prerequisites / Postfix configuration: + + - Access to a PostgreSQL database. + - Postfix: Only virtual mailboxes are supported. + - Postfix: You can use short or long queue_ids (see + http://www.postfix.org/postconf.5.html#enable_long_queue_ids), + but since the uniqueness of short queue_ids is very limited, + usage of long queue_ids is *strongly recommended*. + +Installation: + + - apt install python3-psycopg2 python3-systemd python3-yaml + - Edit /etc/journal-postfix/main.yml + - Output is written to the journal (unit journal-postfix). READ IT! + +Side effects (database): + + - The configured database user will create the tables + - delivery_from + - delivery_to + - noqueue + in the configured database, if they do not yet exist. + These tables will be filled with results from parsing the journal. + Table noqueue contains deliveries rejected by smtpd before they + got a queue_id. Deliveries with queue_id are in tables delivery_from + and delivery_to, which are separate, because an email can have only + one sender, but more than one recipient. Entries in both tables are + related through the queue_id and the approximate date; note that + short queue_ids are not unique for a delivery transaction, so + consider changing your Postfix configuration to long queue_ids. + - Log output is written to journald, unit journal-postfix. + +Configuration: + + - Edit the config file in YAML format located at + /etc/journal-postfix/main.conf + +Limitations: + + - The log output of Postfix may contain messages not primarily relevant + for delivery, namely messages of levels panic, fatal, error, warning. + They are discarded. + - The postfix server must be configured to use virtual mailboxes; + deliveries to local mailboxes are ignored. + - Parsing is specific to a Postfix version and only version 3.4.5 + (the version in Debian buster) is supported; it is intended to support + Postfix versions in future stable Debian releases. + - This script does not support concurrency; we assume that there is only + one process writing to the database tables. Thus clustered postfix + setups are not supported. + +Options: + + - If you use dovecot as lmtpd, you will also get dovecot_ids upon + successful delivery. + - If you have configured Postfix to store VERP-ids of outgoing mails + in table 'mail_from' in the same database, then bounce emails can + be associated with original emails. The VERP-ids must have a certain + format. + - The subject of emails will be extracted from log messages starting + with "info: header Subject:". To enable these messages configure + Postfix like this: Enabled header_checks in main.cf ( + header_checks = regexp:/etc/postfix/header_checks + ) and put this line into /etc/postfix/header_checks: + /^Subject:/ INFO + - You can also import log messages from a log file in syslog format: + Run this script directly from command line with options --file + (the path to the file to be parsed) and --year (the year of the + first message in this log file). + Note: For the name of the month to be recognized correctly, the + script must be run with this locale. + Attention: When running from the command line, log output will + not be sent to unit journal-postfix; use this command instead: + journalctl --follow SYSLOG_IDENTIFIER=python3 diff --git a/journal-postfix/files/srv/parser.py b/journal-postfix/files/srv/parser.py new file mode 100644 index 0000000..3cc21f4 --- /dev/null +++ b/journal-postfix/files/srv/parser.py @@ -0,0 +1,1514 @@ +r""" +Parse postfix log messages. + +The parser (:func:`parse_entry`) uses regular expressions to produce a dict. +This dict is used by :func:`extract_delivery` to extract relevant mail +delivery information. + +If VERP id parsing is to be used, call :func:`init` before using +:func:`parse`. + +Comments like 'smtpd/smtpd.c:1663-1664' refer to a postfix-3.4.7 source file +and line numbers within that file. + +We assume that verbose logging is off, which is the default (util/msg.c:177). +We do not parse verbose messages (variable "msg_verbose", usually). +Useful command for searching postfix sources (in subdirs global, smtpd, qmgr, +...): + + rg 'msg_info\("%s:' -B2 -n *.c + +We only parse messages of level info and some of level warning +(see util/msg.c:67-71, also cf. postlog/postlog.c:140-150); +messages of levels 'error', 'fatal', 'panic' are discarded. + +We only parse messages with queue_id (considering "NOQUEUE" as +queue_id == None). + +Coverage of postfix daemon components: + + * smtpd: yes (includes submission) + * trivial-rewrite: yes (no relevant messages of level msg_info) + * cleanup: yes + * qmgr: yes + * smtp, lmtp: mostly + * bounce: mostly + * virtual: mostly + * error: partially + +These components are not covered: + + * anvil + * discard + * dnsblog + * flush + * local + * oqmgr + * pickup + * pipe + * postlogd + * postscreen + * proxymap + * qmqpd + * scache + * showq + * tlsmgr + * tlsproxy + * verify + +Note: In particular, local delivery is not supported! + +For lmtp we try to extract a dovecot_id from the delivery status text, +see :func:`_get_delivery_status`. +""" + +import re +from pprint import pprint +from traceback import format_exc +from typing import List, Optional, Tuple, Union + + +ignore_identifiers = ( + 'postfix/master', + 'postfix/postfix-script', + 'configure-instance.sh', + 'postmulti', +) +""" +Syslog identifiers to ignore. +""" + + +where = ( + 'CONNECT', + 'DATA content', + 'BDAT content', + 'END-OF-MESSAGE', + 'HELO', + 'EHLO', + 'STARTTLS', + 'AUTH', + 'MAIL', + 'RCPT', + 'DATA', + 'BDAT', + 'RSET', + 'NOOP', + 'VRFY', + 'ETRN', + 'QUIT', + 'XCLIENT', + 'XFORWARD', + 'UNKNOWN', +) +""" +Possible smtpd "where" values from smtpd/smtpd.c:235-260. +""" + + +smtpd_whatsup_actions = { + 'reject': 'reject', + 'hangup': 'reject', + 'info': None, + 'warn': None, + 'filter': None, + 'hold': 'hold', + 'delay': 'delay', + 'discard': 'discard', + 'redirect': 'redirect', + 'bcc': None, + 'permit': None, + 'reject_warning': 'reject', +} +""" +Keys are from `rg log_whatsup smtpd/*.c` and smtpd/s,tpd_check.c:998,1038. + +Map the possible smtpd whatsup value to our action. +""" + + +cleanup_actions = { + 'reject': 'reject', + 'warning': None, + 'info': None, + 'filter': None, + 'pass': None, + 'discard': 'discard', + 'hold': 'hold', + 'delay': 'delay', + 'prepend': None, + 'replace': None, + 'redirect': 'redirect', + 'bcc': None, + 'strip': None, +} +""" +Possible cleanup "actions" and a mapping to our action. +""" + + +smtp_hbc_actions = ['warning', 'info', 'replace', 'prepend', 'strip'] +""" +SMTP header body check actions. +""" + + +cleanup_contexts = ('header', 'body', 'content') + + +mime_error_texts = [ + 'MIME nesting exceeds safety limit', + 'message header length exceeds safety limit', + 'improper use of 8-bit data in message header', + 'improper use of 8-bit data in message body', + 'invalid message/* or multipart/* encoding domain', +] +""" +MIME error texts in cleanup. +""" + + +# rfc3464_actions = ['failed', 'delayed', 'delivered', 'relayed', 'expanded'] + + +address_pattern = r'([^">]*|"([^ "\\]|\\[^ ])*"@[^>]*)' +""" +Email address pattern. + +Either match any number of characters not containing '"' or '>', or +match a local part followed by '@' and a domain part, where the +domain part is arbitrary, but must not contain '>', and the local +part begins and ends with a '"' and contains 1) any char except +space, tab, '"', r'\' or 2) any char except tab prepended by r'\'. + +Note: + + * email addresses are by default logged in 'external' format by Postfix >=3.5: + http://www.postfix.org/postconf.5.html#info_log_address_format + * https://stackoverflow.com/questions/201323/how-to-validate-an-email-address-using-a-regular-expression +""" + + +regexp_patterns = { + 'queue_id_short': r'([0-9A-F]{12})', + 'queue_id_long': r'([0-9BCDFGHIJKLMNPQRSTVWXYZbcdfghijklmnpqrstvwxyz]{10,15}z[0-9BCDFGHIJKLMNPQRSTVWXYZbcdfghijklmnpqrstvwxy]{5,10})', + 'from': 'from=<' + address_pattern + '>', + 'to': 'to=<' + address_pattern + '>', + 'orig_to': 'orig_to=<' + address_pattern + '>', + 'message-id': r'message-id=<([^>]*)>', + 'nrcpt': r'nrcpt=([0-9]+)', + 'relay': r'relay=(none|local|virtual|([^ ]+)\[([^\]]+)\](:([0-9]+))?)', # matches 5 values + 'delay': r'delay=([0-9\.]+)', + 'delays': r'delays=([0-9\./]+)', + 'dsn': r'dsn=([0-9\.]+)', + 'proto': r'proto=(SMTP|ESMTP)', + 'helo': r'helo=<([^>]*)>', + 'host_ip': r'([^ ]+)\[([^\]]+)\]', # matches 2 values + 'none': r'none', + 'dovecot_id': r'([A-za-z0-9/\+]{22}) Saved', + 'sasl_username': r'sasl_username=?', + 'sasl_method': r'sasl_method=(.*)', + 'size': r'size=([0-9]+)', + 'orig_client': r'orig_client=(.*)', + 'orig_queue_id': r'orig_queue_id=(.*)', + 'sasl_sender': r'sasl_sender=(.*)', + 'cleanup_context': '(' + '|'.join(cleanup_contexts) + ')', +} +""" +regexp patterns, usually for matching one expression. +""" + + +cleanup_milter_apply_events = ( + 'END-OF-MESSAGE', + 'CONNECT', + 'EHLO', + 'MAIL', + 'RCPT', + 'DATA', +) +""" +cleanup milter stages. +""" + + +regexps = { + 'cleanup_optional_text': re.compile(' helo=<[^>]*>: '), + 'failed_mail': re.compile( + r'^([^\[]+)\[([^\]]+)\]: (.*); proto=[^ ]+ helo=<([^>]+)>$' + ), + 'failed_rcpt': re.compile( + r'^([^\[]+)\[([^\]]+)\]: (.*); from=<([^>]*)> to=<([^>]+)> proto=[^ ]+ helo=<([^>]+)>$' + ), +} +""" +Special regular expressions for matching expressions which are harder to parse. +""" + + +for label, pattern in regexp_patterns.items(): + regexps['l_' + label] = re.compile(r'(^' + pattern + r'(: |, | |$))') + regexps['r_' + label] = re.compile(r'( ?' + pattern + r'$)') + + +def _strip_queue_id(msg, res, pos='l', target_name='queue_id'): + """ + Strip a postfix queue_id at the left or right end of *msg*. + + The queue_id can either be a short one, or a long one. + If none is matched, return *msg*. + """ + m = regexps[pos + '_queue_id_short'].search(msg) + if not m: + m = regexps[pos + '_queue_id_long'].search(msg) + if not m: + return msg + res[target_name] = m.group(2) + l_ = len(m.group(1)) + return msg[l_:] if pos == 'l' else msg[:-l_] + + +def _strip_pattern(msg, res, pattern_name, pos='l', target_names=None): + """ + Strip a pattern at the left/right end of *msg* and store fields. + + Matching at the left (right) end is chosen be pos='l' (pos='r'). + *pattern_name* is a key from regexp_patterns. + If target_names is set, it must be an iterable. Each name in it + is used as a key in *res* and the values are set to values matched + by the pattern. Most patterns in regexp_patterns only match one + value; in this case the by default (target_names=None) the + *pattern_name* will be used as key in *res*. + """ + m = regexps[pos + '_' + pattern_name].search(msg) + if m: + if target_names is None: + target_names = (pattern_name,) + for ind, target_name in enumerate(target_names): + res[target_name] = m.group(2 + ind) + l_ = len(m.group(1)) + msg = msg[l_:] if pos == 'l' else msg[:-l_] + return msg + + +def _rstrip_smtpd_whatsup(msg, res): + """ + Strip and store smtpd_whatsup fields at the right end of *msg*. + + Return the msg part in front of them. + """ + # from=<{sender}> to=<{recipient}> proto=ESMTP helo=<{helo}> sasl_username=<{sasl_username}> + for field_name in ('sasl_username', 'helo', 'proto', 'to', 'from'): + msg = _strip_pattern(msg, res, field_name, pos='r') + return msg + + +def _strip_host_ip(msg, res, pos='l', allow_none=False): + """ + Strip a hostname followed by an IP address in brackets from *msg*. + + *pos* determines whether the pattern is matched on the left or + right hand side of *msg*. + + The hostname is put into res['host'], the IP address into res['ip']. + If the hostname equals "unknown", it is set to None. + + If allow_none evaluates to True and msg == 'none', this results in + res['host'] = None and res['ip'] = None. + """ + if allow_none: + msg_ = _strip_pattern(msg, res, 'none', pos=pos) + if msg_ != msg: # "none" was matched + res['host'] = None + res['ip'] = None + return msg_ + msg = _strip_pattern( + msg, res, 'host_ip', pos=pos, target_names=('host', 'ip') + ) + if res.get('host') == 'unknown': + res['host'] = None + return msg + + +def _strip_relay(msg, res, pos='l'): + """ + Strip a relay pattern from *msg*. + + If "relay=none" or "relay=local" or "relay=virtual" we + set host,destination,port in res to None. + The destination can be an IP address or text + (e.g., "private/dovecot-lmtp"). + """ + msg = _strip_pattern( + msg, + res, + 'relay', + pos=pos, + target_names=('relay_full', 'host', 'destination', 'port_', 'port'), + ) + if res.get('relay_full') in ('none', 'local', 'virtual'): + res['relay'] = res.get('relay_full') + res['host'] = None + res['destination'] = None + res['port'] = None + else: + res['relay'] = 'external' + if res['destination'] == 'private/dovecot-lmtp': + res['relay'] = 'lmtp' + return msg + + +def _lstrip_where(msg, res): + """ + Strip and store 'where' string at the left end of *msg*. + + Return *msg*, where the 'where' string was strippped, if found. + See global variable 'where'. + """ + for where_ in where: + if msg.startswith(where_): + res['where'] = where_ + return msg[len(where_):] + return msg + + +def _strip(text, part, pos): + """ + Strip *part* from *text* on the left (pos='l') or right (pos='r') hand side. + """ + if pos == 'l': + if text.startswith(part): + return text[len(part):] + if pos == 'r': + if text.endswith(part): + return text[:-len(part)] + return text + + +def _parse_log_adhoc(msg, res): + """ + Parse a log message formatted in global/log_adhoc.c:82-214 (log_adhoc). + + The message has one of these two formats: + + to=<{to}>, relay={none_or_host_ip_port}, delay={delay}, delays={delays}, dsn={dsn}, status={delivery_status} ({delivery_status_text}) + to=<{to}>, orig_to=<{orig_to}>, relay={none_or_host_ip_port}, delay={delay}, delays={delays}, dsn={dsn}, status={delivery_status} ({delivery_status_text}) + + The resulting action will be 'delivery_status'. + """ + if msg.startswith('to=<'): + msg_ = _strip_pattern(msg, res, 'to', pos='l') + msg_ = _strip_pattern(msg_, res, 'orig_to', pos='l') + msg_ = _strip_relay(msg_, res, pos='l') + msg_ = _strip_pattern(msg_, res, 'delay', pos='l') + msg_ = _strip_pattern(msg_, res, 'delays', pos='l') + msg_ = _strip_pattern(msg_, res, 'dsn', pos='l') + _get_delivery_status(msg_, res) + if 'delivery_status' not in res: + res['parsed'] = False + return msg + + +def _get_delivery_status(msg, res): + """ + Extract the delivery status from the beginning of *msg* into *res*. + + A delivery status looks like this: + + status={status} ({text}) + + Here {status} is a word (has no spaces). + + The result is put into res['delivery_status'] and res['delivery_status_text']. + """ + if msg.startswith('status='): + res['action'] = 'delivery_status' + if ' ' in msg: + status, detail = msg.split(' ', 1) + res['delivery_status'] = status[7:] + res['delivery_status_text'] = detail.lstrip('(').rstrip(')') + # also try to extract a dovecot_id; example: + # 250 2.0.0 9y4WEP+qV11uBgAAZU03Dg Saved + _strip_pattern( + res['delivery_status_text'], res, 'dovecot_id', pos='r' + ) + else: # can this happen at all? + res['delivery_status'] = msg[7:] + return msg + + +def init_parser(verp_marker: Optional[str] = None): + """ + Init the module. + + *verp_marker* is the VERP marker. + """ + global regexps + if verp_marker: + regexps['verp_id'] = re.compile( + r'(.*\+.*)' + verp_marker + r'-(\d+)(@[^@]+$)' + ) + + +def parse_entry( + msg_details: dict, debug: Union[bool, List[str]] = False +) -> Optional[dict]: + """ + Parse a log message returning a dict. + + *msg_details* is assumed to be a dict with these keys: + + * 'identifier' (syslog identifier), + * 'pid' (process id), + * 'message' (message text) + + The syslog identifier and process id are copied to the resulting dict. + """ + identifier = msg_details['identifier'] + if identifier in ignore_identifiers: + return None + pid = msg_details['pid'] + message = msg_details['message'].strip() + # postfix component + component = ( + identifier[8:] if identifier.startswith('postfix/') else identifier + ) + res = {'comp': component, 'pid': pid} + # do we have a postfix queue identifer? + if message.startswith('NOQUEUE: '): + res['queue_id'] = None + msg_ = message[9:] + else: + # do not put key 'queue_id' into res, if not found + msg_ = _strip_queue_id(message, res) + try: + if 'queue_id' in res: + _parse_branch(component, msg_, res) + except Exception: + res['parsed'] = False + print('PARSING FAILED:', message) + print(format_exc()) + res['parsed'] = not res.get('parsed') is False + if debug: + if not res['parsed']: + print('-' * 20, message) + print(component) + elif 'queue_id' in res and res.get('action') == 'ignore': + print('I' * 20, message) + print(component) + else: + print(message) + pprint(res) + print('_' * 100) + return res + + +def _parse_branch(comp: str, msg: str, res: dict) -> None: + """ + Parse a log message string *msg*, adding results to dict *res*. + + Depending on the component *comp* we branch to functions + named _parse_{comp}. + + Add parsing results to dict *res*. Always add key 'action'. + Try to parse every syntactical element. + Note: We parse what we can. Assessment of parsing results relevant + for delivery is done in :func:`extract_delivery`. + """ + if ( + msg.startswith('warning: ') + or msg.startswith('error: ') + or msg.startswith('fatal: ') + or msg.startswith('panic: ') + or msg.startswith('using backwards-compatible default setting') + ): + res['action'] = 'ignore' + return + if comp == 'smtpd' or comp.endswith( + '/smtpd' + ): # includes 'submission/smtpd' + _parse_smtpd(msg, res) + res['submission'] = comp.startswith('submission') + elif comp == 'qmgr': + _parse_qmgr(msg, res) + elif comp == 'cleanup': + _parse_cleanup(msg, res) + elif comp == 'trivial-rewrite': + _parse_trivial_rewrite(msg, res) + elif comp in ('smtp', 'lmtp') or comp.endswith( + '/smtp' + ): # includes 'relay/smtp' + _parse_smtp(msg, res) + res['smtp_relay'] = comp.startswith('relay') + elif comp == 'bounce': + _parse_bounce(msg, res) + elif comp == 'virtual': + _parse_virtual(msg, res) + elif comp == 'error': + _parse_error(msg, res) + else: + res['parsed'] = False + + # extract a possible verp_id from orig_to + if 'orig_to' in res: + res['verp_id'], res['orig_to'] = _parse_verp_id(res['orig_to']) + elif 'from' in res: + res['verp_id'], res['from'] = _parse_verp_id(res['from']) + + +def _parse_verp_id(email): + """ + Return th VERP id and the original email. + """ + if 'verp_id' in regexps: + m = regexps['verp_id'].match(email) + if m: + verp_id = m.group(2) + orig_email = m.group(1).rstrip('+') + m.group(3) + return verp_id, orig_email + return None, email + + +def _parse_smtpd(msg, res): + """ + Parse log messages of the smtpd component, including submission. + """ + # smtpd/smtpd.c:2229-2246 smtpd_sasl_auth_cmd_wrapper + # client={hostname_or_unknown}[{ip_address}] + # client={hostname_or_unknown}[{ip_address}], sasl_method={sasl_method}, sasl_username={sasl_username}, sasl_sender={sasl_sender}, orig_queue_id={orig_queue_id}, orig_client={orig_client} + # (sasl_* and orig_* are individually optional) + if msg.startswith('client='): + msg_ = _strip_host_ip(msg[7:], res, pos='l') + msg_ = _strip_pattern(msg_, res, 'orig_client', pos='r') + msg_ = _strip_pattern(msg_, res, 'orig_queue_id', pos='r') + msg_ = _strip_pattern(msg_, res, 'sasl_sender', pos='r') + msg_ = _strip_pattern(msg_, res, 'sasl_username', pos='r') + msg_ = _strip_pattern(msg_, res, 'sasl_method', pos='r') + res['action'] = 'connect' + + # smtpd/smtpd_check.c:949-967 log_whatsup + # {smtpd_whatsup}: {where} from {hostname_or_unknown}[{ip_address}]: {error}; from=<{from}> to=<{to}> proto={proto} helo=<{helo}> + # smtpd/smtpd.c:5411-5414 smtpd_proto + # reject: {where} from {hostname_or_unknown}[{ip_address}]: 421 4.3.0 {myhostname} Server local data error + # reject: {where} from {hostname_or_unknown}[{ip_address}]: {error} + elif ': ' in msg and msg.split(': ', 1)[0] in smtpd_whatsup_actions.keys(): + smtpd_whatsup, msg_ = msg.split(': ', 1) + res['smtpd_whatsup'] = smtpd_whatsup + if smtpd_whatsup_actions.get(smtpd_whatsup): + res['action'] = smtpd_whatsup_actions.get(smtpd_whatsup) + else: + res['action'] = 'ignore' + msg_ = _lstrip_where(msg_, res) + msg_ = _strip(msg_, ' from ', 'l') + msg_ = _strip_host_ip(msg_, res, pos='l') + msg_ = _strip(msg_, ': ', 'l') + msg_ = _rstrip_smtpd_whatsup(msg_, res) + msg_ = _strip(msg_, ';', 'r') + res['error'] = msg_ + + # smtpd/smtpd.c:1663-1664 check_milter_reply + # {milter_action}: {where} from {hostname_or_unknown}[{ip_address}]: {error}; from=<{from}> to=<{to}> proto={proto} helo=<{helo}> + elif msg.startswith('milter-hold: '): + msg_ = _strip(_rstrip_smtpd_whatsup(msg[13:], res), ';', 'r') + msg_ = _strip(_lstrip_where(msg_, res), ' from ', 'l') + msg_ = _strip(_strip_host_ip(msg_, res, pos='l'), ': ', 'l') + res['error'] = msg_ + res['action'] = 'hold' + elif msg.startswith('milter-discard: '): + msg_ = _strip(_rstrip_smtpd_whatsup(msg[16:], res), ';', 'r') + msg_ = _strip(_lstrip_where(msg_, res), ' from ', 'l') + msg_ = _strip(_strip_host_ip(msg_, res, pos='l'), ': ', 'l') + res['error'] = msg_ + res['action'] = 'discard' + elif msg.startswith('milter-reject: '): + msg_ = _strip(_rstrip_smtpd_whatsup(msg[15:], res), ';', 'r') + msg_ = _strip(_lstrip_where(msg_, res), ' from ', 'l') + msg_ = _strip(_strip_host_ip(msg_, res, pos='l'), ': ', 'l') + res['error'] = msg_ + res['action'] = 'reject' + + # smtpd/smtpd.c:5099-5102 smtpd_start_tls + # abort: TLS from {hostname_or_unknown}[{ip_address}]: {error} + elif ( + 'queue_id' in res + and res['queue_id'] is None + and msg.startswith('abort: TLS from ') + ): + msg_ = msg[16:] + msg_ = _strip_host_ip(msg_, res, pos='l') + msg_ = _strip(msg_, ': ', 'l') + res['error'] = msg_ + res['action'] = 'reject' + + # smtpd/smtpd.c:1850 ehlo_cmd + # discarding EHLO keywords: {keywords} + elif msg.startswith('discarding EHLO keywords: '): + res['action'] = 'ignore' + + # smtpd/smtpd.c:5624 smtpd_proto + # replacing command {command1} with {command2} + # without queue_id + # -> ignore + + else: + res['action'] = 'ignore' + res['parsed'] = False + + +def _parse_qmgr(msg, res): + """ + Parse log messages of the qmgr component. + """ + # qmgr/qmgr_active.c:441 qmgr_active_done_25_generic + # from=<{from}>, status=expired, returned to sender + if msg.startswith('from=<') and msg.endswith( + ', status=expired, returned to sender' + ): + res['from'] = msg[6:-36].rstrip('>') + res['action'] = 'expired' + + # qmgr/qmgr_active.c:520 qmgr_active_done_3_generic + elif msg == 'removed': + res['action'] = 'removed' + + # qmgr/qmgr.c:680 pre_accept + # %s: %s feedback type %d value at %d: %g + # without queue_id + # -> ignore + + # qmgr/qmgr_feedback.c:170 + # "%s: %s feedback type %d value at %d: %g" + # without queue_id + # -> ignore + + # qmgr/qmgr_message.c:642-644 qmgr_message_read + # global/opened.c:64,70,76,84-86 opened,vopened + # from=<{from}>, size={size}, nrcpt={nrcpt} (queue {queue_name}) + elif msg.startswith('from=<') and ', nrcpt=' in msg: + msg_ = _strip_pattern(msg, res, 'from', pos='l') + msg_ = _strip_pattern(msg_, res, 'size', pos='l') + msg_ = _strip_pattern(msg_, res, 'nrcpt', pos='l') + msg_ = _strip(msg_, '(queue ', 'l').rstrip(')') + if msg_: + res['queue_name'] = msg_ + res['action'] = 'queued' + + # qmgr/qmgr_message.c:1493 qmgr_message_alloc - create in-core message structure + elif msg == 'skipped, still being delivered': + res['action'] = 'skipped' + + # qmgr/qmgr_queue.c:124-132 + # "%s: feedback %g" + # "%s: queue %s: limit %d window %d success %g failure %g fail_cohorts %g" + # without queue_id + # -> ignore + + else: + res['action'] = 'ignore' + res['parsed'] = False + + +def _parse_cleanup(msg, res): + """ + Parse log messages of the cleanup component. + """ + if msg.startswith('info: '): + msg_ = msg[6:] + if msg_.lower().startswith('header subject: '): + msg_ = _rstrip_smtpd_whatsup(msg_[16:], res).rstrip(';') + res['subject'] = _strip( + _strip_host_ip(msg_, res, pos='r'), ' from', 'r' + ) + res['action'] = 'subject' + else: + res['action'] = 'ignore' + + # cleanup/cleanup.c:584 + elif msg.startswith('table ') and msg.endswith( + ' has changed -- restarting' + ): + res['action'] = 'ignore' + + # cleanup/cleanup_masquerade.c:111 + # "%s: %s map lookup problem -- " + # -> ignore + + # cleanup/cleanup_message.c:1025 cleanup_mime_error_callback + # reject: mime-error {mime_error}: {mime_error_detail} from {unknown_or_host_ip}; from=<{from}> to=<{to}> + elif msg.startswith('reject: mime-error '): + res['action'] = 'reject' + msg_ = msg[19:] + for mime_error_text in mime_error_texts: + if msg_.startswith(mime_error_text): + res['mime_error'] = mime_error_text + msg_ = msg_[len(mime_error_text):] + for field_name in ('to', 'from'): + msg_ = _strip_pattern(msg_, res, field_name, pos='r') + if msg_.endswith(' unknown'): + msg_ = msg_[:-8] + else: + msg_ = _strip_host_ip(msg_, res, pos='r') + msg_ = _strip(msg_, ' from', 'r') + msg_ = _strip(msg_, ': ', 'l') + if 'mime_error' in res: + res['mime_error_detail'] = msg_ + + # cleanup/cleanup_message.c:255-276 cleanup_act_log + # {cleanup_action}: {class} {content} from {attr}; from=<{from}> to=<{to}> proto={proto} helo=<{helo}>: {optional_text} + elif ': ' in msg and msg.split(': ', 1)[0] in cleanup_actions.keys(): + cleanup_action, msg_ = msg.split(': ', 1) + res['action'] = 'cleanup' + res['cleanup_action'] = cleanup_action + if cleanup_actions.get(cleanup_action): + res['action'] = cleanup_actions.get(cleanup_action) + msg_ = _strip_pattern(msg_, res, 'cleanup_context', pos='l') + parts = regexps['cleanup_optional_text'].split(msg_) + if len(parts) > 1: # {optional_text} is present + res['cleanup_optional_text'] = parts[1] + msg_ = msg_[: -len(parts[1]) - 2] + for field_name in ('helo', 'proto', 'to', 'from'): + msg_ = _strip_pattern(msg_, res, field_name, pos='r') + msg_ = _strip(msg_, ';', 'r') + if msg_.endswith('unknown'): + msg_ = msg_[:-8] + else: + msg_ = _strip_host_ip(msg_, res, pos='r') + msg_ = _strip(msg_, ' from', 'r') + res['text'] = msg_ + + # cleanup/cleanup_message.c:626,724-731 cleanup_header_callback + elif msg.startswith('message-id='): + res['message_id'] = msg[11:].lstrip('<').rstrip('>') + res['action'] = 'message_id' + + # cleanup/cleanup_message.c:628,724-731 cleanup_header_callback + elif msg.startswith('resent-message-id='): + res['resent_message_id'] = msg[18:].lstrip('<').rstrip('>') + res['action'] = 'resent_message_id' + + # cleanup/cleanup_milter.c:2066 cleanup_milter_apply + # {milter_action}: {where} from {hostname}[{ip}]: {text}; from=<{from}> to=<{to}> proto={proto} helo=<{helo}> + elif ( + msg.startswith('milter-reject: ') + or msg.startswith('milter-discard: ') + or msg.startswith('milter-hold: ') + ): + act_, msg_ = msg.split(': ', 1) + res['action'] = 'milter_action' + res['milter_action'] = act_[7:] + for event in cleanup_milter_apply_events: + if msg_.startswith(event): + res['milter_event'] = event + msg_ = msg_[len(event):] + break + for field_name in ('helo', 'proto', 'to', 'from'): + msg_ = _strip_pattern(msg_, res, field_name, pos='r') + msg_ = _strip(msg_, ';', 'r') + msg_ = _strip(msg_, ' from ', 'l') + msg_ = _strip_host_ip(msg_, res, pos='l') + msg_ = _strip(msg_, ': ', 'l') + res['text'] = msg_ + + # cleanup/cleanup_milter.c:252 cleanup_milter_hbc_log + # milter-{where}-{cleanup_action}: {where} {text} from {hostname}[{ip}]; from=<{from}> to=<{to}> proto={proto} helo=<{helo}>: {optional_text} + elif msg.startswith('milter-'): + msg_ = msg[7:] + for wh in where: + if msg_.startswith(wh): + res['where'] = wh + break + if where in res: + msg_ = msg_[1:] + for cleanup_action in cleanup_actions.keys(): + if msg_.startswith(cleanup_action): + res['cleanup_action'] = cleanup_action + if cleanup_actions.get(cleanup_action): + res['action'] = cleanup_actions.get(cleanup_action) + break + if 'cleanup_action' in res: + msg_ = _strip(msg_, ': ' + res['where'], 'l') + parts = regexps['cleanup_optional_text'].split(msg_) + if len(parts) > 1: # {optional_text} is present + res['cleanup_optional_text'] = parts[1] + msg_ = msg_[: -len(parts[1]) - 2] + for field_name in ('helo', 'proto', 'to', 'from'): + msg_ = _strip_pattern(msg_, res, field_name, pos='r') + msg_ = _strip(msg_, ';', 'r') + msg_ = _strip_host_ip(msg_, res, post='r') + msg_ = _strip(msg_, ' from', 'r') + res['text'] = msg_ + if 'action' not in res: + res['action'] = 'milter_cleanup' + else: + res['parsed'] = False + else: + res['parsed'] = False + + # cleanup/cleanup_milter.c:2538 + # closing: {text} + # -> ignore (because no queue_id) + + # cleanup/cleanup_milter.c:2557 + # ignoring: {text} + # -> ignore (because no queue_id) + + # cleanup/cleanup_milter.c:2684 + # flags = {text} + # -> ignore (because no queue_id) + + # cleanup/cleanup_milter.c:2686 + # errs = {text} + # -> ignore (because no queue_id) + + else: + res['action'] = 'ignore' + res['parsed'] = False + + +def _parse_trivial_rewrite(msg, res): + """ + Parse log messages of the trivial-rewrite component. + + Currently there is no relevant logging with level msg_info, + so we ignore all messages. + """ + res['action'] = 'ignore' + + +def _parse_smtp(msg, res): + """ + Parse log messages of the smtp component. + """ + # Logging information is often added to a DSB (delivery status buffer), + # more precisely to dsb->why and text to dsb->why->reason; adding is + # done with functions like dsb_simple, dsb_formal, dsb_update from + # global/dsn_buf.c; use `rg -A3 -B3 dsb_` to find this stuff. + # + # Other logging is done by calling log_adhoc, let's handle this and + # special cases first: + + # global/log_adhoc.c:82-214 log_adhoc + # to=<{rcpt}>, relay={none_or_host_ip_port}, delay={delay}, delays={delays}, dsn={dsn}, status={delivery_status} ({delivery_status_text}) + # to=<{rcpt}>, orig_to=<{orig_to}>, relay={none_or_host_ip_port}, delay={delay}, delays={delays}, dsn={dsn}, status={delivery_status} ({delivery_status_text}) + # 1) global/defer.c:267 status=deferred + # example text: lost connection with host.example.com[2606:2800:220:1:248:1893:25c8:1946] while receiving the initial server greeting + # (defer_append() is called in smtp/smtp_trouble.c:264-267,404-407) + # 2) global/bounce.c:322 status=SOFTBOUNCE + # example text: unknown user: "peter" + # 3) global/bounce.c:322,512 status=bounced + # example text: host host.example.com[2606:2800:220:1:248:1893:25c8:1946] said: 500 need some sleep (in reply to DATA command) + # (bounce_append() is called in smtp/smtp_trouble.c:264-267,404-407) + # 4) global/sent.c:162 status=sent + # example text: 250 2.0.0 Ok: queued as D9A33901180 + # (sent() is called in smtp/smtp_rcpt.c:175-177) + if msg.startswith('to=<'): + _parse_log_adhoc(msg, res) # sets action to 'delivery_status' + + # smtp/smtp_proto.c:417 + # enabling PIX workarounds: %s for %s + elif msg.startswith('enabling PIX workarounds: '): + res['action'] = 'ignore' + + # smtp/smtp_connect.c:1057 + + # smtp/smtp_proto.c:1144-1175 smtp_hbc_logger (hbc = header body checks) + # smtp/smtp_proto.c:266-269 smtp_hbc_callbacks + # smtp/smtp.c:1326-1333 hbc_header_checks_create,hbc_body_checks_create + # global/header_body_checks.c:366-415 return hbc where hbc->call_backs contains the logging callback + # the hbc struct is used in :1245-1274(smtp_header_rewrite) and the + # smtp main protocol loop smtp_loop (line 2250 calling mime_state_alloc) + # by calling hbc_header_checks + # global/header_body_checks.c:230-305 hbc_action contains the actual call of the logging callback + # here we have these actions (corresponding to some of the ACTIONS in man 5 header_checks): + # 255: warning, 259: info, 272: replace, 284: prepend, 290: strip + # (see global variable smtp_hbc_actions) + # {smtp_hbc_action}: {header_or_body} {content}: {text} + # {smtp_hbc_action}: {header_or_body} {content} + # -> ignore + elif msg.split(': ', 1)[0] in smtp_hbc_actions: + res['action'] = 'ignore' + + elif msg.startswith('host ') or msg.startswith('Protocol error: host '): + if msg.startswith('Protocol error: '): + msg_ = msg[21:] + res['smtp_protocol_error'] = True + else: + msg_ = msg[5:] + sep1 = ' refused to talk to me: ' + sep2 = ' said: ' + if sep1 in msg: + # smtp/smtp_proto.c:375-378,472-475,484-487,493-496 + # host {hostname_or_unknown}[{ip_address}] refused to talk to me: {error_detail} + host_ip, smtp_error = msg_.split(sep1, 1) + res['smtp_error'] = smtp_error + _strip_host_ip(host_ip, res, pos='l') + res['action'] = 'smtp_error' + elif sep2 in msg: + # smtp/smtp_proto.c:1940-1944,2013-2017,2037-2041,2084-2088,2111-2115 + # host {hostname_or_unknown}[{ip_address}] said: {error_text} (in reply to {command}) + # (calls smtp_rcpt_fail or smtp_mesg_fail in smtp/smtp_trouble.c + # where vsmtp_fill_dsn fills state->why->reason with the + # formatted string, possibly prepending 'Protocol error: ') + host_ip, smtp_error = msg_.split(sep2, 1) + res['smtp_error'] = smtp_error + _strip_host_ip(host_ip, res, pos='l') + res['action'] = 'smtp_error' + else: + res['parsed'] = False + + # smtp/smtp_connect.c:1057 + # smtp/smtp_connect.c:997,195 (call to smtp_connect_addr) + # network address conversion failed: %m + # -> ignore + + # smtp/smtp_connect.c:1057 + # smtp/smtp_connect.c:974 (call to smtp_tls_policy_cache_query) TODO + + # smtp/smtp_proto.c:839 + # smtp/smtp_sasl_proto.c:173-175 + # SASL authentication failed: server %s offered no compatible authentication mechanisms for this type of connection security + # -> ignore + + # smtp/smtp_sasl_glue.c:403-405 + # SASL authentication failed; authentication protocol loop with server %s + # -> ignore + + # smtp/smtp_addr.c:216-220 + # unable to look up host %s: %s + # -> ignore + + # smtp/smtp_addr.c:236-237 + # {host}: host not found + # -> ignore + + # smtp/smtp_addr.c:609-610 + # unable to find primary relay for %s + # -> ignore + + # smtp/smtp_addr.c:612-613,677 + # mail for %s loops back to myself + # -> ignore + + # smtp/smtp_connect.c:141 + # Server configuration error + # -> ignore + + # smtp/smtp_connect.c:195 + # network address conversion failed: %m + # -> ignore + + # smtp/smtp_connect.c:308-309 + # connect to %s[%s]:%d: %m + # -> ignore + + # smtp/smtp_connect.c:311 + # connect to %s[%s]: %m + # -> ignore + + # smtp/smtp_connect.c:782 + # all network protocols are disabled + # -> ignore + + # smtp/smtp_connect.c:1085-1086 + # server unavailable or unable to receive mail + # -> ignore + + # smtp/smtp_tls_policy.c:168 + # client TLS configuration problem + # -> ignore + + # smtp/smtp_tls_policy.c:209 + # Temporary lookup error + # -> ignore + + # smtp/smtp_tls_policy.c:861-862 + # TLSA lookup error for %s:%u + # -> ignore + + # # dsb_simple, vdsb_simple + + # calls to smtp/smtp_tls_policy.c:769 dane_incopmat (vdsb_simple in lin 789): TODO + + # # dsb_update + + # smtp/smtp_tls_policy.c:714-718 TODO + + # smtp/smtp_sasl_glue.c:359-363 + # SASL authentication failed; cannot authenticate to server %s: %s + elif msg.startswith( + 'SASL authentication failed; cannot authenticate to server ' + ): + res['action'] = 'ignore' + + # smtp/smtp_sasl_glue.c:422-426 + # SASL authentication failed; non-empty initial %s challenge from server %s: %s + elif msg.startswith('SASL authentication failed; non-empty initial '): + res['action'] = 'ignore' + + # smtp/smtp_sasl_glue.c:433-437 + # SASL authentication failed; cannot authenticate to server %s: %s + elif msg.startswith( + 'SASL authentication failed; cannot authenticate to server ' + ): + res['action'] = 'ignore' + + # smtp/smtp_sasl_glue.c:460-464 + # SASL authentication failed; server %s said: %s + elif ( + msg.startswith('SASL authentication failed; server ') + and ' said: ' in msg + ): + res['action'] = 'ignore' + + # smtp/smtp_sasl_glue.c:342-345 + # SASL [CACHED] authentication failed; server %s said: %s + # and + # smtp/smtp_sasl_proto.c:192-193 smtp_sasl_helo_login calling smtp/smtp_sasl_glue.c:311 smtp_sasl_authenticate + # SASL [CACHED] authentication failed; server %s said: %s + elif msg.startswith('SASL [CACHED] authentication failed; server '): + res['action'] = 'ignore' + + # # smtp_sess_fail + + # smtp/smtp_sasl_proto.c:173-175 smtp_sasl_helo_login + # SASL authentication failed: server %s offered no compatible authentication mechanisms for this type of connection security + elif msg.startswith( + 'SASL authentication failed: server ' + ) and msg.endswith( + ' offered no compatible authentication mechanisms ' + 'for this type of connection security' + ): + res['action'] = 'ignore' + + # smtp/smtp_sasl_proto.c:192-193 smtp_sasl_helo_login calling smtp/smtp_sasl_glue.c:311 smtp_sasl_authenticate + # SASL [CACHED] authentication failed; server %s said: %s + # -> already done + + # smtp/smtp_connect.c:1196 prevents smtp/smtp_trouble.c:224-225 and thus + # defer_append or bounce_append get called in lines 264-267, which both + # call log_adhoc and are covered above + # -> already done + + # # smtp_bulk_fail + + # smtp/smtp_trouble.c:435,461 (calling smtp_bulk_fail calling msg_info in line 226) + # lost connection with {hostname_or_unknown}[{ip_address}] while {text} + # example text: receiving the initial server greeting + # example text: performing the EHLO handshake + # example text: performing the HELO handshake + # example text: performing the LHLO handshake + # example text: sending MAIL FROM + # example text: sending RCPT TO + # example text: receiving the STARTTLS response + elif msg.startswith('lost connection with '): + res['action'] = 'ignore' + + # smtp/smtp_trouble.c:443,461 (calling smtp_bulk_fail calling msg_info in line 226) + # conversation with {hostname_or_unknown}[{ip_address}] timed out while {text} + elif msg.startswith('conversation with '): + res['action'] = 'ignore' + + # smtp/smtp_trouble.c:452,461 (calling smtp_bulk_fail calling msg_info in line 226) + # local data error while talking to {hostname_or_unknown}[{ip_address}] + elif msg.startswith('local data error while talking to '): + res['action'] = 'ignore' + + # # smtp_site_fail + + # smtp/smtp_proto.c:388-390 + # client TLS configuration problem + # -> ignore + + # smtp/smtp_proto.c:555-558,560-563 + # mail for {nexthop} loops back to myself + # -> ignore + + # smtp/smtp_proto.c:803-806 + # TLS is required, but host %s refused to start TLS: %s + # -> ignore + + # smtp/smtp_proto.c:819-822 + # TLS is required, but was not offered by host %s + # -> ignore + + # smtp/smtp_proto.c:824-826 + # TLS is required, but our TLS engine is unavailable + # -> ignore + + # smtp/smtp_proto.c:830-832 + # TLS is required, but unavailable + # -> ignore + + # smtp/smtp_proto.c:1124-1126 + # Server certificate not trusted + # -> ignore + + # smtp/smtp_proto.c:1129-1131 + # Server certificate not verified + # -> ignore + + # smtp/smtp_proto.c:1960-1962 + # unexpected server message + # -> ignore + + # # smtp_mesg_fail + + # smtp/smtp_proto.c:650-654 + # SMTPUTF8 is required, but was not offered by host %s + # -> ignore + + # smtp/smtp_proto.c:1386-1388 + # %s + # -> ignore + + # smtp/smtp_proto.c:2312-2314 + # unreadable mail queue entry + # -> ignore + + # smtp/smtp_proto.c:2388-2392 + # message size %lu exceeds size limit %.0f of server %s + # -> ignore + + else: + res['action'] = 'ignore' + res['parsed'] = False + + +def _parse_bounce(msg, res): + """ + Parse log messages of the bounce component. + """ + # Logging is mainly done with msg_info. + + # bounce/bounce_notify_service.c:202-203,290-291 + # bounce/bounce_notify_verp.c:239-240 + # bounce/bounce_one_service.c:168-169,244-245 + # postmaster non-delivery notification: {queue_id} + if msg.startswith('postmaster non-delivery notification: '): + _strip_queue_id(msg, res, pos='r', target_name='bounce_id') + res['action'] = 'bounce_final' + + # bounce/bounce_notify_service.c:243-244 + # bounce/bounce_notify_verp.c:187-188 + # bounce/bounce_one_service.c:206-207 + # sender non-delivery notification: {queue_id} + elif msg.startswith('sender non-delivery notification: '): + _strip_queue_id(msg, res, pos='r', target_name='bounce_id') + res['action'] = 'bounce_final' + + # bounce/bounce_warn_service.c:191-192,274-275 + # postmaster delay notification: {queue_id} + elif msg.startswith('postmaster delay notification: '): + _strip_queue_id(msg, res, pos='r', target_name='bounce_id') + res['action'] = 'bounce_delay' + + # bounce/bounce_warn_service.c:230-231 + # sender delay notification: {queue_id} + elif msg.startswith('sender delay notification: '): + _strip_queue_id(msg, res, pos='r', target_name='bounce_id') + res['action'] = 'bounce_delay' + + # bounce/bounce_trace_service.c:107-108 + # not sending trace/success notification for double-bounce message + elif ( + msg + == 'not sending trace/success notification for double-bounce message' + ): + res['action'] = 'ignore' + + # bounce/bounce_trace_service.c:115-116 + # not sending trace/success notification for single-bounce message + elif ( + msg + == 'not sending trace/success notification for single-bounce message' + ): + res['action'] = 'ignore' + + # bounce/bounce_trace_service.c:196-197 + # sender delivery status notification: {queue_id} + elif msg.startswith('sender delivery status notification: '): + _strip_queue_id(msg, res, pos='r', target_name='bounce_id') + res['action'] = 'bounce_notify_sender' + + # bounde/bounce_notify_verp.c:129 + # -> ignore + + # global/bounce.c:368,403 + # status=deferred (bounce failed) + elif msg == 'status=deferred (bounce failed)': + res['action'] = 'bounce_failed' + + else: + res['action'] = 'ignore' + res['parsed'] = False + + +def _parse_virtual(msg, res): + """ + Parse log messages of the virtual component. + """ + # to=<{to}>, relay=virtual, delay={delay}, delays={delays}, dsn={dsn}, status={delivery_status} ({delivery_status_text}) + if msg.startswith('to=<'): + _parse_log_adhoc(msg, res) # sets action to 'delivery_status' + + # virtual/unknown.c:61-62 + # unknown user: "{user}" + # -> ignore + + # virtual/maildir.c:103 + # delivers to maildir + # -> ignore + + # virtual/maildir.c:186-187,211-212 + # create maildir file %s: %m + # -> ignore + + # virtual/maildir.c:242 + # delivered to maildir + # -> ignore + + # virtual/mailbox.c:101 + # delivers to mailbox + # -> ignore + + # virtual/mailbox.c:131,208,234,249,257 + # mail system configuration error + # -> ignore + + # virtual/mailbox.c:134-135 + # destination %s is not owned by recipient + # -> ignore + + # virtual/mailbox.c:163 + # delivered to mailbox + # -> ignore + + else: + res['parsed'] = False + + +def _parse_error(msg, res): + """ + Parse log messages of the error component. + """ + # to=<{to}>, relay=none, delay={delay}, delays={delays}, dsn={dsn}, status={delivery_status} ({delivery_status_text}) + if msg.startswith('to=<'): + _parse_log_adhoc(msg, res) # sets action to 'delivery_status' + else: + res['action'] = 'ignore' + res['parsed'] = False + + +def extract_delivery( + msg_details: dict, parsed: dict +) -> Tuple[Optional[List[str]], Optional[dict]]: + """ + Compute a delivery item from a parsed entry. + + Basically this means that we map the parsed fields to + a type ('from' or 'to') and to the database + fields for table 'delivery_from' or 'delivery_to'. + + We branch to functions _extract_{comp} where comp is the + name of a Postfix component. + + Return a list of error strings and a dict with the + extracted information. Keys with None values are removed + from the dict. + """ + comp = parsed['comp'] + delivery = { + 'type': 'to', + 't': msg_details['t'], + 'queue_id': parsed.get('queue_id'), + 'message': msg_details['message'], + 'identifier': msg_details['identifier'], + 'pid': msg_details['pid'], + 'comp': comp, + } + action = parsed.get('action') + if action == 'ignore' or action is None: + return None, None + elif action == 'subject': + delivery['type'] = 'from' + delivery['subject'] = parsed.get('subject') + elif comp == 'smtpd' or comp.endswith('/smtpd'): + delivery = _extract_smtpd(msg_details, parsed, delivery, action) + delivery['comp'] = 'smtpd' + elif comp == 'qmgr': + delivery = _extract_qmgr(msg_details, parsed, delivery, action) + elif comp == 'cleanup': + delivery = _extract_cleanup(msg_details, parsed, delivery, action) + elif comp == 'trivial-rewrite': + delivery = _extract_trivial_rewrite( + msg_details, parsed, delivery, action + ) + elif comp in ('smtp', 'lmtp') or comp.endswith('/smtp'): + delivery = _extract_smtp(msg_details, parsed, delivery, action) + if delivery: + delivery['comp'] = 'smtp' + elif comp == 'bounce': + delivery = _extract_bounce(msg_details, parsed, delivery, action) + elif comp == 'virtual': + delivery = _extract_virtual(msg_details, parsed, delivery, action) + elif comp == 'error': + delivery = _extract_error(msg_details, parsed, delivery, action) + else: + return ['Cannot extract_delivery'], None + if 'verp_id' in parsed and delivery and delivery['type'] == 'from': + try: + delivery['verp_id'] = int(parsed['verp_id']) + except Exception: + pass + # remove keys with None values + if delivery: + delivery = {k: v for k, v in delivery.items() if v is not None} + return None, delivery + + +def _extract_smtpd(msg_details, parsed, delivery, action): + delivery['type'] = 'from' + if action == 'connect': + delivery['host'] = parsed.get('host') + delivery['ip'] = parsed.get('ip') + delivery['sasl_username'] = parsed.get('sasl_username') + delivery['orig_queue_id'] = parsed.get('orig_queue_id') + elif action in ('reject', 'hold', 'delay', 'discard', 'redirect'): + # Note: Here we may have both sender and recipient. + delivery['host'] = parsed.get('host') + delivery['ip'] = parsed.get('ip') + delivery['status'] = action + delivery['sasl_username'] = parsed.get('sasl_username') + delivery['sender'] = parsed.get('from') + delivery['recipient'] = parsed.get('to') + delivery['phase'] = parsed.get('where') + delivery['error'] = parsed.get('error') + else: + return None + return delivery + + +def _extract_qmgr(msg_details, parsed, delivery, action): + delivery['type'] = 'from' + delivery['status'] = action + if action == 'expired': + delivery['done'] = True + elif action == 'removed': + delivery['done'] = True + elif action == 'queued': + delivery['accepted'] = True + delivery['sender'] = parsed.get('from') + delivery['size'] = parsed.get('size') + delivery['nrcpt'] = parsed.get('nrcpt') + elif action == 'skipped': + delivery['accepted'] = False + delivery['done'] = True + else: + return None + return delivery + + +def _extract_cleanup(msg_details, parsed, delivery, action): + delivery['type'] = 'from' + if action == 'reject': + delivery['status'] = action + delivery['accepted'] = False + if 'mime_error' in parsed: + delivery['phase'] = 'mime' + delivery['error'] = ( + parsed.get('mime_error', '?') + + ': ' + + parsed.get('mime_error_detail', '?') + ) + elif action in ('cleanup', 'hold', 'delay', 'discard', 'redirect'): + delivery['status'] = action + delivery['from'] = parsed.get('from') + delivery['to'] = parsed.get('to') + elif action == 'message_id': + delivery['message_id'] = parsed.get('message_id') + elif action == 'resent_message_id': + delivery['resent_message_id'] = parsed.get('resent_message_id') + elif action == 'milter_action': + return None + elif action == 'milter_cleanup': + return None + else: + return None + return delivery + + +def _extract_trivial_rewrite(msg_details, parsed, delivery, action): + return None + + +def _extract_smtp(msg_details, parsed, delivery, action): + if action == 'delivery_status': + delivery['recipient'] = parsed.get('to') + delivery['orig_recipient'] = parsed.get('orig_to') + delivery['relay'] = parsed.get('relay') + delivery['host'] = parsed.get('host') + delivery['destination'] = parsed.get('destination') + delivery['port'] = parsed.get('port') + delivery['delay'] = parsed.get('delay') + delivery['delays'] = parsed.get('delays') + delivery['dsn'] = parsed.get('dsn') + delivery['status'] = parsed.get('delivery_status') + delivery['status_text'] = parsed.get('delivery_status_text') + elif action == 'smtp_error': + delivery['host'] = parsed.get('host') + delivery['destination'] = parsed.get('ip') + delivery['status'] = 'smtp_error' + delivery['status_text'] = parsed.get('smtp_error') + else: + return None + return delivery + + +def _extract_bounce(msg_details, parsed, delivery, action): + if action == 'bounce_delay': + delivery['status'] = 'bounce_delay' + delivery['status_text'] = parsed.get('bounce_id') + elif action == 'bounce_final': + delivery['status'] = 'bounce_final' + delivery['status_text'] = parsed.get('bounce_id') + elif action == 'bounce_notify_sender': + delivery['status'] = 'bounce_notify_sender' + delivery['status_text'] = parsed.get('bounce_id') + elif action == 'bounce_failed': + delivery['status'] = 'bounce_failed' + else: + return None + + +def _extract_virtual(msg_details, parsed, delivery, action): + if action == 'delivery_status': + delivery['recipient'] = parsed.get('to') + delivery['orig_recipient'] = parsed.get('orig_to') + delivery['relay'] = parsed.get('relay') + delivery['host'] = parsed.get('host') + delivery['destination'] = parsed.get('destination') + delivery['port'] = parsed.get('port') + delivery['delay'] = parsed.get('delay') + delivery['delays'] = parsed.get('delays') + delivery['dsn'] = parsed.get('dsn') + delivery['status'] = parsed.get('delivery_status') + delivery['status_text'] = parsed.get('delivery_status_text') + else: + return None + + +def _extract_error(msg_details, parsed, delivery, action): + if action == 'delivery_status': + delivery['recipient'] = parsed.get('to') + delivery['orig_recipient'] = parsed.get('orig_to') + delivery['relay'] = parsed.get('relay') + delivery['host'] = parsed.get('host') + delivery['destination'] = parsed.get('destination') + delivery['port'] = parsed.get('port') + delivery['delay'] = parsed.get('delay') + delivery['delays'] = parsed.get('delays') + delivery['dsn'] = parsed.get('dsn') + delivery['status'] = parsed.get('delivery_status') + delivery['status_text'] = parsed.get('delivery_status_text') + else: + return None diff --git a/journal-postfix/files/srv/run.py b/journal-postfix/files/srv/run.py new file mode 100755 index 0000000..6af95d1 --- /dev/null +++ b/journal-postfix/files/srv/run.py @@ -0,0 +1,212 @@ +#!/usr/bin/env python3 + +""" +Main script to be run as a systemd unit or manually. +""" + +import argparse +import datetime +import os +import sys +from pprint import pprint +from typing import Iterable, List, Optional, Tuple, Union +import psycopg2 +import psycopg2.extras +from systemd import journal +import settings +from parser import init_parser, parse_entry, extract_delivery +from sources import ( + iter_journal_messages_since, + iter_journal_messages_follow, + iter_logfile_messages, +) +from storage import ( + init_db, + init_session, + get_latest_timestamp, + delete_old_deliveries, + store_delivery_items, +) + + +exit_code_without_restart = 97 + + +def run( + dsn: str, + verp_marker: Optional[str] = None, + filepath: Optional[str] = None, + year: Optional[int] = None, + debug: List[str] = [], +) -> None: + """ + Determine loop(s) and run them within a database context. + """ + init_parser(verp_marker=verp_marker) + with psycopg2.connect(dsn) as conn: + with conn.cursor( + cursor_factory=psycopg2.extras.RealDictCursor + ) as curs: + init_session(curs) + if filepath and year: + run_loop( + iter_logfile_messages(filepath, year), curs, debug=debug + ) + else: + begin_timestamp = get_latest_timestamp(curs) + run_loop( + iter_journal_messages_since(begin_timestamp), + curs, + debug=debug, + ) + begin_timestamp = get_latest_timestamp(curs) + run_loop( + iter_journal_messages_follow(begin_timestamp), + curs, + debug=debug, + ) + + +def run_loop( + iterable: Iterable[Tuple[bool, Optional[dict]]], + curs: psycopg2.extras.RealDictCursor, + debug: List[str] = [] +) -> None: + """ + Loop over log entries obtained from *iterable*. + + Parse the message, extract delivery information from it and store + that delivery information. + + For performance reasons delivery items are collected in a cache + before writing them (i.e., committing a database transaction). + """ + cache = [] + msg_count = settings.max_messages_per_commit + last_delete = None + for commit, msg_details in iterable: + parsed_entry = None + if msg_details: + parsed_entry = parse_entry(msg_details) + if 'all' in debug or ( + parsed_entry and parsed_entry.get('comp') in debug + ): + print('_' * 80) + print('MSG_DETAILS:', msg_details) + print('PARSED_ENTRY:', parsed_entry) + if parsed_entry: + errors, delivery = extract_delivery(msg_details, parsed_entry) + if not errors and delivery: + if 'all' in debug or parsed_entry.get('comp') in debug: + print('DELIVERY:') + pprint(delivery) + # it may happen that a delivery of type 'from' has + # a recipient; in this case add a second delivery + # of type 'to' to the cache, but only for deliveries + # with queue_id + if ( + delivery['type'] == 'from' + and 'recipient' in delivery + and delivery.get('queue_id') + ): + delivery2 = delivery.copy() + delivery2['type'] = 'to' + cache.append(delivery2) + del delivery['recipient'] + cache.append(delivery) + msg_count -= 1 + if msg_count == 0: + commit = True + elif errors: + msg = ( + f'Extracting delivery from parsed entry failed: ' + f'errors={errors}; msg_details={msg_details}; ' + f'parsed_entry={parsed_entry}' + ) + journal.send(msg, PRIORITY=journal.LOG_CRIT) + if 'all' in debug or parsed_entry.get('comp') in debug: + print('EXTRACTION ERRORS:', errors) + if commit: + if 'all' in debug: + print('.' * 40, 'committing') + # store cache, clear cache, reset message counter + store_delivery_items(curs, cache, debug=debug) + cache = [] + msg_count = settings.max_messages_per_commit + now = datetime.datetime.utcnow() + if last_delete is None or last_delete < now - settings.delete_interval: + delete_old_deliveries(curs) + last_delete = now + if 'all' in debug: + print('.' * 40, 'deleting old deliveries') + else: + store_delivery_items(curs, cache, debug=debug) + + +def main() -> None: + parser = argparse.ArgumentParser() + parser.add_argument( + '--debug', + help='Comma-separated list of components to be debugged; ' + 'valid component names are the Postfix components ' + 'plus "sql" plus "all".', + ) + parser.add_argument( + '--file', + help='File path of a Postfix logfile in syslog ' + 'format to be parsed instead of the journal', + ) + parser.add_argument( + '--year', + help='If --file is given, we need to know ' + 'the year of the first line in the logfile', + ) + args = parser.parse_args() + + config = settings.get_config() + if config: + # check if startup is enabled or fail + msg = None + if 'startup' not in config: + msg = 'Parameter "startup" is not configured.' + elif not config['startup']: + msg = 'Startup is not enabled in the config file.' + if msg: + journal.send(msg, PRIORITY=journal.LOG_CRIT) + sys.exit(exit_code_without_restart) + # check more params and call run + try: + verp_marker = config['postfix']['verp_marker'] + except Exception: + verp_marker = None + debug: List[str] = [] + if args.debug: + debug = args.debug.split(',') + filepath = None + year = None + if args.file: + filepath = args.file + if not args.year: + print( + 'If --file is given, we need to know the year' + ' of the first line in the logfile. Please use --year.' + ) + sys.exit(1) + else: + year = int(args.year) + dsn = init_db(config) + if dsn: + run( + dsn, + verp_marker=verp_marker, + filepath=filepath, + year=year, + debug=debug, + ) + else: + print('Config invalid, see journal.') + sys.exit(exit_code_without_restart) + + +if __name__ == '__main__': + main() diff --git a/journal-postfix/files/srv/settings.py b/journal-postfix/files/srv/settings.py new file mode 100755 index 0000000..4fde5df --- /dev/null +++ b/journal-postfix/files/srv/settings.py @@ -0,0 +1,125 @@ +#!/usr/bin/env python3 + +""" +Settings for journal-postfix. +""" + +import os +import datetime +from typing import Union, Optional +from systemd import journal +from yaml import load + + +main_config_file: str = '/etc/journal-postfix/main.yml' +""" +Filepath to the main config file. + +Can be overriden by environment variable JOURNAL_POSTFIX_MAIN_CONF. +""" + + +systemd_unitname: str = 'postfix@-.service' +""" +Name of the systemd unit running the postfix service. +""" + + +journal_poll_interval: Union[float, int] = 10.0 +""" +Poll timeout in seconds for fetching messages from the journal. + +Will be overriden if set in the main config. + +If the poll times out, it is checked whether the last commit +lies more than max_delay_before_commit seconds in the past; +if so, the current database transaction will be committed. +""" + + +max_delay_before_commit: datetime.timedelta = datetime.timedelta(seconds=30) +""" +How much time may pass before committing a database transaction? + +Will be overriden if set in the main config. + +(The actual maximal delay can be one journal_poll_interval in addition.) +""" + + +max_messages_per_commit: int = 1000 +""" +How many messages to cache at most before committing a database transaction? + +Will be overriden if set in the main config. +""" + + +delete_deliveries_after_days: int = 0 +""" +After how many days shall deliveries be deleted from the database? + +A value of 0 means that data are never deleted. +""" + + +def get_config() -> Optional[dict]: + """ + Load config from the main config and return it. + + The default main config file path (global main_config_file) + can be overriden with environment variable + JOURNAL_POSTFIX_MAIN_CONF. + """ + try: + filename = os.environ['JOURNAL_POSTFIX_MAIN_CONF'] + global main_config_file + main_config_file = filename + except Exception: + filename = main_config_file + try: + with open(filename, 'r') as config_file: + config_raw = config_file.read() + except Exception: + msg = f'ERROR: cannot read config file {filename}' + journal.send(msg, PRIORITY=journal.LOG_CRIT) + return None + try: + config = load(config_raw) + except Exception as err: + msg = f'ERROR: invalid yaml syntax in {filename}: {err}' + journal.send(msg, PRIORITY=journal.LOG_CRIT) + return None + # override some global variables + _global_value_from_config(config['postfix'], 'systemd_unitname', str) + _global_value_from_config(config, 'journal_poll_interval', float) + _global_value_from_config(config, 'max_delay_before_commit', 'seconds') + _global_value_from_config(config, 'max_messages_per_commit', int) + _global_value_from_config(config, 'delete_deliveries_after_days', int) + _global_value_from_config(config, 'delete_interval', 'seconds') + return config + + +def _global_value_from_config( + config, name: str, type_: Union[type, str] +) -> None: + """ + Set a global variable to the value obtained from *config*. + + Also cast to *type_*. + """ + try: + value = config.get(name) + if type_ == 'seconds': + value = datetime.timedelta(seconds=float(value)) + else: + value = type_(value) # type: ignore + globals()[name] = value + except Exception: + if value is not None: + msg = f'ERROR: configured value of {name} is invalid.' + journal.send(msg, PRIORITY=journal.LOG_ERR) + + +if __name__ == '__main__': + print(get_config()) diff --git a/journal-postfix/files/srv/setup.cfg b/journal-postfix/files/srv/setup.cfg new file mode 100644 index 0000000..ce35f2a --- /dev/null +++ b/journal-postfix/files/srv/setup.cfg @@ -0,0 +1,5 @@ +[pycodestyle] +max-line-length = 200 + +[mypy] +ignore_missing_imports = True diff --git a/journal-postfix/files/srv/sources.py b/journal-postfix/files/srv/sources.py new file mode 100755 index 0000000..37d2329 --- /dev/null +++ b/journal-postfix/files/srv/sources.py @@ -0,0 +1,178 @@ +#!/usr/bin/env python3 + +""" +Data sources. + +Note: python-systemd journal docs are at +https://www.freedesktop.org/software/systemd/python-systemd/journal.html +""" + +import datetime +import select +from typing import Iterable, Optional, Tuple, Union +from systemd import journal +import settings + + +def iter_journal_messages_since( + timestamp: Union[int, float] +) -> Iterable[Tuple[bool, dict]]: + """ + Yield False and message details from the journal since *timestamp*. + + This is the loading phase (loading messages that already existed + when we start). + + Argument *timestamp* is a UNIX timestamp. + + Only journal entries for systemd unit settings.systemd_unitname with + loglevel INFO and above are retrieved. + """ + timestamp = float(timestamp) + sdj = journal.Reader() + sdj.log_level(journal.LOG_INFO) + sdj.add_match(_SYSTEMD_UNIT=settings.systemd_unitname) + sdj.seek_realtime(timestamp) + for entry in sdj: + yield False, _get_msg_details(entry) + + +def iter_journal_messages_follow( + timestamp: Union[int, float] +) -> Iterable[Tuple[bool, Optional[dict]]]: + """ + Yield commit and message details from the journal through polling. + + This is the polling phase (after we have read pre-existing messages + in the loading phase). + + Argument *timestamp* is a UNIX timestamp. + + Only journal entries for systemd unit settings.systemd_unitname with + loglevel INFO and above are retrieved. + + *commit* (bool) tells whether it is time to store the delivery + information obtained from the messages yielded by us. + It is set to True if settings.max_delay_before_commit has elapsed. + After this delay delivery information will be written; to be exact: + the delay may increase by up to one settings.journal_poll_interval. + """ + sdj = journal.Reader() + sdj.log_level(journal.LOG_INFO) + sdj.add_match(_SYSTEMD_UNIT=settings.systemd_unitname) + sdj.seek_realtime(timestamp) + p = select.poll() + p.register(sdj, sdj.get_events()) + last_commit = datetime.datetime.utcnow() + interval_ms = settings.journal_poll_interval * 1000 + while True: + p.poll(interval_ms) + commit = False + now = datetime.datetime.utcnow() + if last_commit + settings.max_delay_before_commit < now: + commit = True + last_commit = now + if sdj.process() == journal.APPEND: + for entry in sdj: + yield commit, _get_msg_details(entry) + elif commit: + yield commit, None + + +def iter_logfile_messages( + filepath: str, + year: int, + commit_after_lines=settings.max_messages_per_commit, +) -> Iterable[Tuple[bool, dict]]: + """ + Yield messages and a commit flag from a logfile. + + Loop through all lines of the file with given *filepath* and + extract the time and log message. If the log message starts + with 'postfix/', then extract the syslog_identifier, pid and + message text. + + Since syslog lines do not contain the year, the *year* to which + the first log line belongs must be given. + + Return a commit flag and a dict with these keys: + 't': timestamp + 'message': message text + 'identifier': syslog identifier (e.g., 'postfix/smtpd') + 'pid': process id + + The commit flag will be set to True for every + (commit_after_lines)-th filtered message and serves + as a signal to the caller to commit this chunk of data + to the database. + """ + dt = None + with open(filepath, 'r') as fh: + cnt = 0 + while True: + line = fh.readline() + if not line: + break + + # get datetime + timestamp = line[:15] + dt_prev = dt + dt = _parse_logfile_timestamp(timestamp, year) + if dt is None: + continue # discard log message with invalid timestamp + + # if we transgress a year boundary, then increment the year + if dt_prev and dt + datetime.timedelta(days=1) < dt_prev: + year += 1 + dt = _parse_logfile_timestamp(timestamp, year) + + # filter postfix messages + msg = line[21:].strip() + if 'postfix/' in msg: + cnt += 1 + syslog_identifier, msg_ = msg.split('[', 1) + pid, msg__ = msg_.split(']', 1) + message = msg__[2:] + commit = cnt % commit_after_lines == 0 + yield commit, { + 't': dt, + 'message': message, + 'identifier': syslog_identifier, + 'pid': pid, + } + + +def _get_msg_details(journal_entry: dict) -> dict: + """ + Return information extracted from a journal entry object as a dict. + """ + return { + 't': journal_entry['__REALTIME_TIMESTAMP'], + 'message': journal_entry['MESSAGE'], + 'identifier': journal_entry.get('SYSLOG_IDENTIFIER'), + 'pid': journal_entry.get('SYSLOG_PID'), + } + + +def _parse_logfile_timestamp( + timestamp: Optional[str], + year: int +) -> Optional[datetime.datetime]: + """ + Parse a given syslog *timestamp* and return a datetime. + + Since the timestamp does not contain the year, it is an + extra argument. + + Note: Successful parsing og the month's name depends on + the locale under which this script runs. + """ + if timestamp is None: + return None + try: + timestamp = timestamp.replace(' ', ' ') + t1 = datetime.datetime.strptime(timestamp, '%b %d %H:%M:%S') + t2 = t1.replace(year=year) + return t2 + except Exception: + return None diff --git a/journal-postfix/files/srv/storage.py b/journal-postfix/files/srv/storage.py new file mode 100755 index 0000000..93b8346 --- /dev/null +++ b/journal-postfix/files/srv/storage.py @@ -0,0 +1,337 @@ +#!/usr/bin/env python3 + +""" +Storage to PostgreSQL. +""" + +import datetime +import json +import re +import time +from collections import defaultdict +from traceback import format_exc +from typing import Any, Dict, Iterable, List, Optional, Tuple, Union +import psycopg2 +import psycopg2.extras +from systemd import journal +import settings +from storage_setup import ( + get_create_table_stmts, + get_sql_prepared_statement, + get_sql_execute_prepared_statement, + table_fields, +) + + +def get_latest_timestamp(curs: psycopg2.extras.RealDictCursor) -> int: + """ + Fetch the latest timestamp from the database. + + Return the latest timestamp of a message transfer from the database. + If there are no records yet, return 0. + """ + last = 0 + curs.execute( + "SELECT greatest(max(t_i), max(t_f)) AS last FROM delivery_from" + ) + last1 = curs.fetchone()['last'] + if last1: + last = max( + last, (last1 - datetime.datetime(1970, 1, 1)).total_seconds() + ) + curs.execute( + "SELECT greatest(max(t_i), max(t_f)) AS last FROM delivery_to" + ) + last2 = curs.fetchone()['last'] + if last2: + last = max( + last, (last2 - datetime.datetime(1970, 1, 1)).total_seconds() + ) + return last + + +def delete_old_deliveries(curs: psycopg2.extras.RealDictCursor) -> None: + """ + Delete deliveries older than the configured number of days. + + See config param *delete_deliveries_after_days*. + """ + max_days = settings.delete_deliveries_after_days + if max_days: + now = datetime.datetime.utcnow() + dt = datetime.timedelta(days=max_days) + t0 = now - dt + curs.execute("DELETE FROM delivery_from WHERE t_i < %s", (t0,)) + curs.execute("DELETE FROM delivery_to WHERE t_i < %s", (t0,)) + curs.execute("DELETE FROM noqueue WHERE t < %s", (t0,)) + + +def store_delivery_items( + cursor, + cache: List[dict], + debug: List[str] = [] +) -> None: + """ + Store cached delivery items into the database. + + Find queue_ids in *cache* and group delivery items by + them, but separately for delivery types 'from' and 'to'. + In addition, collect delivery items with queue_id is None. + + After grouping we merge all items withing a group into a + single item. So we can combine several SQL queries into + a single one, which improves performance significantly. + + Then store the merged items and the deliveries with + queue_id is None. + """ + if 'all' in debug or 'sql' in debug: + print(f'Storing {len(cache)} messages.') + if not cache: + return + from_items, to_items, noqueue_items = _group_delivery_items(cache) + deliveries_from = _merge_delivery_items(from_items, item_type='from') + deliveries_to = _merge_delivery_items(to_items, item_type='to') + _store_deliveries(cursor, 'delivery_from', deliveries_from, debug=debug) + _store_deliveries(cursor, 'delivery_to', deliveries_to, debug=debug) + _store_deliveries(cursor, 'noqueue', noqueue_items, debug=debug) + + +FromItems = Dict[str, List[dict]] + + +ToItems = Dict[Tuple[str, Optional[str]], List[dict]] + + +NoqueueItems = Dict[int, dict] + + +def _group_delivery_items( + cache: List[dict] +) -> Tuple[FromItems, ToItems, NoqueueItems]: + """ + Group delivery items by type and queue_id. + + Return items of type 'from', of type 'to' and items without + queue_id. + """ + delivery_from_items: FromItems = defaultdict(list) + delivery_to_items: ToItems = defaultdict(list) + noqueue_items: NoqueueItems = {} + noqueue_i = 1 + for item in cache: + if item.get('queue_id'): + queue_id = item['queue_id'] + if item.get('type') == 'from': + delivery_from_items[queue_id].append(item) + else: + recipient = item.get('recipient') + delivery_to_items[(queue_id, recipient)].append(item) + else: + noqueue_items[noqueue_i] = item + noqueue_i += 1 + return delivery_from_items, delivery_to_items, noqueue_items + + +def _merge_delivery_items( + delivery_items: Union[FromItems, ToItems], + item_type: str = 'from', +) -> Dict[Union[str, Tuple[str, Optional[str]]], dict]: + """ + Compute deliveries by combining multiple delivery items. + + Take lists of delivery items for each queue_id (in case + of item_type=='from') or for (queue_id, recipient)-pairs + (in case of item_type='to'). + Each delivery item is a dict obtained from one log message. + The dicts are consecutively updated (merged), except for the + raw log messages (texts) which are collected into a list. + The fields of the resulting delivery are filtered according + to the target table. + Returned is a dict mapping queue_ids (in case + of item_type=='from') or (queue_id, recipient)-pairs + (in case of item_type='to') to deliveries. + """ + deliveries = {} + for group, items in delivery_items.items(): + delivery = {} + messages = [] + for item in items: + message = item.pop('message') + identifier = item.pop('identifier') + pid = item.pop('pid') + messages.append(f'{identifier}[{pid}]: {message}') + delivery.update(item) + delivery['messages'] = messages + deliveries[group] = delivery + return deliveries + + +def _store_deliveries( + cursor: psycopg2.extras.RealDictCursor, + table_name: str, + deliveries: Dict[Any, dict], + debug: List[str] = [], +) -> None: + """ + Store grouped and merged delivery items. + """ + if not deliveries: + return + n = len(deliveries.values()) + t0 = time.time() + cursor.execute('BEGIN') + _store_deliveries_batch(cursor, table_name, deliveries.values()) + cursor.execute('COMMIT') + t1 = time.time() + if 'all' in debug or 'sql' in debug: + milliseconds = (t1 - t0) * 1000 + print( + '*' * 10, + f'SQL transaction time {table_name}: ' + f'{milliseconds:.2f} ms ({n} deliveries)', + ) + + +def _store_deliveries_batch( + cursor: psycopg2.extras.RealDictCursor, + table_name: str, + deliveries: Iterable[dict] +) -> None: + """ + Store *deliveries* (i.e., grouped and merged delivery items). + + We use a prepared statement and execute_batch() from + psycopg2.extras to improve performance. + """ + rows = [] + for delivery in deliveries: + # get values for all fields of the table + field_values: List[Any] = [] + t = delivery.get('t') + delivery['t_i'] = t + delivery['t_f'] = t + for field in table_fields[table_name]: + if field in delivery: + if field == 'messages': + field_values.append(json.dumps(delivery[field])) + else: + field_values.append(delivery[field]) + else: + field_values.append(None) + rows.append(field_values) + sql = get_sql_execute_prepared_statement(table_name) + try: + psycopg2.extras.execute_batch(cursor, sql, rows) + except Exception as err: + msg = f'SQL statement failed: "{sql}" -- the values were: {rows}' + journal.send(msg, PRIORITY=journal.LOG_ERR) + + +def init_db(config: dict) -> Optional[str]: + """ + Initialize database; if ok return DSN, else None. + + Try to get parameters for database access, + check existence of tables and possibly create them. + """ + dsn = _get_dsn(config) + if dsn: + ok = _create_tables(dsn) + if not ok: + return None + return dsn + + +def _get_dsn(config: dict) -> Optional[str]: + """ + Return the DSN (data source name) from the *config*. + """ + try: + postgresql_config = config['postgresql'] + hostname = postgresql_config['hostname'] + port = postgresql_config['port'] + database = postgresql_config['database'] + username = postgresql_config['username'] + password = postgresql_config['password'] + except Exception: + msg = f"""ERROR: invalid config in {settings.main_config_file} +The config file must contain a section like this: + +postgresql: + hostname: + port: + database: + username: + password: +""" + journal.send(msg, PRIORITY=journal.LOG_CRIT) + return None + dsn = f'host={hostname} port={port} dbname={database} '\ + f'user={username} password={password}' + return dsn + + +def _create_tables(dsn: str) -> bool: + """ + Check existence of tables and possibly create them, returning success. + """ + try: + with psycopg2.connect(dsn) as conn: + with conn.cursor() as curs: + for table_name, sql_stmts in get_create_table_stmts().items(): + ok = _create_table(curs, table_name, sql_stmts) + if not ok: + return False + except Exception: + journal.send( + f'ERROR: cannot connect to database, check params' + f' in {settings.main_config_file}', + PRIORITY=journal.LOG_CRIT, + ) + return False + return True + + +def _create_table( + cursor: psycopg2.extras.RealDictCursor, + table_name: str, + sql_stmts: List[str] +) -> bool: + """ + Try to create a table if it does not exist and return whether it exists. + + If creation failed, emit an error to the journal. + """ + cursor.execute("SELECT EXISTS(SELECT * FROM " + "information_schema.tables WHERE table_name=%s)", + (table_name,)) + table_exists = cursor.fetchone()[0] + if not table_exists: + for sql_stmt in sql_stmts: + try: + cursor.execute(sql_stmt) + except Exception: + journal.send( + 'ERROR: database user needs privilege to create tables.\n' + 'Alternatively, you can create the table manually like' + ' this:\n\n' + + '\n'.join([sql + ';' for sql in sql_stmts]), + PRIORITY=journal.LOG_CRIT, + ) + return False + return True + + +def init_session(cursor: psycopg2.extras.RealDictCursor) -> None: + """ + Init a database session. + + Define prepared statements. + """ + stmt = get_sql_prepared_statement('delivery_from') + cursor.execute(stmt) + stmt = get_sql_prepared_statement('delivery_to') + cursor.execute(stmt) + stmt = get_sql_prepared_statement('noqueue') + cursor.execute(stmt) diff --git a/journal-postfix/files/srv/storage_setup.py b/journal-postfix/files/srv/storage_setup.py new file mode 100755 index 0000000..e778712 --- /dev/null +++ b/journal-postfix/files/srv/storage_setup.py @@ -0,0 +1,210 @@ +#!/usr/bin/env python3 + +""" +Database table definitions and prepared statements. + +Note: (short) postfix queue IDs are not unique: +http://postfix.1071664.n5.nabble.com/Queue-ID-gets-reused-Not-unique-td25387.html +""" + +from typing import Dict, List + + +_table_def_delivery_from = [ + [ + dict(name='t_i', dtype='TIMESTAMP'), + dict(name='t_f', dtype='TIMESTAMP'), + dict(name='queue_id', dtype='VARCHAR(16)', null=False, extra='UNIQUE'), + dict(name='host', dtype='VARCHAR(200)'), + dict(name='ip', dtype='VARCHAR(50)'), + dict(name='sasl_username', dtype='VARCHAR(300)'), + dict(name='orig_queue_id', dtype='VARCHAR(16)'), + dict(name='status', dtype='VARCHAR(10)'), + dict(name='accepted', dtype='BOOL', null=False, default='TRUE'), + dict(name='done', dtype='BOOL', null=False, default='FALSE'), + dict(name='sender', dtype='VARCHAR(300)'), + dict(name='message_id', dtype='VARCHAR(1000)'), + dict(name='resent_message_id', dtype='VARCHAR(1000)'), + dict(name='subject', dtype='VARCHAR(1000)'), + dict(name='phase', dtype='VARCHAR(15)'), + dict(name='error', dtype='VARCHAR(1000)'), + dict(name='size', dtype='INT'), + dict(name='nrcpt', dtype='INT'), + dict(name='verp_id', dtype='INT'), + dict(name='messages', dtype='JSONB', null=False, default="'{}'::JSONB"), + ], + "CREATE INDEX delivery_from__queue_id ON delivery_from (queue_id)", + "CREATE INDEX delivery_from__t_i ON delivery_from (t_i)", + "CREATE INDEX delivery_from__t_f ON delivery_from (t_f)", + "CREATE INDEX delivery_from__sender ON delivery_from (sender)", + "CREATE INDEX delivery_from__message_id ON delivery_from (message_id)", +] + + +_table_def_delivery_to = [ + [ + dict(name='t_i', dtype='TIMESTAMP'), + dict(name='t_f', dtype='TIMESTAMP'), + dict(name='queue_id', dtype='VARCHAR(16)', null=False), + dict(name='recipient', dtype='VARCHAR(300)'), + dict(name='orig_recipient', dtype='VARCHAR(300)'), + dict(name='host', dtype='VARCHAR(200)'), + dict(name='ip', dtype='VARCHAR(50)'), + dict(name='port', dtype='VARCHAR(10)'), + dict(name='relay', dtype='VARCHAR(10)'), + dict(name='delay', dtype='VARCHAR(200)'), + dict(name='delays', dtype='VARCHAR(200)'), + dict(name='dsn', dtype='VARCHAR(10)'), + dict(name='status', dtype='VARCHAR(10)'), + dict(name='status_text', dtype='VARCHAR(1000)'), + dict(name='messages', dtype='JSONB', null=False, default="'{}'::JSONB"), + ], + "ALTER TABLE delivery_to ADD CONSTRAINT" + " delivery_to__queue_id_recipient UNIQUE(queue_id, recipient)", + "CREATE INDEX delivery_to__queue_id ON delivery_to (queue_id)", + "CREATE INDEX delivery_to__recipient ON delivery_to (recipient)", + "CREATE INDEX delivery_to__t_i ON delivery_to (t_i)", + "CREATE INDEX delivery_to__t_f ON delivery_to (t_f)", +] + + +_table_def_noqueue = [ + [ + dict(name='t', dtype='TIMESTAMP'), + dict(name='host', dtype='VARCHAR(200)'), + dict(name='ip', dtype='VARCHAR(50)'), + dict(name='sender', dtype='VARCHAR(300)'), + dict(name='recipient', dtype='VARCHAR(300)'), + dict(name='sasl_username', dtype='VARCHAR(300)'), + dict(name='status', dtype='VARCHAR(10)'), + dict(name='phase', dtype='VARCHAR(15)'), + dict(name='error', dtype='VARCHAR(1000)'), + dict(name='message', dtype='TEXT'), + ], + "CREATE INDEX noqueue__t ON noqueue (t)", + "CREATE INDEX noqueue__sender ON noqueue (sender)", + "CREATE INDEX noqueue__recipient ON noqueue (recipient)", +] + + +_tables: Dict[str, list] = { + 'delivery_from': _table_def_delivery_from, + 'delivery_to': _table_def_delivery_to, + 'noqueue': _table_def_noqueue, +} + + +_prepared_statements = { + 'delivery_from': + "PREPARE delivery_from_insert ({}) AS " + "INSERT INTO delivery_from ({}) VALUES ({}) " + "ON CONFLICT (queue_id) DO UPDATE SET {}", + 'delivery_to': + "PREPARE delivery_to_insert ({}) AS " + "INSERT INTO delivery_to ({}) VALUES ({}) " + "ON CONFLICT (queue_id, recipient) DO UPDATE SET {}", + 'noqueue': + "PREPARE noqueue_insert ({}) AS " + "INSERT INTO noqueue ({}) VALUES ({}){}", +} + + +table_fields: Dict[str, List[str]] = {} +""" +Lists of field names for tables, populated by get_create_table_stmts(). +""" + + +def get_sql_prepared_statement(table_name: str) -> str: + """ + Return SQL defining a prepared statement for inserting into a table. + + Table 'noqueue' is handled differently, because it does not have + an UPDATE clause. + """ + col_names = [] + col_types = [] + col_args = [] + col_upds = [] + col_i = 0 + for field in _tables[table_name][0]: + # column type + col_type = field['dtype'] + if field['dtype'].lower().startswith('varchar'): + col_type = 'TEXT' + col_types.append(col_type) + # column args + col_i += 1 + col_arg = '$' + str(col_i) + # column name + col_name = field['name'] + col_names.append(col_name) + if 'default' in field: + default = field['default'] + col_args.append(f'COALESCE({col_arg},{default})') + else: + col_args.append(col_arg) + # column update + col_upd = f'{col_name}=COALESCE({col_arg},{table_name}.{col_name})' + if col_name != 't_i': + if col_name == 'messages': + col_upd = f'{col_name}={table_name}.{col_name}||{col_arg}' + if table_name != 'noqueue': + col_upds.append(col_upd) + stmt = _prepared_statements[table_name].format( + ','.join(col_types), + ','.join(col_names), + ','.join(col_args), + ','.join(col_upds), + ) + return stmt + + +def get_sql_execute_prepared_statement(table_name: str) -> str: + """ + Return SQL for executing the given table's prepared statement. + + The result is based on global variable _tables. + """ + fields = _tables[table_name][0] + return "EXECUTE {}_insert ({})"\ + .format(table_name, ','.join(['%s' for i in range(len(fields))])) + + +def get_create_table_stmts() -> Dict[str, List[str]]: + """ + Return a dict mapping table names to SQL statements creating the tables. + + Also populate global variable table_fields. + """ + res = {} + for table_name, table_def in _tables.items(): + stmts = table_def.copy() + stmts[0] = _get_sql_create_stmt(table_name, table_def[0]) + res[table_name] = stmts + field_names = [x['name'] for x in table_def[0]] + global table_fields + table_fields[table_name] = field_names + return res + + +def _get_sql_create_stmt(table_name: str, fields: List[dict]): + """ + Return the 'CREATE TABLE' SQL statement for a table. + + Factor in NULL, DEFAULT and extra DDL text. + """ + sql = f"CREATE TABLE {table_name} (\n id BIGSERIAL," + col_defs = [] + for field in fields: + col_def = f" {field['name']} {field['dtype']}" + if 'null' in field and field['null'] is False: + col_def += " NOT NULL" + if 'default' in field: + col_def += f" DEFAULT {field['default']}" + if 'extra' in field: + col_def += f" {field['extra']}" + col_defs.append(col_def) + sql += '\n' + ',\n'.join(col_defs) + sql += '\n)' + return sql diff --git a/journal-postfix/tasks/main.yml b/journal-postfix/tasks/main.yml new file mode 100644 index 0000000..aa8d5fd --- /dev/null +++ b/journal-postfix/tasks/main.yml @@ -0,0 +1,90 @@ +- name: user journal-postfix + user: + name: journal-postfix + group: systemd-journal + state: present + system: yes + uid: 420 + create_home: no + home: /srv/journal-postfix + password: '!' + password_lock: yes + comment: created by ansible role journal-postfix + +- name: directories /srv/journal-postfix, /etc/journal-postfix + file: + path: "{{ item }}" + state: directory + owner: journal-postfix + group: systemd-journal + mode: 0755 + loop: + - /srv/journal-postfix + - /etc/journal-postfix + +- name: install dependencies + apt: + name: python3-psycopg2,python3-systemd,python3-yaml + state: present + update_cache: yes + install_recommends: no + +- name: files in /srv/journal-postfix + copy: + src: "srv/{{ item }}" + dest: "/srv/journal-postfix/{{ item }}" + owner: journal-postfix + group: systemd-journal + mode: 0644 + force: yes + loop: + - run.py + - settings.py + - sources.py + - parser.py + - storage.py + - storage_setup.py + - README.md + - setup.cfg + +- name: make some files executable + file: + path: "{{ item }}" + mode: 0755 + loop: + - /srv/journal-postfix/run.py + - /srv/journal-postfix/settings.py + +- name: determine whether to startup + set_fact: + startup: "{{ mailserver.postgresql.host is defined and mailserver.postgresql.port is defined and mailserver.postgresql.dbname is defined and mailserver.postgresql.username is defined and mailserver.postgresql.password is defined }}" + +- name: file /etc/journal-postfix/main.yml + template: + src: main.yml + dest: /etc/journal-postfix/main.yml + owner: journal-postfix + group: systemd-journal + mode: 0600 + force: no + +- name: file journal-postfix.service + copy: + src: journal-postfix.service + dest: /etc/systemd/system/journal-postfix.service + owner: root + group: root + mode: 0644 + force: yes + +- name: enable systemd unit journal-postfix.service + systemd: + enabled: yes + daemon_reload: yes + name: journal-postfix.service + +- name: restart systemd unit journal-postfix.service + systemd: + state: restarted + name: journal-postfix.service + when: startup diff --git a/journal-postfix/templates/main.yml b/journal-postfix/templates/main.yml new file mode 100644 index 0000000..49dc266 --- /dev/null +++ b/journal-postfix/templates/main.yml @@ -0,0 +1,45 @@ +# Configuration for journal-postfix, see /srv/journal-postfix + +# To enable startup of systemd unit journal-postfix set this to yes: +startup: {{ 'yes' if startup else 'no' }} + +# PostgreSQL database connection parameters +postgresql: + hostname: {{ mailserver.postgresql.host | default('127.0.0.1') }} + port: {{ mailserver.postgresql.port | default('5432') }} + database: {{ mailserver.postgresql.dbname | default('mailserver') }} + username: {{ mailserver.postgresql.username | default('mailserver') }} + password: {{ mailserver.postgresql.password | default('*************') }} + +# Postfix parameters +postfix: + # Systemd unit name of the Postfix unit. Only one unit is supported. + systemd_unitname: postfix@-.service + + # If you have configured Postfix to rewrite envelope sender + # addresses of outgoing mails so that it includes a VERP + # (Variable Envelope Return Path) of the form + # {local_part}+{verp_marker}-{id}@{domain}, where id is an + # integer, then set the verp_marker here: + verp_marker: {{ mailserver.postfix.verp_marker | default('') }} + +# Poll timeout in seconds for fetching messages from the journal. +journal_poll_interval: 10.0 + +# How much time may pass before committing a database transaction? +# (The actual maximal delay can be one journal_poll_interval in addition.) +max_delay_before_commit: 60.0 + +# How many messages to cache at most before committing a database transaction? +max_messages_per_commit: 10000 + +# Delete delivery records older than this number of days. +# A value of 0 means that data are never deleted. +# Note: Deliveries may have a substantial time intervals over which they +# are active; here the age of a delivery is determined by its start time. +delete_deliveries_after_days: 30 + +# The time interval in seconds after which a deletion of old +# delivery records is triggered. (Will not be smaller than +# max_delay_before_commit + journal_poll_interval.) +delete_interval: 3600