Add mama-apply: materialize blobs to CAS, hardlink to views

- archive.py: CAS path layout (blobs/<2>/<2>/<hash>), view paths
  scoped by source_kind, hardlink helpers
- applier.py: cursor-paginated apply_pending with dry-run support
- mama-apply CLI with progress and --dry-run

Note: cross-dataset hardlinks fall back to copy (POSIX limitation),
so applying from /mnt/preview to /mnt/archive currently doubles
storage. To be addressed by consolidating sources into the archive
dataset or by introducing --delete-source for move semantics.
This commit is contained in:
warnason 2026-05-25 22:15:33 +02:00
parent a84b32d6b0
commit 0b43c7c4dd
3 changed files with 266 additions and 3 deletions

121
src/mama/applier.py Normal file
View file

@ -0,0 +1,121 @@
"""mama-apply: materialize pending observations into the archive.
For each pending observation:
1. If the blob isn't in the CAS yet: place it there (hardlink from source).
2. Create a hardlink under views/<source_kind>/... pointing at the CAS blob.
3. Mark the observation as 'assigned'.
Blocked blobs (block_reason set) are skipped observations stay pending.
"""
from __future__ import annotations
from collections.abc import Callable
from pathlib import Path
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from sqlalchemy.orm import selectinload
from mama.archive import (
cas_path_for,
ensure_blob_in_cas,
ensure_view_hardlink,
view_path_for,
)
from mama.config import Settings
from mama.models import Blob, Observation
async def apply_pending(
sessions: async_sessionmaker[AsyncSession],
settings: Settings,
*,
dry_run: bool = False,
batch_size: int = 100,
progress_callback: Callable[[dict[str, int]], None] | None = None,
) -> dict[str, int]:
"""Materialize all pending observations into the archive."""
counters = {
"considered": 0,
"blobs_placed": 0,
"views_linked": 0,
"skipped_blocked": 0,
"skipped_missing": 0,
"errors": 0,
"marked_assigned": 0,
}
last_id = 0
while True:
async with sessions() as session:
stmt = (
select(Observation)
.where(
Observation.status == "pending",
Observation.id > last_id,
)
.options(selectinload(Observation.blob))
.order_by(Observation.id)
.limit(batch_size)
)
batch = (await session.execute(stmt)).scalars().all()
if not batch:
break
for obs in batch:
counters["considered"] += 1
last_id = obs.id
try:
if not await _apply_one(session, obs, settings, counters, dry_run):
continue
if not dry_run:
obs.status = "assigned"
counters["marked_assigned"] += 1
except Exception:
counters["errors"] += 1
if not dry_run:
await session.commit()
if progress_callback:
progress_callback(counters)
return counters
async def _apply_one(
session: AsyncSession,
obs: Observation,
settings: Settings,
counters: dict[str, int],
dry_run: bool,
) -> bool:
"""Materialize a single observation. Returns True on success."""
blob: Blob = obs.blob
if blob.block_reason is not None:
counters["skipped_blocked"] += 1
return False
cas_target = cas_path_for(settings, blob.hash)
view_target = view_path_for(settings, obs)
source_path = Path(obs.basedir) / obs.relpath / obs.filename
if dry_run:
return True
if not cas_target.exists():
if not source_path.exists():
counters["skipped_missing"] += 1
return False
ensure_blob_in_cas(source_path, cas_target, move=False)
blob.storage_path = str(cas_target.relative_to(settings.archive_root))
counters["blobs_placed"] += 1
if not view_target.exists():
ensure_view_hardlink(cas_target, view_target)
counters["views_linked"] += 1
return True

85
src/mama/archive.py Normal file
View file

@ -0,0 +1,85 @@
"""Archive layout: content-addressed blob store + hardlinked logical views.
Layout under settings.archive_root:
blobs/<2>/<2>/<hash> # the actual file content
views/<source_kind>/<basedir>/... # hardlinks back to blobs/
A view path mirrors the original directory structure but lives under
its source_kind so that scans from different origins don't collide.
"""
from __future__ import annotations
import os
from pathlib import Path
from mama.config import Settings
from mama.models import Blob, Observation
def cas_path_for(settings: Settings, hash_hex: str) -> Path:
"""Compute the CAS path for a blob hash."""
return settings.blobs_dir / hash_hex[:2] / hash_hex[2:4] / hash_hex
def view_path_for(settings: Settings, obs: Observation) -> Path:
"""Compute the hardlink view path for an observation.
Strategy: views/<source_kind>/<basedir-with-leading-slash-stripped>/<relpath>/<filename>
"""
basedir_safe = obs.basedir.lstrip("/")
parts = [settings.views_dir, obs.source_kind, basedir_safe]
if obs.relpath:
parts.append(obs.relpath)
return Path(*parts) / obs.filename
def ensure_blob_in_cas(source: Path, target: Path, *, move: bool) -> None:
"""Place a blob's content at *target* in the CAS.
If *target* already exists, do nothing (CAS is content-addressed,
so identical content = identical path). Otherwise, move or copy.
"""
if target.exists():
return
target.parent.mkdir(parents=True, exist_ok=True)
if move:
os.rename(source, target)
else:
# Use hardlink within the same dataset; falls back to copy if
# link() fails (e.g. cross-device). For initial materialization
# from /incoming on the same dataset, link is instant and
# makes the source path still valid until you choose to clean it.
try:
os.link(source, target)
except OSError:
import shutil
shutil.copy2(source, target)
# Make blobs read-only to discourage accidental mutation.
os.chmod(target, 0o444)
def ensure_view_hardlink(blob_path: Path, view_path: Path) -> None:
"""Create a hardlink from CAS to view, if it doesn't already exist."""
if view_path.exists():
# Could verify it's the same inode here, but keep it simple for now.
return
view_path.parent.mkdir(parents=True, exist_ok=True)
os.link(blob_path, view_path)
def remove_view_hardlink(view_path: Path) -> None:
"""Remove a view hardlink if it exists. Blob in CAS is unaffected."""
try:
view_path.unlink()
except FileNotFoundError:
pass
# Try to clean up empty parent directories, but don't be aggressive.
parent = view_path.parent
try:
while parent.exists() and not any(parent.iterdir()):
parent.rmdir()
parent = parent.parent
except OSError:
pass

View file

@ -2,10 +2,21 @@
from __future__ import annotations from __future__ import annotations
import asyncio
from pathlib import Path from pathlib import Path
import typer import typer
from rich.console import Console from rich.console import Console
from rich.progress import (
Progress,
SpinnerColumn,
TextColumn,
TimeElapsedColumn,
)
from mama.applier import apply_pending
from mama.config import Settings
from mama.db import make_engine, make_session_factory
app = typer.Typer(help="Materialize approved observations into the archive.") app = typer.Typer(help="Materialize approved observations into the archive.")
console = Console() console = Console()
@ -14,12 +25,58 @@ console = Console()
@app.command() @app.command()
def apply( def apply(
config: Path = typer.Option(Path("mama.toml"), "--config", "-c"), config: Path = typer.Option(Path("mama.toml"), "--config", "-c"),
dry_run: bool = typer.Option(False, "--dry-run", help="Show what would happen"), dry_run: bool = typer.Option(False, "--dry-run", help="Plan only, no changes"),
batch_size: int = typer.Option(100, "--batch-size", "-b"),
) -> None: ) -> None:
"""Move pending blobs into the CAS and create hardlinks in views.""" """Move pending blobs into the CAS and create hardlinks in views."""
asyncio.run(_run(config, dry_run, batch_size))
async def _run(config_path: Path, dry_run: bool, batch_size: int) -> None:
settings = Settings.load(config_path)
engine = make_engine(settings)
sessions = make_session_factory(engine)
mode = "[yellow]DRY RUN[/yellow]" if dry_run else "[bold]LIVE[/bold]"
console.print(f"Applying pending observations ({mode})")
console.print(f" archive_root = {settings.archive_root}")
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
TimeElapsedColumn(),
console=console,
) as progress:
task = progress.add_task("starting...", total=None)
def update(c: dict[str, int]) -> None:
progress.update(task, description=(
f"{c['considered']} considered | "
f"+{c['blobs_placed']} blobs | "
f"+{c['views_linked']} links | "
f"{c['skipped_blocked']} blocked | "
f"{c['skipped_missing']} missing | "
f"{c['errors']} err"
))
counters = await apply_pending(
sessions, settings,
dry_run=dry_run,
batch_size=batch_size,
progress_callback=update,
)
await engine.dispose()
console.print( console.print(
f"[yellow]stub:[/yellow] would apply pending observations " f"[green]Done.[/green] "
f"(config={config}, dry_run={dry_run})" f"{counters['considered']} considered | "
f"{counters['blobs_placed']} blobs placed | "
f"{counters['views_linked']} view links | "
f"{counters['marked_assigned']} marked assigned | "
f"{counters['skipped_blocked']} blocked | "
f"{counters['skipped_missing']} missing | "
f"{counters['errors']} errors"
) )