Source code for ogcat.hooks

"""Lifecycle hooks, writer protocols, and operation context objects.

Hooks are structural protocols: a plugin object participates in a lifecycle
phase by implementing the corresponding method, such as
``before_validate_metadata(context)`` or ``extract_metadata(context)``. Hook
methods receive an :class:`OperationContext`, a mutable object that carries the
catalog root, operation id, source description, planned locators, user metadata,
derived metadata, rollback registrar, and accumulated hook warnings.

The design intentionally keeps hook classes lightweight. Plugin authors do not
subclass a base class; they implement one or more protocol methods, usually with
``context: OperationContext`` and ``-> None`` or ``-> MetadataDict | None`` type
hints. ``HookManager`` dispatches protocols in deterministic registration order,
merges returned derived metadata, records non-fatal after-commit failures as
warnings, and preserves the original exception when error or rollback hooks
fail.

Artifact writers use the same context model. Any object satisfying
:class:`ArtifactWriter` can materialise an :class:`ogcat.models.ArtifactLocator`
from an :class:`OperationSource` before the catalog record is committed.
"""

from __future__ import annotations

import warnings
from collections.abc import Callable, Iterable
from dataclasses import dataclass, field
from pathlib import Path
from typing import Protocol, runtime_checkable

from ogcat.models import ArtifactLocator, MetadataDict
from ogcat.storage import StoragePlan
from ogcat.transactions import RollbackAction
from ogcat.validation import ValidationReport


[docs] @dataclass(slots=True) class HookWarning: """A non-fatal hook warning. Args: hook_name: Name of the hook or plugin reporting the warning. message: Human-readable warning message. code: Stable machine-readable warning code. """ hook_name: str message: str code: str = "hook.warning"
[docs] def to_metadata(self) -> MetadataDict: """Convert the warning to JSON-compatible metadata.""" return { "hook_name": self.hook_name, "message": self.message, "code": self.code, }
class RollbackRegistrar(Protocol): """Function signature used to register rollback work with a transaction.""" def __call__( self, action: RollbackAction | Callable[[], None], *, description: str | None = None, ) -> RollbackAction: """Register a rollback action.""" ...
[docs] @dataclass(slots=True) class OperationSource: """Description of the artifact source for a catalog operation. Args: kind: Short source kind, such as ``"local_file"`` or ``"external"``. path: Optional local source path. descriptor: Optional non-path source description or URI. metadata: Source-specific JSON-compatible metadata. payload: Optional in-memory Python object for writer helpers. """ kind: str path: Path | None = None descriptor: str | None = None metadata: MetadataDict = field(default_factory=dict) payload: object | None = None
[docs] class ArtifactWriter(Protocol): """Plugin-facing writer that materialises artifact data before record write."""
[docs] def write( self, context: OperationContext, source: OperationSource, target: ArtifactLocator, ) -> None: """Write artifact data from source to target.""" ...
[docs] @dataclass(slots=True) class OperationContext: """Mutable context passed to catalog lifecycle hooks. Hooks exert their effect by mutating documented fields on this object, raising an exception, or registering rollback work. `user_metadata` may be mutated before validation. `planned_locators` may be changed during `resolve_artifact_locator`; the first locator is treated as canonical. `derived_metadata` may be updated during metadata extraction. Args: catalog_root: Root path of the catalog. operation_id: Identifier shared with the transaction. operation_type: Catalog operation name, such as ``"add_file"``. record_type: Record type being created. user_metadata: User-supplied metadata, mutable by hooks before validation. derived_metadata: Derived metadata collected during the operation. planned_locators: Locators planned or supplied for the operation. register_rollback: Low-level rollback registrar. Hook authors should normally call ``context.rollback(...)`` instead. source: Artifact source description for this operation. storage_mode: Optional storage mode, such as ``"copy"`` or ``"move"``. storage_plan: Optional planned artifact storage decision. original_path: Optional original path or URI. original_filename: Optional original filename. suffixes: Source suffixes associated with the artifact. """ catalog_root: Path operation_id: str operation_type: str record_type: str user_metadata: MetadataDict derived_metadata: MetadataDict = field(default_factory=dict) planned_locators: list[ArtifactLocator] = field(default_factory=list) register_rollback: RollbackRegistrar | None = None source: OperationSource = field(default_factory=lambda: OperationSource(kind="unknown")) storage_mode: str | None = None storage_plan: StoragePlan | None = None original_path: str | Path | None = None original_filename: str | None = None suffixes: list[str] = field(default_factory=list) warnings: list[HookWarning] = field(default_factory=list) @property def source_path(self) -> Path | None: """Optional local source path, kept for compatibility with existing hooks.""" return self.source.path @source_path.setter def source_path(self, value: Path | None) -> None: """Set the optional local source path on the operation source.""" self.source.path = value @property def source_descriptor(self) -> str | None: """Optional source description, kept for compatibility with existing hooks.""" return self.source.descriptor @source_descriptor.setter def source_descriptor(self, value: str | None) -> None: """Set the optional source description on the operation source.""" self.source.descriptor = value
[docs] def add_warning(self, warning: HookWarning | str, *, hook_name: str = "hook") -> None: """Record a non-fatal warning for this operation.""" if isinstance(warning, HookWarning): self.warnings.append(warning) return self.warnings.append(HookWarning(hook_name=hook_name, message=warning))
[docs] def rollback( self, action: RollbackAction | Callable[[], None], *, description: str | None = None, ) -> RollbackAction: """Register a rollback action through the active catalog transaction.""" if self.register_rollback is None: raise RuntimeError("No active rollback registration is available.") return self.register_rollback(action, description=description)
[docs] @runtime_checkable class BeforeValidateMetadataHook(Protocol): """Hook called before schema metadata validation."""
[docs] def before_validate_metadata(self, context: OperationContext) -> None: """Inspect or mutate metadata before validation.""" ...
[docs] @runtime_checkable class AfterValidateMetadataHook(Protocol): """Hook called after schema metadata validation."""
[docs] def after_validate_metadata(self, context: OperationContext, report: ValidationReport) -> None: """Inspect validation results.""" ...
[docs] @runtime_checkable class ResolveArtifactLocatorHook(Protocol): """Hook called after an artifact locator has been proposed."""
[docs] def resolve_artifact_locator(self, context: OperationContext) -> None: """Inspect, replace, or extend planned artifact locators.""" ...
[docs] @runtime_checkable class BeforeRecordWriteHook(Protocol): """Hook called before writing the catalog record."""
[docs] def before_record_write(self, context: OperationContext) -> None: """Run before the catalog record is written.""" ...
[docs] @runtime_checkable class AfterRecordWriteHook(Protocol): """Hook called after writing the catalog record."""
[docs] def after_record_write(self, context: OperationContext) -> None: """Run after the catalog record is written.""" ...
[docs] @runtime_checkable class ExtractMetadataHook(Protocol): """Hook called during derived metadata extraction."""
[docs] def extract_metadata(self, context: OperationContext) -> MetadataDict | None: """Return derived metadata to merge into the operation context.""" ...
[docs] @runtime_checkable class BeforeCommitHook(Protocol): """Hook called before committing the catalog transaction."""
[docs] def before_commit(self, context: OperationContext) -> None: """Run before transaction commit.""" ...
[docs] @runtime_checkable class AfterCommitHook(Protocol): """Hook called after committing the catalog transaction."""
[docs] def after_commit(self, context: OperationContext) -> None: """Run after transaction commit.""" ...
[docs] @runtime_checkable class RollbackHook(Protocol): """Hook called when an operation fails and rolls back."""
[docs] def on_rollback(self, context: OperationContext, error: BaseException) -> None: """Run after rollback has been requested for a failed operation.""" ...
[docs] @runtime_checkable class ErrorHook(Protocol): """Hook called when an operation fails."""
[docs] def on_error(self, context: OperationContext, error: BaseException) -> None: """Run when an operation fails.""" ...
[docs] class HookManager: """Deterministic dispatcher for registered catalog hooks.""" def __init__(self, hooks: Iterable[object] = ()) -> None: self._hooks = list(hooks) @property def hooks(self) -> tuple[object, ...]: """Registered hooks in dispatch order.""" return tuple(self._hooks)
[docs] def register(self, hook: object) -> object: """Register a hook object and return it for decorator-style usage.""" self._hooks.append(hook) return hook
[docs] def before_validate_metadata(self, context: OperationContext) -> None: """Dispatch ``before_validate_metadata`` hooks.""" for hook in self._hooks: if isinstance(hook, BeforeValidateMetadataHook): hook.before_validate_metadata(context)
[docs] def after_validate_metadata(self, context: OperationContext, report: ValidationReport) -> None: """Dispatch ``after_validate_metadata`` hooks.""" for hook in self._hooks: if isinstance(hook, AfterValidateMetadataHook): hook.after_validate_metadata(context, report)
[docs] def resolve_artifact_locator(self, context: OperationContext) -> None: """Dispatch ``resolve_artifact_locator`` hooks.""" for hook in self._hooks: if isinstance(hook, ResolveArtifactLocatorHook): hook.resolve_artifact_locator(context)
[docs] def before_record_write(self, context: OperationContext) -> None: """Dispatch ``before_record_write`` hooks.""" for hook in self._hooks: if isinstance(hook, BeforeRecordWriteHook): hook.before_record_write(context)
[docs] def after_record_write(self, context: OperationContext) -> None: """Dispatch ``after_record_write`` hooks.""" for hook in self._hooks: if isinstance(hook, AfterRecordWriteHook): hook.after_record_write(context)
[docs] def extract_metadata(self, context: OperationContext) -> None: """Dispatch metadata extraction hooks and merge returned metadata.""" for hook in self._hooks: if isinstance(hook, ExtractMetadataHook): extracted = hook.extract_metadata(context) if extracted is not None: context.derived_metadata.update(extracted)
[docs] def before_commit(self, context: OperationContext) -> None: """Dispatch ``before_commit`` hooks.""" for hook in self._hooks: if isinstance(hook, BeforeCommitHook): hook.before_commit(context)
[docs] def after_commit(self, context: OperationContext) -> None: """Dispatch ``after_commit`` hooks without failing committed work.""" for hook in self._hooks: if isinstance(hook, AfterCommitHook): try: hook.after_commit(context) except Exception as exc: warning = HookWarning( hook_name=type(hook).__name__, message=f"after_commit hook failed: {type(exc).__name__}: {exc}", code="hook.after_commit_failed", ) context.add_warning(warning) warnings.warn( f"{warning.hook_name}: {warning.message}", RuntimeWarning, stacklevel=2, )
[docs] def on_error(self, context: OperationContext, error: BaseException) -> None: """Dispatch error hooks, preserving the original operation failure.""" for hook in self._hooks: if isinstance(hook, ErrorHook): try: hook.on_error(context, error) except Exception as exc: error.add_note(f"error hook failed: {type(exc).__name__}: {exc}")
[docs] def on_rollback(self, context: OperationContext, error: BaseException) -> None: """Dispatch rollback hooks, preserving the original operation failure.""" for hook in self._hooks: if isinstance(hook, RollbackHook): try: hook.on_rollback(context, error) except Exception as exc: error.add_note(f"rollback hook failed: {type(exc).__name__}: {exc}")
__all__ = [ "AfterCommitHook", "AfterValidateMetadataHook", "AfterRecordWriteHook", "ArtifactWriter", "BeforeCommitHook", "BeforeValidateMetadataHook", "BeforeRecordWriteHook", "ErrorHook", "ExtractMetadataHook", "HookManager", "HookWarning", "OperationContext", "OperationSource", "ResolveArtifactLocatorHook", "RollbackHook", "RollbackRegistrar", ]