atextcrawler/src/atextcrawler/search/engine.py

273 lines
7.7 KiB
Python

"""
Search engine, for now elasticsearch.
We have one index per supported language and a default one.
"""
import logging
import warnings
from difflib import SequenceMatcher
from typing import Union
from elasticsearch import AsyncElasticsearch
from elasticsearch.exceptions import NotFoundError
from ..utils.annotation import pack_annotations
from ..utils.section import concat_section_texts
logger = logging.getLogger(__name__)
warnings.filterwarnings(
'ignore',
'The client is unable to verify that the'
' server is Elasticsearch due security privileges on the server side',
)
MIN_INDEXING_TIMEOUT_SECONDS = 5
language_analyzers = {
'en': 'english',
'de': 'german',
#'fr': 'french',
#'el': 'greek',
#'es': 'spanish',
'default': 'standard',
}
properties = {
'resource_id': {'type': 'long'},
'site_id': {'type': 'long'},
'url': {'type': 'text'},
'base_url': {'type': 'text'},
'pub_date': {'type': 'date', 'format': 'yyyy-MM-dd||yyyy-MM||yyyy'},
'lang': {'type': 'keyword'},
'title': {'type': 'text'},
'authors': {'type': 'text'},
'summary': {'type': 'text'},
'keywords': {'type': 'text'},
'collections': {'type': 'keyword'},
'time_horizon': {'type': 'keyword'},
'orig_source': {'type': 'text'},
'topics': {'type': 'text'},
'length': {'type': 'integer'},
'annotations': {'type': 'text', 'index': False},
'sections': {
'type': 'nested',
'properties': {
'start_ids': {'type': 'integer'},
'end_ids': {'type': 'integer'},
'text': {'type': 'text', 'index_options': 'offsets'},
'embedding': {'type': 'dense_vector', 'dims': 512},
},
},
}
async def startup_engine(config):
"""
Open the search engine for access.
"""
engine = AsyncElasticsearch(
host=config['elasticsearch']['host'],
api_key=(
config['elasticsearch']['id'],
config['elasticsearch']['api_key'],
),
use_ssl=False,
timeout=20,
)
engine.index_base_name = config['elasticsearch']['index_base_name']
await create_indices(engine)
await open_indices(engine)
return engine
async def create_indices(engine):
"""
Create indices for all configured langiages.
"""
for lang, analyzer in language_analyzers.items():
index_name = engine.index_base_name + '_text_' + lang
if not await engine.indices.exists(index=index_name):
await engine.indices.create(index=index_name)
await engine.indices.close(index=index_name)
await engine.indices.put_settings(
index=index_name,
body={
'analysis': {'analyzer': {'default': {'type': analyzer}}},
'refresh_interval': '60s',
},
)
await engine.indices.put_mapping(
index=index_name,
body={'properties': properties},
)
async def open_indices(engine):
"""
Open indices for all configure languages.
"""
for lang in language_analyzers.keys():
index_name = engine.index_base_name + '_text_' + lang
await engine.indices.open(index=index_name)
async def shutdown_engine(engine):
"""
Close the connection to the search engine.
"""
# await close_indices(engine)
await engine.close()
async def close_indices(engine):
"""
Close indices. UNUSED.
"""
for lang in language_analyzers.keys():
index_name = engine.index_base_name + '_text_' + lang
await engine.indices.close(index=index_name)
async def index_resource(
engine,
tf,
site_path,
resource,
base_url,
url,
):
"""
Index a resource.
"""
lang = resource.lang
index_lang = lang if lang in language_analyzers.keys() else 'default'
index_name = engine.index_base_name + '_text_' + index_lang
pub_date = resource.search_fields.get('pub_date')
if pub_date:
pub_date = str(pub_date.date())
text = resource.search_fields.get('text')
annotations = resource.search_fields.get('annotations')
semantic_breaks = annotations['semantic_breaks']
sections = []
for section_ids, txt in concat_section_texts(text, semantic_breaks):
embedding = await tf.embed(txt)
sections.append(
{
'start_ids': section_ids[0],
'end_ids': section_ids[-1],
'text': txt,
'embedding': embedding,
}
)
doc = {
'resource_id': resource.id_,
'site_id': site_path.site_id,
'url': url,
'base_url': base_url,
'pub_date': pub_date,
'lang': resource.lang,
'title': resource.search_fields.get('title'),
'authors': resource.search_fields.get('authors'),
'summary': resource.search_fields.get('summary'),
'keywords': resource.search_fields.get('keywords'),
'collections': resource.search_fields.get('collections'),
'time_horizon': resource.search_fields.get('time_horizon'),
'orig_source': resource.search_fields.get('orig_source'),
'topics': resource.search_fields.get('topics'),
'length': len(text),
'annotations': pack_annotations(annotations),
'sections': sections,
}
timeout_seconds = max(MIN_INDEXING_TIMEOUT_SECONDS, int(len(text) / 1000))
await engine.index(
id=resource.id_,
index=index_name,
body=doc,
timeout=f'{timeout_seconds}s',
)
async def delete_resource(engine, lang, resource_id):
"""
Delete a resource.
"""
index_name = engine.index_base_name + '_text_' + (lang or 'default')
try:
await engine.delete(index_name, resource_id)
except NotFoundError:
msg = f'Cannot delete resource from index, not found: {resource_id}'
logger.warning(msg)
async def find_duplicate(engine, site_id, resource) -> Union[bool, None, int]:
"""
UNUSED.
Try to find a duplicate resource with matching site.
If the search backend query fails, return False.
If no matching resource was found, return None.
If a matching resource was found, return its id.
"""
# get sample texts
text = resource.search_fields['text']
if not text or len(text) < 100:
return None
# annotations = resource.search_fields['annotations']
# semantic_breaks = annotations['semantic_breaks']
# texts = []
# for _, txt in concat_section_texts(text, semantic_breaks):
# texts.append(txt)
# texts = extract_samples(texts)
# # search for sample texts
# text_count = len(texts)
# should_min = max(1, int(0.6 * text_count))
# should = []
# for text in texts:
# should.append({'match': {'sections.text': text}})
query = {
'bool': {
'must': {
'nested': {
'path': 'sections',
'query': {'match': {'sections.text': text}},
},
},
'filter': {
'term': {
'site_id': site_id,
},
},
}
}
fields = [
'url',
'sections.text',
'site_id',
]
response = await engine.search(
index=engine.index_base_name + '_text_*',
body={
'query': query,
'fields': fields,
'from': 0,
'size': 3,
'_source': False,
},
)
if response['timed_out']:
return False
for hit in response.get('hits', {}).get('hits'):
txt = ' '.join(hit['fields']['sections.text'])
similarity = SequenceMatcher(None, text, txt).ratio()
if similarity > 0.99:
return hit['_id']
return None