ExifTool metadata extraction + size in observations + workflow doc
- metadata.py: persistent ExifTool session (avoids Perl startup per file), filters out File:/ExifTool: noise - scanner now populates observations.meta as JSONB - size duplicated into observations for self-contained queries and to strengthen the rescan idempotency check (path + mtime + size) - README rewritten with state diagram, schema tables, scan/apply workflow
This commit is contained in:
parent
0b43c7c4dd
commit
23566b0885
6 changed files with 328 additions and 89 deletions
191
README.md
191
README.md
|
|
@ -10,38 +10,193 @@ documents) on top of ZFS.
|
||||||
|
|
||||||
- Database schema, CLI surface, on-disk layout, and HTTP API are unstable and
|
- Database schema, CLI surface, on-disk layout, and HTTP API are unstable and
|
||||||
will change without migration paths.
|
will change without migration paths.
|
||||||
- Most features described below are planned, not implemented.
|
- Most features described below are partly implemented, partly planned.
|
||||||
- Documentation lags behind code.
|
- Documentation lags behind code.
|
||||||
|
|
||||||
Do not point mama at irreplaceable data. Keep independent backups of anything
|
Do not point mama at irreplaceable data. Keep independent backups.
|
||||||
mama touches.
|
|
||||||
|
|
||||||
## Concept
|
## Concept
|
||||||
|
|
||||||
mama indexes files placed in configured scan folders, stores file contents in a
|
mama treats every file as two separate things:
|
||||||
content-addressed blob store, and exposes them through hardlinked filesystem
|
|
||||||
views consumable by specialized viewers (Immich for photos and video, Navidrome
|
|
||||||
for music, Paperless-ngx for documents).
|
|
||||||
|
|
||||||
Identical content is stored only once. Per-file context — original path, source
|
- **A blob** — pure content, identified by its BLAKE3 hash. Stored once in
|
||||||
device, scan timestamp, embedded metadata (EXIF, ID3, sidecar files) — is
|
a content-addressed store, regardless of how many places it appears.
|
||||||
preserved as *observations* linked to the underlying blob, so duplicates
|
- **An observation** — a sighting of that content at a specific filesystem
|
||||||
contribute information instead of clutter.
|
path on a specific host at a specific time, with its own filesystem
|
||||||
|
metadata and embedded metadata (EXIF, ID3, sidecar, ...).
|
||||||
|
|
||||||
### Workflow
|
This split is what enables real deduplication without losing context.
|
||||||
|
Identical content from a phone, a backup DVD, and an old laptop become three
|
||||||
|
observations referencing one blob.
|
||||||
|
|
||||||
1. `mama-scan PATH` — index files into the database (no copies, no moves)
|
## Workflow
|
||||||
2. `mama-apply` — materialize approved observations into the archive
|
|
||||||
(blob into CAS, hardlink into view)
|
mama operates in two phases per source folder:
|
||||||
3. `mama-web` — browse, merge duplicates, filter, export, delete
|
|
||||||
|
1. **`mama-scan`** — walk filesystem, hash files, record observations in DB.
|
||||||
|
No copies, no moves. Safe to re-run.
|
||||||
|
2. **`mama-apply`** — materialize observations into the archive (CAS blobs +
|
||||||
|
hardlinked views). Idempotent.
|
||||||
|
|
||||||
|
```
|
||||||
|
┌─────────────────┐
|
||||||
|
filesystem ─▶│ mama-scan │─▶ observations + blobs in DB
|
||||||
|
└─────────────────┘
|
||||||
|
│
|
||||||
|
▼
|
||||||
|
┌─────────────────┐
|
||||||
|
│ mama-apply │─▶ CAS + views
|
||||||
|
└─────────────────┘
|
||||||
|
```
|
||||||
|
|
||||||
|
### Storage Layout
|
||||||
|
|
||||||
|
```
|
||||||
|
<archive_root>/
|
||||||
|
├── blobs/ Content-addressed storage: blobs/ab/cd/<full-hash>
|
||||||
|
│ - mode 444, identical hashes share one inode
|
||||||
|
├── views/ Hardlink trees scoped by source_kind:
|
||||||
|
│ views/<source_kind>/<basedir>/<relpath>/<filename>
|
||||||
|
│ - same inode as the corresponding blob (zero extra storage
|
||||||
|
│ within the same ZFS dataset)
|
||||||
|
└── previews/ (planned: derived thumbnails / low-res for browsing)
|
||||||
|
```
|
||||||
|
|
||||||
|
### Database
|
||||||
|
|
||||||
|
**`blobs`** — one row per unique content (BLAKE3 hash):
|
||||||
|
|
||||||
|
| Column | Type | Purpose |
|
||||||
|
|---------------|-------------|------------------------------------------|
|
||||||
|
| hash | str(64) PK | BLAKE3 hex digest |
|
||||||
|
| size | bigint | content size in bytes |
|
||||||
|
| storage_path | text | location in CAS (set by `mama-apply`) |
|
||||||
|
| first_seen | timestamptz | when first scanned |
|
||||||
|
| mime | str(128)? | detected via libmagic |
|
||||||
|
| block_reason | str(32)? | NULL = active; planned: deleted/blocked |
|
||||||
|
|
||||||
|
**`observations`** — one row per file sighting:
|
||||||
|
|
||||||
|
| Column | Type | Purpose |
|
||||||
|
|-------------|-------------|------------------------------------------|
|
||||||
|
| id | int PK | |
|
||||||
|
| blob_hash | str(64) FK | links to `blobs.hash` |
|
||||||
|
| hostname | str(255) | machine where the file was seen |
|
||||||
|
| basedir | text | scan root path |
|
||||||
|
| relpath | text | directory below scan root |
|
||||||
|
| filename | text | |
|
||||||
|
| size | bigint | size as seen (also in blobs, denormalized) |
|
||||||
|
| mtime | timestamptz | file's modification time |
|
||||||
|
| ctime | timestamptz | file's change time |
|
||||||
|
| scan_time | timestamptz | last time this path was confirmed |
|
||||||
|
| source_kind | str(32) | syncthing / incoming / existing / import |
|
||||||
|
| status | str(32) | pending / assigned / ignored |
|
||||||
|
| meta | jsonb? | ExifTool / ID3 / sidecar metadata |
|
||||||
|
|
||||||
|
Indexes:
|
||||||
|
- `ix_observations_blob_hash` — for joins
|
||||||
|
- `ix_observations_path_mtime` — for rescan idempotency (hostname, basedir, relpath, filename, mtime, size)
|
||||||
|
|
||||||
|
### Observation Lifecycle
|
||||||
|
|
||||||
|
```mermaid
|
||||||
|
stateDiagram-v2
|
||||||
|
[*] --> pending: mama-scan (new file)
|
||||||
|
pending --> assigned: mama-apply
|
||||||
|
pending --> ignored: curation (planned)
|
||||||
|
assigned --> ignored: curation (planned)
|
||||||
|
ignored --> assigned: curation (planned)
|
||||||
|
```
|
||||||
|
|
||||||
|
`status` represents the **target state** (Soll-Zustand):
|
||||||
|
|
||||||
|
- `pending` — newly scanned, target not yet decided
|
||||||
|
- current: `mama-apply` auto-promotes to `assigned`
|
||||||
|
- planned: stays `pending` until reviewed via web UI or rules
|
||||||
|
- `assigned` — should be in the archive; `mama-apply` ensures the view exists
|
||||||
|
- `ignored` — should not be in the archive; `mama-apply` ensures no view (planned)
|
||||||
|
|
||||||
|
`mama-apply`'s job is to reconcile the filesystem with the target state.
|
||||||
|
|
||||||
|
### mama-scan in detail
|
||||||
|
|
||||||
|
For each file under the scan root:
|
||||||
|
|
||||||
|
**1. Cheap path check (no content I/O)**
|
||||||
|
|
||||||
|
Reads:
|
||||||
|
- `stat()` → `size`, `mtime`, `ctime`
|
||||||
|
- DB query for an observation matching
|
||||||
|
`(hostname, basedir, relpath, filename, mtime, size)`
|
||||||
|
|
||||||
|
If a match is found:
|
||||||
|
- update `scan_time` on that observation
|
||||||
|
- increment `unchanged` counter
|
||||||
|
- **skip everything else** (no hashing, no metadata extraction)
|
||||||
|
|
||||||
|
**2. Full processing (new or modified file)**
|
||||||
|
|
||||||
|
Reads:
|
||||||
|
- BLAKE3 over content → `hash`
|
||||||
|
- libmagic → `mime`
|
||||||
|
- ExifTool → `meta` JSON
|
||||||
|
|
||||||
|
Writes:
|
||||||
|
- new `Blob` row if `hash` not seen before (sets: `hash`, `size`, `mime`,
|
||||||
|
`first_seen`; leaves `storage_path` empty for `mama-apply` to fill)
|
||||||
|
- new `Observation` row (sets all fields, `status='pending'`)
|
||||||
|
|
||||||
|
Counters reported: `files | new obs | unchanged | new blobs | duplicates |
|
||||||
|
with metadata | errors`.
|
||||||
|
|
||||||
|
### mama-apply in detail
|
||||||
|
|
||||||
|
Processes observations in cursor-paginated batches, ordered by `id`.
|
||||||
|
|
||||||
|
For each observation:
|
||||||
|
|
||||||
|
1. If `blob.block_reason IS NOT NULL` → skip, count as `blocked`
|
||||||
|
2. Compute CAS target path: `<archive_root>/blobs/<2>/<2>/<full-hash>`
|
||||||
|
3. If CAS target doesn't exist:
|
||||||
|
- resolve source path: `basedir/relpath/filename`
|
||||||
|
- if source is missing → skip, count as `missing`
|
||||||
|
- try `os.link()` (instant, same dataset)
|
||||||
|
- fall back to `shutil.copy2()` (cross-dataset; POSIX limit, costs space)
|
||||||
|
- `chmod 444` on the blob
|
||||||
|
- set `blob.storage_path` to the CAS-relative path
|
||||||
|
4. Compute view path: `<archive_root>/views/<source_kind>/<basedir>/<relpath>/<filename>`
|
||||||
|
5. If view doesn't exist → `os.link()` from CAS blob to view path
|
||||||
|
6. Set `observation.status = 'assigned'`
|
||||||
|
|
||||||
|
The whole loop is idempotent — re-running `mama-apply` with no pending
|
||||||
|
observations does nothing.
|
||||||
|
|
||||||
|
### Rescan safety
|
||||||
|
|
||||||
|
`mama-scan` can be re-run on the same path any number of times:
|
||||||
|
|
||||||
|
- unchanged files (matching `(path, size, mtime)`) → only `scan_time` updated,
|
||||||
|
no new observation, no hashing
|
||||||
|
- modified files → re-hashed, new observation row added (old one stays for history)
|
||||||
|
- new files → full processing
|
||||||
|
- removed files → observation stays in DB (planned: mark as gone)
|
||||||
|
|
||||||
|
This makes `mama-scan` cheap to schedule on a timer for the Syncthing folders.
|
||||||
|
|
||||||
|
## Components
|
||||||
|
|
||||||
|
- **`mama-scan`** — index files into DB (above)
|
||||||
|
- **`mama-apply`** — materialize archive (above)
|
||||||
|
- **`mama-dev`** — developer utilities (`reset`, `stats`)
|
||||||
|
- **`mama-web`** — planned: browse, merge duplicates, filter, export, set status
|
||||||
|
|
||||||
## Tech Stack
|
## Tech Stack
|
||||||
|
|
||||||
- Python 3.13, FastAPI, SQLAlchemy 2.x (async), Alembic
|
- Python 3.13, FastAPI, SQLAlchemy 2.x (async), Alembic
|
||||||
- PostgreSQL 16 (JSONB for embedded metadata)
|
- PostgreSQL 17 (JSONB for embedded metadata)
|
||||||
- Vue 3, Vite
|
- Vue 3, Vite
|
||||||
- ZFS (single archive dataset, snapshots, NFS export), Caddy
|
- ZFS (single archive dataset, snapshots, NFS export), Caddy
|
||||||
- ExifTool, BLAKE3, ffmpeg, Pillow
|
- ExifTool, BLAKE3, libmagic, ffmpeg, Pillow
|
||||||
- Docker Compose for companion viewers (Immich, Navidrome, Paperless-ngx)
|
- Docker Compose for companion viewers (Immich, Navidrome, Paperless-ngx)
|
||||||
|
|
||||||
## Disclaimer
|
## Disclaimer
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,36 @@
|
||||||
|
"""add size to observations and to idempotency index
|
||||||
|
|
||||||
|
Revision ID: da77e90cfd45
|
||||||
|
Revises: 1f337e7155be
|
||||||
|
Create Date: 2026-05-26 08:50:38.257793
|
||||||
|
|
||||||
|
"""
|
||||||
|
from typing import Sequence, Union
|
||||||
|
|
||||||
|
from alembic import op
|
||||||
|
import sqlalchemy as sa
|
||||||
|
|
||||||
|
|
||||||
|
# revision identifiers, used by Alembic.
|
||||||
|
revision: str = 'da77e90cfd45'
|
||||||
|
down_revision: Union[str, Sequence[str], None] = '1f337e7155be'
|
||||||
|
branch_labels: Union[str, Sequence[str], None] = None
|
||||||
|
depends_on: Union[str, Sequence[str], None] = None
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade() -> None:
|
||||||
|
"""Upgrade schema."""
|
||||||
|
# ### commands auto generated by Alembic - please adjust! ###
|
||||||
|
op.add_column('observations', sa.Column('size', sa.BigInteger(), nullable=False))
|
||||||
|
op.drop_index(op.f('ix_observations_path_mtime'), table_name='observations')
|
||||||
|
op.create_index('ix_observations_path_mtime', 'observations', ['hostname', 'basedir', 'relpath', 'filename', 'mtime', 'size'], unique=False)
|
||||||
|
# ### end Alembic commands ###
|
||||||
|
|
||||||
|
|
||||||
|
def downgrade() -> None:
|
||||||
|
"""Downgrade schema."""
|
||||||
|
# ### commands auto generated by Alembic - please adjust! ###
|
||||||
|
op.drop_index('ix_observations_path_mtime', table_name='observations')
|
||||||
|
op.create_index(op.f('ix_observations_path_mtime'), 'observations', ['hostname', 'basedir', 'relpath', 'filename', 'mtime'], unique=False)
|
||||||
|
op.drop_column('observations', 'size')
|
||||||
|
# ### end Alembic commands ###
|
||||||
|
|
@ -27,9 +27,7 @@ def scan(
|
||||||
path: Path = typer.Argument(..., exists=True, file_okay=False, readable=True),
|
path: Path = typer.Argument(..., exists=True, file_okay=False, readable=True),
|
||||||
config: Path = typer.Option(Path("mama.toml"), "--config", "-c"),
|
config: Path = typer.Option(Path("mama.toml"), "--config", "-c"),
|
||||||
source_kind: str = typer.Option(
|
source_kind: str = typer.Option(
|
||||||
"incoming",
|
"incoming", "--source-kind", "-s",
|
||||||
"--source-kind",
|
|
||||||
"-s",
|
|
||||||
help="Origin: syncthing | incoming | existing | import",
|
help="Origin: syncthing | incoming | existing | import",
|
||||||
),
|
),
|
||||||
batch_size: int = typer.Option(100, "--batch-size", "-b"),
|
batch_size: int = typer.Option(100, "--batch-size", "-b"),
|
||||||
|
|
@ -63,8 +61,9 @@ async def _run(
|
||||||
f"{counters['files']} files | "
|
f"{counters['files']} files | "
|
||||||
f"+{counters['new_observations']} new | "
|
f"+{counters['new_observations']} new | "
|
||||||
f"~{counters['unchanged']} unchanged | "
|
f"~{counters['unchanged']} unchanged | "
|
||||||
f"{counters['new_blobs']} new blobs | "
|
f"{counters['new_blobs']} blobs | "
|
||||||
f"{counters['duplicates']} dup | "
|
f"{counters['duplicates']} dup | "
|
||||||
|
f"{counters['with_metadata']} meta | "
|
||||||
f"{counters['errors']} err"
|
f"{counters['errors']} err"
|
||||||
)
|
)
|
||||||
if last_path is not None:
|
if last_path is not None:
|
||||||
|
|
@ -81,11 +80,12 @@ async def _run(
|
||||||
|
|
||||||
console.print(
|
console.print(
|
||||||
f"[green]Done.[/green] "
|
f"[green]Done.[/green] "
|
||||||
f"{counters['files']} files seen | "
|
f"{counters['files']} files | "
|
||||||
f"{counters['new_observations']} new observations | "
|
f"{counters['new_observations']} new obs | "
|
||||||
f"{counters['unchanged']} unchanged | "
|
f"{counters['unchanged']} unchanged | "
|
||||||
f"{counters['new_blobs']} new blobs | "
|
f"{counters['new_blobs']} new blobs | "
|
||||||
f"{counters['duplicates']} duplicates | "
|
f"{counters['duplicates']} duplicates | "
|
||||||
|
f"{counters['with_metadata']} with metadata | "
|
||||||
f"{counters['errors']} errors"
|
f"{counters['errors']} errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
||||||
65
src/mama/metadata.py
Normal file
65
src/mama/metadata.py
Normal file
|
|
@ -0,0 +1,65 @@
|
||||||
|
"""ExifTool-based metadata extraction.
|
||||||
|
|
||||||
|
We use a long-running ExifTool process (via pyexiftool's ExifToolHelper)
|
||||||
|
to avoid the Perl startup overhead per file. Filesystem-level fields
|
||||||
|
(File:*, ExifTool:*, SourceFile) are stripped because mama already
|
||||||
|
tracks those in the observations table.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from pathlib import Path
|
||||||
|
from types import TracebackType
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from exiftool import ExifToolHelper
|
||||||
|
|
||||||
|
_SKIP_PREFIXES = ("File:", "ExifTool:")
|
||||||
|
_SKIP_KEYS = {"SourceFile"}
|
||||||
|
|
||||||
|
|
||||||
|
def filter_metadata(raw: dict[str, Any]) -> dict[str, Any] | None:
|
||||||
|
"""Strip filesystem/tool fields; keep only content-derived metadata."""
|
||||||
|
if not raw:
|
||||||
|
return None
|
||||||
|
cleaned = {
|
||||||
|
k: v
|
||||||
|
for k, v in raw.items()
|
||||||
|
if k not in _SKIP_KEYS
|
||||||
|
and not any(k.startswith(p) for p in _SKIP_PREFIXES)
|
||||||
|
}
|
||||||
|
return cleaned or None
|
||||||
|
|
||||||
|
|
||||||
|
class MetadataExtractor:
|
||||||
|
"""Context manager around a persistent ExifTool process."""
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
self._helper: ExifToolHelper | None = None
|
||||||
|
|
||||||
|
def __enter__(self) -> MetadataExtractor:
|
||||||
|
self._helper = ExifToolHelper()
|
||||||
|
self._helper.__enter__()
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(
|
||||||
|
self,
|
||||||
|
exc_type: type[BaseException] | None,
|
||||||
|
exc_val: BaseException | None,
|
||||||
|
exc_tb: TracebackType | None,
|
||||||
|
) -> None:
|
||||||
|
if self._helper is not None:
|
||||||
|
self._helper.__exit__(exc_type, exc_val, exc_tb)
|
||||||
|
self._helper = None
|
||||||
|
|
||||||
|
def extract(self, path: Path) -> dict[str, Any] | None:
|
||||||
|
"""Extract metadata for one file. Returns filtered dict or None."""
|
||||||
|
if self._helper is None:
|
||||||
|
raise RuntimeError("MetadataExtractor must be used as a context manager")
|
||||||
|
try:
|
||||||
|
results = self._helper.get_metadata(str(path))
|
||||||
|
except Exception:
|
||||||
|
return None
|
||||||
|
if not results:
|
||||||
|
return None
|
||||||
|
return filter_metadata(results[0])
|
||||||
|
|
@ -1,10 +1,6 @@
|
||||||
"""SQLAlchemy ORM models for the mama archive.
|
"""SQLAlchemy ORM models for the mama archive.
|
||||||
|
|
||||||
Schema overview:
|
See README.md for the full workflow and field semantics.
|
||||||
|
|
||||||
- blobs: unique file contents in the CAS (one row per unique hash)
|
|
||||||
- observations: each file sighting on disk (many per blob), with embedded
|
|
||||||
metadata as JSONB
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
@ -37,28 +33,7 @@ class Blob(Base):
|
||||||
|
|
||||||
|
|
||||||
class Observation(Base):
|
class Observation(Base):
|
||||||
"""A single sighting of a file on some host at some path.
|
"""A single sighting of a file on some host at some path."""
|
||||||
|
|
||||||
Many observations can point to the same blob (= the file content is
|
|
||||||
identical, but the path/hostname/timestamps differ). Per-observation
|
|
||||||
metadata extracted from the file (EXIF, ID3, sidecar, ...) lives in
|
|
||||||
the `meta` JSONB column.
|
|
||||||
|
|
||||||
scan_time semantics: time the file was LAST CONFIRMED at this path.
|
|
||||||
A rescan that finds the same (path, mtime) updates scan_time instead
|
|
||||||
of creating a duplicate observation.
|
|
||||||
|
|
||||||
Lifecycle is tracked via `status`:
|
|
||||||
pending - just scanned, not yet materialized into the archive
|
|
||||||
assigned - materialized: blob is in CAS, hardlink exists in view
|
|
||||||
ignored - intentionally not materialized (e.g. user dismissed it)
|
|
||||||
|
|
||||||
Origin is tracked via `source_kind` (immutable):
|
|
||||||
syncthing - files synced from a phone/device
|
|
||||||
incoming - dropped manually into an incoming folder
|
|
||||||
existing - found during the initial archive scan
|
|
||||||
import - imported from an external medium (DVD, USB, ...)
|
|
||||||
"""
|
|
||||||
|
|
||||||
__tablename__ = "observations"
|
__tablename__ = "observations"
|
||||||
|
|
||||||
|
|
@ -70,6 +45,7 @@ class Observation(Base):
|
||||||
basedir: Mapped[str] = mapped_column(Text, nullable=False)
|
basedir: Mapped[str] = mapped_column(Text, nullable=False)
|
||||||
relpath: Mapped[str] = mapped_column(Text, nullable=False)
|
relpath: Mapped[str] = mapped_column(Text, nullable=False)
|
||||||
filename: Mapped[str] = mapped_column(Text, nullable=False)
|
filename: Mapped[str] = mapped_column(Text, nullable=False)
|
||||||
|
size: Mapped[int] = mapped_column(BigInteger, nullable=False)
|
||||||
mtime: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
|
mtime: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
|
||||||
ctime: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
|
ctime: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
|
||||||
scan_time: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
|
scan_time: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
|
||||||
|
|
@ -80,10 +56,8 @@ class Observation(Base):
|
||||||
blob: Mapped[Blob] = relationship(back_populates="observations")
|
blob: Mapped[Blob] = relationship(back_populates="observations")
|
||||||
|
|
||||||
__table_args__ = (
|
__table_args__ = (
|
||||||
# Fast lookup for rescan idempotency: did we already see this
|
|
||||||
# exact file (path + mtime) on this host?
|
|
||||||
Index(
|
Index(
|
||||||
"ix_observations_path_mtime",
|
"ix_observations_path_mtime",
|
||||||
"hostname", "basedir", "relpath", "filename", "mtime",
|
"hostname", "basedir", "relpath", "filename", "mtime", "size",
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
|
||||||
|
|
@ -1,9 +1,8 @@
|
||||||
"""File scanner: walks directories, hashes files, records observations.
|
"""File scanner: walks directories, hashes files, records observations.
|
||||||
|
|
||||||
The scanner is rescan-safe: if a file at a given (hostname, basedir,
|
Rescan-safe: if a file at a given path is found with the same size and
|
||||||
relpath, filename) is found with the same mtime as in a previous scan,
|
mtime as in a previous scan, only scan_time is updated and the file is
|
||||||
only the observation's scan_time is updated and the file is NOT hashed
|
not hashed again.
|
||||||
again. This makes rescans cheap.
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
@ -19,6 +18,7 @@ from sqlalchemy import select
|
||||||
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
|
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
|
||||||
|
|
||||||
from mama.config import Settings
|
from mama.config import Settings
|
||||||
|
from mama.metadata import MetadataExtractor
|
||||||
from mama.models import Blob, Observation
|
from mama.models import Blob, Observation
|
||||||
|
|
||||||
EMPTY_HASH = blake3().hexdigest()
|
EMPTY_HASH = blake3().hexdigest()
|
||||||
|
|
@ -35,7 +35,7 @@ def iter_files(root: Path) -> Iterator[Path]:
|
||||||
|
|
||||||
|
|
||||||
def hash_file(path: Path, size: int) -> str:
|
def hash_file(path: Path, size: int) -> str:
|
||||||
"""BLAKE3 hash of a file. Uses mmap + multi-threading for large files."""
|
"""BLAKE3 hash of a file. mmap + multi-threaded for large files."""
|
||||||
if size == 0:
|
if size == 0:
|
||||||
return EMPTY_HASH
|
return EMPTY_HASH
|
||||||
h = blake3(max_threads=blake3.AUTO)
|
h = blake3(max_threads=blake3.AUTO)
|
||||||
|
|
@ -57,8 +57,9 @@ async def _find_existing_observation(
|
||||||
relpath: str,
|
relpath: str,
|
||||||
filename: str,
|
filename: str,
|
||||||
mtime: datetime,
|
mtime: datetime,
|
||||||
|
size: int,
|
||||||
) -> Observation | None:
|
) -> Observation | None:
|
||||||
"""Return an existing observation matching path + mtime, or None."""
|
"""Return an observation matching path + mtime + size, or None."""
|
||||||
stmt = (
|
stmt = (
|
||||||
select(Observation)
|
select(Observation)
|
||||||
.where(
|
.where(
|
||||||
|
|
@ -67,6 +68,7 @@ async def _find_existing_observation(
|
||||||
Observation.relpath == relpath,
|
Observation.relpath == relpath,
|
||||||
Observation.filename == filename,
|
Observation.filename == filename,
|
||||||
Observation.mtime == mtime,
|
Observation.mtime == mtime,
|
||||||
|
Observation.size == size,
|
||||||
)
|
)
|
||||||
.limit(1)
|
.limit(1)
|
||||||
)
|
)
|
||||||
|
|
@ -83,6 +85,7 @@ async def _process_one(
|
||||||
now: datetime,
|
now: datetime,
|
||||||
batch_blob_hashes: set[str],
|
batch_blob_hashes: set[str],
|
||||||
counters: dict[str, int],
|
counters: dict[str, int],
|
||||||
|
extractor: MetadataExtractor,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Process a single file: idempotent rescan or full hash+insert."""
|
"""Process a single file: idempotent rescan or full hash+insert."""
|
||||||
stat = path.stat()
|
stat = path.stat()
|
||||||
|
|
@ -94,17 +97,16 @@ async def _process_one(
|
||||||
if rel_dir == ".":
|
if rel_dir == ".":
|
||||||
rel_dir = ""
|
rel_dir = ""
|
||||||
|
|
||||||
# Idempotency: same path + same mtime → same content (we trust this).
|
# Cheap idempotency: same path + mtime + size → trust unchanged.
|
||||||
# Just refresh scan_time, skip hashing entirely.
|
|
||||||
existing = await _find_existing_observation(
|
existing = await _find_existing_observation(
|
||||||
session, settings.hostname, str(root), rel_dir, path.name, mtime,
|
session, settings.hostname, str(root), rel_dir, path.name, mtime, size,
|
||||||
)
|
)
|
||||||
if existing is not None:
|
if existing is not None:
|
||||||
existing.scan_time = now
|
existing.scan_time = now
|
||||||
counters["unchanged"] += 1
|
counters["unchanged"] += 1
|
||||||
return
|
return
|
||||||
|
|
||||||
# New or modified file: hash and insert.
|
# New or changed file: hash, detect MIME, extract metadata.
|
||||||
hash_hex = hash_file(path, size)
|
hash_hex = hash_file(path, size)
|
||||||
|
|
||||||
if hash_hex not in batch_blob_hashes:
|
if hash_hex not in batch_blob_hashes:
|
||||||
|
|
@ -114,7 +116,7 @@ async def _process_one(
|
||||||
Blob(
|
Blob(
|
||||||
hash=hash_hex,
|
hash=hash_hex,
|
||||||
size=size,
|
size=size,
|
||||||
storage_path="", # populated later by mama-apply
|
storage_path="",
|
||||||
first_seen=now,
|
first_seen=now,
|
||||||
mime=detect_mime(path),
|
mime=detect_mime(path),
|
||||||
)
|
)
|
||||||
|
|
@ -126,6 +128,8 @@ async def _process_one(
|
||||||
else:
|
else:
|
||||||
counters["duplicates"] += 1
|
counters["duplicates"] += 1
|
||||||
|
|
||||||
|
meta = extractor.extract(path)
|
||||||
|
|
||||||
session.add(
|
session.add(
|
||||||
Observation(
|
Observation(
|
||||||
blob_hash=hash_hex,
|
blob_hash=hash_hex,
|
||||||
|
|
@ -133,15 +137,18 @@ async def _process_one(
|
||||||
basedir=str(root),
|
basedir=str(root),
|
||||||
relpath=rel_dir,
|
relpath=rel_dir,
|
||||||
filename=path.name,
|
filename=path.name,
|
||||||
|
size=size,
|
||||||
mtime=mtime,
|
mtime=mtime,
|
||||||
ctime=ctime,
|
ctime=ctime,
|
||||||
scan_time=now,
|
scan_time=now,
|
||||||
source_kind=source_kind,
|
source_kind=source_kind,
|
||||||
status="pending",
|
status="pending",
|
||||||
meta=None,
|
meta=meta,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
counters["new_observations"] += 1
|
counters["new_observations"] += 1
|
||||||
|
if meta is not None:
|
||||||
|
counters["with_metadata"] += 1
|
||||||
|
|
||||||
|
|
||||||
async def scan_directory(
|
async def scan_directory(
|
||||||
|
|
@ -160,6 +167,7 @@ async def scan_directory(
|
||||||
"new_blobs": 0,
|
"new_blobs": 0,
|
||||||
"unchanged": 0,
|
"unchanged": 0,
|
||||||
"duplicates": 0,
|
"duplicates": 0,
|
||||||
|
"with_metadata": 0,
|
||||||
"errors": 0,
|
"errors": 0,
|
||||||
}
|
}
|
||||||
root = root.resolve()
|
root = root.resolve()
|
||||||
|
|
@ -168,30 +176,31 @@ async def scan_directory(
|
||||||
files_iter = iter_files(root)
|
files_iter = iter_files(root)
|
||||||
batch: list[Path] = []
|
batch: list[Path] = []
|
||||||
|
|
||||||
while True:
|
with MetadataExtractor() as extractor:
|
||||||
batch.clear()
|
while True:
|
||||||
for _ in range(batch_size):
|
batch.clear()
|
||||||
try:
|
for _ in range(batch_size):
|
||||||
batch.append(next(files_iter))
|
|
||||||
except StopIteration:
|
|
||||||
break
|
|
||||||
if not batch:
|
|
||||||
break
|
|
||||||
|
|
||||||
async with sessions() as session:
|
|
||||||
seen_blobs: set[str] = set()
|
|
||||||
for path in batch:
|
|
||||||
counters["files"] += 1
|
|
||||||
try:
|
try:
|
||||||
await _process_one(
|
batch.append(next(files_iter))
|
||||||
session, path, root, settings, source_kind, now,
|
except StopIteration:
|
||||||
seen_blobs, counters,
|
break
|
||||||
)
|
if not batch:
|
||||||
except Exception:
|
break
|
||||||
counters["errors"] += 1
|
|
||||||
await session.commit()
|
|
||||||
|
|
||||||
if progress_callback:
|
async with sessions() as session:
|
||||||
progress_callback(counters, batch[-1])
|
seen_blobs: set[str] = set()
|
||||||
|
for path in batch:
|
||||||
|
counters["files"] += 1
|
||||||
|
try:
|
||||||
|
await _process_one(
|
||||||
|
session, path, root, settings, source_kind, now,
|
||||||
|
seen_blobs, counters, extractor,
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
counters["errors"] += 1
|
||||||
|
await session.commit()
|
||||||
|
|
||||||
|
if progress_callback:
|
||||||
|
progress_callback(counters, batch[-1])
|
||||||
|
|
||||||
return counters
|
return counters
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue