Files
Ned Halksworth e0f2eedcd9 inital commit
2026-05-04 19:31:46 +01:00

310 lines
11 KiB
Python

"""Async web crawler used to build the sFetch index."""
from __future__ import annotations
import asyncio
from collections import defaultdict
from typing import Iterable
from urllib.parse import urljoin, urldefrag, urlsplit, urlunsplit
from urllib.robotparser import RobotFileParser
import httpx
from bs4 import BeautifulSoup
from config import (
CRAWL_DELAY_SECONDS,
DEFAULT_CRAWL_CONCURRENCY,
MAX_CRAWL_DEPTH,
MAX_PAGES_PER_DOMAIN,
USER_AGENT,
)
from content_filter import is_adult_text, is_adult_url
from indexer import index_page
class sFetchBot:
"""A polite async crawler that stays within configurable crawl limits and filters adult content."""
def __init__(
self,
max_depth: int = MAX_CRAWL_DEPTH,
same_domain_only: bool = True,
crawl_delay: float = CRAWL_DELAY_SECONDS,
max_pages_per_domain: int = MAX_PAGES_PER_DOMAIN,
max_concurrency: int = DEFAULT_CRAWL_CONCURRENCY,
timeout_seconds: float = 15.0,
) -> None:
self.max_depth = max_depth
self.same_domain_only = same_domain_only
self.crawl_delay = crawl_delay
self.max_pages_per_domain = max_pages_per_domain
self.max_concurrency = max(1, max_concurrency)
self.timeout_seconds = timeout_seconds
self.visited: set[str] = set()
self.domain_counts: defaultdict[str, int] = defaultdict(int)
self.robots_cache: dict[str, RobotFileParser] = {}
self.indexed_count = 0
self._state_lock = asyncio.Lock()
self._fetch_semaphore = asyncio.Semaphore(self.max_concurrency)
self._client: httpx.AsyncClient | None = None
async def start(self, seed_urls: list[str]) -> None:
if not seed_urls:
return
timeout = httpx.Timeout(self.timeout_seconds)
headers = {"User-Agent": USER_AGENT}
async with httpx.AsyncClient(
timeout=timeout,
follow_redirects=True,
headers=headers,
) as client:
self._client = client
tasks = []
for seed_url in seed_urls:
normalized_seed = self._normalize_url(seed_url)
if normalized_seed is None:
print(f"sFetch: skipped {seed_url} (invalid URL)")
continue
if is_adult_url(normalized_seed):
print(f"sFetch: skipped {seed_url} (adult content filtered)")
continue
root_domain = urlsplit(normalized_seed).netloc.lower()
tasks.append(self._crawl_url(normalized_seed, root_domain, depth=0))
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
self._client = None
async def _crawl_url(self, url: str, root_domain: str, depth: int) -> None:
try:
if depth > self.max_depth:
return
normalized_url = self._normalize_url(url)
if normalized_url is None:
return
if is_adult_url(normalized_url):
print(f"sFetch: skipped {normalized_url} (adult)")
return
parsed = urlsplit(normalized_url)
current_domain = parsed.netloc.lower()
if self.same_domain_only and current_domain != root_domain:
return
if await self._already_seen(normalized_url):
return
if await self._domain_limit_reached(current_domain):
return
if not await self._is_allowed_by_robots(normalized_url):
return
client = self._require_client()
async with self._fetch_semaphore:
await asyncio.sleep(self.crawl_delay)
response = await client.get(normalized_url)
response.raise_for_status()
content_type = response.headers.get("content-type", "").lower()
if "text/html" not in content_type:
return
title, body_text, links, images, videos = self._extract_page_content(normalized_url, response.text)
if is_adult_text(body_text):
print(f"sFetch: skipped {normalized_url} (adult text)")
return
await index_page(normalized_url, title, body_text, images, videos)
await self._increment_domain_count(current_domain)
self.indexed_count += 1
print(f"sFetch: indexed {normalized_url}")
for link in links:
await self._crawl_url(link, root_domain, depth + 1)
except httpx.HTTPError as exc:
print(f"sFetch: HTTP error {url} ({exc})")
except Exception as exc:
print(f"sFetch: error {url} ({exc})")
def _require_client(self) -> httpx.AsyncClient:
if self._client is None:
raise RuntimeError("Crawler client is not initialized.")
return self._client
async def _already_seen(self, url: str) -> bool:
async with self._state_lock:
if url in self.visited:
return True
self.visited.add(url)
return False
async def _domain_limit_reached(self, domain: str) -> bool:
async with self._state_lock:
return self.domain_counts[domain] >= self.max_pages_per_domain
async def _increment_domain_count(self, domain: str) -> None:
async with self._state_lock:
self.domain_counts[domain] += 1
async def _is_allowed_by_robots(self, url: str) -> bool:
parsed = urlsplit(url)
robots_key = f"{parsed.scheme}://{parsed.netloc.lower()}"
parser = self.robots_cache.get(robots_key)
if parser is None:
parser = await self._fetch_robots_parser(robots_key)
self.robots_cache[robots_key] = parser
return parser.can_fetch(USER_AGENT, url)
async def _fetch_robots_parser(self, domain_base: str) -> RobotFileParser:
parser = RobotFileParser()
robots_url = f"{domain_base}/robots.txt"
parser.set_url(robots_url)
try:
client = self._require_client()
response = await client.get(robots_url)
if response.status_code == 200:
parser.parse(response.text.splitlines())
else:
parser.parse([])
except Exception:
parser.parse([])
return parser
def _extract_page_content(
self,
url: str,
html_text: str,
) -> tuple[str, str, list[str], list[dict[str, str]], list[dict[str, str]]]:
soup = BeautifulSoup(html_text, "html.parser")
images = self._extract_images(url, soup)
videos = self._extract_videos(url, soup)
for element in soup(["script", "style", "noscript"]):
element.decompose()
title = ""
if soup.title and soup.title.string:
title = soup.title.string.strip()
if not title:
title = url
body_text = soup.get_text(separator=" ", strip=True)
links = self._extract_links(url, soup)
return title, body_text, links, images, videos
def _extract_images(self, base_url: str, soup: BeautifulSoup) -> list[dict[str, str]]:
images = []
for img in soup.find_all("img", src=True):
src = str(img["src"]).strip()
if not src or src.startswith(("data:", "javascript:")):
continue
absolute_url = urljoin(base_url, src)
normalized_url = self._normalize_url(absolute_url)
if normalized_url is not None:
alt = str(img.get("alt", "")).strip()
images.append({"url": normalized_url, "alt_text": alt})
return self._dedupe_media(images)
def _extract_videos(self, base_url: str, soup: BeautifulSoup) -> list[dict[str, str]]:
videos: list[dict[str, str]] = []
for video in soup.find_all("video"):
if video.get("src"):
normalized = self._normalize_url(urljoin(base_url, str(video["src"]).strip()))
if normalized:
title = str(video.get("title") or video.get("aria-label") or "").strip()
videos.append({"url": normalized, "title": title})
for source in video.find_all("source", src=True):
normalized = self._normalize_url(urljoin(base_url, str(source["src"]).strip()))
if normalized:
title = str(video.get("title") or video.get("aria-label") or "").strip()
videos.append({"url": normalized, "title": title})
for iframe in soup.find_all("iframe", src=True):
raw_src = str(iframe["src"]).strip()
normalized = self._normalize_url(urljoin(base_url, raw_src))
if normalized and self._is_video_url(normalized):
title = str(iframe.get("title") or iframe.get("aria-label") or "").strip()
videos.append({"url": normalized, "title": title})
for tag in soup.find_all("a", href=True):
raw_href = str(tag["href"]).strip()
normalized = self._normalize_url(urljoin(base_url, raw_href))
if normalized and self._is_video_url(normalized):
title = " ".join(tag.stripped_strings).strip()
videos.append({"url": normalized, "title": title})
return self._dedupe_media(videos)
def _is_video_url(self, url: str) -> bool:
lowered = url.lower()
return any(
marker in lowered
for marker in (
"youtube.com/watch",
"youtube.com/embed/",
"youtu.be/",
"vimeo.com/",
".mp4",
".webm",
".mov",
".m3u8",
)
)
def _dedupe_media(self, items: list[dict[str, str]]) -> list[dict[str, str]]:
seen: set[str] = set()
unique: list[dict[str, str]] = []
for item in items:
media_url = item.get("url")
if not media_url or media_url in seen:
continue
seen.add(media_url)
unique.append(item)
return unique
def _extract_links(self, base_url: str, soup: BeautifulSoup) -> list[str]:
collected_links: list[str] = []
for tag in soup.find_all("a", href=True):
href = str(tag["href"]).strip()
if not href or href.startswith(("javascript:", "mailto:", "tel:")):
continue
absolute_url = urljoin(base_url, href)
normalized_url = self._normalize_url(absolute_url)
if normalized_url is not None:
collected_links.append(normalized_url)
return self._dedupe_links(collected_links)
def _dedupe_links(self, links: Iterable[str]) -> list[str]:
seen: set[str] = set()
unique_links: list[str] = []
for link in links:
if link in seen:
continue
seen.add(link)
unique_links.append(link)
return unique_links
def _normalize_url(self, url: str) -> str | None:
if not url:
return None
clean_url, _ = urldefrag(url.strip())
parsed = urlsplit(clean_url)
if parsed.scheme not in {"http", "https"} or not parsed.netloc:
return None
normalized = parsed._replace(
scheme=parsed.scheme.lower(),
netloc=parsed.netloc.lower(),
)
return urlunsplit(normalized)