282 lines
11 KiB
Python
282 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
PostgreSQL Sync Module - Sync custodian YAML files to PostgreSQL.
|
|
|
|
This module syncs all custodian YAML files to PostgreSQL database(s).
|
|
It wraps the existing backend/postgres/load_custodian_data.py functionality.
|
|
|
|
Usage:
|
|
python -m scripts.sync.postgres_sync [--dry-run] [--limit N] [--databases DB1,DB2]
|
|
"""
|
|
|
|
import argparse
|
|
import asyncio
|
|
import os
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
# Add project root to path for imports
|
|
PROJECT_ROOT = Path(__file__).parent.parent.parent
|
|
if str(PROJECT_ROOT) not in sys.path:
|
|
sys.path.insert(0, str(PROJECT_ROOT))
|
|
|
|
# Use try/except for imports that may not resolve in all environments
|
|
try:
|
|
from scripts.sync import BaseSyncer, SyncResult, SyncStatus
|
|
except ImportError:
|
|
# Fallback for when running as module
|
|
from . import BaseSyncer, SyncResult, SyncStatus
|
|
|
|
# Configuration
|
|
DATABASE_URL = os.getenv(
|
|
"DATABASE_URL",
|
|
"postgresql://glam_api:glam_api_password@localhost:5432/glam"
|
|
)
|
|
|
|
# Multi-database configuration
|
|
DATABASES = {
|
|
'glam_heritage': {
|
|
'host': os.getenv("POSTGRES_HOST", "localhost"),
|
|
'port': int(os.getenv("POSTGRES_PORT", "5432")),
|
|
'database': 'glam_heritage',
|
|
'user': os.getenv("POSTGRES_USER", "glam_api"),
|
|
'password': os.getenv("POSTGRES_PASSWORD", "glam_api_password"),
|
|
},
|
|
'glam_geo': {
|
|
'host': os.getenv("GEO_POSTGRES_HOST", os.getenv("POSTGRES_HOST", "localhost")),
|
|
'port': int(os.getenv("GEO_POSTGRES_PORT", os.getenv("POSTGRES_PORT", "5432"))),
|
|
'database': 'glam_geo',
|
|
'user': os.getenv("GEO_POSTGRES_USER", os.getenv("POSTGRES_USER", "glam_api")),
|
|
'password': os.getenv("GEO_POSTGRES_PASSWORD", os.getenv("POSTGRES_PASSWORD", "glam_api_password")),
|
|
},
|
|
}
|
|
|
|
|
|
class PostgresSyncer(BaseSyncer):
|
|
"""Sync custodian YAML files to PostgreSQL."""
|
|
|
|
database_name = "postgres"
|
|
|
|
def __init__(self, databases: list[str] | None = None, **kwargs):
|
|
super().__init__(**kwargs)
|
|
self.databases = databases or ['glam_heritage']
|
|
self._conn = None
|
|
|
|
def check_connection(self) -> bool:
|
|
"""Check if PostgreSQL is available."""
|
|
try:
|
|
import psycopg2
|
|
|
|
for db_name in self.databases:
|
|
if db_name not in DATABASES:
|
|
continue
|
|
config = DATABASES[db_name]
|
|
try:
|
|
conn = psycopg2.connect(
|
|
host=config['host'],
|
|
port=config['port'],
|
|
database=config['database'],
|
|
user=config['user'],
|
|
password=config['password'],
|
|
connect_timeout=5,
|
|
)
|
|
conn.close()
|
|
except Exception as e:
|
|
self.logger.error(f"Cannot connect to {db_name}: {e}")
|
|
return False
|
|
return True
|
|
except ImportError:
|
|
self.logger.error("psycopg2 not installed. Run: pip install psycopg2-binary")
|
|
return False
|
|
except Exception as e:
|
|
self.logger.error(f"Connection check failed: {e}")
|
|
return False
|
|
|
|
def get_status(self) -> dict:
|
|
"""Get PostgreSQL status."""
|
|
try:
|
|
import psycopg2
|
|
|
|
status = {"databases": {}}
|
|
for db_name in self.databases:
|
|
if db_name not in DATABASES:
|
|
status["databases"][db_name] = {"status": "unknown", "error": "Not configured"}
|
|
continue
|
|
|
|
config = DATABASES[db_name]
|
|
try:
|
|
conn = psycopg2.connect(
|
|
host=config['host'],
|
|
port=config['port'],
|
|
database=config['database'],
|
|
user=config['user'],
|
|
password=config['password'],
|
|
connect_timeout=5,
|
|
)
|
|
# Check if custodians table exists and get count
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT COUNT(*) FROM custodians")
|
|
result = cursor.fetchone()
|
|
count = result[0] if result else 0
|
|
cursor.close()
|
|
status["databases"][db_name] = {
|
|
"status": "healthy",
|
|
"custodians_count": count,
|
|
}
|
|
except Exception:
|
|
status["databases"][db_name] = {
|
|
"status": "healthy",
|
|
"custodians_count": 0,
|
|
"note": "Table may not exist",
|
|
}
|
|
conn.close()
|
|
except Exception as e:
|
|
status["databases"][db_name] = {"status": "unavailable", "error": str(e)}
|
|
|
|
return status
|
|
except Exception as e:
|
|
return {"status": "error", "error": str(e)}
|
|
|
|
def sync(self, limit: Optional[int] = None, dry_run: bool = False) -> SyncResult:
|
|
"""Sync all YAML files to PostgreSQL using existing loader."""
|
|
result = SyncResult(
|
|
database="postgres",
|
|
status=SyncStatus.IN_PROGRESS,
|
|
start_time=datetime.now(timezone.utc),
|
|
)
|
|
|
|
# Check connection
|
|
if not dry_run and not self.check_connection():
|
|
result.status = SyncStatus.FAILED
|
|
result.error_message = "Cannot connect to PostgreSQL"
|
|
result.end_time = datetime.now(timezone.utc)
|
|
return result
|
|
|
|
# Load YAML files
|
|
yaml_files = self._list_yaml_files()
|
|
if limit:
|
|
yaml_files = yaml_files[:limit]
|
|
|
|
self.progress.total_files = len(yaml_files)
|
|
self.progress.current_database = "postgres"
|
|
result.records_processed = len(yaml_files)
|
|
|
|
if dry_run:
|
|
self.logger.info(f"[DRY RUN] Would sync {len(yaml_files)} files to PostgreSQL")
|
|
result.status = SyncStatus.SUCCESS
|
|
result.records_succeeded = len(yaml_files)
|
|
result.end_time = datetime.now(timezone.utc)
|
|
result.details["dry_run"] = True
|
|
result.details["databases"] = self.databases
|
|
return result
|
|
|
|
# Use the existing loader
|
|
try:
|
|
# Import the loader module
|
|
sys.path.insert(0, str(PROJECT_ROOT / "backend" / "postgres"))
|
|
from load_custodian_data import load_data_to_database, parse_custodian_file, DATABASES as LOADER_DATABASES
|
|
|
|
async def _sync():
|
|
results = {}
|
|
for db_name in self.databases:
|
|
if db_name not in LOADER_DATABASES:
|
|
self.logger.warning(f"Database {db_name} not configured in loader")
|
|
continue
|
|
|
|
self.progress.current_database = db_name
|
|
self._report_progress()
|
|
|
|
try:
|
|
db_result = await load_data_to_database(
|
|
db_name=db_name,
|
|
db_config=LOADER_DATABASES[db_name],
|
|
yaml_files=yaml_files,
|
|
drop_existing=True, # Replace mode
|
|
)
|
|
results[db_name] = db_result
|
|
except Exception as e:
|
|
self.logger.error(f"Error loading to {db_name}: {e}")
|
|
results[db_name] = {"error": str(e)}
|
|
|
|
return results
|
|
|
|
sync_results = asyncio.run(_sync())
|
|
|
|
# Aggregate results
|
|
total_processed = 0
|
|
total_errors = 0
|
|
for db_name, db_result in sync_results.items():
|
|
if "error" in db_result:
|
|
total_errors += 1
|
|
else:
|
|
total_processed += db_result.get("processed", 0)
|
|
|
|
result.records_succeeded = total_processed // len(self.databases) if self.databases else 0
|
|
result.records_failed = total_errors
|
|
result.details["database_results"] = sync_results
|
|
result.status = SyncStatus.SUCCESS if total_errors == 0 else SyncStatus.PARTIAL
|
|
|
|
except ImportError as e:
|
|
self.logger.error(f"Cannot import loader: {e}")
|
|
result.status = SyncStatus.FAILED
|
|
result.error_message = f"Import error: {e}"
|
|
except Exception as e:
|
|
self.logger.error(f"Sync failed: {e}")
|
|
result.status = SyncStatus.FAILED
|
|
result.error_message = str(e)
|
|
|
|
result.end_time = datetime.now(timezone.utc)
|
|
return result
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Sync custodian YAML files to PostgreSQL")
|
|
parser.add_argument("--dry-run", action="store_true", help="Parse files but don't upload")
|
|
parser.add_argument("--limit", type=int, help="Limit number of files to process")
|
|
parser.add_argument(
|
|
"--databases",
|
|
type=str,
|
|
default="glam_heritage",
|
|
help="Comma-separated list of databases (default: glam_heritage)"
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
import logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(message)s",
|
|
)
|
|
|
|
databases = [db.strip() for db in args.databases.split(",")]
|
|
syncer = PostgresSyncer(databases=databases)
|
|
|
|
print("=" * 60)
|
|
print("PostgreSQL Sync")
|
|
print("=" * 60)
|
|
|
|
if not args.dry_run:
|
|
print(f"Checking connection to databases: {', '.join(databases)}...")
|
|
status = syncer.get_status()
|
|
for db_name, db_status in status.get("databases", {}).items():
|
|
print(f" {db_name}: {db_status.get('status', 'unknown')}")
|
|
if db_status.get("custodians_count"):
|
|
print(f" Current custodians: {db_status['custodians_count']}")
|
|
|
|
result = syncer.sync(limit=args.limit, dry_run=args.dry_run)
|
|
|
|
print("\n" + "=" * 60)
|
|
print(f"Sync Result: {result.status.value.upper()}")
|
|
print(f" Processed: {result.records_processed}")
|
|
print(f" Succeeded: {result.records_succeeded}")
|
|
print(f" Failed: {result.records_failed}")
|
|
print(f" Duration: {result.duration_seconds:.2f}s")
|
|
if result.error_message:
|
|
print(f" Error: {result.error_message}")
|
|
print("=" * 60)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|