glam/backend/sync/main.py
2025-12-14 17:09:55 +01:00

479 lines
15 KiB
Python

"""
Database Sync API Server
FastAPI backend for triggering and monitoring database sync operations.
Syncs all databases from data/custodian/*.yaml (single source of truth).
Endpoints:
POST /api/sync/all - Sync all databases
POST /api/sync/{database} - Sync specific database
GET /api/sync/status - Get sync status for all databases
GET /api/sync/progress - Get sync progress (if running)
Run with:
uvicorn backend.sync.main:app --port 8766
"""
import asyncio
import os
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional, Any
from contextlib import asynccontextmanager
from enum import Enum
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
# Add project root to path for imports
PROJECT_ROOT = Path(__file__).parent.parent.parent
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
from scripts.sync import SyncResult, SyncStatus, SyncProgress
from scripts.sync.ducklake_sync import DuckLakeSyncer
from scripts.sync.postgres_sync import PostgresSyncer
from scripts.sync.oxigraph_sync import OxigraphSyncer
from scripts.sync.qdrant_sync import QdrantSyncer
# ============================================================================
# Configuration
# ============================================================================
class Settings(BaseModel):
"""Sync API server settings"""
host: str = os.getenv("SYNC_API_HOST", "0.0.0.0")
port: int = int(os.getenv("SYNC_API_PORT", "8766"))
data_dir: str = os.getenv("GLAM_DATA_DIR", str(PROJECT_ROOT / "data" / "custodian"))
settings = Settings()
# Convert to Path once for reuse
DATA_DIR = Path(settings.data_dir)
# ============================================================================
# Database Registry
# ============================================================================
SYNCERS = {
'ducklake': DuckLakeSyncer,
'postgres': PostgresSyncer,
'oxigraph': OxigraphSyncer,
'qdrant': QdrantSyncer,
}
AVAILABLE_DATABASES = list(SYNCERS.keys())
# ============================================================================
# Pydantic Models
# ============================================================================
class DatabaseEnum(str, Enum):
ducklake = "ducklake"
postgres = "postgres"
oxigraph = "oxigraph"
qdrant = "qdrant"
class SyncRequest(BaseModel):
"""Request to sync database(s)"""
databases: Optional[List[str]] = Field(
None,
description="List of databases to sync (default: all)"
)
limit: Optional[int] = Field(
None,
description="Limit number of records to sync"
)
dry_run: bool = Field(
False,
description="Preview changes without modifying databases"
)
class SyncResultResponse(BaseModel):
"""Response for a single database sync result"""
database: str
status: str
records_processed: int
records_succeeded: int
records_failed: int
duration_seconds: float
error_message: Optional[str] = None
details: Dict[str, Any] = {}
class SyncAllResponse(BaseModel):
"""Response for syncing all databases"""
overall_status: str
dry_run: bool
started_at: datetime
completed_at: Optional[datetime] = None
duration_seconds: float
total_databases: int
databases_succeeded: int
databases_failed: int
total_records_processed: int
total_records_succeeded: int
total_records_failed: int
results: Dict[str, SyncResultResponse]
connection_errors: Dict[str, str] = {}
class DatabaseStatusResponse(BaseModel):
"""Status of a single database"""
database: str
connected: bool
status: Dict[str, Any]
class AllStatusResponse(BaseModel):
"""Status of all databases"""
databases: List[DatabaseStatusResponse]
yaml_file_count: int
data_directory: str
class ProgressResponse(BaseModel):
"""Current sync progress"""
is_syncing: bool
current_database: Optional[str] = None
progress: Optional[Dict[str, Any]] = None
# ============================================================================
# Global State
# ============================================================================
_is_syncing: bool = False
_current_sync_progress: Optional[SyncProgress] = None
_last_sync_result: Optional[SyncAllResponse] = None
# ============================================================================
# Helper Functions
# ============================================================================
def count_yaml_files() -> int:
"""Count YAML files in data directory."""
if not DATA_DIR.exists():
return 0
return len(list(DATA_DIR.glob("*.yaml")))
def progress_callback(progress: SyncProgress):
"""Callback to update global progress state."""
global _current_sync_progress
_current_sync_progress = progress
def check_database_connection(db_name: str) -> tuple[bool, Dict[str, Any]]:
"""Check connection and get status for a database."""
syncer_class = SYNCERS.get(db_name)
if not syncer_class:
return False, {"error": f"Unknown database: {db_name}"}
# Pass data_dir to syncer
syncer = syncer_class(data_dir=DATA_DIR)
try:
connected = syncer.check_connection()
if connected:
status = syncer.get_status()
return True, status
else:
return False, {"error": "Connection failed"}
except Exception as e:
return False, {"error": str(e)}
async def run_sync(
databases: List[str],
limit: Optional[int] = None,
dry_run: bool = False,
) -> SyncAllResponse:
"""Run sync operation for specified databases."""
global _is_syncing, _current_sync_progress, _last_sync_result
_is_syncing = True
_current_sync_progress = SyncProgress()
started_at = datetime.now(timezone.utc)
results: Dict[str, SyncResultResponse] = {}
connection_errors: Dict[str, str] = {}
try:
# Check connections first
valid_databases = []
for db_name in databases:
connected, _ = check_database_connection(db_name)
if connected:
valid_databases.append(db_name)
else:
connection_errors[db_name] = "Connection failed"
# Sync each database
for db_name in valid_databases:
_current_sync_progress.current_database = db_name
syncer_class = SYNCERS.get(db_name)
if not syncer_class:
continue
# Pass data_dir to syncer
syncer = syncer_class(data_dir=DATA_DIR, progress_callback=progress_callback)
try:
# Run sync in thread pool to avoid blocking
result = await asyncio.get_event_loop().run_in_executor(
None,
lambda: syncer.sync(limit=limit, dry_run=dry_run)
)
results[db_name] = SyncResultResponse(
database=db_name,
status=result.status.value,
records_processed=result.records_processed,
records_succeeded=result.records_succeeded,
records_failed=result.records_failed,
duration_seconds=result.duration_seconds,
error_message=result.error_message,
details=result.details,
)
except Exception as e:
results[db_name] = SyncResultResponse(
database=db_name,
status="failed",
records_processed=0,
records_succeeded=0,
records_failed=0,
duration_seconds=0.0,
error_message=str(e),
)
completed_at = datetime.now(timezone.utc)
duration = (completed_at - started_at).total_seconds()
# Calculate totals
total_processed = sum(r.records_processed for r in results.values())
total_succeeded = sum(r.records_succeeded for r in results.values())
total_failed = sum(r.records_failed for r in results.values())
db_succeeded = sum(1 for r in results.values() if r.status == "success")
db_failed = len(databases) - db_succeeded
overall_status = "success"
if connection_errors or any(r.status == "failed" for r in results.values()):
overall_status = "partial" if results else "failed"
response = SyncAllResponse(
overall_status=overall_status,
dry_run=dry_run,
started_at=started_at,
completed_at=completed_at,
duration_seconds=duration,
total_databases=len(databases),
databases_succeeded=db_succeeded,
databases_failed=db_failed,
total_records_processed=total_processed,
total_records_succeeded=total_succeeded,
total_records_failed=total_failed,
results=results,
connection_errors=connection_errors,
)
_last_sync_result = response
return response
finally:
_is_syncing = False
_current_sync_progress = None
# ============================================================================
# FastAPI App
# ============================================================================
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan handler."""
# Startup
print(f"Sync API starting - Data dir: {settings.data_dir}")
print(f"Available databases: {AVAILABLE_DATABASES}")
print(f"YAML files: {count_yaml_files()}")
yield
# Shutdown
print("Sync API shutting down")
app = FastAPI(
title="GLAM Database Sync API",
description="API for syncing heritage custodian data to all databases",
version="1.0.0",
lifespan=lifespan,
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Configure for production
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ============================================================================
# Endpoints
# ============================================================================
@app.get("/", tags=["Health"])
async def root():
"""Root endpoint - health check."""
return {
"service": "GLAM Database Sync API",
"status": "healthy",
"databases": AVAILABLE_DATABASES,
"yaml_files": count_yaml_files(),
}
@app.get("/api/sync/status", response_model=AllStatusResponse, tags=["Status"])
async def get_all_status():
"""Get connection status for all databases."""
statuses = []
for db_name in AVAILABLE_DATABASES:
connected, status = check_database_connection(db_name)
statuses.append(DatabaseStatusResponse(
database=db_name,
connected=connected,
status=status,
))
return AllStatusResponse(
databases=statuses,
yaml_file_count=count_yaml_files(),
data_directory=settings.data_dir,
)
@app.get("/api/sync/status/{database}", response_model=DatabaseStatusResponse, tags=["Status"])
async def get_database_status(database: DatabaseEnum):
"""Get connection status for a specific database."""
connected, status = check_database_connection(database.value)
return DatabaseStatusResponse(
database=database.value,
connected=connected,
status=status,
)
@app.get("/api/sync/progress", response_model=ProgressResponse, tags=["Sync"])
async def get_sync_progress():
"""Get current sync progress (if running)."""
return ProgressResponse(
is_syncing=_is_syncing,
current_database=_current_sync_progress.current_database if _current_sync_progress else None,
progress=_current_sync_progress.to_dict() if _current_sync_progress else None,
)
@app.get("/api/sync/last-result", tags=["Sync"])
async def get_last_result():
"""Get the result of the last sync operation."""
if _last_sync_result is None:
return {"message": "No sync has been run yet"}
return _last_sync_result
@app.post("/api/sync/all", response_model=SyncAllResponse, tags=["Sync"])
async def sync_all_databases(request: SyncRequest, background_tasks: BackgroundTasks):
"""
Sync all databases from YAML source files.
This endpoint triggers a sync operation that reads all custodian YAML files
and updates all configured databases (DuckLake, PostgreSQL, Oxigraph, Qdrant).
"""
if _is_syncing:
raise HTTPException(
status_code=409,
detail="Sync already in progress. Check /api/sync/progress for status."
)
databases = request.databases or AVAILABLE_DATABASES
# Validate database names
invalid = [db for db in databases if db not in AVAILABLE_DATABASES]
if invalid:
raise HTTPException(
status_code=400,
detail=f"Invalid database names: {invalid}. Available: {AVAILABLE_DATABASES}"
)
result = await run_sync(
databases=databases,
limit=request.limit,
dry_run=request.dry_run,
)
return result
@app.post("/api/sync/{database}", response_model=SyncResultResponse, tags=["Sync"])
async def sync_single_database(
database: DatabaseEnum,
limit: Optional[int] = None,
dry_run: bool = False,
):
"""Sync a specific database from YAML source files."""
if _is_syncing:
raise HTTPException(
status_code=409,
detail="Sync already in progress. Check /api/sync/progress for status."
)
result = await run_sync(
databases=[database.value],
limit=limit,
dry_run=dry_run,
)
if database.value in result.results:
return result.results[database.value]
elif database.value in result.connection_errors:
raise HTTPException(
status_code=503,
detail=f"Cannot connect to {database.value}: {result.connection_errors[database.value]}"
)
else:
raise HTTPException(
status_code=500,
detail=f"Unexpected error syncing {database.value}"
)
@app.get("/api/databases", tags=["Info"])
async def list_databases():
"""List all available databases."""
return {
"databases": AVAILABLE_DATABASES,
"count": len(AVAILABLE_DATABASES),
}
# ============================================================================
# Main
# ============================================================================
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"main:app",
host=settings.host,
port=settings.port,
reload=True,
)