111 lines
3.4 KiB
Python
111 lines
3.4 KiB
Python
"""Load and sanitize the top-site seed list for first-launch indexing."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import csv
|
|
import io
|
|
import zipfile
|
|
from collections.abc import Iterable
|
|
from urllib.parse import urlsplit, urlunsplit
|
|
|
|
import httpx
|
|
|
|
from config import (
|
|
TOP_SITE_DOWNLOAD_TIMEOUT_SECONDS,
|
|
TOP_SITE_SEED_LIMIT,
|
|
TOP_SITE_SOURCE_URL,
|
|
TOP_SITES,
|
|
USER_AGENT,
|
|
)
|
|
from content_filter import is_adult_url
|
|
|
|
|
|
def _normalize_site_url(value: str) -> str | None:
|
|
raw_value = value.strip()
|
|
if not raw_value:
|
|
return None
|
|
|
|
candidate = raw_value if "://" in raw_value else f"https://{raw_value}"
|
|
parsed = urlsplit(candidate)
|
|
if parsed.scheme not in {"http", "https"} or not parsed.netloc:
|
|
return None
|
|
|
|
normalized = parsed._replace(
|
|
scheme=parsed.scheme.lower(),
|
|
netloc=parsed.netloc.lower(),
|
|
path=parsed.path.rstrip("/") if parsed.path not in {"", "/"} else "",
|
|
query="",
|
|
fragment="",
|
|
)
|
|
return urlunsplit(normalized)
|
|
|
|
|
|
def _host_key(url: str) -> str:
|
|
return urlsplit(url).netloc.lower().removeprefix("www.")
|
|
|
|
|
|
def _safe_top_urls(candidates: Iterable[str], limit: int = TOP_SITE_SEED_LIMIT) -> list[str]:
|
|
safe_urls: list[str] = []
|
|
seen_hosts: set[str] = set()
|
|
|
|
for candidate in candidates:
|
|
normalized = _normalize_site_url(candidate)
|
|
if normalized is None:
|
|
continue
|
|
host_key = _host_key(normalized)
|
|
if host_key in seen_hosts or is_adult_url(normalized):
|
|
continue
|
|
seen_hosts.add(host_key)
|
|
safe_urls.append(normalized)
|
|
if len(safe_urls) >= limit:
|
|
break
|
|
|
|
return safe_urls
|
|
|
|
|
|
def _domains_from_csv_text(csv_text: str) -> list[str]:
|
|
domains: list[str] = []
|
|
reader = csv.reader(io.StringIO(csv_text))
|
|
for row in reader:
|
|
if not row:
|
|
continue
|
|
domain = row[1] if len(row) > 1 else row[0]
|
|
if domain and domain.lower() != "domain":
|
|
domains.append(domain)
|
|
return domains
|
|
|
|
|
|
def _domains_from_zip(payload: bytes) -> list[str]:
|
|
with zipfile.ZipFile(io.BytesIO(payload)) as archive:
|
|
csv_name = next((name for name in archive.namelist() if name.endswith(".csv")), None)
|
|
if csv_name is None:
|
|
raise ValueError("Tranco archive did not contain a CSV file.")
|
|
with archive.open(csv_name) as csv_file:
|
|
text = csv_file.read().decode("utf-8", errors="replace")
|
|
return _domains_from_csv_text(text)
|
|
|
|
|
|
async def load_top_site_seed_urls(limit: int = TOP_SITE_SEED_LIMIT) -> tuple[list[str], str]:
|
|
"""Return the latest safe top-site URLs, falling back to the bundled list if needed."""
|
|
|
|
timeout = httpx.Timeout(TOP_SITE_DOWNLOAD_TIMEOUT_SECONDS)
|
|
headers = {"User-Agent": USER_AGENT}
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=timeout, follow_redirects=True, headers=headers) as client:
|
|
response = await client.get(TOP_SITE_SOURCE_URL)
|
|
response.raise_for_status()
|
|
|
|
if response.content.startswith(b"PK"):
|
|
candidates = _domains_from_zip(response.content)
|
|
else:
|
|
candidates = _domains_from_csv_text(response.text)
|
|
|
|
safe_urls = _safe_top_urls(candidates, limit=limit)
|
|
if safe_urls:
|
|
return safe_urls, TOP_SITE_SOURCE_URL
|
|
except Exception as exc:
|
|
print(f"sFetch: unable to load latest top-site list ({exc}); using bundled fallback.")
|
|
|
|
return _safe_top_urls(TOP_SITES, limit=limit), "bundled fallback list"
|