atextcrawler/src/atextcrawler/resource/operations.py

357 lines
12 KiB
Python

"""
Operations on resources.
"""
import logging
from datetime import datetime
from typing import Optional, Sequence
from asyncpg import Connection
from ..models import (
Feed,
MetaResource,
ResourceError,
Site,
Sitemap,
SitemapIndex,
SitePath,
TextResource,
)
from ..search import delete_resource, index_resource
from ..tensorflow import TensorFlow
from ..utils.durl import Durl
from ..utils.similarity import (
create_simhash,
search_simhash,
simhash_from_bigint,
simhash_to_bigint,
)
from .feed import convert_feed_entries
from .fetch import ResourceFetcher
from .sitemap import extract_sitemap_paths
logger = logging.getLogger(__name__)
async def add_site_paths(
conn: Connection,
site_id: int,
paths: Sequence[tuple[str, Optional[bool]]],
) -> None:
"""
Add site paths. if resource infos are given, also create resources.
The paths must be given as relative paths and together with a boolean
telling whether the link is a canonical link.
"""
sql = (
"INSERT INTO site_path (site_id, path, canonical)"
" VALUES ($1, $2, $3) ON CONFLICT (site_id, path) DO NOTHING"
)
values = (
(site_id, path, canonical)
for path, canonical in paths[:100000]
if len(path) <= 400
)
await conn.executemany(sql, values)
async def update_resource_meta(
conn: Connection,
site_id: int,
resource_meta: dict,
) -> None:
"""
Update meta information of existing resources using path to find them.
"""
sql = (
"UPDATE resource SET last_change=coalesce($1, last_change),"
" title=coalesce($2, title), summary=coalesce($3, summary) FROM ("
" SELECT resource_id FROM site_path WHERE site_id=$4 AND path=$5"
") sp WHERE resource.id=sp.resource_id"
)
values = ((*meta, site_id, path) for path, meta in resource_meta.items())
await conn.executemany(sql, values)
async def store_feed_entries(
conn: Connection,
site: Site,
entries: list[dict],
) -> None:
"""
Add missing resources of a site from given feed entries.
"""
if site.id_:
paths, resource_meta = convert_feed_entries(site.base_url, entries)
await add_site_paths(conn, site.id_, paths)
await update_resource_meta(conn, site.id_, resource_meta)
async def get_site_path(
conn: Connection,
site: Site,
before: datetime,
only_new=False,
) -> Optional[SitePath]:
"""
Return the next path of a given site that needs to be processed.
If none needs to be processed, return None.
In particular, for sites having crawl_enabled=false return None.
Only return paths that have last been visited before *before*
or not been processed at all. Paths with an ok_count of -3 or lower
are dropped.
If *only_new*, limit to paths that have not been processed at all,
irrespective of the value of *before*.
"""
sql = "SELECT crawl_enabled FROM site WHERE id=$1"
crawl_enabled = await conn.fetchval(sql, site.id_)
if not crawl_enabled:
return None
if only_new:
sql = (
"SELECT * FROM site_path"
" WHERE site_id=$1 AND last_visit is null LIMIT 1"
) # implicitly canonical=null
row = await conn.fetchrow(sql, site.id_)
else:
sql = (
"SELECT * FROM site_path"
" WHERE site_id=$1 AND canonical IS NOT false AND"
" (last_visit is null OR last_visit<$2) AND"
" ok_count > -3 LIMIT 1"
) # canonical can be true or null
row = await conn.fetchrow(sql, site.id_, before)
if row:
return await SitePath().load_from_row(row)
return None
async def process_site_path(
app,
worker_number: int,
conn: Connection,
fetcher: ResourceFetcher,
tf: TensorFlow,
site: Site,
site_path: SitePath,
) -> bool:
"""
Fetch a path, deduplicate and if canonical, update and index the resource.
Return whether a new resource was handled that should contribute to
statistics.
"""
msg = (
f'Worker {worker_number} processing site {site.id_}'
f' site_path {site_path.id_} {site.base_url}{site_path.path}'
)
logger.debug(msg)
if not site.id_: # only to satisfy typing
return False
# fetch url
site_path.last_visit = datetime.utcnow()
url = site_path.url(site)
resource = await fetcher.fetch(url, site=site)
# handle failure (possibly deleting old information)
if not isinstance(resource, (TextResource, MetaResource)):
if not resource: # irrelevant content-type
site_path.ok_count = -10
elif isinstance(resource, ResourceError):
site_path.ok_count -= 1
if site_path.ok_count <= -3 and site_path.resource_id:
await site_path.unlink_resource(
conn,
app.search_engine,
app.config['elasticsearch']['index_base_name'],
)
await site_path.save(conn)
if resource: # relevant content-type
msg = (
f'Worker {worker_number} failed to process site_path'
f' {site_path.id_} (site {site.id_},'
f' {site.base_url}{site_path.path})'
)
logger.info(msg)
return False
# handle MetaResources
if isinstance(resource, MetaResource):
if isinstance(resource, Feed):
resource.site_id = site.id_
await resource.save(conn)
if resource.entries:
await store_feed_entries(conn, site, resource.entries)
elif isinstance(resource, Sitemap):
paths, _ = extract_sitemap_paths(site.base_url, resource.urls)
await add_site_paths(conn, site.id_, paths)
elif isinstance(resource, SitemapIndex):
for sitemap_dict in resource.sitemaps:
url = sitemap_dict['loc']
res_sitemap = await fetcher.fetch(url, site=site)
if isinstance(res_sitemap, Sitemap):
paths, _ = extract_sitemap_paths(
site.base_url, res_sitemap.urls
)
await add_site_paths(conn, site.id_, paths)
resource_id = None
is_new_resource = False
else: # handle TextResource
resource_id, is_new_resource = await _handle_text_resource(
app, conn, tf, site, site_path, resource, url
)
site_path.canonical = resource.init_fields.get('canonical')
if shortlink_url := resource.init_fields.get('shortlink'):
await _save_shortlink(
conn,
site,
url,
resource_id,
shortlink_url,
site_path.last_visit,
)
site_path.resource_id = resource_id
site_path.ok_count += 1
await site_path.save(conn)
return is_new_resource
async def _handle_text_resource(
app, conn, tf, site, site_path, resource, url
) -> tuple[Optional[int], bool]:
"""
Ingest a text resource returning the id of the possibly merged resource.
Return the id of the merged resource (or None if the incoming resource
has a too short text and is not worth storing a resource) and
whether the resource is new (False if the returned resource_id is None).
"""
# save the resource's internal links
paths = []
if links_int := resource.init_fields['links_int']:
for durl, (rel, _) in links_int.items():
rp_filter = app.plugins['filter_resource_path'].rp_filter
if path := rp_filter(site, durl):
canon = (rel and rel.lower() == 'canonical') or None
paths.append((path, canon))
await add_site_paths(conn, site.id_, paths)
# find resources similar to the current text
text = resource.search_fields['text']
# discard resources with too short texts
if len(text) < app.config['crawl']['min_text_length']:
await site_path.unlink_resource(
conn,
app.search_engine,
app.config['elasticsearch']['index_base_name'],
)
await site_path.save(conn)
return None, False
simhash = simhash_from_bigint(resource.simhash)
index = site.simhash_index
similar_ids = search_simhash(index, simhash)
print(similar_ids, site_path.resource_id)
# determine the destination resource and resources to be merged into it
old_id = site_path.resource_id
if old_id and old_id in similar_ids:
merge_ids = similar_ids
dest_resource = await TextResource().load(conn, old_id)
else: # no old text, or old text not similar any more
if old_id:
await site_path.unlink_resource(
conn,
app.search_engine,
app.config['elasticsearch']['index_base_name'],
)
# find the first existing similar resource
for similar_id in similar_ids:
dest_resource = await TextResource().load(conn, similar_id)
if dest_resource:
# also require similar length
l1 = len(resource.search_fields['text'])
l2 = dest_resource.text_len
if 0.95 * l2 <= l1 <= 1.05 * l2:
merge_ids = list(
filter(lambda elem: elem != similar_id, similar_ids)
)
break
else:
dest_resource = None
merge_ids = []
# update or create the destination resource
if dest_resource:
is_new_resource = False
resource.simhash = create_simhash(index, dest_resource.id_, simhash)
await dest_resource.update_from_resource(resource)
resource = dest_resource
else:
is_new_resource = True
resource.simhash = simhash_to_bigint(simhash)
await resource.save(conn)
create_simhash(index, resource.id_, simhash)
# add resource to search index
await index_resource(
app.search_engine,
tf,
site_path,
resource,
site.base_url,
url,
)
# replace references to any merge resource with links to the dest resource
sql = "UPDATE site_path SET resource_id=$1 WHERE resource_id=ANY($2)"
await conn.execute(sql, resource.id_, merge_ids)
# remove orphaned resources after merging
sql = "DELETE FROM resource WHERE id=ANY($1) RETURNING (id, lang)"
rows = await conn.fetch(sql, set(merge_ids) - set([resource.id_]))
for row in rows:
await delete_resource(
app.search_engine,
row['row'][1],
row['row'][0],
)
return resource.id_, is_new_resource
async def _save_shortlink(
conn, site, url, resource_id, shortlink_url, last_visit
):
"""
Save a shortlink.
"""
shortlink_durl = await Durl(shortlink_url, base=site.base_url)
if shortlink_durl and shortlink_url != url:
sql = "SELECT * FROM site_path WHERE site_id=$1 AND path=$2"
sl_path = shortlink_durl.pwa()
row = await conn.fetchrow(sql, site.id_, sl_path)
shortlink = await SitePath().load_from_row(row)
if not shortlink:
shortlink = SitePath(
site_id=site.id_,
path=sl_path,
last_visit=last_visit,
ok_count=1,
canonical=False,
resource_id=resource_id,
)
else:
shortlink.last_visit = last_visit
shortlink.ok_count += 1
shortlink.canonical = False
shortlink.resource_id = resource_id
await shortlink.save(conn)