diff --git a/README.md b/README.md index 73af0f4..fb39b03 100644 --- a/README.md +++ b/README.md @@ -10,38 +10,193 @@ documents) on top of ZFS. - Database schema, CLI surface, on-disk layout, and HTTP API are unstable and will change without migration paths. -- Most features described below are planned, not implemented. +- Most features described below are partly implemented, partly planned. - Documentation lags behind code. -Do not point mama at irreplaceable data. Keep independent backups of anything -mama touches. +Do not point mama at irreplaceable data. Keep independent backups. ## Concept -mama indexes files placed in configured scan folders, stores file contents in a -content-addressed blob store, and exposes them through hardlinked filesystem -views consumable by specialized viewers (Immich for photos and video, Navidrome -for music, Paperless-ngx for documents). +mama treats every file as two separate things: -Identical content is stored only once. Per-file context — original path, source -device, scan timestamp, embedded metadata (EXIF, ID3, sidecar files) — is -preserved as *observations* linked to the underlying blob, so duplicates -contribute information instead of clutter. +- **A blob** — pure content, identified by its BLAKE3 hash. Stored once in + a content-addressed store, regardless of how many places it appears. +- **An observation** — a sighting of that content at a specific filesystem + path on a specific host at a specific time, with its own filesystem + metadata and embedded metadata (EXIF, ID3, sidecar, ...). -### Workflow +This split is what enables real deduplication without losing context. +Identical content from a phone, a backup DVD, and an old laptop become three +observations referencing one blob. -1. `mama-scan PATH` — index files into the database (no copies, no moves) -2. `mama-apply` — materialize approved observations into the archive - (blob into CAS, hardlink into view) -3. `mama-web` — browse, merge duplicates, filter, export, delete +## Workflow + +mama operates in two phases per source folder: + +1. **`mama-scan`** — walk filesystem, hash files, record observations in DB. + No copies, no moves. Safe to re-run. +2. **`mama-apply`** — materialize observations into the archive (CAS blobs + + hardlinked views). Idempotent. + +``` + ┌─────────────────┐ + filesystem ─▶│ mama-scan │─▶ observations + blobs in DB + └─────────────────┘ + │ + ▼ + ┌─────────────────┐ + │ mama-apply │─▶ CAS + views + └─────────────────┘ +``` + +### Storage Layout + +``` +/ +├── blobs/ Content-addressed storage: blobs/ab/cd/ +│ - mode 444, identical hashes share one inode +├── views/ Hardlink trees scoped by source_kind: +│ views//// +│ - same inode as the corresponding blob (zero extra storage +│ within the same ZFS dataset) +└── previews/ (planned: derived thumbnails / low-res for browsing) +``` + +### Database + +**`blobs`** — one row per unique content (BLAKE3 hash): + +| Column | Type | Purpose | +|---------------|-------------|------------------------------------------| +| hash | str(64) PK | BLAKE3 hex digest | +| size | bigint | content size in bytes | +| storage_path | text | location in CAS (set by `mama-apply`) | +| first_seen | timestamptz | when first scanned | +| mime | str(128)? | detected via libmagic | +| block_reason | str(32)? | NULL = active; planned: deleted/blocked | + +**`observations`** — one row per file sighting: + +| Column | Type | Purpose | +|-------------|-------------|------------------------------------------| +| id | int PK | | +| blob_hash | str(64) FK | links to `blobs.hash` | +| hostname | str(255) | machine where the file was seen | +| basedir | text | scan root path | +| relpath | text | directory below scan root | +| filename | text | | +| size | bigint | size as seen (also in blobs, denormalized) | +| mtime | timestamptz | file's modification time | +| ctime | timestamptz | file's change time | +| scan_time | timestamptz | last time this path was confirmed | +| source_kind | str(32) | syncthing / incoming / existing / import | +| status | str(32) | pending / assigned / ignored | +| meta | jsonb? | ExifTool / ID3 / sidecar metadata | + +Indexes: +- `ix_observations_blob_hash` — for joins +- `ix_observations_path_mtime` — for rescan idempotency (hostname, basedir, relpath, filename, mtime, size) + +### Observation Lifecycle + +```mermaid +stateDiagram-v2 + [*] --> pending: mama-scan (new file) + pending --> assigned: mama-apply + pending --> ignored: curation (planned) + assigned --> ignored: curation (planned) + ignored --> assigned: curation (planned) +``` + +`status` represents the **target state** (Soll-Zustand): + +- `pending` — newly scanned, target not yet decided + - current: `mama-apply` auto-promotes to `assigned` + - planned: stays `pending` until reviewed via web UI or rules +- `assigned` — should be in the archive; `mama-apply` ensures the view exists +- `ignored` — should not be in the archive; `mama-apply` ensures no view (planned) + +`mama-apply`'s job is to reconcile the filesystem with the target state. + +### mama-scan in detail + +For each file under the scan root: + +**1. Cheap path check (no content I/O)** + +Reads: +- `stat()` → `size`, `mtime`, `ctime` +- DB query for an observation matching + `(hostname, basedir, relpath, filename, mtime, size)` + +If a match is found: +- update `scan_time` on that observation +- increment `unchanged` counter +- **skip everything else** (no hashing, no metadata extraction) + +**2. Full processing (new or modified file)** + +Reads: +- BLAKE3 over content → `hash` +- libmagic → `mime` +- ExifTool → `meta` JSON + +Writes: +- new `Blob` row if `hash` not seen before (sets: `hash`, `size`, `mime`, + `first_seen`; leaves `storage_path` empty for `mama-apply` to fill) +- new `Observation` row (sets all fields, `status='pending'`) + +Counters reported: `files | new obs | unchanged | new blobs | duplicates | +with metadata | errors`. + +### mama-apply in detail + +Processes observations in cursor-paginated batches, ordered by `id`. + +For each observation: + +1. If `blob.block_reason IS NOT NULL` → skip, count as `blocked` +2. Compute CAS target path: `/blobs/<2>/<2>/` +3. If CAS target doesn't exist: + - resolve source path: `basedir/relpath/filename` + - if source is missing → skip, count as `missing` + - try `os.link()` (instant, same dataset) + - fall back to `shutil.copy2()` (cross-dataset; POSIX limit, costs space) + - `chmod 444` on the blob + - set `blob.storage_path` to the CAS-relative path +4. Compute view path: `/views////` +5. If view doesn't exist → `os.link()` from CAS blob to view path +6. Set `observation.status = 'assigned'` + +The whole loop is idempotent — re-running `mama-apply` with no pending +observations does nothing. + +### Rescan safety + +`mama-scan` can be re-run on the same path any number of times: + +- unchanged files (matching `(path, size, mtime)`) → only `scan_time` updated, + no new observation, no hashing +- modified files → re-hashed, new observation row added (old one stays for history) +- new files → full processing +- removed files → observation stays in DB (planned: mark as gone) + +This makes `mama-scan` cheap to schedule on a timer for the Syncthing folders. + +## Components + +- **`mama-scan`** — index files into DB (above) +- **`mama-apply`** — materialize archive (above) +- **`mama-dev`** — developer utilities (`reset`, `stats`) +- **`mama-web`** — planned: browse, merge duplicates, filter, export, set status ## Tech Stack - Python 3.13, FastAPI, SQLAlchemy 2.x (async), Alembic -- PostgreSQL 16 (JSONB for embedded metadata) +- PostgreSQL 17 (JSONB for embedded metadata) - Vue 3, Vite - ZFS (single archive dataset, snapshots, NFS export), Caddy -- ExifTool, BLAKE3, ffmpeg, Pillow +- ExifTool, BLAKE3, libmagic, ffmpeg, Pillow - Docker Compose for companion viewers (Immich, Navidrome, Paperless-ngx) ## Disclaimer diff --git a/alembic/versions/da77e90cfd45_add_size_to_observations_and_to_.py b/alembic/versions/da77e90cfd45_add_size_to_observations_and_to_.py new file mode 100644 index 0000000..d5df599 --- /dev/null +++ b/alembic/versions/da77e90cfd45_add_size_to_observations_and_to_.py @@ -0,0 +1,36 @@ +"""add size to observations and to idempotency index + +Revision ID: da77e90cfd45 +Revises: 1f337e7155be +Create Date: 2026-05-26 08:50:38.257793 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = 'da77e90cfd45' +down_revision: Union[str, Sequence[str], None] = '1f337e7155be' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.add_column('observations', sa.Column('size', sa.BigInteger(), nullable=False)) + op.drop_index(op.f('ix_observations_path_mtime'), table_name='observations') + op.create_index('ix_observations_path_mtime', 'observations', ['hostname', 'basedir', 'relpath', 'filename', 'mtime', 'size'], unique=False) + # ### end Alembic commands ### + + +def downgrade() -> None: + """Downgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index('ix_observations_path_mtime', table_name='observations') + op.create_index(op.f('ix_observations_path_mtime'), 'observations', ['hostname', 'basedir', 'relpath', 'filename', 'mtime'], unique=False) + op.drop_column('observations', 'size') + # ### end Alembic commands ### diff --git a/src/mama/cli/scan.py b/src/mama/cli/scan.py index 734d2b8..e23d1c2 100644 --- a/src/mama/cli/scan.py +++ b/src/mama/cli/scan.py @@ -27,9 +27,7 @@ def scan( path: Path = typer.Argument(..., exists=True, file_okay=False, readable=True), config: Path = typer.Option(Path("mama.toml"), "--config", "-c"), source_kind: str = typer.Option( - "incoming", - "--source-kind", - "-s", + "incoming", "--source-kind", "-s", help="Origin: syncthing | incoming | existing | import", ), batch_size: int = typer.Option(100, "--batch-size", "-b"), @@ -63,8 +61,9 @@ async def _run( f"{counters['files']} files | " f"+{counters['new_observations']} new | " f"~{counters['unchanged']} unchanged | " - f"{counters['new_blobs']} new blobs | " + f"{counters['new_blobs']} blobs | " f"{counters['duplicates']} dup | " + f"{counters['with_metadata']} meta | " f"{counters['errors']} err" ) if last_path is not None: @@ -81,11 +80,12 @@ async def _run( console.print( f"[green]Done.[/green] " - f"{counters['files']} files seen | " - f"{counters['new_observations']} new observations | " + f"{counters['files']} files | " + f"{counters['new_observations']} new obs | " f"{counters['unchanged']} unchanged | " f"{counters['new_blobs']} new blobs | " f"{counters['duplicates']} duplicates | " + f"{counters['with_metadata']} with metadata | " f"{counters['errors']} errors" ) diff --git a/src/mama/metadata.py b/src/mama/metadata.py new file mode 100644 index 0000000..b9f0587 --- /dev/null +++ b/src/mama/metadata.py @@ -0,0 +1,65 @@ +"""ExifTool-based metadata extraction. + +We use a long-running ExifTool process (via pyexiftool's ExifToolHelper) +to avoid the Perl startup overhead per file. Filesystem-level fields +(File:*, ExifTool:*, SourceFile) are stripped because mama already +tracks those in the observations table. +""" + +from __future__ import annotations + +from pathlib import Path +from types import TracebackType +from typing import Any + +from exiftool import ExifToolHelper + +_SKIP_PREFIXES = ("File:", "ExifTool:") +_SKIP_KEYS = {"SourceFile"} + + +def filter_metadata(raw: dict[str, Any]) -> dict[str, Any] | None: + """Strip filesystem/tool fields; keep only content-derived metadata.""" + if not raw: + return None + cleaned = { + k: v + for k, v in raw.items() + if k not in _SKIP_KEYS + and not any(k.startswith(p) for p in _SKIP_PREFIXES) + } + return cleaned or None + + +class MetadataExtractor: + """Context manager around a persistent ExifTool process.""" + + def __init__(self) -> None: + self._helper: ExifToolHelper | None = None + + def __enter__(self) -> MetadataExtractor: + self._helper = ExifToolHelper() + self._helper.__enter__() + return self + + def __exit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> None: + if self._helper is not None: + self._helper.__exit__(exc_type, exc_val, exc_tb) + self._helper = None + + def extract(self, path: Path) -> dict[str, Any] | None: + """Extract metadata for one file. Returns filtered dict or None.""" + if self._helper is None: + raise RuntimeError("MetadataExtractor must be used as a context manager") + try: + results = self._helper.get_metadata(str(path)) + except Exception: + return None + if not results: + return None + return filter_metadata(results[0]) diff --git a/src/mama/models.py b/src/mama/models.py index b54db4a..8186e33 100644 --- a/src/mama/models.py +++ b/src/mama/models.py @@ -1,10 +1,6 @@ """SQLAlchemy ORM models for the mama archive. -Schema overview: - -- blobs: unique file contents in the CAS (one row per unique hash) -- observations: each file sighting on disk (many per blob), with embedded - metadata as JSONB +See README.md for the full workflow and field semantics. """ from __future__ import annotations @@ -37,28 +33,7 @@ class Blob(Base): class Observation(Base): - """A single sighting of a file on some host at some path. - - Many observations can point to the same blob (= the file content is - identical, but the path/hostname/timestamps differ). Per-observation - metadata extracted from the file (EXIF, ID3, sidecar, ...) lives in - the `meta` JSONB column. - - scan_time semantics: time the file was LAST CONFIRMED at this path. - A rescan that finds the same (path, mtime) updates scan_time instead - of creating a duplicate observation. - - Lifecycle is tracked via `status`: - pending - just scanned, not yet materialized into the archive - assigned - materialized: blob is in CAS, hardlink exists in view - ignored - intentionally not materialized (e.g. user dismissed it) - - Origin is tracked via `source_kind` (immutable): - syncthing - files synced from a phone/device - incoming - dropped manually into an incoming folder - existing - found during the initial archive scan - import - imported from an external medium (DVD, USB, ...) - """ + """A single sighting of a file on some host at some path.""" __tablename__ = "observations" @@ -70,6 +45,7 @@ class Observation(Base): basedir: Mapped[str] = mapped_column(Text, nullable=False) relpath: Mapped[str] = mapped_column(Text, nullable=False) filename: Mapped[str] = mapped_column(Text, nullable=False) + size: Mapped[int] = mapped_column(BigInteger, nullable=False) mtime: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) ctime: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) scan_time: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) @@ -80,10 +56,8 @@ class Observation(Base): blob: Mapped[Blob] = relationship(back_populates="observations") __table_args__ = ( - # Fast lookup for rescan idempotency: did we already see this - # exact file (path + mtime) on this host? Index( "ix_observations_path_mtime", - "hostname", "basedir", "relpath", "filename", "mtime", + "hostname", "basedir", "relpath", "filename", "mtime", "size", ), ) diff --git a/src/mama/scanner.py b/src/mama/scanner.py index bb077cf..555b0ca 100644 --- a/src/mama/scanner.py +++ b/src/mama/scanner.py @@ -1,9 +1,8 @@ """File scanner: walks directories, hashes files, records observations. -The scanner is rescan-safe: if a file at a given (hostname, basedir, -relpath, filename) is found with the same mtime as in a previous scan, -only the observation's scan_time is updated and the file is NOT hashed -again. This makes rescans cheap. +Rescan-safe: if a file at a given path is found with the same size and +mtime as in a previous scan, only scan_time is updated and the file is +not hashed again. """ from __future__ import annotations @@ -19,6 +18,7 @@ from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker from mama.config import Settings +from mama.metadata import MetadataExtractor from mama.models import Blob, Observation EMPTY_HASH = blake3().hexdigest() @@ -35,7 +35,7 @@ def iter_files(root: Path) -> Iterator[Path]: def hash_file(path: Path, size: int) -> str: - """BLAKE3 hash of a file. Uses mmap + multi-threading for large files.""" + """BLAKE3 hash of a file. mmap + multi-threaded for large files.""" if size == 0: return EMPTY_HASH h = blake3(max_threads=blake3.AUTO) @@ -57,8 +57,9 @@ async def _find_existing_observation( relpath: str, filename: str, mtime: datetime, + size: int, ) -> Observation | None: - """Return an existing observation matching path + mtime, or None.""" + """Return an observation matching path + mtime + size, or None.""" stmt = ( select(Observation) .where( @@ -67,6 +68,7 @@ async def _find_existing_observation( Observation.relpath == relpath, Observation.filename == filename, Observation.mtime == mtime, + Observation.size == size, ) .limit(1) ) @@ -83,6 +85,7 @@ async def _process_one( now: datetime, batch_blob_hashes: set[str], counters: dict[str, int], + extractor: MetadataExtractor, ) -> None: """Process a single file: idempotent rescan or full hash+insert.""" stat = path.stat() @@ -94,17 +97,16 @@ async def _process_one( if rel_dir == ".": rel_dir = "" - # Idempotency: same path + same mtime → same content (we trust this). - # Just refresh scan_time, skip hashing entirely. + # Cheap idempotency: same path + mtime + size → trust unchanged. existing = await _find_existing_observation( - session, settings.hostname, str(root), rel_dir, path.name, mtime, + session, settings.hostname, str(root), rel_dir, path.name, mtime, size, ) if existing is not None: existing.scan_time = now counters["unchanged"] += 1 return - # New or modified file: hash and insert. + # New or changed file: hash, detect MIME, extract metadata. hash_hex = hash_file(path, size) if hash_hex not in batch_blob_hashes: @@ -114,7 +116,7 @@ async def _process_one( Blob( hash=hash_hex, size=size, - storage_path="", # populated later by mama-apply + storage_path="", first_seen=now, mime=detect_mime(path), ) @@ -126,6 +128,8 @@ async def _process_one( else: counters["duplicates"] += 1 + meta = extractor.extract(path) + session.add( Observation( blob_hash=hash_hex, @@ -133,15 +137,18 @@ async def _process_one( basedir=str(root), relpath=rel_dir, filename=path.name, + size=size, mtime=mtime, ctime=ctime, scan_time=now, source_kind=source_kind, status="pending", - meta=None, + meta=meta, ) ) counters["new_observations"] += 1 + if meta is not None: + counters["with_metadata"] += 1 async def scan_directory( @@ -160,6 +167,7 @@ async def scan_directory( "new_blobs": 0, "unchanged": 0, "duplicates": 0, + "with_metadata": 0, "errors": 0, } root = root.resolve() @@ -168,30 +176,31 @@ async def scan_directory( files_iter = iter_files(root) batch: list[Path] = [] - while True: - batch.clear() - for _ in range(batch_size): - try: - batch.append(next(files_iter)) - except StopIteration: - break - if not batch: - break - - async with sessions() as session: - seen_blobs: set[str] = set() - for path in batch: - counters["files"] += 1 + with MetadataExtractor() as extractor: + while True: + batch.clear() + for _ in range(batch_size): try: - await _process_one( - session, path, root, settings, source_kind, now, - seen_blobs, counters, - ) - except Exception: - counters["errors"] += 1 - await session.commit() + batch.append(next(files_iter)) + except StopIteration: + break + if not batch: + break - if progress_callback: - progress_callback(counters, batch[-1]) + async with sessions() as session: + seen_blobs: set[str] = set() + for path in batch: + counters["files"] += 1 + try: + await _process_one( + session, path, root, settings, source_kind, now, + seen_blobs, counters, extractor, + ) + except Exception: + counters["errors"] += 1 + await session.commit() + + if progress_callback: + progress_callback(counters, batch[-1]) return counters