atextcrawler/src/atextcrawler/utils/annotation.py

482 lines
15 KiB
Python

"""
Convert html to plain text with annotations over character ranges.
"""
import re
from collections import defaultdict
from html.parser import HTMLParser
from .json import json_dumps, json_loads
from .link import nofollow_link_rels
from .tag import keep_tags, self_closing_tags
MAX_HREF_LENGTH = 200
"""
Maximum length of an href. Other links are discarded.
"""
text_blacklist = [
'previous',
'next',
'back', # common pagination navigation
'↩︎', # amusewiki footnote separator (after conversion from muse to html)
]
"""
Texts to ignore.
"""
class AnnotatingParser(HTMLParser):
"""
Parse tagged text resulting in pure text and annotations.
The text is available in self.text and the annotations
in self.annotations, which is a dict with these keys:
* tags: contains a mapping of offset ranges (i, f) to
the tags opening at i and closing at f
* semantic_breaks: a mapping of offset positions where
a new section begins to the nesting level of that
sections; a section is whereever an (opening or closing)
separating tag is placed in the raw html; for the
separating flag of tags see tag.py
* links: a mapping of hrefs to link texts obtained from
anchor (a) tags; we skip hyperref with nofollow rels
* section_ids: map an offset position to the first
id attribute (of any tag) at the beginning of a
semantic section; this can later be used in a URL
fragment for linking directly into this section
Internally, we put opening tags on self.stack and pop them
when the first matching closing tag is encountered. We assume
balanced tags (tidy html).
NB: all tags with semantic breaks have sep=True, i.e.,
they will have spaces around them so that the semantic breaks
always sit on a space; the semantic break position p is the end
of the last section and the next sections begins at p + 1.
The text alway begins with a ' ' (added if not in the original),
which is assigned a semantic break with default level 80
(if there is no semantic break tag at the beginning).
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.text = ' ' # concatenated text data (without tags)
self.pos = 1 # equal to len(self.text)
self.stack = []
self.tags = defaultdict(dict)
self.semantic_breaks = {0: 80}
self.tag_id = None
self.section_ids = defaultdict(list)
self.links = {}
self.add_space = False
def close(self):
"""
Finish by collecting results in dict `self.annotations`.
"""
super().close()
self.annotations = {}
self.annotations['links'] = self.links
self.annotations['semantic_breaks'] = {
pos: lvl for pos, lvl in sorted(self.semantic_breaks.items())
}
self.annotations['tags'] = self.tags
self.annotations['section_ids'] = self.section_ids
def handle_starttag(self, tag, attrs):
"""
Called for each opening tag.
"""
sep, lvl, sem = keep_tags[tag]
attrs = dict(attrs)
if sep:
self.add_space = True
if tag == 'section' and 'endnotes' in attrs.get('role', ''):
lvl = 25
# ARIA roles
if role := attrs.get('role'):
if role == 'article':
lvl = 15
elif role == 'heading':
if aria_level := attrs.get('aria-level'):
if aria_level in (1, 2, 3, 4, 5, 6):
sep, lvl, sem = keep_tags[f'h{aria_level}']
elif role == 'region':
lvl = 24
i = self.pos
if tag in self_closing_tags:
# self-closing tags will not be added to the result tags,
# they only appear in semantic_breaks
# the two self-closing tags br and hr both have lvl and sep
if i == 1: # replace the default semantic break at pos 0
i = 0
self.add_semantic_break(i, lvl)
i += 1
if tag_id := attrs.get('id'):
self.tag_id = i, tag_id
self.add_tag_id(i) # br or hr may have an id, too
self.add_space = True
else:
self.stack.append((i, tag, sep, lvl, sem, attrs))
# forget outdated tag id at new semantic break
if lvl:
self.forget_tag_id()
# memorize tag id
if not self.tag_id and (tag_id := attrs.get('id')):
self.tag_id = self.pos, tag_id
def handle_endtag(self, tag):
"""
Called for each closing tag.
"""
if not self.stack or (self.stack and self.stack[-1][1] != tag):
return # nothing to do for an already closed self-closing tag
i, tag_, sep, lvl, sem, attrs = self.stack.pop()
f = self.pos
# omit tag without content
if i == f:
return
# for a closing div tag revise lvl to minimum level of contained
# semantic breaks (if any)
if tag == 'div':
min_lvl = 101
for pos_, lvl_ in reversed(self.semantic_breaks.items()):
if pos_ <= i:
break
min_lvl = min(min_lvl, lvl_)
if min_lvl < 101:
lvl = min_lvl
# add semantic break and an optional section_id
if lvl:
if i == 1: # replace the default semantic break at pos 0
i = 0
if tag in ('ul', 'ol', 'li'):
seen_tags = [x[1] for x in self.stack]
if 'p' not in seen_tags:
lvl = 52 + seen_tags.count('tag')
if tag == 'li':
lvl += 1
self.add_semantic_break(i, lvl)
self.add_tag_id(i)
# do not include surrounding spaces in tag span
if self.text[i] == ' ':
i += 1
# add tag
self.tags[(i, f)][tag] = sem
# add space (when handling next data)
if sep:
self.add_space = True
# collect links
if tag == 'a':
self.extract_link(i, attrs)
def handle_data(self, text):
"""
Called for each non-tag content between tags.
"""
# handle empty or blacklisted text
if text == '':
return
if text == ' ':
self.add_space = True
return
if text.strip().lower() in text_blacklist:
if ' ' in text:
self.add_space = True
return
# add a space (at self.pos) if the text begins with one
# or if we shall add one
startswith_space = text.startswith(' ')
text = text.lstrip()
if startswith_space or self.add_space:
if self.text[-1] != ' ':
self.text += ' '
self.pos += 1
self.add_space = False
# strip a space at the end of text and handle it in end tag
if text.endswith(' '):
text = text[:-1]
self.add_space = True
# add text to self.text
self.text += text
self.pos += len(text)
def add_semantic_break(self, pos, lvl):
"""
Add a semantic break of level *lvl* at position *pos*.
"""
if pos in self.semantic_breaks:
self.semantic_breaks[pos] = min(self.semantic_breaks[pos], lvl)
else:
self.semantic_breaks[pos] = lvl
def forget_tag_id(self):
"""
Reset a tag id if it is too far behind in the text stream.
"""
if self.tag_id:
pos_, tag_id = self.tag_id
if pos_ + 200 < self.pos:
self.tag_id = None
def add_tag_id(self, pos):
"""
Add and clear an id if the just closing section has none yet.
*pos* is the start position of the current section, and the
position where the id will be added.
Add an id only if we are not too far in the section's text already.
"""
if self.tag_id:
pos_, tag_id = self.tag_id
if pos_ < pos + 100 and pos not in self.section_ids:
self.section_ids[pos].append(tag_id.lower())
self.tag_id = None
def extract_link(self, i, attrs):
"""
Add a link covering character range (i, self.pos).
From html *attrs* extract href and rel.
"""
if (href := attrs.get('href')) and not attrs.get('rel') == 'nofollow':
if href.startswith('#'):
return
if len(href) > MAX_HREF_LENGTH:
return
attrs.get('title', '')
if rel := attrs.get('rel'):
if set(rel) & nofollow_link_rels:
return
self.links[href] = i, self.pos, rel
def annotate(html):
"""
Split html text into plain text with annotations (from AnnotatingParser).
"""
parser = AnnotatingParser()
parser.reset()
parser.feed(html)
parser.close()
return parser.text, parser.annotations
re_footnote = re.compile(r'^\s*\[\d+\]\s+')
def headline_probability(text, tags, lvl) -> float:
"""
Estimate the probability that the text with tags is a headline.
The context is not considered: The question is not whether the
text is a headline for the following text.
"""
text = text.strip()
res = 0.0
if not text:
return res
if lvl < 60:
return 1.0
# if 'h1' in tags or 'h2' in tags or 'h3' in tags or\
# 'h4' in tags or 'h5' in tags or 'h6' in tags or 'center' in tags:
# return 1.0
if len(text) < 80:
res = 0.7
else:
res = 0.7 - 0.7 * (len(text) - 80) / 200
if 'p' in tags:
res -= 0.4
if 'em' in tags:
res += 0.3
if 'a' in tags:
res -= 0.1
if text[-1] in '.:':
res -= 0.3
res -= 0.1 * text.count(', ')
if re_footnote.match(text):
res -= 0.4
return max(res, 0.0)
def get_tag_counts(tag_names, i, f, tags, text) -> tuple[int, float, float]:
"""
Return the info on the share of characters covered with one of the *tags*.
Only consider the characters between i and f of string *text*.
Return the number of tags that have an overlap in the specified region,
the tag density in the region (fraction of covered characters by all),
and the average number of covered chars per tag.
NB: If more than one tag name is given, then the fractional share
may exceed 1.
"""
if i == f:
return 0, 0.0, 0.0
tag_count = 0
covered_chars = 0
for (s_i, s_f), anns in tags.items():
if overlap := range_overlap(i, f - 1, s_i, s_f - 1):
for ann in anns:
if ann in tag_names:
tag_count += 1
covered_chars += overlap[1] - overlap[0]
all_chars = f - i
tag_density = covered_chars * 1.0 / all_chars
avg_text_len = covered_chars * 1.0 / tag_count if tag_count else 0
return tag_count, tag_density, avg_text_len
def range_overlap(i1, f1, i2, f2):
"""
Return the overlap of both ranges (None if there is none).
"""
return None if f1 <= i2 or f2 <= i1 else (max(i1, i2), min(f1, f2))
def annotations_remove_section(annotations, i, f):
"""
Remove section (i, f) from annotations and return result.
"""
new_annotations = {}
d = f - i
if not d:
return annotations
# relocate tags
new_tags = {}
for (t_i, t_f), anns in annotations['tags'].items():
n_i, n_f = cut_range(i, f, d, t_i, t_f)
if n_i is not None:
new_tags[(n_i, n_f)] = anns
new_annotations['tags'] = new_tags
# relocate links
new_links = {}
for href, (l_i, l_f, rel) in annotations['links'].items():
n_i, n_f = cut_range(i, f, d, l_i, l_f)
if n_i is not None:
new_links[href] = n_i, n_f, rel
# relocate semantic breaks and section_ids
semantic_breaks = annotations['semantic_breaks']
section_ids = annotations['section_ids']
new_semantic_breaks = {}
new_section_ids = {}
for pos in sorted(semantic_breaks.keys()):
level = semantic_breaks[pos]
if i <= pos and pos < f:
continue # discard
elif f <= pos:
new_semantic_breaks[pos - d] = level
if pos in section_ids:
new_section_ids[pos - d] = section_ids[pos]
else:
new_semantic_breaks[pos] = level
if pos in section_ids:
new_section_ids[pos] = section_ids[pos]
# collect and return results
new_annotations['semantic_breaks'] = new_semantic_breaks
new_annotations['section_ids'] = new_section_ids
new_annotations['links'] = new_links
return new_annotations
def cut_range(i, f, d, t_i, t_f):
"""
Return the new coordinates of a text range (t_i,t_f) after cutting (i,f).
If (t_i,t_f) is fully within (i,f), return None, None.
"""
if t_f < i:
return t_i, t_f
elif t_i < i <= t_f <= f:
return t_i, i
elif t_i < i and f <= t_f:
return t_i, t_f - d
elif i <= t_i and t_f <= f:
return None, None
elif i <= t_i <= f < t_f:
return i, t_f - d
else: # f < t_i
return t_i - d, t_f - d
def clean_annotations(annotations: dict) -> None:
"""
Remove void stuff from annotations.
"""
cleaned_tags = {}
for (i, f), anns in annotations['tags'].items():
if f > i and anns:
cleaned_tags[(i, f)] = anns
annotations['tags'] = cleaned_tags
def pack_annotations(annotations):
"""
Pack annotations to a special JSON string, reducing their volume a little.
"""
return json_dumps(
{
'tags': _pack_tags(annotations['tags']),
'semantic_breaks': ','.join(
[
f'{pos}:{level}'
for pos, level in annotations['semantic_breaks'].items()
]
),
'section_ids': annotations['section_ids'],
'links': annotations['links'],
}
)
def _pack_tags(tags: dict) -> str:
"""
Utility function for packing tag information into a string.
"""
res = ''
for (i, f), anns in tags.items():
if anns:
anns_ = ','.join([f'{tag}={sem}' for tag, sem in anns.items()])
res += f'{i}-{f}:{anns_}\n'
return res
def unpack_annotations(json_text: str) -> dict:
"""
Unpack tag information from a string.
"""
annotations = json_loads(json_text)
tags = {}
for line in annotations['tags'].split('\n'):
if line:
range_, anns_ = line.split(':')
i, f = range_.split('-')
i = int(i)
f = int(f)
anns = {}
if anns_:
for ann_ in anns_.split(','):
tag_, sem_ = ann_.split('=')
anns[tag_] = sem_
tags[(i, f)] = anns
semantic_breaks = {}
for sb_ in annotations['semantic_breaks'].split(','):
pos_, lvl_ = sb_.split(':')
semantic_breaks[int(pos_)] = int(lvl_)
return {
'tags': tags,
'semantic_breaks': semantic_breaks,
'section_ids': annotations['section_ids'],
'links': annotations['links'],
}