#!/usr/bin/env python3 """ Unified Database Sync Orchestrator Syncs all databases from data/custodian/*.yaml (single source of truth). Usage: # Sync all databases python scripts/sync_all_databases.py # Dry run with limit python scripts/sync_all_databases.py --dry-run --limit 100 # Sync specific databases only python scripts/sync_all_databases.py --databases ducklake,postgres # Check status only (no sync) python scripts/sync_all_databases.py --status-only """ import argparse import logging import sys from datetime import datetime from typing import Dict, List, Optional from dataclasses import dataclass, field # Import sync modules from scripts.sync import SyncResult, SyncStatus from scripts.sync.ducklake_sync import DuckLakeSyncer from scripts.sync.postgres_sync import PostgresSyncer from scripts.sync.postgres_person_sync import PostgresPersonSyncer from scripts.sync.oxigraph_sync import OxigraphSyncer from scripts.sync.qdrant_sync import QdrantSyncer from scripts.sync.qdrant_person_sync import QdrantPersonSyncer logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s' ) logger = logging.getLogger(__name__) # Available syncers SYNCERS = { 'ducklake': DuckLakeSyncer, 'postgres': PostgresSyncer, 'postgres_persons': PostgresPersonSyncer, 'oxigraph': OxigraphSyncer, 'qdrant': QdrantSyncer, 'qdrant_persons': QdrantPersonSyncer, } DEFAULT_DATABASES = ['ducklake', 'postgres', 'postgres_persons', 'oxigraph', 'qdrant', 'qdrant_persons'] @dataclass class OrchestratorResult: """Result from orchestrating multiple database syncs.""" start_time: datetime end_time: Optional[datetime] = None results: Dict[str, SyncResult] = field(default_factory=dict) connection_errors: Dict[str, str] = field(default_factory=dict) @property def duration_seconds(self) -> float: if self.end_time: return (self.end_time - self.start_time).total_seconds() return 0.0 @property def all_succeeded(self) -> bool: if self.connection_errors: return False return all(r.status == SyncStatus.SUCCESS for r in self.results.values()) @property def total_processed(self) -> int: return sum(r.records_processed for r in self.results.values()) @property def total_succeeded(self) -> int: return sum(r.records_succeeded for r in self.results.values()) @property def total_failed(self) -> int: return sum(r.records_failed for r in self.results.values()) def print_separator(char: str = '=', width: int = 70): print(char * width) def print_header(title: str): print_separator() print(f" {title}") print_separator() def check_connections(databases: List[str]) -> Dict[str, bool]: """Check connections to all specified databases.""" results = {} for db_name in databases: syncer_class = SYNCERS.get(db_name) if not syncer_class: logger.warning(f"Unknown database: {db_name}") results[db_name] = False continue syncer = syncer_class() try: connected = syncer.check_connection() results[db_name] = connected status = "OK" if connected else "FAILED" logger.info(f"Connection check {db_name}: {status}") except Exception as e: results[db_name] = False logger.error(f"Connection check {db_name}: ERROR - {e}") return results def get_status(databases: List[str]) -> Dict[str, dict]: """Get status from all specified databases.""" statuses = {} for db_name in databases: syncer_class = SYNCERS.get(db_name) if not syncer_class: continue syncer = syncer_class() try: status = syncer.get_status() statuses[db_name] = status except Exception as e: statuses[db_name] = {'error': str(e)} return statuses def sync_all( databases: List[str], limit: Optional[int] = None, dry_run: bool = False, skip_connection_check: bool = False ) -> OrchestratorResult: """ Sync all specified databases from YAML source files. Args: databases: List of database names to sync limit: Optional limit on records to process dry_run: If True, don't actually modify databases skip_connection_check: If True, skip initial connection check Returns: OrchestratorResult with results for each database """ result = OrchestratorResult(start_time=datetime.now()) # Check connections first (unless skipped) if not skip_connection_check: logger.info("Checking database connections...") connections = check_connections(databases) for db_name, connected in connections.items(): if not connected: result.connection_errors[db_name] = "Connection failed" logger.error(f"Skipping {db_name} - connection failed") # Filter to only connected databases databases = [db for db in databases if connections.get(db, False)] if not databases: logger.error("No databases available to sync") result.end_time = datetime.now() return result # Sync each database for db_name in databases: logger.info(f"\n{'='*60}") logger.info(f"Syncing {db_name}...") logger.info(f"{'='*60}") syncer_class = SYNCERS.get(db_name) if not syncer_class: continue syncer = syncer_class() try: sync_result = syncer.sync(limit=limit, dry_run=dry_run) result.results[db_name] = sync_result status_str = "SUCCESS" if sync_result.status == SyncStatus.SUCCESS else "FAILED" logger.info(f"{db_name} sync: {status_str} - {sync_result.records_succeeded}/{sync_result.records_processed} records") except Exception as e: logger.error(f"{db_name} sync failed: {e}") result.results[db_name] = SyncResult( database=db_name, status=SyncStatus.FAILED, error_message=str(e) ) result.end_time = datetime.now() return result def print_status_report(statuses: Dict[str, dict]): """Print status report for all databases.""" print_header("DATABASE STATUS REPORT") for db_name, status in statuses.items(): print(f"\n{db_name.upper()}") print("-" * 40) if 'error' in status: print(f" Error: {status['error']}") continue for key, value in status.items(): print(f" {key}: {value}") print() def print_sync_report(result: OrchestratorResult, dry_run: bool = False): """Print comprehensive sync report.""" mode = "[DRY RUN] " if dry_run else "" print_header(f"{mode}SYNC REPORT") # Connection errors if result.connection_errors: print("\nConnection Errors:") for db_name, error in result.connection_errors.items(): print(f" {db_name}: {error}") # Individual results print("\nDatabase Results:") print("-" * 60) for db_name, sync_result in result.results.items(): status_icon = "OK" if sync_result.status == SyncStatus.SUCCESS else "FAIL" print(f"\n [{status_icon}] {db_name.upper()}") print(f" Status: {sync_result.status.value}") print(f" Processed: {sync_result.records_processed}") print(f" Succeeded: {sync_result.records_succeeded}") print(f" Failed: {sync_result.records_failed}") print(f" Duration: {sync_result.duration_seconds:.2f}s") if sync_result.details: for key, value in sync_result.details.items(): print(f" {key}: {value}") if sync_result.error_message: print(f" Error: {sync_result.error_message}") # Summary print_separator("-") print("\nSUMMARY") print(f" Total Databases: {len(result.results) + len(result.connection_errors)}") print(f" Successful: {sum(1 for r in result.results.values() if r.status == SyncStatus.SUCCESS)}") print(f" Failed: {sum(1 for r in result.results.values() if r.status == SyncStatus.FAILED) + len(result.connection_errors)}") print(f" Total Records Processed: {result.total_processed}") print(f" Total Records Succeeded: {result.total_succeeded}") print(f" Total Records Failed: {result.total_failed}") print(f" Total Duration: {result.duration_seconds:.2f}s") overall_status = "SUCCESS" if result.all_succeeded else "PARTIAL" if result.results else "FAILED" print(f"\n Overall Status: {overall_status}") print_separator() def main(): parser = argparse.ArgumentParser( description='Unified Database Sync Orchestrator', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Sync all databases python scripts/sync_all_databases.py # Dry run with limit python scripts/sync_all_databases.py --dry-run --limit 100 # Sync specific databases python scripts/sync_all_databases.py --databases ducklake,postgres # Check status only python scripts/sync_all_databases.py --status-only Available databases: ducklake, postgres, postgres_persons, oxigraph, qdrant, qdrant_persons """ ) parser.add_argument( '--databases', '-d', type=str, default=','.join(DEFAULT_DATABASES), help=f'Comma-separated list of databases to sync (default: {",".join(DEFAULT_DATABASES)})' ) parser.add_argument( '--limit', '-l', type=int, default=None, help='Limit number of records to process (default: all)' ) parser.add_argument( '--dry-run', action='store_true', help='Preview changes without modifying databases' ) parser.add_argument( '--status-only', action='store_true', help='Only check status, do not sync' ) parser.add_argument( '--skip-connection-check', action='store_true', help='Skip initial connection check' ) parser.add_argument( '--verbose', '-v', action='store_true', help='Enable verbose output' ) args = parser.parse_args() if args.verbose: logging.getLogger().setLevel(logging.DEBUG) # Parse database list databases = [db.strip().lower() for db in args.databases.split(',')] # Validate database names invalid_dbs = [db for db in databases if db not in SYNCERS] if invalid_dbs: logger.error(f"Invalid database names: {invalid_dbs}") logger.error(f"Available: {list(SYNCERS.keys())}") sys.exit(1) print_header("GLAM Database Sync Orchestrator") print(f" Databases: {', '.join(databases)}") print(f" Limit: {args.limit or 'all'}") print(f" Dry Run: {args.dry_run}") print(f" Mode: {'Status Check' if args.status_only else 'Full Sync'}") print() if args.status_only: # Just check status statuses = get_status(databases) print_status_report(statuses) return # Run sync result = sync_all( databases=databases, limit=args.limit, dry_run=args.dry_run, skip_connection_check=args.skip_connection_check ) # Print report print_sync_report(result, dry_run=args.dry_run) # Exit with appropriate code if result.all_succeeded: sys.exit(0) elif result.results: sys.exit(1) # Partial success else: sys.exit(2) # Complete failure if __name__ == '__main__': main()