"""Async SQLite helpers for sFetch's crawl index.""" from __future__ import annotations from contextlib import asynccontextmanager from typing import Any, AsyncIterator import aiosqlite from config import DB_PATH @asynccontextmanager async def _get_connection() -> AsyncIterator[aiosqlite.Connection]: async with aiosqlite.connect(DB_PATH) as connection: connection.row_factory = aiosqlite.Row await connection.execute("PRAGMA foreign_keys = ON;") await connection.execute("PRAGMA journal_mode = WAL;") yield connection def _to_fts_query(query: str) -> str: tokens: list[str] = [] for raw_token in query.split(): token = raw_token.strip() if not token: continue escaped = token.replace('"', '""') tokens.append(f'"{escaped}"') return " OR ".join(tokens) async def init_db() -> None: async with _get_connection() as connection: await connection.executescript( """ CREATE TABLE IF NOT EXISTS pages ( id INTEGER PRIMARY KEY AUTOINCREMENT, url TEXT UNIQUE NOT NULL, title TEXT, body_text TEXT, indexed_at DATETIME DEFAULT CURRENT_TIMESTAMP ); CREATE VIRTUAL TABLE IF NOT EXISTS pages_fts USING fts5(title, body_text, content='pages', content_rowid='id'); CREATE TRIGGER IF NOT EXISTS pages_ai AFTER INSERT ON pages BEGIN INSERT INTO pages_fts(rowid, title, body_text) VALUES (new.id, new.title, new.body_text); END; CREATE TRIGGER IF NOT EXISTS pages_ad AFTER DELETE ON pages BEGIN INSERT INTO pages_fts(pages_fts, rowid, title, body_text) VALUES ('delete', old.id, old.title, old.body_text); END; CREATE TRIGGER IF NOT EXISTS pages_au AFTER UPDATE ON pages BEGIN INSERT INTO pages_fts(pages_fts, rowid, title, body_text) VALUES ('delete', old.id, old.title, old.body_text); INSERT INTO pages_fts(rowid, title, body_text) VALUES (new.id, new.title, new.body_text); END; CREATE TABLE IF NOT EXISTS images ( id INTEGER PRIMARY KEY AUTOINCREMENT, url TEXT UNIQUE NOT NULL, page_url TEXT NOT NULL, alt_text TEXT, indexed_at DATETIME DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY(page_url) REFERENCES pages(url) ON DELETE CASCADE ); CREATE VIRTUAL TABLE IF NOT EXISTS images_fts USING fts5(alt_text, content='images', content_rowid='id'); CREATE TRIGGER IF NOT EXISTS images_ai AFTER INSERT ON images BEGIN INSERT INTO images_fts(rowid, alt_text) VALUES (new.id, new.alt_text); END; CREATE TRIGGER IF NOT EXISTS images_ad AFTER DELETE ON images BEGIN INSERT INTO images_fts(images_fts, rowid, alt_text) VALUES ('delete', old.id, old.alt_text); END; CREATE TRIGGER IF NOT EXISTS images_au AFTER UPDATE ON images BEGIN INSERT INTO images_fts(images_fts, rowid, alt_text) VALUES ('delete', old.id, old.alt_text); INSERT INTO images_fts(rowid, alt_text) VALUES (new.id, new.alt_text); END; CREATE TABLE IF NOT EXISTS videos ( id INTEGER PRIMARY KEY AUTOINCREMENT, url TEXT UNIQUE NOT NULL, page_url TEXT NOT NULL, title TEXT, indexed_at DATETIME DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY(page_url) REFERENCES pages(url) ON DELETE CASCADE ); CREATE VIRTUAL TABLE IF NOT EXISTS videos_fts USING fts5(title, content='videos', content_rowid='id'); CREATE TRIGGER IF NOT EXISTS videos_ai AFTER INSERT ON videos BEGIN INSERT INTO videos_fts(rowid, title) VALUES (new.id, new.title); END; CREATE TRIGGER IF NOT EXISTS videos_ad AFTER DELETE ON videos BEGIN INSERT INTO videos_fts(videos_fts, rowid, title) VALUES ('delete', old.id, old.title); END; CREATE TRIGGER IF NOT EXISTS videos_au AFTER UPDATE ON videos BEGIN INSERT INTO videos_fts(videos_fts, rowid, title) VALUES ('delete', old.id, old.title); INSERT INTO videos_fts(rowid, title) VALUES (new.id, new.title); END; CREATE TABLE IF NOT EXISTS app_meta ( key TEXT PRIMARY KEY, value TEXT NOT NULL, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ); """ ) await connection.commit() async def get_meta_value(key: str) -> str | None: async with _get_connection() as connection: cursor = await connection.execute( "SELECT value FROM app_meta WHERE key = ?", (key,), ) row = await cursor.fetchone() await cursor.close() return str(row["value"]) if row else None async def set_meta_value(key: str, value: str) -> None: async with _get_connection() as connection: await connection.execute( """ INSERT INTO app_meta (key, value) VALUES (?, ?) ON CONFLICT(key) DO UPDATE SET value = excluded.value, updated_at = CURRENT_TIMESTAMP """, (key, value), ) await connection.commit() async def insert_page(url: str, title: str, body_text: str) -> int: async with _get_connection() as connection: await connection.execute( """ INSERT INTO pages (url, title, body_text) VALUES (?, ?, ?) ON CONFLICT(url) DO UPDATE SET title = excluded.title, body_text = excluded.body_text, indexed_at = CURRENT_TIMESTAMP """, (url, title, body_text), ) await connection.commit() cursor = await connection.execute( "SELECT id FROM pages WHERE url = ?", (url,), ) row = await cursor.fetchone() await cursor.close() if row is None: raise RuntimeError("Inserted page could not be reloaded from the database.") return int(row["id"]) async def insert_image(url: str, page_url: str, alt_text: str) -> None: async with _get_connection() as connection: await connection.execute( """ INSERT INTO images (url, page_url, alt_text) VALUES (?, ?, ?) ON CONFLICT(url) DO UPDATE SET page_url = excluded.page_url, alt_text = excluded.alt_text, indexed_at = CURRENT_TIMESTAMP """, (url, page_url, alt_text), ) await connection.commit() async def insert_video(url: str, page_url: str, title: str) -> None: async with _get_connection() as connection: await connection.execute( """ INSERT INTO videos (url, page_url, title) VALUES (?, ?, ?) ON CONFLICT(url) DO UPDATE SET page_url = excluded.page_url, title = excluded.title, indexed_at = CURRENT_TIMESTAMP """, (url, page_url, title), ) await connection.commit() async def search_pages(query: str, limit: int = 10, offset: int = 0) -> list[dict[str, Any]]: fts_query = _to_fts_query(query) if not fts_query: return [] safe_limit = max(1, min(limit, 50)) safe_offset = max(0, offset) async with _get_connection() as connection: cursor = await connection.execute( """ SELECT p.id, p.url, p.title, p.body_text, p.indexed_at FROM pages_fts JOIN pages AS p ON p.id = pages_fts.rowid WHERE pages_fts MATCH ? ORDER BY bm25(pages_fts), p.indexed_at DESC LIMIT ? OFFSET ? """, (fts_query, safe_limit, safe_offset), ) rows = await cursor.fetchall() await cursor.close() return [dict(row) for row in rows] async def count_search_results(query: str) -> int: fts_query = _to_fts_query(query) if not fts_query: return 0 async with _get_connection() as connection: cursor = await connection.execute( """ SELECT COUNT(*) AS total FROM pages_fts WHERE pages_fts MATCH ? """, (fts_query,), ) row = await cursor.fetchone() await cursor.close() return int(row["total"]) if row and row["total"] is not None else 0 async def search_images(query: str, limit: int = 10, offset: int = 0) -> list[dict[str, Any]]: fts_query = _to_fts_query(query) if not fts_query: return [] safe_limit = max(1, min(limit, 50)) safe_offset = max(0, offset) async with _get_connection() as connection: cursor = await connection.execute( """ SELECT i.id, i.url, i.page_url, i.alt_text, i.indexed_at FROM images_fts JOIN images AS i ON i.id = images_fts.rowid WHERE images_fts MATCH ? ORDER BY bm25(images_fts), i.indexed_at DESC LIMIT ? OFFSET ? """, (fts_query, safe_limit, safe_offset), ) rows = await cursor.fetchall() await cursor.close() return [dict(row) for row in rows] async def count_image_results(query: str) -> int: fts_query = _to_fts_query(query) if not fts_query: return 0 async with _get_connection() as connection: cursor = await connection.execute( """ SELECT COUNT(*) AS total FROM images_fts WHERE images_fts MATCH ? """, (fts_query,), ) row = await cursor.fetchone() await cursor.close() return int(row["total"]) if row and row["total"] is not None else 0 async def search_videos(query: str, limit: int = 10, offset: int = 0) -> list[dict[str, Any]]: fts_query = _to_fts_query(query) if not fts_query: return [] safe_limit = max(1, min(limit, 50)) safe_offset = max(0, offset) async with _get_connection() as connection: cursor = await connection.execute( """ SELECT v.id, v.url, v.page_url, v.title, v.indexed_at FROM videos_fts JOIN videos AS v ON v.id = videos_fts.rowid WHERE videos_fts MATCH ? ORDER BY bm25(videos_fts), v.indexed_at DESC LIMIT ? OFFSET ? """, (fts_query, safe_limit, safe_offset), ) rows = await cursor.fetchall() await cursor.close() return [dict(row) for row in rows] async def count_video_results(query: str) -> int: fts_query = _to_fts_query(query) if not fts_query: return 0 async with _get_connection() as connection: cursor = await connection.execute( """ SELECT COUNT(*) AS total FROM videos_fts WHERE videos_fts MATCH ? """, (fts_query,), ) row = await cursor.fetchone() await cursor.close() return int(row["total"]) if row and row["total"] is not None else 0 async def get_stats() -> dict[str, Any]: async with _get_connection() as connection: cursor = await connection.execute( """ SELECT COUNT(*) AS total_pages, MAX(indexed_at) AS last_indexed_at FROM pages """ ) row = await cursor.fetchone() await cursor.close() return { "total_pages": int(row["total_pages"]) if row and row["total_pages"] is not None else 0, "last_indexed_at": row["last_indexed_at"] if row else None, }