Rescan idempotency + mama-dev (reset, stats)

- observations now keyed by (path, mtime) for idempotent rescans
- new index ix_observations_path_mtime
- mama-dev reset: truncate all data, schema kept
- mama-dev stats: overview, breakdowns by source_kind/status/hostname/MIME, largest blobs
This commit is contained in:
warnason 2026-05-25 21:19:30 +02:00
parent b0d2723ead
commit a84b32d6b0
6 changed files with 282 additions and 39 deletions

View file

@ -0,0 +1,32 @@
"""add path+mtime index for rescan idempotency
Revision ID: 1f337e7155be
Revises: 1608054c0ffe
Create Date: 2026-05-25 21:02:49.743977
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '1f337e7155be'
down_revision: Union[str, Sequence[str], None] = '1608054c0ffe'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_index('ix_observations_path_mtime', 'observations', ['hostname', 'basedir', 'relpath', 'filename', 'mtime'], unique=False)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index('ix_observations_path_mtime', table_name='observations')
# ### end Alembic commands ###

View file

@ -28,6 +28,7 @@ dependencies = [
mama-scan = "mama.cli.scan:main"
mama-apply = "mama.cli.apply:main"
mama-web = "mama.cli.web:main"
mama-dev = "mama.cli.dev:main"
[dependency-groups]
dev = [

159
src/mama/cli/dev.py Normal file
View file

@ -0,0 +1,159 @@
"""mama-dev: developer utilities (DB reset, statistics, ...)."""
from __future__ import annotations
import asyncio
from pathlib import Path
from typing import Any
import typer
from rich.console import Console
from rich.table import Table
from sqlalchemy import Select, func, select, text
from sqlalchemy.ext.asyncio import AsyncSession
from mama.config import Settings
from mama.db import make_engine, make_session_factory
from mama.models import Blob, Observation
app = typer.Typer(help="Developer utilities for mama.")
console = Console()
# ---------- reset --------------------------------------------------------- #
@app.command()
def reset(
config: Path = typer.Option(Path("mama.toml"), "--config", "-c"),
yes: bool = typer.Option(False, "--yes", "-y", help="Skip confirmation prompt"),
) -> None:
"""Truncate all mama tables. Schema stays intact, archive files untouched."""
if not yes:
confirmed = typer.confirm(
"Delete ALL observations and blobs from the database?",
default=False,
)
if not confirmed:
console.print("[yellow]Aborted.[/yellow]")
raise typer.Exit(0)
asyncio.run(_reset(config))
console.print("[green]Database reset.[/green]")
async def _reset(config_path: Path) -> None:
settings = Settings.load(config_path)
engine = make_engine(settings)
async with engine.begin() as conn:
await conn.execute(
text("TRUNCATE TABLE observations, blobs RESTART IDENTITY CASCADE")
)
await engine.dispose()
# ---------- stats --------------------------------------------------------- #
@app.command()
def stats(
config: Path = typer.Option(Path("mama.toml"), "--config", "-c"),
) -> None:
"""Show database statistics."""
asyncio.run(_stats(config))
async def _stats(config_path: Path) -> None:
settings = Settings.load(config_path)
engine = make_engine(settings)
sessions = make_session_factory(engine)
async with sessions() as session:
await _print_overview(session)
await _print_group(session, "By source_kind",
select(Observation.source_kind, func.count())
.group_by(Observation.source_kind)
.order_by(func.count().desc()))
await _print_group(session, "By status",
select(Observation.status, func.count())
.group_by(Observation.status)
.order_by(func.count().desc()))
await _print_group(session, "By hostname",
select(Observation.hostname, func.count())
.group_by(Observation.hostname)
.order_by(func.count().desc()))
await _print_group(session, "MIME types (top 10)",
select(Blob.mime, func.count())
.where(Blob.mime.is_not(None))
.group_by(Blob.mime)
.order_by(func.count().desc())
.limit(10))
await _print_largest_blobs(session)
await engine.dispose()
async def _print_overview(session: AsyncSession) -> None:
n_obs = await session.scalar(select(func.count(Observation.id))) or 0
n_blobs = await session.scalar(select(func.count(Blob.hash))) or 0
total_size = await session.scalar(select(func.coalesce(func.sum(Blob.size), 0))) or 0
n_blocked = await session.scalar(
select(func.count(Blob.hash)).where(Blob.block_reason.is_not(None))
) or 0
duplicates = max(n_obs - n_blobs, 0)
dup_pct = (duplicates / n_obs * 100) if n_obs else 0.0
t = Table(show_header=False, title="Overview", title_justify="left", box=None)
t.add_column("Metric", style="cyan")
t.add_column("Value", justify="right")
t.add_row("Observations", f"{n_obs:,}")
t.add_row("Unique blobs", f"{n_blobs:,}")
t.add_row("Total content", _format_size(total_size))
t.add_row("Duplicates (obs - blobs)", f"{duplicates:,} ({dup_pct:.1f}%)")
t.add_row("Blocked blobs", f"{n_blocked:,}")
console.print(t)
console.print()
async def _print_group(session: AsyncSession, title: str, stmt: Select[Any]) -> None:
rows = (await session.execute(stmt)).all()
if not rows:
return
t = Table(title=title, title_justify="left", box=None)
t.add_column("Value", style="cyan")
t.add_column("Count", justify="right")
for value, count in rows:
t.add_row(str(value) if value is not None else "[dim](null)[/dim]", f"{count:,}")
console.print(t)
console.print()
async def _print_largest_blobs(session: AsyncSession) -> None:
rows = (await session.execute(
select(Blob.hash, Blob.size, Blob.mime).order_by(Blob.size.desc()).limit(5)
)).all()
if not rows:
return
t = Table(title="Largest blobs", title_justify="left", box=None)
t.add_column("Hash", style="dim")
t.add_column("Size", justify="right")
t.add_column("MIME", style="cyan")
for hash_, size, mime in rows:
t.add_row(hash_[:12] + "", _format_size(size), mime or "[dim]-[/dim]")
console.print(t)
console.print()
def _format_size(n: int) -> str:
size = float(n)
for unit in ("B", "KB", "MB", "GB", "TB"):
if size < 1024:
return f"{n} B" if unit == "B" else f"{size:.1f} {unit}"
size /= 1024
return f"{size:.1f} PB"
def main() -> None:
app()
if __name__ == "__main__":
main()

View file

@ -61,12 +61,14 @@ async def _run(
def update(counters: dict[str, int], last_path: Path | None) -> None:
desc = (
f"{counters['files']} files | "
f"{counters['new_blobs']} new | "
f"+{counters['new_observations']} new | "
f"~{counters['unchanged']} unchanged | "
f"{counters['new_blobs']} new blobs | "
f"{counters['duplicates']} dup | "
f"{counters['errors']} err"
)
if last_path is not None:
desc += f" | {last_path.name}"
desc += f" | {last_path.name[:40]}"
progress.update(task, description=desc)
counters = await scan_directory(
@ -79,9 +81,11 @@ async def _run(
console.print(
f"[green]Done.[/green] "
f"{counters['files']} files, "
f"{counters['new_blobs']} new blobs, "
f"{counters['duplicates']} duplicates, "
f"{counters['files']} files seen | "
f"{counters['new_observations']} new observations | "
f"{counters['unchanged']} unchanged | "
f"{counters['new_blobs']} new blobs | "
f"{counters['duplicates']} duplicates | "
f"{counters['errors']} errors"
)

View file

@ -12,7 +12,7 @@ from __future__ import annotations
from datetime import datetime
from typing import Any
from sqlalchemy import BigInteger, DateTime, ForeignKey, String, Text
from sqlalchemy import BigInteger, DateTime, ForeignKey, Index, String, Text
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
@ -44,6 +44,10 @@ class Observation(Base):
metadata extracted from the file (EXIF, ID3, sidecar, ...) lives in
the `meta` JSONB column.
scan_time semantics: time the file was LAST CONFIRMED at this path.
A rescan that finds the same (path, mtime) updates scan_time instead
of creating a duplicate observation.
Lifecycle is tracked via `status`:
pending - just scanned, not yet materialized into the archive
assigned - materialized: blob is in CAS, hardlink exists in view
@ -74,3 +78,12 @@ class Observation(Base):
meta: Mapped[dict[str, Any] | None] = mapped_column(JSONB)
blob: Mapped[Blob] = relationship(back_populates="observations")
__table_args__ = (
# Fast lookup for rescan idempotency: did we already see this
# exact file (path + mtime) on this host?
Index(
"ix_observations_path_mtime",
"hostname", "basedir", "relpath", "filename", "mtime",
),
)

View file

@ -1,4 +1,10 @@
"""File scanner: walks directories, hashes files, records observations."""
"""File scanner: walks directories, hashes files, records observations.
The scanner is rescan-safe: if a file at a given (hostname, basedir,
relpath, filename) is found with the same mtime as in a previous scan,
only the observation's scan_time is updated and the file is NOT hashed
again. This makes rescans cheap.
"""
from __future__ import annotations
@ -9,21 +15,17 @@ from pathlib import Path
import magic
from blake3 import blake3
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from mama.config import Settings
from mama.models import Blob, Observation
# Hash of zero-byte content (so we don't try to mmap empty files).
EMPTY_HASH = blake3().hexdigest()
def iter_files(root: Path) -> Iterator[Path]:
"""Walk *root* and yield regular file paths.
Hidden files and directories (names starting with ".") are skipped.
Output is sorted within each directory for deterministic scan order.
"""
"""Walk *root* and yield regular file paths, skipping hidden entries."""
for dirpath, dirnames, filenames in os.walk(root):
dirnames[:] = sorted(d for d in dirnames if not d.startswith("."))
for name in sorted(filenames):
@ -42,13 +44,36 @@ def hash_file(path: Path, size: int) -> str:
def detect_mime(path: Path) -> str | None:
"""Detect MIME type via libmagic. Returns None on failure."""
try:
return magic.from_file(str(path), mime=True)
except Exception:
return None
async def _find_existing_observation(
session: AsyncSession,
hostname: str,
basedir: str,
relpath: str,
filename: str,
mtime: datetime,
) -> Observation | None:
"""Return an existing observation matching path + mtime, or None."""
stmt = (
select(Observation)
.where(
Observation.hostname == hostname,
Observation.basedir == basedir,
Observation.relpath == relpath,
Observation.filename == filename,
Observation.mtime == mtime,
)
.limit(1)
)
result = await session.execute(stmt)
return result.scalar_one_or_none()
async def _process_one(
session: AsyncSession,
path: Path,
@ -59,22 +84,32 @@ async def _process_one(
batch_blob_hashes: set[str],
counters: dict[str, int],
) -> None:
"""Hash one file, upsert blob, insert observation."""
"""Process a single file: idempotent rescan or full hash+insert."""
stat = path.stat()
size = stat.st_size
mtime = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc)
ctime = datetime.fromtimestamp(stat.st_ctime, tz=timezone.utc)
rel_dir = path.parent.relative_to(root).as_posix()
if rel_dir == ".":
rel_dir = ""
# Idempotency: same path + same mtime → same content (we trust this).
# Just refresh scan_time, skip hashing entirely.
existing = await _find_existing_observation(
session, settings.hostname, str(root), rel_dir, path.name, mtime,
)
if existing is not None:
existing.scan_time = now
counters["unchanged"] += 1
return
# New or modified file: hash and insert.
hash_hex = hash_file(path, size)
# Blob upsert: skip lookup if we already added this hash earlier in the
# same batch (the SQLAlchemy session sees pending objects but a fresh
# `get` still hits the DB on cache miss — the set avoids that).
if hash_hex in batch_blob_hashes:
is_new = False
else:
existing = await session.get(Blob, hash_hex)
if existing is None:
if hash_hex not in batch_blob_hashes:
existing_blob = await session.get(Blob, hash_hex)
if existing_blob is None:
session.add(
Blob(
hash=hash_hex,
@ -84,20 +119,13 @@ async def _process_one(
mime=detect_mime(path),
)
)
is_new = True
counters["new_blobs"] += 1
else:
is_new = False
counters["duplicates"] += 1
batch_blob_hashes.add(hash_hex)
if is_new:
counters["new_blobs"] += 1
else:
counters["duplicates"] += 1
rel_dir = path.parent.relative_to(root).as_posix()
if rel_dir == ".":
rel_dir = ""
session.add(
Observation(
blob_hash=hash_hex,
@ -113,7 +141,7 @@ async def _process_one(
meta=None,
)
)
counters["files"] += 1
counters["new_observations"] += 1
async def scan_directory(
@ -125,10 +153,15 @@ async def scan_directory(
batch_size: int = 100,
progress_callback: Callable[[dict[str, int], Path | None], None] | None = None,
) -> dict[str, int]:
"""Scan *root* recursively. Each batch is committed in its own transaction,
so progress is preserved if the scan is interrupted.
"""
counters = {"files": 0, "new_blobs": 0, "duplicates": 0, "errors": 0}
"""Scan *root* recursively, batched commits per batch_size files."""
counters = {
"files": 0,
"new_observations": 0,
"new_blobs": 0,
"unchanged": 0,
"duplicates": 0,
"errors": 0,
}
root = root.resolve()
now = datetime.now(timezone.utc)
@ -146,12 +179,13 @@ async def scan_directory(
break
async with sessions() as session:
seen: set[str] = set()
seen_blobs: set[str] = set()
for path in batch:
counters["files"] += 1
try:
await _process_one(
session, path, root, settings, source_kind, now,
seen, counters,
seen_blobs, counters,
)
except Exception:
counters["errors"] += 1