atextcrawler/src/atextcrawler/crawl/__init__.py

218 lines
7.3 KiB
Python

"""
Crawl a site.
"""
import logging
from datetime import datetime
import aiohttp
from ..models import Crawl
from ..resource import ResourceFetcher, get_site_path, process_site_path
from ..site import (
RobotsInfo,
checkin_site,
checkout_site,
fetch_feeds,
process_site,
update_site,
)
from ..tensorflow import TensorFlow
logger = logging.getLogger(__name__)
class CrawlWorker:
"""
Worker fetching sites, crawling their resources and storing statistics.
"""
def __init__(self, app, worker_number, pool):
self.app = app
self.worker_number = worker_number
self.pool = pool
self.tf_config = self.app.config['tensorflow']
self.site_delay = self.app.config['crawl']['site_delay']
self.resource_delay = self.app.config['crawl']['resource_delay']
self.site = None
self.crawl = None
self.running = True # do crawl
def __await__(self):
return self.__ainit__().__await__()
async def __ainit__(self):
await self.startup()
return self
async def startup(self):
"""
Asynchronous startup.
"""
logger.info(f'Starting worker {self.worker_number}')
self.conn = await self.pool.acquire()
self.session = aiohttp.ClientSession()
self.fetcher = ResourceFetcher(self.session)
self.tf = TensorFlow(self.tf_config, self.session)
async def shutdown(self):
"""
Asynchronous shutdown.
"""
logger.info(f'Shutting down worker {self.worker_number}')
await self.session.close()
await self.pool.release(self.conn)
async def run(self):
"""
Worker loop: fetch a site, crawl its resources and store statistics.
If no site needs to be crawled, sleep for self.site_delay seconds
(configured in crawl.site_delay).
"""
await self.app.sleep(2)
while self.app.running and self.running:
self.site, is_full, more = await checkout_site(
self.app.config, self.conn)
if not self.site:
msg = f'Worker {self.worker_number}: sites exhausted'
logger.debug(msg)
if not more:
await self.app.sleep(self.site_delay)
continue
self.crawl = await get_or_create_crawl(
self.conn, self.site.id_, is_full
)
try:
if is_full:
site_upd, _ = await update_site(
self.app,
self.fetcher,
self.conn,
self.site.base_url,
site=self.site,
)
if site_upd and site_upd.crawl_enabled:
self.site = site_upd
await process_site(
self.fetcher,
self.conn,
self.site,
)
elif self.site.crawl_enabled:
await fetch_feeds(self.fetcher, self.conn, self.site)
if self.site.crawl_enabled:
await self.crawl_resources()
except:
msg = (
f'Worker {self.worker_number} failed crawl'
f' {self.crawl.id_} of site {self.site.id_}'
f' ({self.site.base_url})'
)
logger.exception(msg)
await self.crawl.finish(
self.conn, self.app.running and self.running
)
await checkin_site(self.app, self.conn, self.site, self.crawl)
msg = (
f'Worker {self.worker_number} finished crawl'
f' {self.crawl.id_}'
)
logger.debug(msg)
self.site = None
# if we were cancelled, but the app is still running, run again
if self.app.running:
self.running = True
msg = f'Closing crawler {self.worker_number}'
logger.debug(msg)
async def crawl_resources(self):
"""
Loop over resources of the site and process them. Collect statistics.
All workers operate on distinct sites, so no need for locking here.
"""
crawl_type = 'full' if self.crawl.is_full else 'feed'
msg = (
f'Worker {self.worker_number} beginning'
f' {crawl_type} crawl {self.crawl.id_}'
f' of site {self.site.id_} ({self.site.base_url})'
)
logger.info(msg)
resource_delay = self.resource_delay
robots = await RobotsInfo(self.site.base_url)
if robots.delay:
resource_delay = robots.delay
while self.app.running and self.running:
site_path = await get_site_path(
self.conn,
self.site,
self.crawl.t_begin,
only_new=not self.crawl.is_full,
)
if not site_path:
msg = (
f'Worker {self.worker_number} ending crawl'
f' {self.crawl.id_}: paths exhausted'
)
logger.info(msg)
return
try:
sp_filter = self.app.plugins['filter_site_path'].sp_filter
if sp_filter(self.site, site_path.path, robots):
is_new_resource = await process_site_path(
self.app,
self.worker_number,
self.conn,
self.fetcher,
self.tf,
self.site,
site_path,
)
if is_new_resource:
self.crawl.n_resources_new += 1
if is_new_resource is not None:
self.crawl.n_resources += 1
await self.app.sleep(resource_delay)
else:
sql = (
"UPDATE site_path SET"
" last_visit=now() at time zone 'UTC',"
" filtered=true"
" WHERE id=$1"
)
await self.conn.execute(sql, site_path.id_)
except:
msg = (
f'Worker {self.worker_number} processing path failed'
f' in crawl {self.crawl.id_}: {site_path}'
)
logger.exception(msg)
site_path.ok_count -= 1
await site_path.save(self.conn)
msg = (
f'Worker {self.worker_number}: stopped crawl' f' {self.crawl.id_}'
)
logger.info(msg)
async def get_or_create_crawl(conn, site_id, is_full=True) -> Crawl:
"""
Return a new or existing+unfinished crawl.
If an existing crawl is found, return it, disregarding whether
it is a full crawl or not.
"""
sql = "SELECT * FROM crawl WHERE site_id=$1 AND t_end is null LIMIT 1"
if row := await conn.fetchrow(sql, site_id):
return await Crawl().load_from_row(row)
else:
# create a new crawl
crawl = Crawl(
site_id=site_id,
is_full=is_full,
t_begin=datetime.utcnow(),
)
await crawl.save(conn)
return crawl