From 0b43c7c4ddd5e49413e0388ef602e637d5ac397f Mon Sep 17 00:00:00 2001 From: warnason <276599704+warnason@users.noreply.github.com> Date: Mon, 25 May 2026 22:15:33 +0200 Subject: [PATCH] Add mama-apply: materialize blobs to CAS, hardlink to views - archive.py: CAS path layout (blobs/<2>/<2>/), view paths scoped by source_kind, hardlink helpers - applier.py: cursor-paginated apply_pending with dry-run support - mama-apply CLI with progress and --dry-run Note: cross-dataset hardlinks fall back to copy (POSIX limitation), so applying from /mnt/preview to /mnt/archive currently doubles storage. To be addressed by consolidating sources into the archive dataset or by introducing --delete-source for move semantics. --- src/mama/applier.py | 121 ++++++++++++++++++++++++++++++++++++++++++ src/mama/archive.py | 85 +++++++++++++++++++++++++++++ src/mama/cli/apply.py | 63 ++++++++++++++++++++-- 3 files changed, 266 insertions(+), 3 deletions(-) create mode 100644 src/mama/applier.py create mode 100644 src/mama/archive.py diff --git a/src/mama/applier.py b/src/mama/applier.py new file mode 100644 index 0000000..9087cd3 --- /dev/null +++ b/src/mama/applier.py @@ -0,0 +1,121 @@ +"""mama-apply: materialize pending observations into the archive. + +For each pending observation: + 1. If the blob isn't in the CAS yet: place it there (hardlink from source). + 2. Create a hardlink under views//... pointing at the CAS blob. + 3. Mark the observation as 'assigned'. + +Blocked blobs (block_reason set) are skipped — observations stay pending. +""" + +from __future__ import annotations + +from collections.abc import Callable +from pathlib import Path + +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker +from sqlalchemy.orm import selectinload + +from mama.archive import ( + cas_path_for, + ensure_blob_in_cas, + ensure_view_hardlink, + view_path_for, +) +from mama.config import Settings +from mama.models import Blob, Observation + + +async def apply_pending( + sessions: async_sessionmaker[AsyncSession], + settings: Settings, + *, + dry_run: bool = False, + batch_size: int = 100, + progress_callback: Callable[[dict[str, int]], None] | None = None, +) -> dict[str, int]: + """Materialize all pending observations into the archive.""" + counters = { + "considered": 0, + "blobs_placed": 0, + "views_linked": 0, + "skipped_blocked": 0, + "skipped_missing": 0, + "errors": 0, + "marked_assigned": 0, + } + + last_id = 0 + + while True: + async with sessions() as session: + stmt = ( + select(Observation) + .where( + Observation.status == "pending", + Observation.id > last_id, + ) + .options(selectinload(Observation.blob)) + .order_by(Observation.id) + .limit(batch_size) + ) + batch = (await session.execute(stmt)).scalars().all() + if not batch: + break + + for obs in batch: + counters["considered"] += 1 + last_id = obs.id + try: + if not await _apply_one(session, obs, settings, counters, dry_run): + continue + if not dry_run: + obs.status = "assigned" + counters["marked_assigned"] += 1 + except Exception: + counters["errors"] += 1 + + if not dry_run: + await session.commit() + + if progress_callback: + progress_callback(counters) + + return counters + + +async def _apply_one( + session: AsyncSession, + obs: Observation, + settings: Settings, + counters: dict[str, int], + dry_run: bool, +) -> bool: + """Materialize a single observation. Returns True on success.""" + blob: Blob = obs.blob + + if blob.block_reason is not None: + counters["skipped_blocked"] += 1 + return False + + cas_target = cas_path_for(settings, blob.hash) + view_target = view_path_for(settings, obs) + source_path = Path(obs.basedir) / obs.relpath / obs.filename + + if dry_run: + return True + + if not cas_target.exists(): + if not source_path.exists(): + counters["skipped_missing"] += 1 + return False + ensure_blob_in_cas(source_path, cas_target, move=False) + blob.storage_path = str(cas_target.relative_to(settings.archive_root)) + counters["blobs_placed"] += 1 + + if not view_target.exists(): + ensure_view_hardlink(cas_target, view_target) + counters["views_linked"] += 1 + + return True diff --git a/src/mama/archive.py b/src/mama/archive.py new file mode 100644 index 0000000..4c5c985 --- /dev/null +++ b/src/mama/archive.py @@ -0,0 +1,85 @@ +"""Archive layout: content-addressed blob store + hardlinked logical views. + +Layout under settings.archive_root: + + blobs/<2>/<2>/ # the actual file content + views///... # hardlinks back to blobs/ + +A view path mirrors the original directory structure but lives under +its source_kind so that scans from different origins don't collide. +""" + +from __future__ import annotations + +import os +from pathlib import Path + +from mama.config import Settings +from mama.models import Blob, Observation + + +def cas_path_for(settings: Settings, hash_hex: str) -> Path: + """Compute the CAS path for a blob hash.""" + return settings.blobs_dir / hash_hex[:2] / hash_hex[2:4] / hash_hex + + +def view_path_for(settings: Settings, obs: Observation) -> Path: + """Compute the hardlink view path for an observation. + + Strategy: views//// + """ + basedir_safe = obs.basedir.lstrip("/") + parts = [settings.views_dir, obs.source_kind, basedir_safe] + if obs.relpath: + parts.append(obs.relpath) + return Path(*parts) / obs.filename + + +def ensure_blob_in_cas(source: Path, target: Path, *, move: bool) -> None: + """Place a blob's content at *target* in the CAS. + + If *target* already exists, do nothing (CAS is content-addressed, + so identical content = identical path). Otherwise, move or copy. + """ + if target.exists(): + return + target.parent.mkdir(parents=True, exist_ok=True) + if move: + os.rename(source, target) + else: + # Use hardlink within the same dataset; falls back to copy if + # link() fails (e.g. cross-device). For initial materialization + # from /incoming on the same dataset, link is instant and + # makes the source path still valid until you choose to clean it. + try: + os.link(source, target) + except OSError: + import shutil + shutil.copy2(source, target) + # Make blobs read-only to discourage accidental mutation. + os.chmod(target, 0o444) + + +def ensure_view_hardlink(blob_path: Path, view_path: Path) -> None: + """Create a hardlink from CAS to view, if it doesn't already exist.""" + if view_path.exists(): + # Could verify it's the same inode here, but keep it simple for now. + return + view_path.parent.mkdir(parents=True, exist_ok=True) + os.link(blob_path, view_path) + + +def remove_view_hardlink(view_path: Path) -> None: + """Remove a view hardlink if it exists. Blob in CAS is unaffected.""" + try: + view_path.unlink() + except FileNotFoundError: + pass + # Try to clean up empty parent directories, but don't be aggressive. + parent = view_path.parent + try: + while parent.exists() and not any(parent.iterdir()): + parent.rmdir() + parent = parent.parent + except OSError: + pass diff --git a/src/mama/cli/apply.py b/src/mama/cli/apply.py index ac4a2f9..8982b84 100644 --- a/src/mama/cli/apply.py +++ b/src/mama/cli/apply.py @@ -2,10 +2,21 @@ from __future__ import annotations +import asyncio from pathlib import Path import typer from rich.console import Console +from rich.progress import ( + Progress, + SpinnerColumn, + TextColumn, + TimeElapsedColumn, +) + +from mama.applier import apply_pending +from mama.config import Settings +from mama.db import make_engine, make_session_factory app = typer.Typer(help="Materialize approved observations into the archive.") console = Console() @@ -14,12 +25,58 @@ console = Console() @app.command() def apply( config: Path = typer.Option(Path("mama.toml"), "--config", "-c"), - dry_run: bool = typer.Option(False, "--dry-run", help="Show what would happen"), + dry_run: bool = typer.Option(False, "--dry-run", help="Plan only, no changes"), + batch_size: int = typer.Option(100, "--batch-size", "-b"), ) -> None: """Move pending blobs into the CAS and create hardlinks in views.""" + asyncio.run(_run(config, dry_run, batch_size)) + + +async def _run(config_path: Path, dry_run: bool, batch_size: int) -> None: + settings = Settings.load(config_path) + engine = make_engine(settings) + sessions = make_session_factory(engine) + + mode = "[yellow]DRY RUN[/yellow]" if dry_run else "[bold]LIVE[/bold]" + console.print(f"Applying pending observations ({mode})") + console.print(f" archive_root = {settings.archive_root}") + + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + TimeElapsedColumn(), + console=console, + ) as progress: + task = progress.add_task("starting...", total=None) + + def update(c: dict[str, int]) -> None: + progress.update(task, description=( + f"{c['considered']} considered | " + f"+{c['blobs_placed']} blobs | " + f"+{c['views_linked']} links | " + f"{c['skipped_blocked']} blocked | " + f"{c['skipped_missing']} missing | " + f"{c['errors']} err" + )) + + counters = await apply_pending( + sessions, settings, + dry_run=dry_run, + batch_size=batch_size, + progress_callback=update, + ) + + await engine.dispose() + console.print( - f"[yellow]stub:[/yellow] would apply pending observations " - f"(config={config}, dry_run={dry_run})" + f"[green]Done.[/green] " + f"{counters['considered']} considered | " + f"{counters['blobs_placed']} blobs placed | " + f"{counters['views_linked']} view links | " + f"{counters['marked_assigned']} marked assigned | " + f"{counters['skipped_blocked']} blocked | " + f"{counters['skipped_missing']} missing | " + f"{counters['errors']} errors" )