glam/scripts/sync/postgres_sync.py
2025-12-14 17:09:55 +01:00

282 lines
11 KiB
Python

#!/usr/bin/env python3
"""
PostgreSQL Sync Module - Sync custodian YAML files to PostgreSQL.
This module syncs all custodian YAML files to PostgreSQL database(s).
It wraps the existing backend/postgres/load_custodian_data.py functionality.
Usage:
python -m scripts.sync.postgres_sync [--dry-run] [--limit N] [--databases DB1,DB2]
"""
import argparse
import asyncio
import os
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
# Add project root to path for imports
PROJECT_ROOT = Path(__file__).parent.parent.parent
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
# Use try/except for imports that may not resolve in all environments
try:
from scripts.sync import BaseSyncer, SyncResult, SyncStatus
except ImportError:
# Fallback for when running as module
from . import BaseSyncer, SyncResult, SyncStatus
# Configuration
DATABASE_URL = os.getenv(
"DATABASE_URL",
"postgresql://glam_api:glam_api_password@localhost:5432/glam"
)
# Multi-database configuration
DATABASES = {
'glam_heritage': {
'host': os.getenv("POSTGRES_HOST", "localhost"),
'port': int(os.getenv("POSTGRES_PORT", "5432")),
'database': 'glam_heritage',
'user': os.getenv("POSTGRES_USER", "glam_api"),
'password': os.getenv("POSTGRES_PASSWORD", "glam_api_password"),
},
'glam_geo': {
'host': os.getenv("GEO_POSTGRES_HOST", os.getenv("POSTGRES_HOST", "localhost")),
'port': int(os.getenv("GEO_POSTGRES_PORT", os.getenv("POSTGRES_PORT", "5432"))),
'database': 'glam_geo',
'user': os.getenv("GEO_POSTGRES_USER", os.getenv("POSTGRES_USER", "glam_api")),
'password': os.getenv("GEO_POSTGRES_PASSWORD", os.getenv("POSTGRES_PASSWORD", "glam_api_password")),
},
}
class PostgresSyncer(BaseSyncer):
"""Sync custodian YAML files to PostgreSQL."""
database_name = "postgres"
def __init__(self, databases: list[str] | None = None, **kwargs):
super().__init__(**kwargs)
self.databases = databases or ['glam_heritage']
self._conn = None
def check_connection(self) -> bool:
"""Check if PostgreSQL is available."""
try:
import psycopg2
for db_name in self.databases:
if db_name not in DATABASES:
continue
config = DATABASES[db_name]
try:
conn = psycopg2.connect(
host=config['host'],
port=config['port'],
database=config['database'],
user=config['user'],
password=config['password'],
connect_timeout=5,
)
conn.close()
except Exception as e:
self.logger.error(f"Cannot connect to {db_name}: {e}")
return False
return True
except ImportError:
self.logger.error("psycopg2 not installed. Run: pip install psycopg2-binary")
return False
except Exception as e:
self.logger.error(f"Connection check failed: {e}")
return False
def get_status(self) -> dict:
"""Get PostgreSQL status."""
try:
import psycopg2
status = {"databases": {}}
for db_name in self.databases:
if db_name not in DATABASES:
status["databases"][db_name] = {"status": "unknown", "error": "Not configured"}
continue
config = DATABASES[db_name]
try:
conn = psycopg2.connect(
host=config['host'],
port=config['port'],
database=config['database'],
user=config['user'],
password=config['password'],
connect_timeout=5,
)
# Check if custodians table exists and get count
try:
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM custodians")
result = cursor.fetchone()
count = result[0] if result else 0
cursor.close()
status["databases"][db_name] = {
"status": "healthy",
"custodians_count": count,
}
except Exception:
status["databases"][db_name] = {
"status": "healthy",
"custodians_count": 0,
"note": "Table may not exist",
}
conn.close()
except Exception as e:
status["databases"][db_name] = {"status": "unavailable", "error": str(e)}
return status
except Exception as e:
return {"status": "error", "error": str(e)}
def sync(self, limit: Optional[int] = None, dry_run: bool = False) -> SyncResult:
"""Sync all YAML files to PostgreSQL using existing loader."""
result = SyncResult(
database="postgres",
status=SyncStatus.IN_PROGRESS,
start_time=datetime.now(timezone.utc),
)
# Check connection
if not dry_run and not self.check_connection():
result.status = SyncStatus.FAILED
result.error_message = "Cannot connect to PostgreSQL"
result.end_time = datetime.now(timezone.utc)
return result
# Load YAML files
yaml_files = self._list_yaml_files()
if limit:
yaml_files = yaml_files[:limit]
self.progress.total_files = len(yaml_files)
self.progress.current_database = "postgres"
result.records_processed = len(yaml_files)
if dry_run:
self.logger.info(f"[DRY RUN] Would sync {len(yaml_files)} files to PostgreSQL")
result.status = SyncStatus.SUCCESS
result.records_succeeded = len(yaml_files)
result.end_time = datetime.now(timezone.utc)
result.details["dry_run"] = True
result.details["databases"] = self.databases
return result
# Use the existing loader
try:
# Import the loader module
sys.path.insert(0, str(PROJECT_ROOT / "backend" / "postgres"))
from load_custodian_data import load_data_to_database, parse_custodian_file, DATABASES as LOADER_DATABASES
async def _sync():
results = {}
for db_name in self.databases:
if db_name not in LOADER_DATABASES:
self.logger.warning(f"Database {db_name} not configured in loader")
continue
self.progress.current_database = db_name
self._report_progress()
try:
db_result = await load_data_to_database(
db_name=db_name,
db_config=LOADER_DATABASES[db_name],
yaml_files=yaml_files,
drop_existing=True, # Replace mode
)
results[db_name] = db_result
except Exception as e:
self.logger.error(f"Error loading to {db_name}: {e}")
results[db_name] = {"error": str(e)}
return results
sync_results = asyncio.run(_sync())
# Aggregate results
total_processed = 0
total_errors = 0
for db_name, db_result in sync_results.items():
if "error" in db_result:
total_errors += 1
else:
total_processed += db_result.get("processed", 0)
result.records_succeeded = total_processed // len(self.databases) if self.databases else 0
result.records_failed = total_errors
result.details["database_results"] = sync_results
result.status = SyncStatus.SUCCESS if total_errors == 0 else SyncStatus.PARTIAL
except ImportError as e:
self.logger.error(f"Cannot import loader: {e}")
result.status = SyncStatus.FAILED
result.error_message = f"Import error: {e}"
except Exception as e:
self.logger.error(f"Sync failed: {e}")
result.status = SyncStatus.FAILED
result.error_message = str(e)
result.end_time = datetime.now(timezone.utc)
return result
def main():
parser = argparse.ArgumentParser(description="Sync custodian YAML files to PostgreSQL")
parser.add_argument("--dry-run", action="store_true", help="Parse files but don't upload")
parser.add_argument("--limit", type=int, help="Limit number of files to process")
parser.add_argument(
"--databases",
type=str,
default="glam_heritage",
help="Comma-separated list of databases (default: glam_heritage)"
)
args = parser.parse_args()
import logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
databases = [db.strip() for db in args.databases.split(",")]
syncer = PostgresSyncer(databases=databases)
print("=" * 60)
print("PostgreSQL Sync")
print("=" * 60)
if not args.dry_run:
print(f"Checking connection to databases: {', '.join(databases)}...")
status = syncer.get_status()
for db_name, db_status in status.get("databases", {}).items():
print(f" {db_name}: {db_status.get('status', 'unknown')}")
if db_status.get("custodians_count"):
print(f" Current custodians: {db_status['custodians_count']}")
result = syncer.sync(limit=args.limit, dry_run=args.dry_run)
print("\n" + "=" * 60)
print(f"Sync Result: {result.status.value.upper()}")
print(f" Processed: {result.records_processed}")
print(f" Succeeded: {result.records_succeeded}")
print(f" Failed: {result.records_failed}")
print(f" Duration: {result.duration_seconds:.2f}s")
if result.error_message:
print(f" Error: {result.error_message}")
print("=" * 60)
if __name__ == "__main__":
main()