diff --git a/src/atextcrawler/crawl.py b/src/atextcrawler/crawl/__init__.py similarity index 94% rename from src/atextcrawler/crawl.py rename to src/atextcrawler/crawl/__init__.py index 323e454..6d12fca 100644 --- a/src/atextcrawler/crawl.py +++ b/src/atextcrawler/crawl/__init__.py @@ -7,9 +7,9 @@ from datetime import datetime import aiohttp -from .models import Crawl -from .resource import ResourceFetcher, get_site_path, process_site_path -from .site import ( +from ..models import Crawl +from ..resource import ResourceFetcher, get_site_path, process_site_path +from ..site import ( RobotsInfo, checkin_site, checkout_site, @@ -17,7 +17,7 @@ from .site import ( process_site, update_site, ) -from .tensorflow import TensorFlow +from ..tensorflow import TensorFlow logger = logging.getLogger(__name__) @@ -31,6 +31,7 @@ class CrawlWorker: self.app = app self.worker_number = worker_number self.pool = pool + self.tf_config = self.app.config['tensorflow'] self.site_delay = self.app.config['crawl']['site_delay'] self.resource_delay = self.app.config['crawl']['resource_delay'] self.site = None @@ -52,7 +53,7 @@ class CrawlWorker: self.conn = await self.pool.acquire() self.session = aiohttp.ClientSession() self.fetcher = ResourceFetcher(self.session) - self.tf = TensorFlow(self.app, self.session) + self.tf = TensorFlow(self.tf_config, self.session) async def shutdown(self): """ @@ -71,7 +72,8 @@ class CrawlWorker: """ await self.app.sleep(2) while self.app.running and self.running: - self.site, is_full, more = await checkout_site(self.app, self.conn) + self.site, is_full, more = await checkout_site( + self.app.config, self.conn) if not self.site: msg = f'Worker {self.worker_number}: sites exhausted' logger.debug(msg) diff --git a/src/atextcrawler/crawl/__main__.py b/src/atextcrawler/crawl/__main__.py new file mode 100644 index 0000000..f1c75f5 --- /dev/null +++ b/src/atextcrawler/crawl/__main__.py @@ -0,0 +1,138 @@ +""" +Run a crawl for a specifiv base_url. Use only on dev instance! +""" + +import asyncio +import logging +import sys + +import aiohttp + +from ..config import Config +from ..db import PGPool +from ..models import Site, SitePath +from ..resource import ResourceFetcher, get_site_path, process_site_path +from ..search import shutdown_engine, startup_engine +from ..tensorflow import TensorFlow +from ..utils.similarity import get_simhash_index +from . import get_or_create_crawl + +logger = logging.getLogger() +logger.setLevel(logging.WARNING) +#logger.setLevel(logging.DEBUG) +logger.addHandler(logging.StreamHandler()) + + +async def run(): + """ + Run a full/feed crawl a website with given base_url, or just a path. + + The 3rd argument (path) is optional. + """ + config = Config().get() + pgpool = await PGPool(config['postgresql']) + pool=pgpool.pool + + try: + crawl_type = sys.argv[1] + if crawl_type not in ('full', 'feed'): + logger.error('First argument must be "full" or "feed".') + base_url = sys.argv[2] + except: + msg = ( + 'Please give two arguments:' + ' 1) crawl type ("full" or "feed"),' + ' 2) the base URL of the site to crawl' + ) + logger.error(msg) + sys.exit(2) + if len(sys.argv) > 3: + path = sys.argv[3] + else: + path = None + + # find site + async with pool.acquire() as conn: + sql = 'select id from site where base_url=$1' + site_id = await conn.fetchval(sql, base_url) + if site_id: + site = await Site().load(conn, site_id) + logger.warning(f'site_id: {site.id_}') + logger.warning(f'crawl_enabled: {site.crawl_enabled}') + site.simhash_index = await get_simhash_index(conn, site_id) + else: + logger.warning('Site not found') + + if site_id: + if site.crawl_enabled: + await run_crawl(config, pool, site, crawl_type, path) + else: + logger.warning('Site has crawl_enabled=false') + + # shutdown + await pgpool.shutdown() + + +class AppMock: + def __init__(self, config, search_engine): + self.config = config + self.search_engine = search_engine + class DummyModule: + def rp_filter(self, site, durl): + return durl.pwa() + self.plugins = {'filter_resource_path': DummyModule()} + + +async def run_crawl(config, pool, site, crawl_type, path): + session = aiohttp.ClientSession() + fetcher = ResourceFetcher(session) + tf = TensorFlow(config['tensorflow'], session) + search_engine = await startup_engine(config) + app = AppMock(config, search_engine) + async with pool.acquire() as conn: + is_full = crawl_type == 'full' + crawl = await get_or_create_crawl(conn, site.id_, is_full=is_full) + logger.warning(crawl) + if path: + sql = "SELECT * FROM site_path WHERE site_id=$1 AND path=$2" + row = await conn.fetchrow(sql, site.id_, path) + if row: + site_path = await SitePath().load_from_row(row) + await process_site_path( + app, + 999, + conn, + fetcher, + tf, + site, + site_path, + ) + else: + logger.error('Path does not exist in table site_path') + else: + while True: + site_path = await get_site_path( + conn, + site, + crawl.t_begin, + only_new=not crawl.is_full, + ) + if not site_path: + logger.warning('Paths exhausted.') + break + logger.warning(site_path) + is_new_resource = await process_site_path( + app, + 999, + conn, + fetcher, + tf, + site, + site_path, + ) + logger.warning(f'Is new: {is_new_resource}') + await shutdown_engine(search_engine) + + +if __name__ == '__main__': + asyncio.run(run()) diff --git a/src/atextcrawler/resource/__main__.py b/src/atextcrawler/resource/__main__.py index 1542dfd..a97c7fe 100644 --- a/src/atextcrawler/resource/__main__.py +++ b/src/atextcrawler/resource/__main__.py @@ -76,8 +76,8 @@ async def run(): if isinstance(resource, TextResource): logger.warning(repr(resource)) logger.warning(f'Language: {resource.lang}') - logger.warning(pformat(resource.search_fields)) - logger.warning(pformat(resource.init_fields)) + logger.warning(pformat(resource.search_fields, width=180)) + logger.warning(pformat(resource.init_fields, width=180)) # annotations = resource.search_fields.get('annotations') # text = resource.search_fields['text'] diff --git a/src/atextcrawler/site/operations.py b/src/atextcrawler/site/operations.py index 36689c5..e791eb0 100644 --- a/src/atextcrawler/site/operations.py +++ b/src/atextcrawler/site/operations.py @@ -25,7 +25,7 @@ logger = logging.getLogger(__name__) async def checkout_site( - app, conn: Connection + config, conn: Connection ) -> tuple[Optional[int], bool, bool]: """ Get the id of a site to be crawled and mark it with crawl_active=true. @@ -57,8 +57,8 @@ async def checkout_site( return site, is_full, True else: # site not available; schedule next crawl - int_full = app.config['crawl']['full_crawl_interval'] - int_feed = app.config['crawl']['feed_crawl_interval'] + int_full = config['crawl']['full_crawl_interval'] + int_feed = config['crawl']['feed_crawl_interval'] now = datetime.utcnow() t_full = now + timedelta(seconds=int_full) t_feed = now + timedelta(seconds=int_full + int_feed) diff --git a/src/atextcrawler/tensorflow.py b/src/atextcrawler/tensorflow.py index 197572c..42471fb 100644 --- a/src/atextcrawler/tensorflow.py +++ b/src/atextcrawler/tensorflow.py @@ -17,12 +17,12 @@ class TensorFlow: def __init__( self, - app, + tf_config, session: aiohttp.ClientSession, timeout_sock_connect: Union[int, float] = 0.5, timeout_sock_read: Union[int, float] = 10, ): - self.config = app.config['tensorflow'] + self.config = tf_config self.session = session self.timeout = aiohttp.ClientTimeout( sock_connect=timeout_sock_connect, sock_read=timeout_sock_read