Files
Ned Halksworth e0f2eedcd9 inital commit
2026-05-04 19:31:46 +01:00

396 lines
12 KiB
Python

"""Async SQLite helpers for sFetch's crawl index."""
from __future__ import annotations
from contextlib import asynccontextmanager
from typing import Any, AsyncIterator
import aiosqlite
from config import DB_PATH
@asynccontextmanager
async def _get_connection() -> AsyncIterator[aiosqlite.Connection]:
async with aiosqlite.connect(DB_PATH) as connection:
connection.row_factory = aiosqlite.Row
await connection.execute("PRAGMA foreign_keys = ON;")
await connection.execute("PRAGMA journal_mode = WAL;")
yield connection
def _to_fts_query(query: str) -> str:
tokens: list[str] = []
for raw_token in query.split():
token = raw_token.strip()
if not token:
continue
escaped = token.replace('"', '""')
tokens.append(f'"{escaped}"')
return " OR ".join(tokens)
async def init_db() -> None:
async with _get_connection() as connection:
await connection.executescript(
"""
CREATE TABLE IF NOT EXISTS pages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
url TEXT UNIQUE NOT NULL,
title TEXT,
body_text TEXT,
indexed_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE VIRTUAL TABLE IF NOT EXISTS pages_fts
USING fts5(title, body_text, content='pages', content_rowid='id');
CREATE TRIGGER IF NOT EXISTS pages_ai
AFTER INSERT ON pages
BEGIN
INSERT INTO pages_fts(rowid, title, body_text)
VALUES (new.id, new.title, new.body_text);
END;
CREATE TRIGGER IF NOT EXISTS pages_ad
AFTER DELETE ON pages
BEGIN
INSERT INTO pages_fts(pages_fts, rowid, title, body_text)
VALUES ('delete', old.id, old.title, old.body_text);
END;
CREATE TRIGGER IF NOT EXISTS pages_au
AFTER UPDATE ON pages
BEGIN
INSERT INTO pages_fts(pages_fts, rowid, title, body_text)
VALUES ('delete', old.id, old.title, old.body_text);
INSERT INTO pages_fts(rowid, title, body_text)
VALUES (new.id, new.title, new.body_text);
END;
CREATE TABLE IF NOT EXISTS images (
id INTEGER PRIMARY KEY AUTOINCREMENT,
url TEXT UNIQUE NOT NULL,
page_url TEXT NOT NULL,
alt_text TEXT,
indexed_at DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY(page_url) REFERENCES pages(url) ON DELETE CASCADE
);
CREATE VIRTUAL TABLE IF NOT EXISTS images_fts
USING fts5(alt_text, content='images', content_rowid='id');
CREATE TRIGGER IF NOT EXISTS images_ai
AFTER INSERT ON images
BEGIN
INSERT INTO images_fts(rowid, alt_text)
VALUES (new.id, new.alt_text);
END;
CREATE TRIGGER IF NOT EXISTS images_ad
AFTER DELETE ON images
BEGIN
INSERT INTO images_fts(images_fts, rowid, alt_text)
VALUES ('delete', old.id, old.alt_text);
END;
CREATE TRIGGER IF NOT EXISTS images_au
AFTER UPDATE ON images
BEGIN
INSERT INTO images_fts(images_fts, rowid, alt_text)
VALUES ('delete', old.id, old.alt_text);
INSERT INTO images_fts(rowid, alt_text)
VALUES (new.id, new.alt_text);
END;
CREATE TABLE IF NOT EXISTS videos (
id INTEGER PRIMARY KEY AUTOINCREMENT,
url TEXT UNIQUE NOT NULL,
page_url TEXT NOT NULL,
title TEXT,
indexed_at DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY(page_url) REFERENCES pages(url) ON DELETE CASCADE
);
CREATE VIRTUAL TABLE IF NOT EXISTS videos_fts
USING fts5(title, content='videos', content_rowid='id');
CREATE TRIGGER IF NOT EXISTS videos_ai
AFTER INSERT ON videos
BEGIN
INSERT INTO videos_fts(rowid, title)
VALUES (new.id, new.title);
END;
CREATE TRIGGER IF NOT EXISTS videos_ad
AFTER DELETE ON videos
BEGIN
INSERT INTO videos_fts(videos_fts, rowid, title)
VALUES ('delete', old.id, old.title);
END;
CREATE TRIGGER IF NOT EXISTS videos_au
AFTER UPDATE ON videos
BEGIN
INSERT INTO videos_fts(videos_fts, rowid, title)
VALUES ('delete', old.id, old.title);
INSERT INTO videos_fts(rowid, title)
VALUES (new.id, new.title);
END;
CREATE TABLE IF NOT EXISTS app_meta (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
"""
)
await connection.commit()
async def get_meta_value(key: str) -> str | None:
async with _get_connection() as connection:
cursor = await connection.execute(
"SELECT value FROM app_meta WHERE key = ?",
(key,),
)
row = await cursor.fetchone()
await cursor.close()
return str(row["value"]) if row else None
async def set_meta_value(key: str, value: str) -> None:
async with _get_connection() as connection:
await connection.execute(
"""
INSERT INTO app_meta (key, value)
VALUES (?, ?)
ON CONFLICT(key) DO UPDATE SET
value = excluded.value,
updated_at = CURRENT_TIMESTAMP
""",
(key, value),
)
await connection.commit()
async def insert_page(url: str, title: str, body_text: str) -> int:
async with _get_connection() as connection:
await connection.execute(
"""
INSERT INTO pages (url, title, body_text)
VALUES (?, ?, ?)
ON CONFLICT(url) DO UPDATE SET
title = excluded.title,
body_text = excluded.body_text,
indexed_at = CURRENT_TIMESTAMP
""",
(url, title, body_text),
)
await connection.commit()
cursor = await connection.execute(
"SELECT id FROM pages WHERE url = ?",
(url,),
)
row = await cursor.fetchone()
await cursor.close()
if row is None:
raise RuntimeError("Inserted page could not be reloaded from the database.")
return int(row["id"])
async def insert_image(url: str, page_url: str, alt_text: str) -> None:
async with _get_connection() as connection:
await connection.execute(
"""
INSERT INTO images (url, page_url, alt_text)
VALUES (?, ?, ?)
ON CONFLICT(url) DO UPDATE SET
page_url = excluded.page_url,
alt_text = excluded.alt_text,
indexed_at = CURRENT_TIMESTAMP
""",
(url, page_url, alt_text),
)
await connection.commit()
async def insert_video(url: str, page_url: str, title: str) -> None:
async with _get_connection() as connection:
await connection.execute(
"""
INSERT INTO videos (url, page_url, title)
VALUES (?, ?, ?)
ON CONFLICT(url) DO UPDATE SET
page_url = excluded.page_url,
title = excluded.title,
indexed_at = CURRENT_TIMESTAMP
""",
(url, page_url, title),
)
await connection.commit()
async def search_pages(query: str, limit: int = 10, offset: int = 0) -> list[dict[str, Any]]:
fts_query = _to_fts_query(query)
if not fts_query:
return []
safe_limit = max(1, min(limit, 50))
safe_offset = max(0, offset)
async with _get_connection() as connection:
cursor = await connection.execute(
"""
SELECT
p.id,
p.url,
p.title,
p.body_text,
p.indexed_at
FROM pages_fts
JOIN pages AS p ON p.id = pages_fts.rowid
WHERE pages_fts MATCH ?
ORDER BY bm25(pages_fts), p.indexed_at DESC
LIMIT ? OFFSET ?
""",
(fts_query, safe_limit, safe_offset),
)
rows = await cursor.fetchall()
await cursor.close()
return [dict(row) for row in rows]
async def count_search_results(query: str) -> int:
fts_query = _to_fts_query(query)
if not fts_query:
return 0
async with _get_connection() as connection:
cursor = await connection.execute(
"""
SELECT COUNT(*) AS total
FROM pages_fts
WHERE pages_fts MATCH ?
""",
(fts_query,),
)
row = await cursor.fetchone()
await cursor.close()
return int(row["total"]) if row and row["total"] is not None else 0
async def search_images(query: str, limit: int = 10, offset: int = 0) -> list[dict[str, Any]]:
fts_query = _to_fts_query(query)
if not fts_query:
return []
safe_limit = max(1, min(limit, 50))
safe_offset = max(0, offset)
async with _get_connection() as connection:
cursor = await connection.execute(
"""
SELECT
i.id,
i.url,
i.page_url,
i.alt_text,
i.indexed_at
FROM images_fts
JOIN images AS i ON i.id = images_fts.rowid
WHERE images_fts MATCH ?
ORDER BY bm25(images_fts), i.indexed_at DESC
LIMIT ? OFFSET ?
""",
(fts_query, safe_limit, safe_offset),
)
rows = await cursor.fetchall()
await cursor.close()
return [dict(row) for row in rows]
async def count_image_results(query: str) -> int:
fts_query = _to_fts_query(query)
if not fts_query:
return 0
async with _get_connection() as connection:
cursor = await connection.execute(
"""
SELECT COUNT(*) AS total
FROM images_fts
WHERE images_fts MATCH ?
""",
(fts_query,),
)
row = await cursor.fetchone()
await cursor.close()
return int(row["total"]) if row and row["total"] is not None else 0
async def search_videos(query: str, limit: int = 10, offset: int = 0) -> list[dict[str, Any]]:
fts_query = _to_fts_query(query)
if not fts_query:
return []
safe_limit = max(1, min(limit, 50))
safe_offset = max(0, offset)
async with _get_connection() as connection:
cursor = await connection.execute(
"""
SELECT
v.id,
v.url,
v.page_url,
v.title,
v.indexed_at
FROM videos_fts
JOIN videos AS v ON v.id = videos_fts.rowid
WHERE videos_fts MATCH ?
ORDER BY bm25(videos_fts), v.indexed_at DESC
LIMIT ? OFFSET ?
""",
(fts_query, safe_limit, safe_offset),
)
rows = await cursor.fetchall()
await cursor.close()
return [dict(row) for row in rows]
async def count_video_results(query: str) -> int:
fts_query = _to_fts_query(query)
if not fts_query:
return 0
async with _get_connection() as connection:
cursor = await connection.execute(
"""
SELECT COUNT(*) AS total
FROM videos_fts
WHERE videos_fts MATCH ?
""",
(fts_query,),
)
row = await cursor.fetchone()
await cursor.close()
return int(row["total"]) if row and row["total"] is not None else 0
async def get_stats() -> dict[str, Any]:
async with _get_connection() as connection:
cursor = await connection.execute(
"""
SELECT
COUNT(*) AS total_pages,
MAX(indexed_at) AS last_indexed_at
FROM pages
"""
)
row = await cursor.fetchone()
await cursor.close()
return {
"total_pages": int(row["total_pages"]) if row and row["total_pages"] is not None else 0,
"last_indexed_at": row["last_indexed_at"] if row else None,
}