diff --git a/alembic/versions/1f337e7155be_add_path_mtime_index_for_rescan_.py b/alembic/versions/1f337e7155be_add_path_mtime_index_for_rescan_.py new file mode 100644 index 0000000..510589a --- /dev/null +++ b/alembic/versions/1f337e7155be_add_path_mtime_index_for_rescan_.py @@ -0,0 +1,32 @@ +"""add path+mtime index for rescan idempotency + +Revision ID: 1f337e7155be +Revises: 1608054c0ffe +Create Date: 2026-05-25 21:02:49.743977 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = '1f337e7155be' +down_revision: Union[str, Sequence[str], None] = '1608054c0ffe' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.create_index('ix_observations_path_mtime', 'observations', ['hostname', 'basedir', 'relpath', 'filename', 'mtime'], unique=False) + # ### end Alembic commands ### + + +def downgrade() -> None: + """Downgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index('ix_observations_path_mtime', table_name='observations') + # ### end Alembic commands ### diff --git a/pyproject.toml b/pyproject.toml index 39716b5..4df40be 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,6 +28,7 @@ dependencies = [ mama-scan = "mama.cli.scan:main" mama-apply = "mama.cli.apply:main" mama-web = "mama.cli.web:main" +mama-dev = "mama.cli.dev:main" [dependency-groups] dev = [ diff --git a/src/mama/cli/dev.py b/src/mama/cli/dev.py new file mode 100644 index 0000000..b62aa1d --- /dev/null +++ b/src/mama/cli/dev.py @@ -0,0 +1,159 @@ +"""mama-dev: developer utilities (DB reset, statistics, ...).""" + +from __future__ import annotations + +import asyncio +from pathlib import Path +from typing import Any + +import typer +from rich.console import Console +from rich.table import Table +from sqlalchemy import Select, func, select, text +from sqlalchemy.ext.asyncio import AsyncSession + +from mama.config import Settings +from mama.db import make_engine, make_session_factory +from mama.models import Blob, Observation + +app = typer.Typer(help="Developer utilities for mama.") +console = Console() + + +# ---------- reset --------------------------------------------------------- # + +@app.command() +def reset( + config: Path = typer.Option(Path("mama.toml"), "--config", "-c"), + yes: bool = typer.Option(False, "--yes", "-y", help="Skip confirmation prompt"), +) -> None: + """Truncate all mama tables. Schema stays intact, archive files untouched.""" + if not yes: + confirmed = typer.confirm( + "Delete ALL observations and blobs from the database?", + default=False, + ) + if not confirmed: + console.print("[yellow]Aborted.[/yellow]") + raise typer.Exit(0) + + asyncio.run(_reset(config)) + console.print("[green]Database reset.[/green]") + + +async def _reset(config_path: Path) -> None: + settings = Settings.load(config_path) + engine = make_engine(settings) + async with engine.begin() as conn: + await conn.execute( + text("TRUNCATE TABLE observations, blobs RESTART IDENTITY CASCADE") + ) + await engine.dispose() + + +# ---------- stats --------------------------------------------------------- # + +@app.command() +def stats( + config: Path = typer.Option(Path("mama.toml"), "--config", "-c"), +) -> None: + """Show database statistics.""" + asyncio.run(_stats(config)) + + +async def _stats(config_path: Path) -> None: + settings = Settings.load(config_path) + engine = make_engine(settings) + sessions = make_session_factory(engine) + + async with sessions() as session: + await _print_overview(session) + await _print_group(session, "By source_kind", + select(Observation.source_kind, func.count()) + .group_by(Observation.source_kind) + .order_by(func.count().desc())) + await _print_group(session, "By status", + select(Observation.status, func.count()) + .group_by(Observation.status) + .order_by(func.count().desc())) + await _print_group(session, "By hostname", + select(Observation.hostname, func.count()) + .group_by(Observation.hostname) + .order_by(func.count().desc())) + await _print_group(session, "MIME types (top 10)", + select(Blob.mime, func.count()) + .where(Blob.mime.is_not(None)) + .group_by(Blob.mime) + .order_by(func.count().desc()) + .limit(10)) + await _print_largest_blobs(session) + + await engine.dispose() + + +async def _print_overview(session: AsyncSession) -> None: + n_obs = await session.scalar(select(func.count(Observation.id))) or 0 + n_blobs = await session.scalar(select(func.count(Blob.hash))) or 0 + total_size = await session.scalar(select(func.coalesce(func.sum(Blob.size), 0))) or 0 + n_blocked = await session.scalar( + select(func.count(Blob.hash)).where(Blob.block_reason.is_not(None)) + ) or 0 + duplicates = max(n_obs - n_blobs, 0) + dup_pct = (duplicates / n_obs * 100) if n_obs else 0.0 + + t = Table(show_header=False, title="Overview", title_justify="left", box=None) + t.add_column("Metric", style="cyan") + t.add_column("Value", justify="right") + t.add_row("Observations", f"{n_obs:,}") + t.add_row("Unique blobs", f"{n_blobs:,}") + t.add_row("Total content", _format_size(total_size)) + t.add_row("Duplicates (obs - blobs)", f"{duplicates:,} ({dup_pct:.1f}%)") + t.add_row("Blocked blobs", f"{n_blocked:,}") + console.print(t) + console.print() + + +async def _print_group(session: AsyncSession, title: str, stmt: Select[Any]) -> None: + rows = (await session.execute(stmt)).all() + if not rows: + return + t = Table(title=title, title_justify="left", box=None) + t.add_column("Value", style="cyan") + t.add_column("Count", justify="right") + for value, count in rows: + t.add_row(str(value) if value is not None else "[dim](null)[/dim]", f"{count:,}") + console.print(t) + console.print() + + +async def _print_largest_blobs(session: AsyncSession) -> None: + rows = (await session.execute( + select(Blob.hash, Blob.size, Blob.mime).order_by(Blob.size.desc()).limit(5) + )).all() + if not rows: + return + t = Table(title="Largest blobs", title_justify="left", box=None) + t.add_column("Hash", style="dim") + t.add_column("Size", justify="right") + t.add_column("MIME", style="cyan") + for hash_, size, mime in rows: + t.add_row(hash_[:12] + "…", _format_size(size), mime or "[dim]-[/dim]") + console.print(t) + console.print() + + +def _format_size(n: int) -> str: + size = float(n) + for unit in ("B", "KB", "MB", "GB", "TB"): + if size < 1024: + return f"{n} B" if unit == "B" else f"{size:.1f} {unit}" + size /= 1024 + return f"{size:.1f} PB" + + +def main() -> None: + app() + + +if __name__ == "__main__": + main() diff --git a/src/mama/cli/scan.py b/src/mama/cli/scan.py index 06a6884..734d2b8 100644 --- a/src/mama/cli/scan.py +++ b/src/mama/cli/scan.py @@ -61,12 +61,14 @@ async def _run( def update(counters: dict[str, int], last_path: Path | None) -> None: desc = ( f"{counters['files']} files | " - f"{counters['new_blobs']} new | " + f"+{counters['new_observations']} new | " + f"~{counters['unchanged']} unchanged | " + f"{counters['new_blobs']} new blobs | " f"{counters['duplicates']} dup | " f"{counters['errors']} err" ) if last_path is not None: - desc += f" | {last_path.name}" + desc += f" | {last_path.name[:40]}" progress.update(task, description=desc) counters = await scan_directory( @@ -79,9 +81,11 @@ async def _run( console.print( f"[green]Done.[/green] " - f"{counters['files']} files, " - f"{counters['new_blobs']} new blobs, " - f"{counters['duplicates']} duplicates, " + f"{counters['files']} files seen | " + f"{counters['new_observations']} new observations | " + f"{counters['unchanged']} unchanged | " + f"{counters['new_blobs']} new blobs | " + f"{counters['duplicates']} duplicates | " f"{counters['errors']} errors" ) diff --git a/src/mama/models.py b/src/mama/models.py index e272011..b54db4a 100644 --- a/src/mama/models.py +++ b/src/mama/models.py @@ -12,7 +12,7 @@ from __future__ import annotations from datetime import datetime from typing import Any -from sqlalchemy import BigInteger, DateTime, ForeignKey, String, Text +from sqlalchemy import BigInteger, DateTime, ForeignKey, Index, String, Text from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship @@ -44,6 +44,10 @@ class Observation(Base): metadata extracted from the file (EXIF, ID3, sidecar, ...) lives in the `meta` JSONB column. + scan_time semantics: time the file was LAST CONFIRMED at this path. + A rescan that finds the same (path, mtime) updates scan_time instead + of creating a duplicate observation. + Lifecycle is tracked via `status`: pending - just scanned, not yet materialized into the archive assigned - materialized: blob is in CAS, hardlink exists in view @@ -74,3 +78,12 @@ class Observation(Base): meta: Mapped[dict[str, Any] | None] = mapped_column(JSONB) blob: Mapped[Blob] = relationship(back_populates="observations") + + __table_args__ = ( + # Fast lookup for rescan idempotency: did we already see this + # exact file (path + mtime) on this host? + Index( + "ix_observations_path_mtime", + "hostname", "basedir", "relpath", "filename", "mtime", + ), + ) diff --git a/src/mama/scanner.py b/src/mama/scanner.py index 6d8eeb7..bb077cf 100644 --- a/src/mama/scanner.py +++ b/src/mama/scanner.py @@ -1,4 +1,10 @@ -"""File scanner: walks directories, hashes files, records observations.""" +"""File scanner: walks directories, hashes files, records observations. + +The scanner is rescan-safe: if a file at a given (hostname, basedir, +relpath, filename) is found with the same mtime as in a previous scan, +only the observation's scan_time is updated and the file is NOT hashed +again. This makes rescans cheap. +""" from __future__ import annotations @@ -9,21 +15,17 @@ from pathlib import Path import magic from blake3 import blake3 +from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker from mama.config import Settings from mama.models import Blob, Observation -# Hash of zero-byte content (so we don't try to mmap empty files). EMPTY_HASH = blake3().hexdigest() def iter_files(root: Path) -> Iterator[Path]: - """Walk *root* and yield regular file paths. - - Hidden files and directories (names starting with ".") are skipped. - Output is sorted within each directory for deterministic scan order. - """ + """Walk *root* and yield regular file paths, skipping hidden entries.""" for dirpath, dirnames, filenames in os.walk(root): dirnames[:] = sorted(d for d in dirnames if not d.startswith(".")) for name in sorted(filenames): @@ -42,13 +44,36 @@ def hash_file(path: Path, size: int) -> str: def detect_mime(path: Path) -> str | None: - """Detect MIME type via libmagic. Returns None on failure.""" try: return magic.from_file(str(path), mime=True) except Exception: return None +async def _find_existing_observation( + session: AsyncSession, + hostname: str, + basedir: str, + relpath: str, + filename: str, + mtime: datetime, +) -> Observation | None: + """Return an existing observation matching path + mtime, or None.""" + stmt = ( + select(Observation) + .where( + Observation.hostname == hostname, + Observation.basedir == basedir, + Observation.relpath == relpath, + Observation.filename == filename, + Observation.mtime == mtime, + ) + .limit(1) + ) + result = await session.execute(stmt) + return result.scalar_one_or_none() + + async def _process_one( session: AsyncSession, path: Path, @@ -59,22 +84,32 @@ async def _process_one( batch_blob_hashes: set[str], counters: dict[str, int], ) -> None: - """Hash one file, upsert blob, insert observation.""" + """Process a single file: idempotent rescan or full hash+insert.""" stat = path.stat() size = stat.st_size mtime = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc) ctime = datetime.fromtimestamp(stat.st_ctime, tz=timezone.utc) + rel_dir = path.parent.relative_to(root).as_posix() + if rel_dir == ".": + rel_dir = "" + + # Idempotency: same path + same mtime → same content (we trust this). + # Just refresh scan_time, skip hashing entirely. + existing = await _find_existing_observation( + session, settings.hostname, str(root), rel_dir, path.name, mtime, + ) + if existing is not None: + existing.scan_time = now + counters["unchanged"] += 1 + return + + # New or modified file: hash and insert. hash_hex = hash_file(path, size) - # Blob upsert: skip lookup if we already added this hash earlier in the - # same batch (the SQLAlchemy session sees pending objects but a fresh - # `get` still hits the DB on cache miss — the set avoids that). - if hash_hex in batch_blob_hashes: - is_new = False - else: - existing = await session.get(Blob, hash_hex) - if existing is None: + if hash_hex not in batch_blob_hashes: + existing_blob = await session.get(Blob, hash_hex) + if existing_blob is None: session.add( Blob( hash=hash_hex, @@ -84,20 +119,13 @@ async def _process_one( mime=detect_mime(path), ) ) - is_new = True + counters["new_blobs"] += 1 else: - is_new = False + counters["duplicates"] += 1 batch_blob_hashes.add(hash_hex) - - if is_new: - counters["new_blobs"] += 1 else: counters["duplicates"] += 1 - rel_dir = path.parent.relative_to(root).as_posix() - if rel_dir == ".": - rel_dir = "" - session.add( Observation( blob_hash=hash_hex, @@ -113,7 +141,7 @@ async def _process_one( meta=None, ) ) - counters["files"] += 1 + counters["new_observations"] += 1 async def scan_directory( @@ -125,10 +153,15 @@ async def scan_directory( batch_size: int = 100, progress_callback: Callable[[dict[str, int], Path | None], None] | None = None, ) -> dict[str, int]: - """Scan *root* recursively. Each batch is committed in its own transaction, - so progress is preserved if the scan is interrupted. - """ - counters = {"files": 0, "new_blobs": 0, "duplicates": 0, "errors": 0} + """Scan *root* recursively, batched commits per batch_size files.""" + counters = { + "files": 0, + "new_observations": 0, + "new_blobs": 0, + "unchanged": 0, + "duplicates": 0, + "errors": 0, + } root = root.resolve() now = datetime.now(timezone.utc) @@ -146,12 +179,13 @@ async def scan_directory( break async with sessions() as session: - seen: set[str] = set() + seen_blobs: set[str] = set() for path in batch: + counters["files"] += 1 try: await _process_one( session, path, root, settings, source_kind, now, - seen, counters, + seen_blobs, counters, ) except Exception: counters["errors"] += 1