"""Lifecycle hooks, writer protocols, and operation context objects.
Hooks are structural protocols: a plugin object participates in a lifecycle
phase by implementing the corresponding method, such as
``before_validate_metadata(context)`` or ``extract_metadata(context)``. Hook
methods receive an :class:`OperationContext`, a mutable object that carries the
catalog root, operation id, source description, planned locators, user metadata,
derived metadata, rollback registrar, and accumulated hook warnings.
The design intentionally keeps hook classes lightweight. Plugin authors do not
subclass a base class; they implement one or more protocol methods, usually with
``context: OperationContext`` and ``-> None`` or ``-> MetadataDict | None`` type
hints. ``HookManager`` dispatches protocols in deterministic registration order,
merges returned derived metadata, records non-fatal after-commit failures as
warnings, and preserves the original exception when error or rollback hooks
fail.
Artifact writers use the same context model. Any object satisfying
:class:`ArtifactWriter` can materialise an :class:`ogcat.models.ArtifactLocator`
from an :class:`OperationSource` before the catalog record is committed.
"""
from __future__ import annotations
import warnings
from collections.abc import Callable, Iterable
from dataclasses import dataclass, field
from pathlib import Path
from typing import Protocol, runtime_checkable
from ogcat.models import ArtifactLocator, MetadataDict
from ogcat.storage import StoragePlan
from ogcat.transactions import RollbackAction
from ogcat.validation import ValidationReport
[docs]
@dataclass(slots=True)
class HookWarning:
"""A non-fatal hook warning.
Args:
hook_name: Name of the hook or plugin reporting the warning.
message: Human-readable warning message.
code: Stable machine-readable warning code.
"""
hook_name: str
message: str
code: str = "hook.warning"
class RollbackRegistrar(Protocol):
"""Function signature used to register rollback work with a transaction."""
def __call__(
self,
action: RollbackAction | Callable[[], None],
*,
description: str | None = None,
) -> RollbackAction:
"""Register a rollback action."""
...
[docs]
@dataclass(slots=True)
class OperationSource:
"""Description of the artifact source for a catalog operation.
Args:
kind: Short source kind, such as ``"local_file"`` or ``"external"``.
path: Optional local source path.
descriptor: Optional non-path source description or URI.
metadata: Source-specific JSON-compatible metadata.
payload: Optional in-memory Python object for writer helpers.
"""
kind: str
path: Path | None = None
descriptor: str | None = None
metadata: MetadataDict = field(default_factory=dict)
payload: object | None = None
[docs]
class ArtifactWriter(Protocol):
"""Plugin-facing writer that materialises artifact data before record write."""
[docs]
def write(
self,
context: OperationContext,
source: OperationSource,
target: ArtifactLocator,
) -> None:
"""Write artifact data from source to target."""
...
[docs]
@dataclass(slots=True)
class OperationContext:
"""Mutable context passed to catalog lifecycle hooks.
Hooks exert their effect by mutating documented fields on this object,
raising an exception, or registering rollback work. `user_metadata` may be
mutated before validation. `planned_locators` may be changed during
`resolve_artifact_locator`; the first locator is treated as canonical.
`derived_metadata` may be updated during metadata extraction.
Args:
catalog_root: Root path of the catalog.
operation_id: Identifier shared with the transaction.
operation_type: Catalog operation name, such as ``"add_file"``.
record_type: Record type being created.
user_metadata: User-supplied metadata, mutable by hooks before validation.
derived_metadata: Derived metadata collected during the operation.
planned_locators: Locators planned or supplied for the operation.
register_rollback: Low-level rollback registrar. Hook authors should
normally call ``context.rollback(...)`` instead.
source: Artifact source description for this operation.
storage_mode: Optional storage mode, such as ``"copy"`` or ``"move"``.
storage_plan: Optional planned artifact storage decision.
original_path: Optional original path or URI.
original_filename: Optional original filename.
suffixes: Source suffixes associated with the artifact.
"""
catalog_root: Path
operation_id: str
operation_type: str
record_type: str
user_metadata: MetadataDict
derived_metadata: MetadataDict = field(default_factory=dict)
planned_locators: list[ArtifactLocator] = field(default_factory=list)
register_rollback: RollbackRegistrar | None = None
source: OperationSource = field(default_factory=lambda: OperationSource(kind="unknown"))
storage_mode: str | None = None
storage_plan: StoragePlan | None = None
original_path: str | Path | None = None
original_filename: str | None = None
suffixes: list[str] = field(default_factory=list)
warnings: list[HookWarning] = field(default_factory=list)
@property
def source_path(self) -> Path | None:
"""Optional local source path, kept for compatibility with existing hooks."""
return self.source.path
@source_path.setter
def source_path(self, value: Path | None) -> None:
"""Set the optional local source path on the operation source."""
self.source.path = value
@property
def source_descriptor(self) -> str | None:
"""Optional source description, kept for compatibility with existing hooks."""
return self.source.descriptor
@source_descriptor.setter
def source_descriptor(self, value: str | None) -> None:
"""Set the optional source description on the operation source."""
self.source.descriptor = value
[docs]
def add_warning(self, warning: HookWarning | str, *, hook_name: str = "hook") -> None:
"""Record a non-fatal warning for this operation."""
if isinstance(warning, HookWarning):
self.warnings.append(warning)
return
self.warnings.append(HookWarning(hook_name=hook_name, message=warning))
[docs]
def rollback(
self,
action: RollbackAction | Callable[[], None],
*,
description: str | None = None,
) -> RollbackAction:
"""Register a rollback action through the active catalog transaction."""
if self.register_rollback is None:
raise RuntimeError("No active rollback registration is available.")
return self.register_rollback(action, description=description)
[docs]
@runtime_checkable
class ResolveArtifactLocatorHook(Protocol):
"""Hook called after an artifact locator has been proposed."""
[docs]
def resolve_artifact_locator(self, context: OperationContext) -> None:
"""Inspect, replace, or extend planned artifact locators."""
...
[docs]
@runtime_checkable
class BeforeRecordWriteHook(Protocol):
"""Hook called before writing the catalog record."""
[docs]
def before_record_write(self, context: OperationContext) -> None:
"""Run before the catalog record is written."""
...
[docs]
@runtime_checkable
class AfterRecordWriteHook(Protocol):
"""Hook called after writing the catalog record."""
[docs]
def after_record_write(self, context: OperationContext) -> None:
"""Run after the catalog record is written."""
...
[docs]
@runtime_checkable
class BeforeCommitHook(Protocol):
"""Hook called before committing the catalog transaction."""
[docs]
def before_commit(self, context: OperationContext) -> None:
"""Run before transaction commit."""
...
[docs]
@runtime_checkable
class AfterCommitHook(Protocol):
"""Hook called after committing the catalog transaction."""
[docs]
def after_commit(self, context: OperationContext) -> None:
"""Run after transaction commit."""
...
[docs]
@runtime_checkable
class RollbackHook(Protocol):
"""Hook called when an operation fails and rolls back."""
[docs]
def on_rollback(self, context: OperationContext, error: BaseException) -> None:
"""Run after rollback has been requested for a failed operation."""
...
[docs]
@runtime_checkable
class ErrorHook(Protocol):
"""Hook called when an operation fails."""
[docs]
def on_error(self, context: OperationContext, error: BaseException) -> None:
"""Run when an operation fails."""
...
[docs]
class HookManager:
"""Deterministic dispatcher for registered catalog hooks."""
def __init__(self, hooks: Iterable[object] = ()) -> None:
self._hooks = list(hooks)
@property
def hooks(self) -> tuple[object, ...]:
"""Registered hooks in dispatch order."""
return tuple(self._hooks)
[docs]
def register(self, hook: object) -> object:
"""Register a hook object and return it for decorator-style usage."""
self._hooks.append(hook)
return hook
[docs]
def resolve_artifact_locator(self, context: OperationContext) -> None:
"""Dispatch ``resolve_artifact_locator`` hooks."""
for hook in self._hooks:
if isinstance(hook, ResolveArtifactLocatorHook):
hook.resolve_artifact_locator(context)
[docs]
def before_record_write(self, context: OperationContext) -> None:
"""Dispatch ``before_record_write`` hooks."""
for hook in self._hooks:
if isinstance(hook, BeforeRecordWriteHook):
hook.before_record_write(context)
[docs]
def after_record_write(self, context: OperationContext) -> None:
"""Dispatch ``after_record_write`` hooks."""
for hook in self._hooks:
if isinstance(hook, AfterRecordWriteHook):
hook.after_record_write(context)
[docs]
def before_commit(self, context: OperationContext) -> None:
"""Dispatch ``before_commit`` hooks."""
for hook in self._hooks:
if isinstance(hook, BeforeCommitHook):
hook.before_commit(context)
[docs]
def after_commit(self, context: OperationContext) -> None:
"""Dispatch ``after_commit`` hooks without failing committed work."""
for hook in self._hooks:
if isinstance(hook, AfterCommitHook):
try:
hook.after_commit(context)
except Exception as exc:
warning = HookWarning(
hook_name=type(hook).__name__,
message=f"after_commit hook failed: {type(exc).__name__}: {exc}",
code="hook.after_commit_failed",
)
context.add_warning(warning)
warnings.warn(
f"{warning.hook_name}: {warning.message}",
RuntimeWarning,
stacklevel=2,
)
[docs]
def on_error(self, context: OperationContext, error: BaseException) -> None:
"""Dispatch error hooks, preserving the original operation failure."""
for hook in self._hooks:
if isinstance(hook, ErrorHook):
try:
hook.on_error(context, error)
except Exception as exc:
error.add_note(f"error hook failed: {type(exc).__name__}: {exc}")
[docs]
def on_rollback(self, context: OperationContext, error: BaseException) -> None:
"""Dispatch rollback hooks, preserving the original operation failure."""
for hook in self._hooks:
if isinstance(hook, RollbackHook):
try:
hook.on_rollback(context, error)
except Exception as exc:
error.add_note(f"rollback hook failed: {type(exc).__name__}: {exc}")
__all__ = [
"AfterCommitHook",
"AfterValidateMetadataHook",
"AfterRecordWriteHook",
"ArtifactWriter",
"BeforeCommitHook",
"BeforeValidateMetadataHook",
"BeforeRecordWriteHook",
"ErrorHook",
"ExtractMetadataHook",
"HookManager",
"HookWarning",
"OperationContext",
"OperationSource",
"ResolveArtifactLocatorHook",
"RollbackHook",
"RollbackRegistrar",
]