# journal-postfix - A log parser for Postfix Experiences from applying Python to the domain of bad old email. ## Email ✉ * old technology (starting in the 70ies) * [store-and-forward](https://en.wikipedia.org/wiki/Store_and_forward): sent != delivered to recipient * non-delivery reasons: * recipient over quota * inexistent destination * malware * spam * server problem * ... * permanent / non-permanent failure ([DSN ~ 5.X.Y / 4.X.Y](https://www.iana.org/assignments/smtp-enhanced-status-codes/smtp-enhanced-status-codes.xhtml)) * non-delivery modes * immediate reject on SMTP level * delayed [bounce messages](https://en.wikipedia.org/wiki/Bounce_message) by [reporting MTA](https://upload.wikimedia.org/wikipedia/commons/a/a2/Bounce-DSN-MTA-names.png) - queueing (e.g., ~5d) before delivery failure notification * discarding * read receipts * [Wikipedia: email tracking](https://en.wikipedia.org/wiki/Email_tracking) ## [SMTP](https://en.wikipedia.org/wiki/SMTP) [SMTP session example](https://en.wikipedia.org/wiki/Simple_Mail_Transfer_Protocol#SMTP_transport_example): envelope sender, envelope recipient may differ from From:, To: Lists of error codes: * [SMTP and ESMTP](https://www.inmotionhosting.com/support/email/email-troubleshooting/smtp-and-esmtp-error-code-list) * [SMTP](https://serversmtp.com/smtp-error/) * [SMTP](https://info.webtoolhub.com/kb-a15-smtp-status-codes-smtp-error-codes-smtp-reply-codes.aspx) Example of an error within a bounced email (Subject: Mail delivery failed: returning message to sender) SMTP error from remote server for TEXT command, host: smtpin.rzone.de (81.169.145.97) reason: 550 5.7.1 Refused by local policy. No SPAM please! * email users are continually asking for the fate of their emails (or those of their correspondents which should have arrived) ## [Postfix](http://www.postfix.org) * popular [MTA](https://en.wikipedia.org/wiki/Message_transfer_agent) * written in C * logging to files / journald * example log messages for a (non-)delivery + stats ``` Nov 27 16:19:22 mail postfix/smtpd[18995]: connect from unknown[80.82.79.244] Nov 27 16:19:22 mail postfix/smtpd[18995]: NOQUEUE: reject: RCPT from unknown[80.82.79.244]: 454 4.7.1 : Relay access denied; from= to= proto=ESMTP helo= Nov 27 16:19:22 mail postfix/smtpd[18995]: disconnect from unknown[80.82.79.244] ehlo=1 mail=1 rcpt=0/1 rset=1 quit=1 commands=4/5 Nov 27 16:22:43 mail postfix/anvil[18997]: statistics: max connection rate 1/60s for (smtp:80.82.79.244) at Nov 27 16:19:22 Nov 27 16:22:43 mail postfix/anvil[18997]: statistics: max connection count 1 for (smtp:80.82.79.244) at Nov 27 16:19:22 Nov 27 16:22:43 mail postfix/anvil[18997]: statistics: max cache size 1 at Nov 27 16:19:22 Nov 27 16:22:48 mail postfix/smtpd[18999]: connect from mail.cosmopool.net[2a01:4f8:160:20c1::10:107] Nov 27 16:22:49 mail postfix/smtpd[18999]: 47NQzY13DbzNWNQG: client=mail.cosmopool.net[2a01:4f8:160:20c1::10:107] Nov 27 16:22:49 mail postfix/cleanup[19003]: 47NQzY13DbzNWNQG: info: header Subject: Re: test from mail.cosmopool.net[2a01:4f8:160:20c1::10:107]; from= to= proto=ESMTP helo= Nov 27 16:22:49 mail postfix/cleanup[19003]: 47NQzY13DbzNWNQG: message-id= Nov 27 16:22:49 mail postfix/qmgr[29349]: 47NQzY13DbzNWNQG: from=, size=1365, nrcpt=2 (queue active) Nov 27 16:22:49 mail postfix/smtpd[18999]: disconnect from mail.cosmopool.net[2a01:4f8:160:20c1::10:107] ehlo=1 mail=1 rcpt=2 data=1 quit=1 commands=6 Nov 27 16:22:50 mail postfix/lmtp[19005]: 47NQzY13DbzNWNQG: to=, relay=mail.multiname.org[private/dovecot-lmtp], delay=1.2, delays=0.56/0.01/0.01/0.63, dsn=2.0.0, status=sent (250 2.0.0 nV9iJ9mi3l0+SgAAZU03Dg Saved) Nov 27 16:22:50 mail postfix/lmtp[19005]: 47NQzY13DbzNWNQG: to=, relay=mail.multiname.org[private/dovecot-lmtp], delay=1.2, delays=0.56/0.01/0.01/0.63, dsn=2.0.0, status=sent (250 2.0.0 nV9iJ9mi3l0+SgAAZU03Dg:2 Saved) Nov 27 16:22:50 mail postfix/qmgr[29349]: 47NQzY13DbzNWNQG: removed ``` * [involved postfix components](http://www.postfix.org/OVERVIEW.html) * smtpd (port 25: smtp, port 587: submission) * cleanup * smtp/lmtp * missing log parser ## Idea * follow log stream and write summarized delivery information to a database * goal: spot delivery problems, collect delivery stats * a GUI could then display the current delivery status to users ## Why Python? * simple and fun language, clear and concise * well suited for text processing * libs available for systemd, PostgreSQL * huge standard library (used here: datetime, re, yaml, argparse, select) * speed sufficient? ## Development iterations * hmm, easy task, might take a few days * PoC: reading and polling from journal works as expected * used postfix logfiles in syslog format and wrote regexps matching them iteratively * separated parsing messages from extracting delivery information * created a delivery table * hmm, this is very slow, takes hours to process log messages from a few days (from a server with not much traffic) * introduced polling timeout and SQL transactions handling several messages at once * ... much faster * looks fine, but wait... did I catch all syntax variants of Postfix log messages? * looked into Postfix sources and almost got lost * weeks of hard work identifying relevant log output directives * completely rewrote parser to deal with the rich log msg syntax, e.g.:
`def _strip_pattern(msg, res, pattern_name, pos='l', target_names=None)` * oh, there are even more Postfix components... limit to certain Postfix configurations, in particular virtual mailboxes and not local ones * mails may have multiple recipients... split delivery table into delivery_from and delivery_to * decide which delivery information is relevant * cleanup and polish (config mgmt, logging) * write ansible role ## Structure ```blockdiag blockdiag { default_fontsize = 20; node_height = 80; journal_since -> run_loop; journal_follow -> run_loop; logfile -> run_loop; run_loop -> parse -> extract_delivery -> store; store -> delivery_from; store -> delivery_to; store -> noqueue; group { label="input iterables"; journal_since; journal_follow; logfile; }; group { label="output tables"; delivery_from; delivery_to; noqueue; }; } ``` ## Iterables ```python def iter_journal_messages_since(timestamp: Union[int, float]): """ Yield False and message details from the journal since *timestamp*. This is the loading phase (loading messages that already existed when we start). Argument *timestamp* is a UNIX timestamp. Only journal entries for systemd unit UNITNAME with loglevel INFO and above are retrieved. """ ... def iter_journal_messages_follow(timestamp: Union[int, float]): """ Yield commit and message details from the journal through polling. This is the polling phase (after we have read pre-existing messages in the loading phase). Argument *timestamp* is a UNIX timestamp. Only journal entries for systemd unit UNITNAME with loglevel INFO and above are retrieved. *commit* (bool) tells whether it is time to store the delivery information obtained from the messages yielded by us. It is set to True if max_delay_before_commit has elapsed. After this delay delivery information will be written; to be exact: the delay may increase by up to one journal_poll_interval. """ ... def iter_logfile_messages(filepath: str, year: int, commit_after_lines=max_messages_per_commit): """ Yield messages and a commit flag from a logfile. Loop through all lines of the file with given *filepath* and extract the time and log message. If the log message starts with 'postfix/', then extract the syslog_identifier, pid and message text. Since syslog lines do not contain the year, the *year* to which the first log line belongs must be given. Return a commit flag and a dict with these keys: 't': timestamp 'message': message text 'identifier': syslog identifier (e.g., 'postfix/smtpd') 'pid': process id The commit flag will be set to True for every (commit_after_lines)-th filtered message and serves as a signal to the caller to commit this chunk of data to the database. """ ... ``` ## Running loops ```python def run(dsn, verp_marker=False, filepath=None, year=None, debug=[]): """ Determine loop(s) and run them within a database context. """ init(verp_marker=verp_marker) with psycopg2.connect(dsn) as conn: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as curs: if filepath: run_loop(iter_logfile_messages(filepath, year), curs, debug=debug) else: begin_timestamp = get_latest_timestamp(curs) run_loop(iter_journal_messages_since(begin_timestamp), curs, debug=debug) begin_timestamp = get_latest_timestamp(curs) run_loop(iter_journal_messages_follow(begin_timestamp), curs, debug=debug) def run_loop(iterable, curs, debug=[]): """ Loop over log messages obtained from *iterable*. Parse the message, extract delivery information from it and store that delivery information. For performance reasons delivery items are collected in a cache before writing them (i.e., committing a database transaction). """ cache = [] msg_count = max_messages_per_commit for commit, msg_details in iterable: ... ``` ## Parsing Parse what you can. (But only msg_info in Postfix, and only relevant components.) ```python def parse(msg_details, debug=False): """ Parse a log message returning a dict. *msg_details* is assumed to be a dict with these keys: * 'identifier' (syslog identifier), * 'pid' (process id), * 'message' (message text) The syslog identifier and process id are copied to the resulting dict. """ ... def _parse_branch(comp, msg, res): """ Parse a log message string *msg*, adding results to dict *res*. Depending on the component *comp* we branch to functions named _parse_{comp}. Add parsing results to dict *res*. Always add key 'action'. Try to parse every syntactical element. Note: We parse what we can. Assessment of parsing results relevant for delivery is done in :func:`extract_delivery`. """ ... ``` ## Extracting Extract what is relevant. ```python def extract_delivery(msg_details, parsed): """ Compute delivery information from parsing results. Basically this means that we map the parsed fields to a type ('from' or 'to') and to the database fields for table 'delivery_from' or 'delivery_to'. We branch to functions _extract_{comp} where comp is the name of a Postfix component. Return a list of error strings and a dict with the extracted information. Keys with None values are removed from the dict. """ ... ``` ## Regular expressions * see sources * [Stackoverflow: How to validate an email address](https://stackoverflow.com/questions/201323/how-to-validate-an-email-address-using-a-regular-expression) [FSM](https://i.stack.imgur.com/YI6KR.png) ### BTW: [email.utils.parseaddr](https://docs.python.org/3/library/email.utils.html#email.utils.parseaddr) ```python >>> from email.utils import parseaddr >>> parseaddr('Ghost <"hello@nowhere"@pyug.at>') ('Ghost', '"hello@nowhere"@pyug.at') >>> print(parseaddr('"more\"fun\"\\"hello\\"@nowhere"@pyug.at')[1]) "more"fun"\"hello\"@nowhere"@pyug.at >>> print(parseaddr('""@pyug.at')[1]) ""@pyug.at ``` ## Storing ```python def store_deliveries(cursor, cache, debug=[]): """ Store cached delivery information into the database. Find queue_ids in *cache* and group delivery items by them, but separately for delivery types 'from' and 'to'. In addition, collect delivery items with queue_id is None. After grouping we merge all items withing a group into a single item. So we can combine several SQL queries into a single one, which improves performance significantly. Then store the merged items and the deliveries with queue_id is None. """ ... ``` Database schema: 3 tables: * delivery_from: smtpd, milters, qmgr * delivery_to: smtp, virtual, bounce, error * noqueue: rejected by smtpd before even getting a queue_id Table noqueue contains all the spam; for this we only use SQL INSERT, no ON CONFLICT ... UPDATE; it's faster. ## Demo ... ## Questions / Suggestions * Could you enhance speed by using prepared statements? * Will old data be deleted (as required by GDPR)? Both were implemented after the talk.