Compare commits

...

3 Commits

7 changed files with 207 additions and 68 deletions

View File

@ -7,9 +7,9 @@ from datetime import datetime
import aiohttp
from .models import Crawl
from .resource import ResourceFetcher, get_site_path, process_site_path
from .site import (
from ..models import Crawl
from ..resource import ResourceFetcher, get_site_path, process_site_path
from ..site import (
RobotsInfo,
checkin_site,
checkout_site,
@ -17,7 +17,7 @@ from .site import (
process_site,
update_site,
)
from .tensorflow import TensorFlow
from ..tensorflow import TensorFlow
logger = logging.getLogger(__name__)
@ -31,6 +31,7 @@ class CrawlWorker:
self.app = app
self.worker_number = worker_number
self.pool = pool
self.tf_config = self.app.config['tensorflow']
self.site_delay = self.app.config['crawl']['site_delay']
self.resource_delay = self.app.config['crawl']['resource_delay']
self.site = None
@ -52,7 +53,7 @@ class CrawlWorker:
self.conn = await self.pool.acquire()
self.session = aiohttp.ClientSession()
self.fetcher = ResourceFetcher(self.session)
self.tf = TensorFlow(self.app, self.session)
self.tf = TensorFlow(self.tf_config, self.session)
async def shutdown(self):
"""
@ -71,7 +72,8 @@ class CrawlWorker:
"""
await self.app.sleep(2)
while self.app.running and self.running:
self.site, is_full, more = await checkout_site(self.app, self.conn)
self.site, is_full, more = await checkout_site(
self.app.config, self.conn)
if not self.site:
msg = f'Worker {self.worker_number}: sites exhausted'
logger.debug(msg)

View File

@ -0,0 +1,138 @@
"""
Run a crawl for a specifiv base_url. Use only on dev instance!
"""
import asyncio
import logging
import sys
import aiohttp
from ..config import Config
from ..db import PGPool
from ..models import Site, SitePath
from ..resource import ResourceFetcher, get_site_path, process_site_path
from ..search import shutdown_engine, startup_engine
from ..tensorflow import TensorFlow
from ..utils.similarity import get_simhash_index
from . import get_or_create_crawl
logger = logging.getLogger()
logger.setLevel(logging.WARNING)
#logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler())
async def run():
"""
Run a full/feed crawl a website with given base_url, or just a path.
The 3rd argument (path) is optional.
"""
config = Config().get()
pgpool = await PGPool(config['postgresql'])
pool=pgpool.pool
try:
crawl_type = sys.argv[1]
if crawl_type not in ('full', 'feed'):
logger.error('First argument must be "full" or "feed".')
base_url = sys.argv[2]
except:
msg = (
'Please give two arguments:'
' 1) crawl type ("full" or "feed"),'
' 2) the base URL of the site to crawl'
)
logger.error(msg)
sys.exit(2)
if len(sys.argv) > 3:
path = sys.argv[3]
else:
path = None
# find site
async with pool.acquire() as conn:
sql = 'select id from site where base_url=$1'
site_id = await conn.fetchval(sql, base_url)
if site_id:
site = await Site().load(conn, site_id)
logger.warning(f'site_id: {site.id_}')
logger.warning(f'crawl_enabled: {site.crawl_enabled}')
site.simhash_index = await get_simhash_index(conn, site_id)
else:
logger.warning('Site not found')
if site_id:
if site.crawl_enabled:
await run_crawl(config, pool, site, crawl_type, path)
else:
logger.warning('Site has crawl_enabled=false')
# shutdown
await pgpool.shutdown()
class AppMock:
def __init__(self, config, search_engine):
self.config = config
self.search_engine = search_engine
class DummyModule:
def rp_filter(self, site, durl):
return durl.pwa()
self.plugins = {'filter_resource_path': DummyModule()}
async def run_crawl(config, pool, site, crawl_type, path):
session = aiohttp.ClientSession()
fetcher = ResourceFetcher(session)
tf = TensorFlow(config['tensorflow'], session)
search_engine = await startup_engine(config)
app = AppMock(config, search_engine)
async with pool.acquire() as conn:
is_full = crawl_type == 'full'
crawl = await get_or_create_crawl(conn, site.id_, is_full=is_full)
logger.warning(crawl)
if path:
sql = "SELECT * FROM site_path WHERE site_id=$1 AND path=$2"
row = await conn.fetchrow(sql, site.id_, path)
if row:
site_path = await SitePath().load_from_row(row)
await process_site_path(
app,
999,
conn,
fetcher,
tf,
site,
site_path,
)
else:
logger.error('Path does not exist in table site_path')
else:
while True:
site_path = await get_site_path(
conn,
site,
crawl.t_begin,
only_new=not crawl.is_full,
)
if not site_path:
logger.warning('Paths exhausted.')
break
logger.warning(site_path)
is_new_resource = await process_site_path(
app,
999,
conn,
fetcher,
tf,
site,
site_path,
)
logger.warning(f'Is new: {is_new_resource}')
await shutdown_engine(search_engine)
if __name__ == '__main__':
asyncio.run(run())

View File

@ -231,6 +231,8 @@ COMMENT ON COLUMN site_path.path IS 'Path'
----
COMMENT ON COLUMN site_path.last_visit IS 'Time of last retrieval of the resource; null before first retrival'
----
COMMENT ON COLUMN site_path.filtered IS 'Whether the path was filtered'
----
COMMENT ON COLUMN site_path.ok_count IS 'Increased by 1 for every successful retrieval of the resource and decreased by 1 for every failed'
----
COMMENT ON COLUMN site_path.canonical IS 'Whether the path is the canonical one for the resource; null before first retrival'

View File

@ -76,8 +76,8 @@ async def run():
if isinstance(resource, TextResource):
logger.warning(repr(resource))
logger.warning(f'Language: {resource.lang}')
logger.warning(pformat(resource.search_fields))
logger.warning(pformat(resource.init_fields))
logger.warning(pformat(resource.search_fields, width=180))
logger.warning(pformat(resource.init_fields, width=180))
# annotations = resource.search_fields.get('annotations')
# text = resource.search_fields['text']

View File

@ -99,7 +99,7 @@ async def get_site_path(
Return the next path of a given site that needs to be processed.
If none needs to be processed, return None.
I particular, for sites having crawl_enabled=false return None.
In particular, for sites having crawl_enabled=false return None.
Only return paths that have last been visited before *before*
or not been processed at all. Paths with an ok_count of -3 or lower
@ -200,34 +200,37 @@ async def process_site_path(
site.base_url, res_sitemap.urls
)
await add_site_paths(conn, site.id_, paths)
return False
# handle TextResource
relevant, is_new_resource = await _handle_text_resource(
app, conn, tf, site, site_path, resource, url
)
if not relevant:
return False
site_path.resource_id = resource.id_
site_path.canonical = resource.init_fields.get('canonical')
resource_id = None
is_new_resource = False
else: # handle TextResource
resource_id, is_new_resource = await _handle_text_resource(
app, conn, tf, site, site_path, resource, url
)
site_path.canonical = resource.init_fields.get('canonical')
if shortlink_url := resource.init_fields.get('shortlink'):
await _save_shortlink(
conn,
site,
url,
resource_id,
shortlink_url,
site_path.last_visit,
)
site_path.resource_id = resource_id
site_path.ok_count += 1
await site_path.save(conn)
if shortlink_url := resource.init_fields.get('shortlink'):
await _save_shortlink(
conn, site, url, resource, shortlink_url, site_path.last_visit
)
return is_new_resource
async def _handle_text_resource(
app, conn, tf, site, site_path, resource, url
) -> tuple[bool, bool]:
) -> tuple[Optional[int], bool]:
"""
Ingest a text resource.
Ingest a text resource returning the id of the possibly merged resource.
Return whether the resource is relevant and whether it is new.
Return the id of the merged resource (or None if the incoming resource
has a too short text and is not worth storing a resource) and
whether the resource is new (False if the returned resource_id is None).
"""
# save the resource's internal links
paths = []
@ -250,22 +253,18 @@ async def _handle_text_resource(
app.config['elasticsearch']['index_base_name'],
)
await site_path.save(conn)
return False, False
return None, False
simhash = simhash_from_bigint(resource.simhash)
index = site.simhash_index
similar_ids = search_simhash(index, simhash)
print(similar_ids, site_path.resource_id)
# determine the destination resource and resources to be merged into it
old_id = site_path.resource_id
if (
old_id
and old_id in similar_ids
and ( # similar to old text
dest_resource := await TextResource().load(conn, old_id)
)
):
merge_ids = list(filter(lambda elem: elem != old_id, similar_ids))
if old_id and old_id in similar_ids:
merge_ids = similar_ids
dest_resource = await TextResource().load(conn, old_id)
else: # no old text, or old text not similar any more
if old_id:
await site_path.unlink_resource(
@ -302,36 +301,34 @@ async def _handle_text_resource(
create_simhash(index, resource.id_, simhash)
# add resource to search index
if resource.content_type in ('html', 'plain'):
await index_resource(
await index_resource(
app.search_engine,
tf,
site_path,
resource,
site.base_url,
url,
)
# replace references to any merge resource with links to the dest resource
sql = "UPDATE site_path SET resource_id=$1 WHERE resource_id=ANY($2)"
await conn.execute(sql, resource.id_, merge_ids)
# remove orphaned resources after merging
sql = "DELETE FROM resource WHERE id=ANY($1) RETURNING (id, lang)"
rows = await conn.fetch(sql, set(merge_ids) - set([resource.id_]))
for row in rows:
await delete_resource(
app.search_engine,
tf,
site_path,
resource,
site.base_url,
url,
row['row'][1],
row['row'][0],
)
# merge resources: merge_ids -> resource
for merge_id in merge_ids:
# replace links to the merge resource with links to the dest resource
sql = "UPDATE site_path SET resource_id=$1 WHERE resource_id=$2"
await conn.execute(sql, resource.id_ or None, merge_id)
# remove orphaned merge resource
sql = "DELETE FROM resource WHERE id=$1 RETURNING (true, lang)"
found = await conn.fetchval(sql, merge_id)
if found:
await delete_resource(
app.search_engine,
found[1],
merge_id,
)
return True, is_new_resource
return resource.id_, is_new_resource
async def _save_shortlink(
conn, site, url, resource, shortlink_url, last_visit
conn, site, url, resource_id, shortlink_url, last_visit
):
"""
Save a shortlink.
@ -349,11 +346,11 @@ async def _save_shortlink(
last_visit=last_visit,
ok_count=1,
canonical=False,
resource_id=resource.id_,
resource_id=resource_id,
)
else:
shortlink.last_visit = last_visit
shortlink.ok_count += 1
shortlink.canonical = False
shortlink.resource_id = resource.id_
shortlink.resource_id = resource_id
await shortlink.save(conn)

View File

@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
async def checkout_site(
app, conn: Connection
config, conn: Connection
) -> tuple[Optional[int], bool, bool]:
"""
Get the id of a site to be crawled and mark it with crawl_active=true.
@ -57,8 +57,8 @@ async def checkout_site(
return site, is_full, True
else:
# site not available; schedule next crawl
int_full = app.config['crawl']['full_crawl_interval']
int_feed = app.config['crawl']['feed_crawl_interval']
int_full = config['crawl']['full_crawl_interval']
int_feed = config['crawl']['feed_crawl_interval']
now = datetime.utcnow()
t_full = now + timedelta(seconds=int_full)
t_feed = now + timedelta(seconds=int_full + int_feed)

View File

@ -17,12 +17,12 @@ class TensorFlow:
def __init__(
self,
app,
tf_config,
session: aiohttp.ClientSession,
timeout_sock_connect: Union[int, float] = 0.5,
timeout_sock_read: Union[int, float] = 10,
):
self.config = app.config['tensorflow']
self.config = tf_config
self.session = session
self.timeout = aiohttp.ClientTimeout(
sock_connect=timeout_sock_connect, sock_read=timeout_sock_read