#!/usr/bin/env python3 """ Storage to PostgreSQL. """ import datetime import json import re import time from collections import defaultdict from traceback import format_exc from typing import Any, Dict, Iterable, List, Optional, Tuple, Union import psycopg2 import psycopg2.extras from systemd import journal import settings from storage_setup import ( get_create_table_stmts, get_sql_prepared_statement, get_sql_execute_prepared_statement, table_fields, ) def get_latest_timestamp(curs: psycopg2.extras.RealDictCursor) -> int: """ Fetch the latest timestamp from the database. Return the latest timestamp of a message transfer from the database. If there are no records yet, return 0. """ last = 0 curs.execute( "SELECT greatest(max(t_i), max(t_f)) AS last FROM delivery_from" ) last1 = curs.fetchone()['last'] if last1: last = max( last, (last1 - datetime.datetime(1970, 1, 1)).total_seconds() ) curs.execute( "SELECT greatest(max(t_i), max(t_f)) AS last FROM delivery_to" ) last2 = curs.fetchone()['last'] if last2: last = max( last, (last2 - datetime.datetime(1970, 1, 1)).total_seconds() ) return last def delete_old_deliveries(curs: psycopg2.extras.RealDictCursor) -> None: """ Delete deliveries older than the configured number of days. See config param *delete_deliveries_after_days*. """ max_days = settings.delete_deliveries_after_days if max_days: now = datetime.datetime.utcnow() dt = datetime.timedelta(days=max_days) t0 = now - dt curs.execute("DELETE FROM delivery_from WHERE t_i < %s", (t0,)) curs.execute("DELETE FROM delivery_to WHERE t_i < %s", (t0,)) curs.execute("DELETE FROM noqueue WHERE t < %s", (t0,)) def store_delivery_items( cursor, cache: List[dict], debug: List[str] = [] ) -> None: """ Store cached delivery items into the database. Find queue_ids in *cache* and group delivery items by them, but separately for delivery types 'from' and 'to'. In addition, collect delivery items with queue_id is None. After grouping we merge all items withing a group into a single item. So we can combine several SQL queries into a single one, which improves performance significantly. Then store the merged items and the deliveries with queue_id is None. """ if 'all' in debug or 'sql' in debug: print(f'Storing {len(cache)} messages.') if not cache: return from_items, to_items, noqueue_items = _group_delivery_items(cache) deliveries_from = _merge_delivery_items(from_items, item_type='from') deliveries_to = _merge_delivery_items(to_items, item_type='to') _store_deliveries(cursor, 'delivery_from', deliveries_from, debug=debug) _store_deliveries(cursor, 'delivery_to', deliveries_to, debug=debug) _store_deliveries(cursor, 'noqueue', noqueue_items, debug=debug) FromItems = Dict[str, List[dict]] ToItems = Dict[Tuple[str, Optional[str]], List[dict]] NoqueueItems = Dict[int, dict] def _group_delivery_items( cache: List[dict] ) -> Tuple[FromItems, ToItems, NoqueueItems]: """ Group delivery items by type and queue_id. Return items of type 'from', of type 'to' and items without queue_id. """ delivery_from_items: FromItems = defaultdict(list) delivery_to_items: ToItems = defaultdict(list) noqueue_items: NoqueueItems = {} noqueue_i = 1 for item in cache: if item.get('queue_id'): queue_id = item['queue_id'] if item.get('type') == 'from': delivery_from_items[queue_id].append(item) else: recipient = item.get('recipient') delivery_to_items[(queue_id, recipient)].append(item) else: noqueue_items[noqueue_i] = item noqueue_i += 1 return delivery_from_items, delivery_to_items, noqueue_items def _merge_delivery_items( delivery_items: Union[FromItems, ToItems], item_type: str = 'from', ) -> Dict[Union[str, Tuple[str, Optional[str]]], dict]: """ Compute deliveries by combining multiple delivery items. Take lists of delivery items for each queue_id (in case of item_type=='from') or for (queue_id, recipient)-pairs (in case of item_type='to'). Each delivery item is a dict obtained from one log message. The dicts are consecutively updated (merged), except for the raw log messages (texts) which are collected into a list. The fields of the resulting delivery are filtered according to the target table. Returned is a dict mapping queue_ids (in case of item_type=='from') or (queue_id, recipient)-pairs (in case of item_type='to') to deliveries. """ deliveries = {} for group, items in delivery_items.items(): delivery = {} messages = [] for item in items: message = item.pop('message') identifier = item.pop('identifier') pid = item.pop('pid') messages.append(f'{identifier}[{pid}]: {message}') delivery.update(item) delivery['messages'] = messages deliveries[group] = delivery return deliveries def _store_deliveries( cursor: psycopg2.extras.RealDictCursor, table_name: str, deliveries: Dict[Any, dict], debug: List[str] = [], ) -> None: """ Store grouped and merged delivery items. """ if not deliveries: return n = len(deliveries.values()) t0 = time.time() cursor.execute('BEGIN') _store_deliveries_batch(cursor, table_name, deliveries.values()) cursor.execute('COMMIT') t1 = time.time() if 'all' in debug or 'sql' in debug: milliseconds = (t1 - t0) * 1000 print( '*' * 10, f'SQL transaction time {table_name}: ' f'{milliseconds:.2f} ms ({n} deliveries)', ) def _store_deliveries_batch( cursor: psycopg2.extras.RealDictCursor, table_name: str, deliveries: Iterable[dict] ) -> None: """ Store *deliveries* (i.e., grouped and merged delivery items). We use a prepared statement and execute_batch() from psycopg2.extras to improve performance. """ rows = [] for delivery in deliveries: # get values for all fields of the table field_values: List[Any] = [] t = delivery.get('t') delivery['t_i'] = t delivery['t_f'] = t for field in table_fields[table_name]: if field in delivery: if field == 'messages': field_values.append(json.dumps(delivery[field])) else: field_values.append(delivery[field]) else: field_values.append(None) rows.append(field_values) sql = get_sql_execute_prepared_statement(table_name) try: psycopg2.extras.execute_batch(cursor, sql, rows) except Exception as err: msg = f'SQL statement failed: "{sql}" -- the values were: {rows}' journal.send(msg, PRIORITY=journal.LOG_ERR) def init_db(config: dict) -> Optional[str]: """ Initialize database; if ok return DSN, else None. Try to get parameters for database access, check existence of tables and possibly create them. """ dsn = _get_dsn(config) if dsn: ok = _create_tables(dsn) if not ok: return None return dsn def _get_dsn(config: dict) -> Optional[str]: """ Return the DSN (data source name) from the *config*. """ try: postgresql_config = config['postgresql'] hostname = postgresql_config['hostname'] port = postgresql_config['port'] database = postgresql_config['database'] username = postgresql_config['username'] password = postgresql_config['password'] except Exception: msg = f"""ERROR: invalid config in {settings.main_config_file} The config file must contain a section like this: postgresql: hostname: port: database: username: password: """ journal.send(msg, PRIORITY=journal.LOG_CRIT) return None dsn = f'host={hostname} port={port} dbname={database} '\ f'user={username} password={password}' return dsn def _create_tables(dsn: str) -> bool: """ Check existence of tables and possibly create them, returning success. """ try: with psycopg2.connect(dsn) as conn: with conn.cursor() as curs: for table_name, sql_stmts in get_create_table_stmts().items(): ok = _create_table(curs, table_name, sql_stmts) if not ok: return False except Exception: journal.send( f'ERROR: cannot connect to database, check params' f' in {settings.main_config_file}', PRIORITY=journal.LOG_CRIT, ) return False return True def _create_table( cursor: psycopg2.extras.RealDictCursor, table_name: str, sql_stmts: List[str] ) -> bool: """ Try to create a table if it does not exist and return whether it exists. If creation failed, emit an error to the journal. """ cursor.execute("SELECT EXISTS(SELECT * FROM " "information_schema.tables WHERE table_name=%s)", (table_name,)) table_exists = cursor.fetchone()[0] if not table_exists: for sql_stmt in sql_stmts: try: cursor.execute(sql_stmt) except Exception: journal.send( 'ERROR: database user needs privilege to create tables.\n' 'Alternatively, you can create the table manually like' ' this:\n\n' + '\n'.join([sql + ';' for sql in sql_stmts]), PRIORITY=journal.LOG_CRIT, ) return False return True def init_session(cursor: psycopg2.extras.RealDictCursor) -> None: """ Init a database session. Define prepared statements. """ stmt = get_sql_prepared_statement('delivery_from') cursor.execute(stmt) stmt = get_sql_prepared_statement('delivery_to') cursor.execute(stmt) stmt = get_sql_prepared_statement('noqueue') cursor.execute(stmt)