Compare commits

..

No commits in common. "main" and "a6af5b12d23650bb8942a562560f3ab8b535271e" have entirely different histories.

15 changed files with 88 additions and 284 deletions

View File

@ -69,9 +69,3 @@ source_suffix = {
'.rst': 'restructuredtext',
'.md': 'markdown',
}
html_logo = 'logo.svg'
html_theme_options = {
'style_nav_header_background': '#bd4',
}

View File

@ -38,7 +38,7 @@ crawl:
# Number of concurrent workers
# Default value: 10
# Allowed values: integer >=0 and <=1000
workers: 10
#workers: 3
# Delay in seconds between attempts to fetch items
# from site_queue if the last attempt gave no item
@ -70,22 +70,16 @@ crawl:
# Allowed values: positive number
#feed_crawl_interval: 86400
# Minimum length of the text (in characters) extracted from
# a resource; resources with shorter texts are not stored.
# Default value: 300
# Allowed values: positive number
#min_text_length: 300
# Parameters for access to the ElasticSearch service
# No default values; must be set.
elasticsearch:
# host on which ES is running
host: localhost
# API key for accessing ES
api_key: "____________________"
api_key: "**********************"
# API user id
id: "____________________"
# Index base name (full index names will have '_text_{language}' appended)
id: "**********************"
# Index base name (actual index names will have '_text' etc. appended)
index_base_name: atext
# Tensorflow access

View File

@ -12,11 +12,6 @@ pipenv install -d
## Configure the instance
See [installation](installation.md).
Finally also do
```
pre-commit install
```
## Run
```
python -m atextcrawler

View File

@ -5,11 +5,9 @@ The instructions below are for this system.
## System packages
```
apt install pandoc tidy python3-systemd openjdk-17-jre-headless
apt install protobuf-compiler libprotobuf-dev build-essential libpython3-dev
apt install pandoc tidy python3-systemd protobuf-compiler libprotobuf-dev
```
Java is needed for tika.
The second line is required for python package gcld3 (see below).
The protobuf packages are required for python package gcld3 (see below).
## PostgreSQL database
We need access to a PostgreSQL database. Install PostgreSQL or provide connectivity to a PostgreSQL database over TCP/IP. Create a new database:
@ -24,11 +22,6 @@ Note: TLS is not yet supported, so install this service locally.
See [elasticsearch howto](elasticsearch.md).
Create an API key (using the password for user elastic):
```
http --auth elastic:******************* -j POST http://127.0.0.1:9200/_security/api_key name=atext role_descriptors:='{"atext": {"cluster": [], "index": [{"names": ["atext_*"], "privileges": ["all"]}]}}'
```
## Tensorflow model server
We need access to a tensorflow model server (over TCP/IP).
It should serve `universal_sentence_encoder_multilingual`
@ -47,19 +40,18 @@ cat >>.bashrc <<EOF
export PYTHONPATH=\$HOME/repo/src
EOF
pip3 install --user pipenv
mkdir repo
cat >>.profile <<EOF
PYTHONPATH=\$HOME/repo/src
PATH=\$HOME/.local/bin:$PATH
cd repo
\$HOME/.local/bin/pipenv shell
EOF
exit
su - atextcrawler
rm Pipfile
git clone https://gitea.multiname.org/a-text/atextcrawler.git $HOME/repo
virtualenv --system-site-packages `pipenv --venv` # for systemd
git clone https://gitea.multiname.org/a-text/atextcrawler.git repo
cd repo
pipenv sync
pipenv install --site-packages # for systemd
pre-commit install
```
Note: One of the dependencies, Python package `tldextract`,
@ -71,7 +63,7 @@ $HOME/.cache/python-tldextract/
## Configure atextcrawler
As user `atextcrawler` execute
```
mkdir -p $HOME/.config
mkdir $HOME/.config
cp -r $HOME/repo/doc/source/config_template $HOME/.config/atextcrawler
```
@ -80,7 +72,7 @@ Edit `$HOME/.config/atextcrawler/main.yaml`.
If you want to override a plugin, copy it to the plugins directory
and edit it, e.g.
```
cp $HOME/repo/doc/source/config_template/plugins/filter_site.py $HOME/.config/atextcrawler/plugins
cp /srv/atextcrawler/repo/src/atextcrawler/plugin_defaults/filter_site.py $HOME/.config/plugins
```
Optionally edit `$HOME/.config/atextcrawler/initial_data/seed_urls.list`.
@ -95,12 +87,7 @@ To see if it works, run `atextcrawler` from the command line:
```
python -m atextcrawler
```
You can follow the log with:
```
journalctl -ef SYSLOG_IDENTIFIER=atextcrawler
```
You can stop with `Ctrl-C`; stopping may take a few seconds or even minutes.
You can stop it with `Ctrl-C`; stopping may take a few seconds or even minutes.
## Install systemd service
To make the service persistent, create a systemd unit file
@ -109,8 +96,8 @@ To make the service persistent, create a systemd unit file
[Unit]
Description=atextcrawler web crawler
Documentation=https://gitea.multiname.org/a-text/atextcrawler
Requires=network.target elasticsearch.service tensorflow.service
After=network-online.target elasticsearch.service tensorflow.service
Requires=network.target
After=network-online.target
[Service]
Type=simple
@ -121,7 +108,7 @@ Environment=PYTHONPATH=/srv/atextcrawler/repo/src
ExecStart=/srv/atextcrawler/.local/bin/pipenv run python -m atextcrawler
TimeoutStartSec=30
ExecStop=/bin/kill -INT $MAINPID
TimeoutStopSec=300
TimeoutStopSec=180
Restart=on-failure
[Install]
@ -133,7 +120,3 @@ systemctl daemon-reload
systemctl enable atextcrawler
systemctl start atextcrawler
```
Then follow the log with:
```
journalctl -efu atextcrawler
```

View File

@ -1,9 +0,0 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN" "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd">
<svg width="80" height="80" viewBox="0 0 1000 800" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink">
<line x1="500" y1="50" x2="360" y2="500" stroke="black" stroke-width="50" />
<line x1="500" y1="50" x2="640" y2="500" stroke="black" stroke-width="50" />
<line x1="400" y1="330" x2="600" y2="330" stroke="black" stroke-width="50" />
<circle cx="500" cy="300" r="250" stroke="grey" fill="none" stroke-width="50" />
<line x1="50" y1="750" x2="330" y2="470" stroke="grey" stroke-width="80" />
</svg>

Before

Width:  |  Height:  |  Size: 690 B

View File

@ -278,7 +278,6 @@ schema_crawl = Schema(
Required('resource_delay', default=5): positive_number,
Required('full_crawl_interval', default=864000): positive_number,
Required('feed_crawl_interval', default=86400): positive_number,
Required('min_text_length', default=300): positive_number,
}
)

View File

@ -7,9 +7,9 @@ from datetime import datetime
import aiohttp
from ..models import Crawl
from ..resource import ResourceFetcher, get_site_path, process_site_path
from ..site import (
from .models import Crawl
from .resource import ResourceFetcher, get_site_path, process_site_path
from .site import (
RobotsInfo,
checkin_site,
checkout_site,
@ -17,7 +17,7 @@ from ..site import (
process_site,
update_site,
)
from ..tensorflow import TensorFlow
from .tensorflow import TensorFlow
logger = logging.getLogger(__name__)
@ -31,7 +31,6 @@ class CrawlWorker:
self.app = app
self.worker_number = worker_number
self.pool = pool
self.tf_config = self.app.config['tensorflow']
self.site_delay = self.app.config['crawl']['site_delay']
self.resource_delay = self.app.config['crawl']['resource_delay']
self.site = None
@ -53,7 +52,7 @@ class CrawlWorker:
self.conn = await self.pool.acquire()
self.session = aiohttp.ClientSession()
self.fetcher = ResourceFetcher(self.session)
self.tf = TensorFlow(self.tf_config, self.session)
self.tf = TensorFlow(self.app, self.session)
async def shutdown(self):
"""
@ -72,8 +71,7 @@ class CrawlWorker:
"""
await self.app.sleep(2)
while self.app.running and self.running:
self.site, is_full, more = await checkout_site(
self.app.config, self.conn)
self.site, is_full, more = await checkout_site(self.app, self.conn)
if not self.site:
msg = f'Worker {self.worker_number}: sites exhausted'
logger.debug(msg)

View File

@ -1,138 +0,0 @@
"""
Run a crawl for a specifiv base_url. Use only on dev instance!
"""
import asyncio
import logging
import sys
import aiohttp
from ..config import Config
from ..db import PGPool
from ..models import Site, SitePath
from ..resource import ResourceFetcher, get_site_path, process_site_path
from ..search import shutdown_engine, startup_engine
from ..tensorflow import TensorFlow
from ..utils.similarity import get_simhash_index
from . import get_or_create_crawl
logger = logging.getLogger()
logger.setLevel(logging.WARNING)
#logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler())
async def run():
"""
Run a full/feed crawl a website with given base_url, or just a path.
The 3rd argument (path) is optional.
"""
config = Config().get()
pgpool = await PGPool(config['postgresql'])
pool=pgpool.pool
try:
crawl_type = sys.argv[1]
if crawl_type not in ('full', 'feed'):
logger.error('First argument must be "full" or "feed".')
base_url = sys.argv[2]
except:
msg = (
'Please give two arguments:'
' 1) crawl type ("full" or "feed"),'
' 2) the base URL of the site to crawl'
)
logger.error(msg)
sys.exit(2)
if len(sys.argv) > 3:
path = sys.argv[3]
else:
path = None
# find site
async with pool.acquire() as conn:
sql = 'select id from site where base_url=$1'
site_id = await conn.fetchval(sql, base_url)
if site_id:
site = await Site().load(conn, site_id)
logger.warning(f'site_id: {site.id_}')
logger.warning(f'crawl_enabled: {site.crawl_enabled}')
site.simhash_index = await get_simhash_index(conn, site_id)
else:
logger.warning('Site not found')
if site_id:
if site.crawl_enabled:
await run_crawl(config, pool, site, crawl_type, path)
else:
logger.warning('Site has crawl_enabled=false')
# shutdown
await pgpool.shutdown()
class AppMock:
def __init__(self, config, search_engine):
self.config = config
self.search_engine = search_engine
class DummyModule:
def rp_filter(self, site, durl):
return durl.pwa()
self.plugins = {'filter_resource_path': DummyModule()}
async def run_crawl(config, pool, site, crawl_type, path):
session = aiohttp.ClientSession()
fetcher = ResourceFetcher(session)
tf = TensorFlow(config['tensorflow'], session)
search_engine = await startup_engine(config)
app = AppMock(config, search_engine)
async with pool.acquire() as conn:
is_full = crawl_type == 'full'
crawl = await get_or_create_crawl(conn, site.id_, is_full=is_full)
logger.warning(crawl)
if path:
sql = "SELECT * FROM site_path WHERE site_id=$1 AND path=$2"
row = await conn.fetchrow(sql, site.id_, path)
if row:
site_path = await SitePath().load_from_row(row)
await process_site_path(
app,
999,
conn,
fetcher,
tf,
site,
site_path,
)
else:
logger.error('Path does not exist in table site_path')
else:
while True:
site_path = await get_site_path(
conn,
site,
crawl.t_begin,
only_new=not crawl.is_full,
)
if not site_path:
logger.warning('Paths exhausted.')
break
logger.warning(site_path)
is_new_resource = await process_site_path(
app,
999,
conn,
fetcher,
tf,
site,
site_path,
)
logger.warning(f'Is new: {is_new_resource}')
await shutdown_engine(search_engine)
if __name__ == '__main__':
asyncio.run(run())

View File

@ -231,8 +231,6 @@ COMMENT ON COLUMN site_path.path IS 'Path'
----
COMMENT ON COLUMN site_path.last_visit IS 'Time of last retrieval of the resource; null before first retrival'
----
COMMENT ON COLUMN site_path.filtered IS 'Whether the path was filtered'
----
COMMENT ON COLUMN site_path.ok_count IS 'Increased by 1 for every successful retrieval of the resource and decreased by 1 for every failed'
----
COMMENT ON COLUMN site_path.canonical IS 'Whether the path is the canonical one for the resource; null before first retrival'

View File

@ -76,8 +76,8 @@ async def run():
if isinstance(resource, TextResource):
logger.warning(repr(resource))
logger.warning(f'Language: {resource.lang}')
logger.warning(pformat(resource.search_fields, width=180))
logger.warning(pformat(resource.init_fields, width=180))
logger.warning(pformat(resource.search_fields))
logger.warning(pformat(resource.init_fields))
# annotations = resource.search_fields.get('annotations')
# text = resource.search_fields['text']

View File

@ -99,19 +99,14 @@ async def get_site_path(
Return the next path of a given site that needs to be processed.
If none needs to be processed, return None.
In particular, for sites having crawl_enabled=false return None.
Only return paths that have last been visited before *before*
or not been processed at all. Paths with an ok_count of -3 or lower
or not been processed at all. Paths with a ok_count of -3 or lower
are dropped.
If *only_new*, limit to paths that have not been processed at all,
irrespective of the value of *before*.
"""
sql = "SELECT crawl_enabled FROM site WHERE id=$1"
crawl_enabled = await conn.fetchval(sql, site.id_)
if not crawl_enabled:
return None
if only_new:
sql = (
"SELECT * FROM site_path"
@ -143,7 +138,7 @@ async def process_site_path(
"""
Fetch a path, deduplicate and if canonical, update and index the resource.
Return whether a new resource was handled that should contribute to
Return whether a new resource was handled that should contribute be
statistics.
"""
msg = (
@ -200,37 +195,34 @@ async def process_site_path(
site.base_url, res_sitemap.urls
)
await add_site_paths(conn, site.id_, paths)
resource_id = None
is_new_resource = False
else: # handle TextResource
resource_id, is_new_resource = await _handle_text_resource(
app, conn, tf, site, site_path, resource, url
)
site_path.canonical = resource.init_fields.get('canonical')
if shortlink_url := resource.init_fields.get('shortlink'):
await _save_shortlink(
conn,
site,
url,
resource_id,
shortlink_url,
site_path.last_visit,
)
site_path.resource_id = resource_id
return False
# handle TextResource
relevant, is_new_resource = await _handle_text_resource(
app, conn, tf, site, site_path, resource, url
)
if not relevant:
return False
site_path.resource_id = resource.id_
site_path.canonical = resource.init_fields.get('canonical')
site_path.ok_count += 1
await site_path.save(conn)
if shortlink_url := resource.init_fields.get('shortlink'):
await _save_shortlink(
conn, site, url, resource, shortlink_url, site_path.last_visit
)
return is_new_resource
async def _handle_text_resource(
app, conn, tf, site, site_path, resource, url
) -> tuple[Optional[int], bool]:
) -> tuple[bool, bool]:
"""
Ingest a text resource returning the id of the possibly merged resource.
Ingest a text resource.
Return the id of the merged resource (or None if the incoming resource
has a too short text and is not worth storing a resource) and
whether the resource is new (False if the returned resource_id is None).
Return whether the resource is relevant and whether it is new.
"""
# save the resource's internal links
paths = []
@ -244,26 +236,24 @@ async def _handle_text_resource(
# find resources similar to the current text
text = resource.search_fields['text']
# discard resources with too short texts
if len(text) < app.config['crawl']['min_text_length']:
await site_path.unlink_resource(
conn,
app.search_engine,
app.config['elasticsearch']['index_base_name'],
)
if len(text) < 300: # discard resources with too short texts
site_path.resource_id = None
await site_path.save(conn)
return None, False
return False, False
simhash = simhash_from_bigint(resource.simhash)
index = site.simhash_index
similar_ids = search_simhash(index, simhash)
# determine the destination resource and resources to be merged into it
old_id = site_path.resource_id
if old_id and old_id in similar_ids:
merge_ids = similar_ids
dest_resource = await TextResource().load(conn, old_id)
if (
old_id
and old_id in similar_ids
and ( # similar to old text
dest_resource := await TextResource().load(conn, old_id)
)
):
merge_ids = list(filter(lambda elem: elem != old_id, similar_ids))
else: # no old text, or old text not similar any more
if old_id:
await site_path.unlink_resource(
@ -300,34 +290,36 @@ async def _handle_text_resource(
create_simhash(index, resource.id_, simhash)
# add resource to search index
await index_resource(
app.search_engine,
tf,
site_path,
resource,
site.base_url,
url,
)
# replace references to any merge resource with links to the dest resource
sql = "UPDATE site_path SET resource_id=$1 WHERE resource_id=ANY($2)"
await conn.execute(sql, resource.id_, merge_ids)
# remove orphaned resources after merging
sql = "DELETE FROM resource WHERE id=ANY($1) RETURNING (id, lang)"
rows = await conn.fetch(sql, set(merge_ids) - set([resource.id_]))
for row in rows:
await delete_resource(
if resource.content_type in ('html', 'plain'):
await index_resource(
app.search_engine,
row['row'][1],
row['row'][0],
tf,
site_path,
resource,
site.base_url,
url,
)
return resource.id_, is_new_resource
# merge resources: merge_ids -> resource
for merge_id in merge_ids:
# replace links to the merge resource with links to the dest resource
sql = "UPDATE site_path SET resource_id=$1 WHERE resource_id=$2"
await conn.execute(sql, resource.id_ or None, merge_id)
# remove orphaned merge resource
sql = "DELETE FROM resource WHERE id=$1 RETURNING (true, lang)"
found = await conn.fetchval(sql, merge_id)
if found:
await delete_resource(
app.search_engine,
found[1],
merge_id,
)
return True, is_new_resource
async def _save_shortlink(
conn, site, url, resource_id, shortlink_url, last_visit
conn, site, url, resource, shortlink_url, last_visit
):
"""
Save a shortlink.
@ -345,11 +337,11 @@ async def _save_shortlink(
last_visit=last_visit,
ok_count=1,
canonical=False,
resource_id=resource_id,
resource_id=resource.id_,
)
else:
shortlink.last_visit = last_visit
shortlink.ok_count += 1
shortlink.canonical = False
shortlink.resource_id = resource_id
shortlink.resource_id = resource.id_
await shortlink.save(conn)

View File

@ -53,7 +53,6 @@ properties = {
'time_horizon': {'type': 'keyword'},
'orig_source': {'type': 'text'},
'topics': {'type': 'text'},
'length': {'type': 'integer'},
'annotations': {'type': 'text', 'index': False},
'sections': {
'type': 'nested',
@ -180,7 +179,6 @@ async def index_resource(
'time_horizon': resource.search_fields.get('time_horizon'),
'orig_source': resource.search_fields.get('orig_source'),
'topics': resource.search_fields.get('topics'),
'length': len(text),
'annotations': pack_annotations(annotations),
'sections': sections,
}

View File

@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
async def checkout_site(
config, conn: Connection
app, conn: Connection
) -> tuple[Optional[int], bool, bool]:
"""
Get the id of a site to be crawled and mark it with crawl_active=true.
@ -57,8 +57,8 @@ async def checkout_site(
return site, is_full, True
else:
# site not available; schedule next crawl
int_full = config['crawl']['full_crawl_interval']
int_feed = config['crawl']['feed_crawl_interval']
int_full = app.config['crawl']['full_crawl_interval']
int_feed = app.config['crawl']['feed_crawl_interval']
now = datetime.utcnow()
t_full = now + timedelta(seconds=int_full)
t_feed = now + timedelta(seconds=int_full + int_feed)

View File

@ -48,7 +48,7 @@ async def parse_startpage(
# feeds
feeds = meta_links['feeds']
if 'wordpress' in (meta.get('generator') or '').lower():
if 'wordpress' in meta.get('generator', '').lower():
url = durl.site() + 'feed/'
feeds[url] = 'application/rss+xml'
# TODO later: maybe also probe other possible feed paths 'rss', 'rss/'

View File

@ -17,12 +17,12 @@ class TensorFlow:
def __init__(
self,
tf_config,
app,
session: aiohttp.ClientSession,
timeout_sock_connect: Union[int, float] = 0.5,
timeout_sock_read: Union[int, float] = 10,
):
self.config = tf_config
self.config = app.config['tensorflow']
self.session = session
self.timeout = aiohttp.ClientTimeout(
sock_connect=timeout_sock_connect, sock_read=timeout_sock_read