""" Data Models. """ import logging from dataclasses import InitVar, asdict, dataclass, field, fields from datetime import date, datetime from itertools import chain from typing import Any, ClassVar, Optional import tldextract from asyncpg import Connection from .search import delete_resource from .utils.durl import Durl, get_url_variants from .utils.link import extract_domain from .utils.similarity import get_simhash, simhash_to_bigint logger = logging.getLogger(__name__) class ModelBase: """ Abstract base class for models. Execute SQL to load, save, delete instances using asyncpg. """ table: ClassVar id_: Optional[int] = 0 async def load(self, conn: Connection, id_: int) -> Optional[Any]: """ If loading fails, return None. """ sql = f"SELECT * FROM {self.table} WHERE id=$1" row = await conn.fetchrow(sql, id_) if not row: return None return await self.load_from_row(row) async def load_from_row(self, row): """ If row is None, return None. """ if not row: return None data = dict(row) self.id_ = data.pop('id') self.__init__(**data) return self async def save(self, conn: Connection) -> None: """ Save the instance (update if self.id_ is set, else insert). """ data = asdict(self) # logger.debug(f'Save {self}: id_={self.id_}') if self.id_: # update cols = ', '.join(data.keys()) upds = ', '.join( [f'{col}=${i + 1}' for i, col in enumerate(data.keys())] ) val_id = f'${len(data) + 1}' sql = f"UPDATE {self.table} SET {upds} WHERE id={val_id}" await conn.execute(sql, *data.values(), self.id_) else: # insert cols = ', '.join(data.keys()) vals = ', '.join([f'${i + 1}' for i in range(len(data))]) sql = ( f"INSERT INTO {self.table} ({cols}) VALUES ({vals})" f" RETURNING id" ) self.id_ = await conn.fetchval(sql, *data.values()) def asdict(self): """ Return instance data as dictionary. """ return asdict(self) async def delete(self, conn: Connection) -> None: """ Delete the object if it has an id_. """ if self.id_: sql = f"DELETE FROM {self.table} WHERE id=$1" await conn.execute(sql, self.id_) class ResourceError: """ Error encountered while trying to fetch a resource. ResourceError is used for cases when fetching a resource fails. """ def __init__(self, msg, status=None, headers=None): self.msg = msg self.status = status self.headers = headers def __repr__(self): return f'ResourceError: {self.msg}' class ResourceRedirect: """ A resource containing a redirect. """ def __init__(self, urls): self.urls = urls @dataclass class TextResource(ModelBase): """ TextResource (without path). TextResource models web resources with relevant text content. They are instantiated in modules page, document, ...; their metadata are stored in table `resource` and the text content is stored with the search engine. Do not confuse with SitePath: Several SitePath instances may point to a TextResource. The TextResource holds the actual content. If we are not dealing with the startpage of a new site, the init_fields dict usually will contain the site to which the resource belongs. """ table: ClassVar = 'resource' init_fields: InitVar[dict] = None # additional fields after fetching search_fields: InitVar[dict] = None # additional fields for indexing # database fields simhash: Optional[int] = None content_type: Optional[str] = None last_change: Optional[datetime] = None text_len: int = 0 lang: Optional[str] = None title: Optional[str] = None summary: Optional[str] = None def __post_init__(self, init_fields, search_fields): if init_fields is None: init_fields = {} self.init_fields = init_fields if search_fields is None: search_fields = {} self.search_fields = search_fields self.site = self.init_fields.get('site') self.site_id = self.site.id_ if self.site else None self._update_simhash() def __str__(self): return ( f'TextResource(id={self.id_},' f' site_id={self.site_id},' f' type={self.content_type})' ) def _update_simhash(self): """ Update the simhash of the resource from its text content. """ if self.simhash is None: text = self.search_fields.get('text', '') self.simhash = simhash_to_bigint(get_simhash(text)) async def save(self, conn: Connection): """ Save the instance, extending the parent's method. """ self.content_type = ( self.content_type[:50] if self.content_type else None ) self.title = self.title[:200] if self.title else None self.summary = self.summary[:400] if self.summary else None self._update_simhash() if self.last_change is None: self.last_change = datetime.utcnow() await super().save(conn) async def update_from_resource(self, upd: 'TextResource'): """ Update self with values from another resource. """ names = [field.name for field in fields(self)] for name in names: cur_val = getattr(self, name) upd_val = getattr(upd, name) if not cur_val and upd_val is not None: setattr(self, name, upd_val) init_names = [ 'headers', 'redirects', 'links_int', 'links_ext', 'shortlinks', 'canonical', #'head', ] self.init_fields = upd.init_fields self.search_fields = upd.search_fields # for init_name in init_names: # cur_val = self.init_fields.get(init_name) # upd_val = upd.init_fields.get(init_name) # if not cur_val and upd_val is not None: # self.init_fields[init_name] = upd_val @dataclass class MetaResource(ModelBase): """ Parent class for Feed, Sitemap, SitemapIndex. MetaResource is a parent class for Feed, Sitemap, SitemapIndex. Their instances are not stored. Note: class Feed contains feed meta data and is stored in the database. """ @dataclass class SitemapIndex(MetaResource): """ A SitemapIndex meta resource. Just a list of the siteap URLs, nothing more. """ sitemaps: list = field(default_factory=list) @dataclass class Sitemap(MetaResource): """ A Sitemap meta resource. Just a list of the resulting links, nothing more. """ urls: list = field(default_factory=list) @dataclass class Feed(MetaResource): """ A site's feed (RSS, Atom , ...). """ table: ClassVar = 'site_feed' entries: InitVar[list] = None site_id: Optional[int] = None url: Optional[str] = None etag: Optional[str] = None modified: Optional[str] = None t_visit: Optional[datetime] = None t_content: Optional[datetime] = None version: Optional[str] = None title: Optional[str] = None description: Optional[str] = None fail_count: int = 0 def __post_init__(self, entries): self.entries = entries def __str__(self): return f'Feed(id={self.id_}, site_id={self.site_id}, url={self.url})' async def save(self, conn: Connection): """ Save, trying to merge with existing entry matching on site_id and url. """ if not self.site_id or not self.url: msg = f'Saving feed failed: missing site_id of url' logger.error(msg) return sql = "SELECT id FROM site_feed WHERE site_id=$1 AND url=$2" self.id_ = await conn.fetchval(sql, self.site_id, self.url) await super().save(conn) def debug(self) -> str: """ Return the instance data asa string for debug print output. """ return ( f'Feed:\n' f'- id: {self.id_}\n' f'- site_id: {self.site_id}\n' f'- url: {self.url}\n' f'- etag: {self.etag}\n' f'- modified: {self.modified}\n' f'- t_visit: {self.t_visit}\n' f'- t_content: {self.t_content}\n' f'- version: {self.version}\n' f'- title: {self.title}\n' f'- description: {self.description}\n' f'- fail_count: {self.fail_count}\n' f'- entries: {self.entries}' ) @dataclass class Site(ModelBase): """ Website. """ table: ClassVar = 'site' base_durl: InitVar[Durl] = None feeds: InitVar[dict] = None links_ext: InitVar[dict] = None links_int: InitVar[dict] = None startpage_text: InitVar[str] = None canonical_url: Optional[str] = None base_url: Optional[str] = None base_urls: list[str] = field(default_factory=list) domains: list[str] = field(default_factory=list) ips: Optional[list[str]] = None crawl_enabled: bool = False crawl_active: bool = False next_full_crawl: Optional[datetime] = None next_feed_crawl: Optional[datetime] = None last_update: Optional[datetime] = None last_pub: Optional[datetime] = None pub_dates: Optional[dict[str, str]] = None langs: list[str] = field(default_factory=list) alt_langs: dict[str, str] = field(default_factory=dict) title: Optional[str] = None description: Optional[str] = None keywords: list[str] = field(default_factory=list) linkbacks: dict[str, str] = field(default_factory=dict) meta_info: dict = field(default_factory=dict) boilerplate_texts: list[str] = field(default_factory=list) def __post_init__( self, base_durl: Durl, feeds=None, links_ext=None, links_int=None, startpage_text=None, ): self.feeds = feeds self.links_ext = links_ext self.links_int = links_int self.startpage_text = startpage_text self.keywords = self.keywords[:20] if not self.last_update: self.last_update = datetime.utcnow() pub_date: Optional[str] if self.last_pub: pub_date = date.isoformat(self.last_pub.date()) self.pub_dates = {date.isoformat(self.last_update): pub_date} else: pub_date = None self.pub_dates = {} if base_durl: self.base_urls = [base_durl.url()[:200]] self.domains = [extract_domain(base_durl.hostname)[:100]] def __str__(self): return ( f'Site(id={self.id_}, url={self.base_url},' f' crawl_enabled={self.crawl_enabled})' ) async def update_base_url(self) -> None: """ Update the base_url, choosing the most relevant URL. If canonical_url is not None, use this. Otherwise set self.base_url to the shortest from self.base_urls, but requiring a https-url if there is at least one. """ if self.canonical_url and self.canonical_url not in self.base_urls: if canonical_durl := await Durl(self.canonical_url): self.base_urls.append(self.canonical_url) domain = extract_domain(canonical_durl.hostname) if domain not in self.domains: self.domains.append(domain) if self.canonical_url: self.base_url = self.canonical_url return if not self.base_url: url_candidates = self.base_urls if https_urls := [ url for url in self.base_urls if url.startswith('https://') ]: url_candidates = https_urls self.base_url = min(url_candidates, key=len) async def save( # type: ignore self, conn, merge=True ) -> tuple[Optional[int], bool]: """ Store the site, optionally trying to merge it with an existing site. Return the id of the saved instance and whether a new instance was created. If self.id_ is not 0, replace the data of the existing site with this id. Else if not merge, store as new row, and if merge, try to merge with an existing matching site. """ await self.update_base_url() if not merge: created = not bool(self.id_) await super().save(conn) return self.id_, created if self.id_: sql = "SELECT base_urls, pub_dates FROM site WHERE id=$1" row = await conn.fetchrow(sql, self.id_) self.base_urls = list( set(row['base_urls']).union(set(self.base_urls)) ) if previous_pub_dates := row['pub_dates']: if not self.pub_dates: self.pub_dates = {} self.pub_dates.update(previous_pub_dates) await super().save(conn) return self.id_, False same_site_id = await search_same_site(self, conn) if same_site_id: same_site = await Site().load(conn, same_site_id) if same_site_id and same_site: same_site.base_urls = set(same_site.base_urls).union( set(self.base_urls) ) same_site.domains = set(same_site.domains).union(set(self.domains)) if self.canonical_url and not same_site.canonical_url: same_site.canonical_url = self.canonical_url await same_site.save(conn, merge=False) # call ourselves self.id_ = same_site.id_ return self.id_, False else: await super().save(conn) return self.id_, True @dataclass class SitePath(ModelBase): """ Path of a website. May point to a Resource. """ table: ClassVar = 'site_path' site: InitVar[str] = None site_id: Optional[int] = None path: Optional[str] = None filtered: bool = False last_visit: Optional[datetime] = None ok_count: int = 0 canonical: Optional[bool] = None resource_id: Optional[int] = None def __str__(self): return ( f'SitePath(id={self.id_}, site_id={self.site_id},' f' path={self.path})' ) async def save(self, conn: Connection): """ Save the instance, extending the parent's method. """ self.path = self.path[:400] if self.path else '' await super().save(conn) async def unlink_resource(self, conn, engine, index_base_name): """ Unlink the resource and also delete it, if it has no more links. """ if self.id_: if self.resource_id: sql = "SELECT COUNT(*) FROM site_path WHERE resource_id=$1" ref_count = await conn.fetchval(sql, self.resource_id) if ref_count == 0: sql = ( "DELETE FROM resource WHERE id=$1" " RETURNING (true, lang)" ) found = await conn.fetchval(sql, self.resource_id) if found: await delete_resource( engine, found[1], self.resource_id ) self.resource_id = None def url(self, site): """ Return the full URL (combine the site's base_url with our path). """ return site.base_url + self.path @dataclass class Crawl(ModelBase): """ The crawl process of a website (begin, end, statistics, ...). """ table: ClassVar = 'crawl' site_id: Optional[int] = None is_full: bool = False t_begin: datetime = datetime.utcnow() t_end: Optional[datetime] = None n_resources: int = 0 n_resources_new: int = 0 async def finish(self, conn, set_t_end): """ Save the crawl. Set t_end only if indicated. """ if set_t_end: self.t_end = datetime.utcnow() await self.save(conn) async def search_same_site( site: Site, conn: Connection, ) -> Optional[int]: """ Try to find a matching site for the given *site* and return its id. TODO: if the path is non-trivial, require it also for the matching site Two sites match when they return the same content for identical paths. The base_url (scheme and/or netloc) may differ. We do not have the content for all paths of both websites, so we need to estimate: We only take into account meta information from the start pages of both sites, in particular the title, description and information obtained the base_urls: We use a combination of these conditions: 1. one of the sites has a canonical URL which matches the URL of the other site 2. the content fields (title, description) have sufficient information 3. the content fields match exactly 4. the domain matches 5. the domain matches, except for the TLD 6. the base_urls differ in their schemes (http vs. https) 7. the hostnames in the base_urls are identical 8. the hostnames in the base_urls differ by a prepended 'www.' 9. the IPs have at least one common address The algorithm is this (first answer is final, yes means match): * if (1) : yes * if (2), (3), (4) : yes * if (2), (3), (5), (9) : yes * if (6), ((7) or (8)) : yes * no """ # rule (1) if site.canonical_url: sql = "SELECT id FROM site WHERE $1=ANY(base_urls) LIMIT 1" id_ = await conn.fetchval(sql, site.canonical_url) if id_: return id_ else: sql = "SELECT id FROM site WHERE canonical_url=ANY($1) LIMIT 1" id_ = await conn.fetchval(sql, site.base_urls) if id_: return id_ # rule (6), ((7) or (8)) url_variants = set( chain.from_iterable( get_url_variants(base_url) for base_url in site.base_urls ) ) sql = f"SELECT id FROM site WHERE base_urls && $1 LIMIT 1" if id_ := await conn.fetchval(sql, url_variants): return id_ # condition (2) if len(site.title or '') > 15 or len(site.description or '') > 15: sql = ( f"SELECT * FROM site WHERE" f" COALESCE(title, '')=$1 AND COALESCE(description, '')=$2" ) rows = await conn.fetch(sql, site.title or '', site.description or '') # condition (3) if rows: # condition (4) for row in rows: domains = set(row.get('domains', [])) if domains & set(site.domains): return row['id'] # condition (9) for row in rows: ips = set(row.get('ips', [])) if site.ips and ips & set(site.ips): # condition (5) domains_ = row.get('domains', []) d1 = set([tldextract.extract(d).domain for d in domains_]) domains_ = site.domains or [] d2 = set([tldextract.extract(d).domain for d in domains_]) if d1 & d2: return row['id'] return None