#!/usr/bin/env python3 """ PostgreSQL Sync Module - Sync custodian YAML files to PostgreSQL. This module syncs all custodian YAML files to PostgreSQL database(s). It wraps the existing backend/postgres/load_custodian_data.py functionality. Usage: python -m scripts.sync.postgres_sync [--dry-run] [--limit N] [--databases DB1,DB2] """ import argparse import asyncio import os import sys from datetime import datetime, timezone from pathlib import Path from typing import Optional # Add project root to path for imports PROJECT_ROOT = Path(__file__).parent.parent.parent if str(PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(PROJECT_ROOT)) # Use try/except for imports that may not resolve in all environments try: from scripts.sync import BaseSyncer, SyncResult, SyncStatus except ImportError: # Fallback for when running as module from . import BaseSyncer, SyncResult, SyncStatus # Configuration DATABASE_URL = os.getenv( "DATABASE_URL", "postgresql://glam_api:glam_api_password@localhost:5432/glam" ) # Multi-database configuration DATABASES = { 'glam_heritage': { 'host': os.getenv("POSTGRES_HOST", "localhost"), 'port': int(os.getenv("POSTGRES_PORT", "5432")), 'database': 'glam_heritage', 'user': os.getenv("POSTGRES_USER", "glam_api"), 'password': os.getenv("POSTGRES_PASSWORD", "glam_api_password"), }, 'glam_geo': { 'host': os.getenv("GEO_POSTGRES_HOST", os.getenv("POSTGRES_HOST", "localhost")), 'port': int(os.getenv("GEO_POSTGRES_PORT", os.getenv("POSTGRES_PORT", "5432"))), 'database': 'glam_geo', 'user': os.getenv("GEO_POSTGRES_USER", os.getenv("POSTGRES_USER", "glam_api")), 'password': os.getenv("GEO_POSTGRES_PASSWORD", os.getenv("POSTGRES_PASSWORD", "glam_api_password")), }, } class PostgresSyncer(BaseSyncer): """Sync custodian YAML files to PostgreSQL.""" database_name = "postgres" def __init__(self, databases: list[str] | None = None, **kwargs): super().__init__(**kwargs) self.databases = databases or ['glam_heritage'] self._conn = None def check_connection(self) -> bool: """Check if PostgreSQL is available.""" try: import psycopg2 for db_name in self.databases: if db_name not in DATABASES: continue config = DATABASES[db_name] try: conn = psycopg2.connect( host=config['host'], port=config['port'], database=config['database'], user=config['user'], password=config['password'], connect_timeout=5, ) conn.close() except Exception as e: self.logger.error(f"Cannot connect to {db_name}: {e}") return False return True except ImportError: self.logger.error("psycopg2 not installed. Run: pip install psycopg2-binary") return False except Exception as e: self.logger.error(f"Connection check failed: {e}") return False def get_status(self) -> dict: """Get PostgreSQL status.""" try: import psycopg2 status = {"databases": {}} for db_name in self.databases: if db_name not in DATABASES: status["databases"][db_name] = {"status": "unknown", "error": "Not configured"} continue config = DATABASES[db_name] try: conn = psycopg2.connect( host=config['host'], port=config['port'], database=config['database'], user=config['user'], password=config['password'], connect_timeout=5, ) # Check if custodians table exists and get count try: cursor = conn.cursor() cursor.execute("SELECT COUNT(*) FROM custodians") result = cursor.fetchone() count = result[0] if result else 0 cursor.close() status["databases"][db_name] = { "status": "healthy", "custodians_count": count, } except Exception: status["databases"][db_name] = { "status": "healthy", "custodians_count": 0, "note": "Table may not exist", } conn.close() except Exception as e: status["databases"][db_name] = {"status": "unavailable", "error": str(e)} return status except Exception as e: return {"status": "error", "error": str(e)} def sync(self, limit: Optional[int] = None, dry_run: bool = False) -> SyncResult: """Sync all YAML files to PostgreSQL using existing loader.""" result = SyncResult( database="postgres", status=SyncStatus.IN_PROGRESS, start_time=datetime.now(timezone.utc), ) # Check connection if not dry_run and not self.check_connection(): result.status = SyncStatus.FAILED result.error_message = "Cannot connect to PostgreSQL" result.end_time = datetime.now(timezone.utc) return result # Load YAML files yaml_files = self._list_yaml_files() if limit: yaml_files = yaml_files[:limit] self.progress.total_files = len(yaml_files) self.progress.current_database = "postgres" result.records_processed = len(yaml_files) if dry_run: self.logger.info(f"[DRY RUN] Would sync {len(yaml_files)} files to PostgreSQL") result.status = SyncStatus.SUCCESS result.records_succeeded = len(yaml_files) result.end_time = datetime.now(timezone.utc) result.details["dry_run"] = True result.details["databases"] = self.databases return result # Use the existing loader try: # Import the loader module sys.path.insert(0, str(PROJECT_ROOT / "backend" / "postgres")) from load_custodian_data import load_data_to_database, parse_custodian_file, DATABASES as LOADER_DATABASES async def _sync(): results = {} for db_name in self.databases: if db_name not in LOADER_DATABASES: self.logger.warning(f"Database {db_name} not configured in loader") continue self.progress.current_database = db_name self._report_progress() try: db_result = await load_data_to_database( db_name=db_name, db_config=LOADER_DATABASES[db_name], yaml_files=yaml_files, drop_existing=True, # Replace mode ) results[db_name] = db_result except Exception as e: self.logger.error(f"Error loading to {db_name}: {e}") results[db_name] = {"error": str(e)} return results sync_results = asyncio.run(_sync()) # Aggregate results total_processed = 0 total_errors = 0 for db_name, db_result in sync_results.items(): if "error" in db_result: total_errors += 1 else: total_processed += db_result.get("processed", 0) result.records_succeeded = total_processed // len(self.databases) if self.databases else 0 result.records_failed = total_errors result.details["database_results"] = sync_results result.status = SyncStatus.SUCCESS if total_errors == 0 else SyncStatus.PARTIAL except ImportError as e: self.logger.error(f"Cannot import loader: {e}") result.status = SyncStatus.FAILED result.error_message = f"Import error: {e}" except Exception as e: self.logger.error(f"Sync failed: {e}") result.status = SyncStatus.FAILED result.error_message = str(e) result.end_time = datetime.now(timezone.utc) return result def main(): parser = argparse.ArgumentParser(description="Sync custodian YAML files to PostgreSQL") parser.add_argument("--dry-run", action="store_true", help="Parse files but don't upload") parser.add_argument("--limit", type=int, help="Limit number of files to process") parser.add_argument( "--databases", type=str, default="glam_heritage", help="Comma-separated list of databases (default: glam_heritage)" ) args = parser.parse_args() import logging logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", ) databases = [db.strip() for db in args.databases.split(",")] syncer = PostgresSyncer(databases=databases) print("=" * 60) print("PostgreSQL Sync") print("=" * 60) if not args.dry_run: print(f"Checking connection to databases: {', '.join(databases)}...") status = syncer.get_status() for db_name, db_status in status.get("databases", {}).items(): print(f" {db_name}: {db_status.get('status', 'unknown')}") if db_status.get("custodians_count"): print(f" Current custodians: {db_status['custodians_count']}") result = syncer.sync(limit=args.limit, dry_run=args.dry_run) print("\n" + "=" * 60) print(f"Sync Result: {result.status.value.upper()}") print(f" Processed: {result.records_processed}") print(f" Succeeded: {result.records_succeeded}") print(f" Failed: {result.records_failed}") print(f" Duration: {result.duration_seconds:.2f}s") if result.error_message: print(f" Error: {result.error_message}") print("=" * 60) if __name__ == "__main__": main()