310 lines
11 KiB
Python
310 lines
11 KiB
Python
"""Async web crawler used to build the sFetch index."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from collections import defaultdict
|
|
from typing import Iterable
|
|
from urllib.parse import urljoin, urldefrag, urlsplit, urlunsplit
|
|
from urllib.robotparser import RobotFileParser
|
|
|
|
import httpx
|
|
from bs4 import BeautifulSoup
|
|
|
|
from config import (
|
|
CRAWL_DELAY_SECONDS,
|
|
DEFAULT_CRAWL_CONCURRENCY,
|
|
MAX_CRAWL_DEPTH,
|
|
MAX_PAGES_PER_DOMAIN,
|
|
USER_AGENT,
|
|
)
|
|
from content_filter import is_adult_text, is_adult_url
|
|
from indexer import index_page
|
|
|
|
|
|
class sFetchBot:
|
|
"""A polite async crawler that stays within configurable crawl limits and filters adult content."""
|
|
|
|
def __init__(
|
|
self,
|
|
max_depth: int = MAX_CRAWL_DEPTH,
|
|
same_domain_only: bool = True,
|
|
crawl_delay: float = CRAWL_DELAY_SECONDS,
|
|
max_pages_per_domain: int = MAX_PAGES_PER_DOMAIN,
|
|
max_concurrency: int = DEFAULT_CRAWL_CONCURRENCY,
|
|
timeout_seconds: float = 15.0,
|
|
) -> None:
|
|
self.max_depth = max_depth
|
|
self.same_domain_only = same_domain_only
|
|
self.crawl_delay = crawl_delay
|
|
self.max_pages_per_domain = max_pages_per_domain
|
|
self.max_concurrency = max(1, max_concurrency)
|
|
self.timeout_seconds = timeout_seconds
|
|
self.visited: set[str] = set()
|
|
self.domain_counts: defaultdict[str, int] = defaultdict(int)
|
|
self.robots_cache: dict[str, RobotFileParser] = {}
|
|
self.indexed_count = 0
|
|
self._state_lock = asyncio.Lock()
|
|
self._fetch_semaphore = asyncio.Semaphore(self.max_concurrency)
|
|
self._client: httpx.AsyncClient | None = None
|
|
|
|
async def start(self, seed_urls: list[str]) -> None:
|
|
if not seed_urls:
|
|
return
|
|
|
|
timeout = httpx.Timeout(self.timeout_seconds)
|
|
headers = {"User-Agent": USER_AGENT}
|
|
async with httpx.AsyncClient(
|
|
timeout=timeout,
|
|
follow_redirects=True,
|
|
headers=headers,
|
|
) as client:
|
|
self._client = client
|
|
tasks = []
|
|
for seed_url in seed_urls:
|
|
normalized_seed = self._normalize_url(seed_url)
|
|
if normalized_seed is None:
|
|
print(f"sFetch: skipped {seed_url} (invalid URL)")
|
|
continue
|
|
if is_adult_url(normalized_seed):
|
|
print(f"sFetch: skipped {seed_url} (adult content filtered)")
|
|
continue
|
|
root_domain = urlsplit(normalized_seed).netloc.lower()
|
|
tasks.append(self._crawl_url(normalized_seed, root_domain, depth=0))
|
|
|
|
if tasks:
|
|
await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
self._client = None
|
|
|
|
async def _crawl_url(self, url: str, root_domain: str, depth: int) -> None:
|
|
try:
|
|
if depth > self.max_depth:
|
|
return
|
|
|
|
normalized_url = self._normalize_url(url)
|
|
if normalized_url is None:
|
|
return
|
|
|
|
if is_adult_url(normalized_url):
|
|
print(f"sFetch: skipped {normalized_url} (adult)")
|
|
return
|
|
|
|
parsed = urlsplit(normalized_url)
|
|
current_domain = parsed.netloc.lower()
|
|
if self.same_domain_only and current_domain != root_domain:
|
|
return
|
|
|
|
if await self._already_seen(normalized_url):
|
|
return
|
|
|
|
if await self._domain_limit_reached(current_domain):
|
|
return
|
|
|
|
if not await self._is_allowed_by_robots(normalized_url):
|
|
return
|
|
|
|
client = self._require_client()
|
|
async with self._fetch_semaphore:
|
|
await asyncio.sleep(self.crawl_delay)
|
|
response = await client.get(normalized_url)
|
|
response.raise_for_status()
|
|
|
|
content_type = response.headers.get("content-type", "").lower()
|
|
if "text/html" not in content_type:
|
|
return
|
|
|
|
title, body_text, links, images, videos = self._extract_page_content(normalized_url, response.text)
|
|
|
|
if is_adult_text(body_text):
|
|
print(f"sFetch: skipped {normalized_url} (adult text)")
|
|
return
|
|
|
|
await index_page(normalized_url, title, body_text, images, videos)
|
|
await self._increment_domain_count(current_domain)
|
|
self.indexed_count += 1
|
|
print(f"sFetch: indexed {normalized_url}")
|
|
|
|
for link in links:
|
|
await self._crawl_url(link, root_domain, depth + 1)
|
|
except httpx.HTTPError as exc:
|
|
print(f"sFetch: HTTP error {url} ({exc})")
|
|
except Exception as exc:
|
|
print(f"sFetch: error {url} ({exc})")
|
|
|
|
def _require_client(self) -> httpx.AsyncClient:
|
|
if self._client is None:
|
|
raise RuntimeError("Crawler client is not initialized.")
|
|
return self._client
|
|
|
|
async def _already_seen(self, url: str) -> bool:
|
|
async with self._state_lock:
|
|
if url in self.visited:
|
|
return True
|
|
self.visited.add(url)
|
|
return False
|
|
|
|
async def _domain_limit_reached(self, domain: str) -> bool:
|
|
async with self._state_lock:
|
|
return self.domain_counts[domain] >= self.max_pages_per_domain
|
|
|
|
async def _increment_domain_count(self, domain: str) -> None:
|
|
async with self._state_lock:
|
|
self.domain_counts[domain] += 1
|
|
|
|
async def _is_allowed_by_robots(self, url: str) -> bool:
|
|
parsed = urlsplit(url)
|
|
robots_key = f"{parsed.scheme}://{parsed.netloc.lower()}"
|
|
parser = self.robots_cache.get(robots_key)
|
|
if parser is None:
|
|
parser = await self._fetch_robots_parser(robots_key)
|
|
self.robots_cache[robots_key] = parser
|
|
return parser.can_fetch(USER_AGENT, url)
|
|
|
|
async def _fetch_robots_parser(self, domain_base: str) -> RobotFileParser:
|
|
parser = RobotFileParser()
|
|
robots_url = f"{domain_base}/robots.txt"
|
|
parser.set_url(robots_url)
|
|
|
|
try:
|
|
client = self._require_client()
|
|
response = await client.get(robots_url)
|
|
if response.status_code == 200:
|
|
parser.parse(response.text.splitlines())
|
|
else:
|
|
parser.parse([])
|
|
except Exception:
|
|
parser.parse([])
|
|
return parser
|
|
|
|
def _extract_page_content(
|
|
self,
|
|
url: str,
|
|
html_text: str,
|
|
) -> tuple[str, str, list[str], list[dict[str, str]], list[dict[str, str]]]:
|
|
soup = BeautifulSoup(html_text, "html.parser")
|
|
|
|
images = self._extract_images(url, soup)
|
|
videos = self._extract_videos(url, soup)
|
|
|
|
for element in soup(["script", "style", "noscript"]):
|
|
element.decompose()
|
|
|
|
title = ""
|
|
if soup.title and soup.title.string:
|
|
title = soup.title.string.strip()
|
|
if not title:
|
|
title = url
|
|
|
|
body_text = soup.get_text(separator=" ", strip=True)
|
|
links = self._extract_links(url, soup)
|
|
return title, body_text, links, images, videos
|
|
|
|
def _extract_images(self, base_url: str, soup: BeautifulSoup) -> list[dict[str, str]]:
|
|
images = []
|
|
for img in soup.find_all("img", src=True):
|
|
src = str(img["src"]).strip()
|
|
if not src or src.startswith(("data:", "javascript:")):
|
|
continue
|
|
absolute_url = urljoin(base_url, src)
|
|
normalized_url = self._normalize_url(absolute_url)
|
|
if normalized_url is not None:
|
|
alt = str(img.get("alt", "")).strip()
|
|
images.append({"url": normalized_url, "alt_text": alt})
|
|
return self._dedupe_media(images)
|
|
|
|
def _extract_videos(self, base_url: str, soup: BeautifulSoup) -> list[dict[str, str]]:
|
|
videos: list[dict[str, str]] = []
|
|
|
|
for video in soup.find_all("video"):
|
|
if video.get("src"):
|
|
normalized = self._normalize_url(urljoin(base_url, str(video["src"]).strip()))
|
|
if normalized:
|
|
title = str(video.get("title") or video.get("aria-label") or "").strip()
|
|
videos.append({"url": normalized, "title": title})
|
|
for source in video.find_all("source", src=True):
|
|
normalized = self._normalize_url(urljoin(base_url, str(source["src"]).strip()))
|
|
if normalized:
|
|
title = str(video.get("title") or video.get("aria-label") or "").strip()
|
|
videos.append({"url": normalized, "title": title})
|
|
|
|
for iframe in soup.find_all("iframe", src=True):
|
|
raw_src = str(iframe["src"]).strip()
|
|
normalized = self._normalize_url(urljoin(base_url, raw_src))
|
|
if normalized and self._is_video_url(normalized):
|
|
title = str(iframe.get("title") or iframe.get("aria-label") or "").strip()
|
|
videos.append({"url": normalized, "title": title})
|
|
|
|
for tag in soup.find_all("a", href=True):
|
|
raw_href = str(tag["href"]).strip()
|
|
normalized = self._normalize_url(urljoin(base_url, raw_href))
|
|
if normalized and self._is_video_url(normalized):
|
|
title = " ".join(tag.stripped_strings).strip()
|
|
videos.append({"url": normalized, "title": title})
|
|
|
|
return self._dedupe_media(videos)
|
|
|
|
def _is_video_url(self, url: str) -> bool:
|
|
lowered = url.lower()
|
|
return any(
|
|
marker in lowered
|
|
for marker in (
|
|
"youtube.com/watch",
|
|
"youtube.com/embed/",
|
|
"youtu.be/",
|
|
"vimeo.com/",
|
|
".mp4",
|
|
".webm",
|
|
".mov",
|
|
".m3u8",
|
|
)
|
|
)
|
|
|
|
def _dedupe_media(self, items: list[dict[str, str]]) -> list[dict[str, str]]:
|
|
seen: set[str] = set()
|
|
unique: list[dict[str, str]] = []
|
|
for item in items:
|
|
media_url = item.get("url")
|
|
if not media_url or media_url in seen:
|
|
continue
|
|
seen.add(media_url)
|
|
unique.append(item)
|
|
return unique
|
|
|
|
def _extract_links(self, base_url: str, soup: BeautifulSoup) -> list[str]:
|
|
collected_links: list[str] = []
|
|
for tag in soup.find_all("a", href=True):
|
|
href = str(tag["href"]).strip()
|
|
if not href or href.startswith(("javascript:", "mailto:", "tel:")):
|
|
continue
|
|
absolute_url = urljoin(base_url, href)
|
|
normalized_url = self._normalize_url(absolute_url)
|
|
if normalized_url is not None:
|
|
collected_links.append(normalized_url)
|
|
return self._dedupe_links(collected_links)
|
|
|
|
def _dedupe_links(self, links: Iterable[str]) -> list[str]:
|
|
seen: set[str] = set()
|
|
unique_links: list[str] = []
|
|
for link in links:
|
|
if link in seen:
|
|
continue
|
|
seen.add(link)
|
|
unique_links.append(link)
|
|
return unique_links
|
|
|
|
def _normalize_url(self, url: str) -> str | None:
|
|
if not url:
|
|
return None
|
|
|
|
clean_url, _ = urldefrag(url.strip())
|
|
parsed = urlsplit(clean_url)
|
|
if parsed.scheme not in {"http", "https"} or not parsed.netloc:
|
|
return None
|
|
|
|
normalized = parsed._replace(
|
|
scheme=parsed.scheme.lower(),
|
|
netloc=parsed.netloc.lower(),
|
|
)
|
|
return urlunsplit(normalized)
|