glam/scripts/sync/__init__.py
2025-12-14 17:09:55 +01:00

158 lines
4.8 KiB
Python

"""
Database Sync Package - Unified sync from YAML source of truth to all databases.
This package provides modular sync functionality for syncing custodian YAML files
to all database backends: DuckLake, PostgreSQL, Oxigraph, and Qdrant.
Usage:
# Sync all databases
python -m scripts.sync.all
# Sync specific database
python -m scripts.sync.ducklake
python -m scripts.sync.postgres
python -m scripts.sync.oxigraph
python -m scripts.sync.qdrant
Architecture (from AGENTS.md Rule 22):
data/custodian/*.yaml <- SINGLE SOURCE OF TRUTH
|
+------+------+------+------+
| | | | |
v v v v v
DuckLake PostgreSQL Oxigraph Qdrant
|
REST API responses <- DERIVED
|
Frontend display <- DERIVED
"""
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Any, Callable, Optional
import logging
logger = logging.getLogger(__name__)
class SyncStatus(Enum):
"""Status of a sync operation."""
PENDING = "pending"
IN_PROGRESS = "in_progress"
SUCCESS = "success"
PARTIAL = "partial" # Some records failed
FAILED = "failed"
SKIPPED = "skipped"
@dataclass
class SyncResult:
"""Result of a database sync operation."""
database: str
status: SyncStatus
records_processed: int = 0
records_succeeded: int = 0
records_failed: int = 0
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
error_message: Optional[str] = None
details: dict = field(default_factory=dict)
@property
def duration_seconds(self) -> float:
"""Duration of sync in seconds."""
if self.start_time and self.end_time:
return (self.end_time - self.start_time).total_seconds()
return 0.0
def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization."""
return {
"database": self.database,
"status": self.status.value,
"records_processed": self.records_processed,
"records_succeeded": self.records_succeeded,
"records_failed": self.records_failed,
"start_time": self.start_time.isoformat() if self.start_time else None,
"end_time": self.end_time.isoformat() if self.end_time else None,
"duration_seconds": self.duration_seconds,
"error_message": self.error_message,
"details": self.details,
}
@dataclass
class SyncProgress:
"""Progress tracking for sync operations."""
total_files: int = 0
processed_files: int = 0
current_file: str = ""
current_database: str = ""
errors: list = field(default_factory=list)
@property
def percentage(self) -> float:
if self.total_files == 0:
return 0.0
return (self.processed_files / self.total_files) * 100
def to_dict(self) -> dict:
return {
"total_files": self.total_files,
"processed_files": self.processed_files,
"current_file": self.current_file,
"current_database": self.current_database,
"percentage": round(self.percentage, 1),
"errors": self.errors[-10:], # Last 10 errors
}
class BaseSyncer:
"""Base class for database syncers."""
database_name: str = "unknown"
def __init__(
self,
data_dir: Path | None = None,
progress_callback: Optional[Callable[[SyncProgress], None]] = None,
):
self.data_dir = data_dir or Path(__file__).parent.parent.parent / "data" / "custodian"
self.progress_callback = progress_callback
self.progress = SyncProgress()
self.logger = logging.getLogger(f"{__name__}.{self.database_name}")
def check_connection(self) -> bool:
"""Check if database is available. Override in subclass."""
raise NotImplementedError
def sync(self, limit: Optional[int] = None, dry_run: bool = False) -> SyncResult:
"""Run sync operation. Override in subclass."""
raise NotImplementedError
def _report_progress(self) -> None:
"""Report progress via callback if set."""
if self.progress_callback:
self.progress_callback(self.progress)
def _list_yaml_files(self) -> list[Path]:
"""List all YAML files in data directory."""
if not self.data_dir.exists():
self.logger.error(f"Data directory not found: {self.data_dir}")
return []
return sorted(self.data_dir.glob("*.yaml"))
# Database availability constants
DEFAULT_CUSTODIAN_DIR = Path(__file__).parent.parent.parent / "data" / "custodian"
# Export all
__all__ = [
"SyncStatus",
"SyncResult",
"SyncProgress",
"BaseSyncer",
"DEFAULT_CUSTODIAN_DIR",
]