""" atextcrawler application. """ import asyncio import importlib import logging import signal import sys from systemd.journal import JournalHandler from .config import Config from .crawl import CrawlWorker from .db import PGPool from .search import shutdown_engine, startup_engine from .site import load_seeds, process_site_queue plugin_names = ['filter_site', 'filter_site_path', 'filter_resource_path'] class Application: """ atextcrawler application. The basic structure of the application is this: * one site crawler works just on the site_queue: fetching start pages of sites and storing updated site information in table sites * N other CrawlWorkers each do this in a loop: checkout a site that is due for crawl and crawl its resources; they fill the site_queue """ running = True def __init__(self, config=None): if config is None: config = Config().get() self.config = config self.instance_name = config['instance_name'] self.instance_type = config['instance_type'] log_level = getattr( logging, config['log_level'].upper(), logging.CRITICAL ) self.logger = logging.getLogger('atextcrawler') self.logger.setLevel(log_level) if self.instance_type == 'dev': self.logger.addHandler(logging.StreamHandler()) else: self.logger.addHandler( JournalHandler(SYSLOG_IDENTIFIER=self.instance_name) ) self.logger.propagate = False self.channel = 'atextcrawler_' + self.config['instance_name'] msg = f'Instance "{self}" initializing' self.logger.info(msg) self.plugins = self._load_plugins() def __str__(self): return self.instance_name def _load_plugins(self): """ Return a dict mapping plugin names to modules. """ modules = {} old_path = sys.path for name in plugin_names: try: plugins_dir = self.config['plugins_dir'] sys.path.insert(0, plugins_dir) module = importlib.import_module(name) msg = f'Loading plugin "{name}" from {plugins_dir}' except: module = importlib.import_module( 'atextcrawler.plugin_defaults.' + name ) msg = f'Loading plugin "{name}" from default location' self.logger.info(msg) modules[name] = module sys.path = old_path return modules async def run(self): """ Application lifecycle. """ await asyncio.gather(self.wait_for_shutdown(), self.startup()) await self.shutdown() async def startup(self): """ Asynchronous startup. """ msg = f'Instance "{self}" starting components' self.logger.info(msg) self.search_engine = await startup_engine(self.config) self.pgpool = await PGPool(self.config['postgresql']) self.pool = self.pgpool.pool await load_seeds(self.config, self.pool) await reset_site_locks(self.pool) worker_count = self.config['crawl']['workers'] self.workers = [] for worker_number in range(worker_count): worker = await CrawlWorker(self, worker_number, self.pool) self.workers.append(worker) worker_coros = [worker.run() for worker in self.workers] await asyncio.gather( process_site_queue(self, self.pool), self.handle_notifications(), *worker_coros, ) async def wait_for_shutdown(self): """ Create a shutdown event (:class:`asyncio.Event`) and wait for it. The event will be set by a signal handler for SIGINT and SIGTERM signals (see :meth:`Application.handle_shutdown_signal`). """ self.shutdown_event = asyncio.Event() for sig in (signal.SIGINT, signal.SIGTERM): asyncio.get_running_loop().add_signal_handler( sig, self.handle_shutdown_signal ) self.logger.debug(f'{self} waiting for shutdown event') await self.shutdown_event.wait() self.logger.info(f'Instance "{self}" shutdown event') def handle_shutdown_signal(self): """ Handle shutdown signal. """ if self.shutdown_event.is_set(): return self.shutdown_event.set() self.running = False async def shutdown(self): """ Asynchronous shutdown. """ self.logger.debug(f'Instance "{self}" shutting down') await self.notify_conn.remove_listener( self.channel, self.listen_callback ) await self.pool.release(self.notify_conn) for worker in self.workers: await worker.shutdown() await shutdown_engine(self.search_engine) await self.pgpool.shutdown() self.logger.info(f'Instance "{self}" shutdown completed') async def handle_notifications(self): """ Handle notifications using PostgreSQL's NOTIFY/LISTEN. """ self.notify_conn = await self.pool.acquire() await self.notify_conn.add_listener(self.channel, self.listen_callback) def listen_callback(self, *args): """ Handle notify event from PostgreSQL. """ channel = args[2] if channel != self.channel: return message = args[3] if message.startswith('site_update '): try: site_id = int(message.removeprefix('site_update ')) for worker in self.workers: if worker.site and site_id == worker.site.id_: msg = ( f'Cancelling worker {worker.worker_number}' f' (site={site_id}) due to site_update' ) self.logger.info(msg) worker.running = False except: pass async def sleep(self, duration, t_slice=3): """ Sleep for *duration* seconds while self.running. Check self.running every *t_slice* seconds. """ remaining = duration while remaining > 0 and self.running: await asyncio.sleep(min(t_slice, remaining)) remaining -= t_slice async def reset_site_locks(pool): """ Remove locks leftover from last run: Set crawl_active=false for all sites. This is relevant when the application was not shutdown properly (e.g. when the process was killed). """ async with pool.acquire() as conn: sql = "UPDATE site SET crawl_active = false WHERE crawl_active = true" await conn.execute(sql)