497 lines
17 KiB
Python
497 lines
17 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
PostgreSQL Person Sync Module - Sync person/staff JSON files to PostgreSQL.
|
|
|
|
This module syncs all person data (staff lists and entity profiles) to PostgreSQL
|
|
for querying heritage institution personnel.
|
|
|
|
Usage:
|
|
python -m scripts.sync.postgres_person_sync [--dry-run] [--limit N]
|
|
|
|
Data Sources:
|
|
- data/custodian/person/entity/*.json - Individual person profile files
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any, Optional
|
|
|
|
# Add project root to path
|
|
PROJECT_ROOT = Path(__file__).parent.parent.parent
|
|
sys.path.insert(0, str(PROJECT_ROOT))
|
|
|
|
# Use try/except for imports that may not resolve in all environments
|
|
try:
|
|
from scripts.sync import BaseSyncer, SyncResult, SyncStatus
|
|
except ImportError:
|
|
from . import BaseSyncer, SyncResult, SyncStatus
|
|
|
|
# Configuration
|
|
DATABASE_CONFIG = {
|
|
'host': os.getenv("POSTGRES_HOST", "localhost"),
|
|
'port': int(os.getenv("POSTGRES_PORT", "5432")),
|
|
'database': os.getenv("POSTGRES_DB", "glam_heritage"),
|
|
'user': os.getenv("POSTGRES_USER", "glam_api"),
|
|
'password': os.getenv("POSTGRES_PASSWORD", "glam_api_password"),
|
|
}
|
|
|
|
BATCH_SIZE = 100
|
|
|
|
# Person data directory
|
|
PERSON_DATA_DIR = PROJECT_ROOT / "data" / "custodian" / "person" / "entity"
|
|
|
|
# SQL statements
|
|
CREATE_PERSONS_TABLE = """
|
|
CREATE TABLE IF NOT EXISTS persons (
|
|
id SERIAL PRIMARY KEY,
|
|
staff_id VARCHAR(255) UNIQUE NOT NULL,
|
|
name VARCHAR(500) NOT NULL,
|
|
headline TEXT,
|
|
location VARCHAR(500),
|
|
country_code VARCHAR(10),
|
|
custodian_slug VARCHAR(255),
|
|
custodian_name VARCHAR(500),
|
|
linkedin_url VARCHAR(1000),
|
|
profile_image_url TEXT,
|
|
heritage_relevant BOOLEAN DEFAULT TRUE,
|
|
heritage_types JSONB DEFAULT '[]'::jsonb,
|
|
experience JSONB DEFAULT '[]'::jsonb,
|
|
education JSONB DEFAULT '[]'::jsonb,
|
|
skills JSONB DEFAULT '[]'::jsonb,
|
|
languages JSONB DEFAULT '[]'::jsonb,
|
|
about TEXT,
|
|
connections VARCHAR(255),
|
|
extraction_date TIMESTAMP WITH TIME ZONE,
|
|
extraction_method VARCHAR(100),
|
|
source_file VARCHAR(1000),
|
|
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
|
|
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
|
|
);
|
|
|
|
-- Indexes for common queries
|
|
CREATE INDEX IF NOT EXISTS idx_persons_custodian_slug ON persons(custodian_slug);
|
|
CREATE INDEX IF NOT EXISTS idx_persons_heritage_relevant ON persons(heritage_relevant);
|
|
CREATE INDEX IF NOT EXISTS idx_persons_name ON persons(name);
|
|
CREATE INDEX IF NOT EXISTS idx_persons_country_code ON persons(country_code);
|
|
CREATE INDEX IF NOT EXISTS idx_persons_heritage_types ON persons USING GIN (heritage_types);
|
|
|
|
-- Full text search index
|
|
CREATE INDEX IF NOT EXISTS idx_persons_search ON persons USING GIN (
|
|
to_tsvector('english', COALESCE(name, '') || ' ' || COALESCE(headline, '') || ' ' || COALESCE(custodian_name, ''))
|
|
);
|
|
"""
|
|
|
|
UPSERT_PERSON = """
|
|
INSERT INTO persons (
|
|
staff_id, name, headline, location, country_code,
|
|
custodian_slug, custodian_name, linkedin_url, profile_image_url,
|
|
heritage_relevant, heritage_types, experience, education,
|
|
skills, languages, about, connections,
|
|
extraction_date, extraction_method, source_file
|
|
) VALUES (
|
|
%s, %s, %s, %s, %s,
|
|
%s, %s, %s, %s,
|
|
%s, %s, %s, %s,
|
|
%s, %s, %s, %s,
|
|
%s, %s, %s
|
|
)
|
|
ON CONFLICT (staff_id) DO UPDATE SET
|
|
name = EXCLUDED.name,
|
|
headline = EXCLUDED.headline,
|
|
location = EXCLUDED.location,
|
|
country_code = EXCLUDED.country_code,
|
|
custodian_slug = EXCLUDED.custodian_slug,
|
|
custodian_name = EXCLUDED.custodian_name,
|
|
linkedin_url = EXCLUDED.linkedin_url,
|
|
profile_image_url = EXCLUDED.profile_image_url,
|
|
heritage_relevant = EXCLUDED.heritage_relevant,
|
|
heritage_types = EXCLUDED.heritage_types,
|
|
experience = EXCLUDED.experience,
|
|
education = EXCLUDED.education,
|
|
skills = EXCLUDED.skills,
|
|
languages = EXCLUDED.languages,
|
|
about = EXCLUDED.about,
|
|
connections = EXCLUDED.connections,
|
|
extraction_date = EXCLUDED.extraction_date,
|
|
extraction_method = EXCLUDED.extraction_method,
|
|
source_file = EXCLUDED.source_file,
|
|
updated_at = NOW()
|
|
"""
|
|
|
|
|
|
def extract_country_code(location: str) -> Optional[str]:
|
|
"""Extract country code from location string like 'Amsterdam, North Holland, Netherlands (NL)'."""
|
|
if not location:
|
|
return None
|
|
|
|
# Try to find country code in parentheses
|
|
import re
|
|
match = re.search(r'\(([A-Z]{2})\)', location)
|
|
if match:
|
|
return match.group(1)
|
|
|
|
# Common country mappings
|
|
country_map = {
|
|
'netherlands': 'NL',
|
|
'united states': 'US',
|
|
'united kingdom': 'GB',
|
|
'germany': 'DE',
|
|
'france': 'FR',
|
|
'belgium': 'BE',
|
|
'spain': 'ES',
|
|
'italy': 'IT',
|
|
}
|
|
|
|
lower_loc = location.lower()
|
|
for country, code in country_map.items():
|
|
if country in lower_loc:
|
|
return code
|
|
|
|
return None
|
|
|
|
|
|
def parse_person_file(file_path: Path) -> Optional[dict]:
|
|
"""Parse a person JSON file and extract data for database."""
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
except (json.JSONDecodeError, IOError) as e:
|
|
return None
|
|
|
|
extraction_meta = data.get('extraction_metadata', {})
|
|
source_staff = data.get('source_staff_info', {})
|
|
profile_data = data.get('profile_data', {})
|
|
heritage_rel = data.get('heritage_relevance', {})
|
|
|
|
# Get staff_id (required)
|
|
staff_id = extraction_meta.get('staff_id')
|
|
if not staff_id:
|
|
# Try to generate from filename
|
|
staff_id = file_path.stem.split('_')[0]
|
|
|
|
# Get name (required)
|
|
name = profile_data.get('name') or source_staff.get('name')
|
|
if not name:
|
|
return None
|
|
|
|
# Extract location and country code
|
|
location = profile_data.get('location', '')
|
|
country_code = extract_country_code(location)
|
|
|
|
# Parse extraction date
|
|
extraction_date = None
|
|
date_str = extraction_meta.get('extraction_date')
|
|
if date_str:
|
|
try:
|
|
extraction_date = datetime.fromisoformat(date_str.replace('Z', '+00:00'))
|
|
except ValueError:
|
|
extraction_date = None
|
|
|
|
# Build custodian slug from custodian name
|
|
custodian_name = source_staff.get('custodian', '')
|
|
custodian_slug = custodian_name.lower().replace(' ', '-') if custodian_name else None
|
|
|
|
return {
|
|
'staff_id': staff_id,
|
|
'name': name,
|
|
'headline': profile_data.get('headline') or source_staff.get('headline'),
|
|
'location': location,
|
|
'country_code': country_code,
|
|
'custodian_slug': custodian_slug,
|
|
'custodian_name': custodian_name,
|
|
'linkedin_url': profile_data.get('linkedin_url') or extraction_meta.get('linkedin_url'),
|
|
'profile_image_url': profile_data.get('profile_image_url'),
|
|
'heritage_relevant': heritage_rel.get('is_heritage_relevant', True),
|
|
'heritage_types': json.dumps(heritage_rel.get('heritage_types', [])),
|
|
'experience': json.dumps(profile_data.get('experience', [])),
|
|
'education': json.dumps(profile_data.get('education', [])),
|
|
'skills': json.dumps(profile_data.get('skills', [])),
|
|
'languages': json.dumps(profile_data.get('languages', [])),
|
|
'about': profile_data.get('about'),
|
|
'connections': profile_data.get('connections'),
|
|
'extraction_date': extraction_date,
|
|
'extraction_method': extraction_meta.get('extraction_method'),
|
|
'source_file': str(file_path.relative_to(PROJECT_ROOT)),
|
|
}
|
|
|
|
|
|
class PostgresPersonSyncer(BaseSyncer):
|
|
"""Sync person/staff JSON files to PostgreSQL database."""
|
|
|
|
database_name = "postgres_persons"
|
|
|
|
def __init__(
|
|
self,
|
|
db_config: dict | None = None,
|
|
person_dir: Path = PERSON_DATA_DIR,
|
|
batch_size: int = BATCH_SIZE,
|
|
**kwargs
|
|
):
|
|
super().__init__(**kwargs)
|
|
self.db_config = db_config or DATABASE_CONFIG
|
|
self.person_dir = person_dir
|
|
self.batch_size = batch_size
|
|
self._conn = None
|
|
|
|
def _get_connection(self):
|
|
"""Get database connection."""
|
|
if self._conn is None:
|
|
import psycopg2
|
|
self._conn = psycopg2.connect(
|
|
host=self.db_config['host'],
|
|
port=self.db_config['port'],
|
|
database=self.db_config['database'],
|
|
user=self.db_config['user'],
|
|
password=self.db_config['password'],
|
|
)
|
|
return self._conn
|
|
|
|
def check_connection(self) -> bool:
|
|
"""Check if PostgreSQL is available."""
|
|
try:
|
|
import psycopg2
|
|
conn = psycopg2.connect(
|
|
host=self.db_config['host'],
|
|
port=self.db_config['port'],
|
|
database=self.db_config['database'],
|
|
user=self.db_config['user'],
|
|
password=self.db_config['password'],
|
|
connect_timeout=5,
|
|
)
|
|
conn.close()
|
|
return True
|
|
except ImportError:
|
|
self.logger.error("psycopg2 not installed. Run: pip install psycopg2-binary")
|
|
return False
|
|
except Exception as e:
|
|
self.logger.error(f"Connection check failed: {e}")
|
|
return False
|
|
|
|
def get_status(self) -> dict:
|
|
"""Get PostgreSQL persons table status."""
|
|
try:
|
|
import psycopg2
|
|
conn = psycopg2.connect(
|
|
host=self.db_config['host'],
|
|
port=self.db_config['port'],
|
|
database=self.db_config['database'],
|
|
user=self.db_config['user'],
|
|
password=self.db_config['password'],
|
|
connect_timeout=5,
|
|
)
|
|
|
|
cursor = conn.cursor()
|
|
|
|
# Check if table exists and get count
|
|
try:
|
|
cursor.execute("SELECT COUNT(*) FROM persons")
|
|
count = cursor.fetchone()[0]
|
|
|
|
# Get heritage types distribution
|
|
cursor.execute("""
|
|
SELECT heritage_types, COUNT(*)
|
|
FROM persons
|
|
GROUP BY heritage_types
|
|
ORDER BY COUNT(*) DESC
|
|
LIMIT 10
|
|
""")
|
|
type_dist = cursor.fetchall()
|
|
|
|
status = {
|
|
"status": "healthy",
|
|
"persons_count": count,
|
|
"heritage_types": dict(type_dist) if type_dist else {},
|
|
}
|
|
except psycopg2.errors.UndefinedTable:
|
|
status = {
|
|
"status": "healthy",
|
|
"persons_count": 0,
|
|
"note": "Table does not exist yet",
|
|
}
|
|
|
|
cursor.close()
|
|
conn.close()
|
|
return status
|
|
|
|
except Exception as e:
|
|
return {"status": "error", "error": str(e)}
|
|
|
|
def _ensure_table(self, conn):
|
|
"""Ensure the persons table exists."""
|
|
cursor = conn.cursor()
|
|
cursor.execute(CREATE_PERSONS_TABLE)
|
|
conn.commit()
|
|
cursor.close()
|
|
|
|
def _list_person_files(self) -> list[Path]:
|
|
"""List all person JSON files."""
|
|
if not self.person_dir.exists():
|
|
self.logger.error(f"Person directory not found: {self.person_dir}")
|
|
return []
|
|
return sorted(self.person_dir.glob("*.json"))
|
|
|
|
def sync(self, limit: Optional[int] = None, dry_run: bool = False) -> SyncResult:
|
|
"""Sync all person JSON files to PostgreSQL."""
|
|
result = SyncResult(
|
|
database="postgres_persons",
|
|
status=SyncStatus.IN_PROGRESS,
|
|
start_time=datetime.now(timezone.utc),
|
|
)
|
|
|
|
# Check connection
|
|
if not dry_run and not self.check_connection():
|
|
result.status = SyncStatus.FAILED
|
|
result.error_message = "Cannot connect to PostgreSQL"
|
|
result.end_time = datetime.now(timezone.utc)
|
|
return result
|
|
|
|
# Find all person files
|
|
person_files = self._list_person_files()
|
|
if limit:
|
|
person_files = person_files[:limit]
|
|
|
|
self.progress.total_files = len(person_files)
|
|
self.progress.current_database = "postgres_persons"
|
|
result.records_processed = len(person_files)
|
|
|
|
if not person_files:
|
|
self.logger.warning("No person files found to sync")
|
|
result.status = SyncStatus.SUCCESS
|
|
result.end_time = datetime.now(timezone.utc)
|
|
return result
|
|
|
|
self.logger.info(f"Found {len(person_files)} person files to sync")
|
|
|
|
if dry_run:
|
|
self.logger.info(f"[DRY RUN] Would sync {len(person_files)} person files to PostgreSQL")
|
|
result.status = SyncStatus.SUCCESS
|
|
result.records_succeeded = len(person_files)
|
|
result.end_time = datetime.now(timezone.utc)
|
|
result.details["dry_run"] = True
|
|
return result
|
|
|
|
try:
|
|
import psycopg2
|
|
conn = self._get_connection()
|
|
|
|
# Ensure table exists
|
|
self._ensure_table(conn)
|
|
|
|
cursor = conn.cursor()
|
|
succeeded = 0
|
|
failed = 0
|
|
|
|
for i, file_path in enumerate(person_files):
|
|
self.progress.processed_files = i + 1
|
|
self.progress.current_file = file_path.name
|
|
self._report_progress()
|
|
|
|
try:
|
|
person_data = parse_person_file(file_path)
|
|
if not person_data:
|
|
self.logger.warning(f"Could not parse: {file_path.name}")
|
|
failed += 1
|
|
continue
|
|
|
|
# Insert/update person
|
|
cursor.execute(UPSERT_PERSON, (
|
|
person_data['staff_id'],
|
|
person_data['name'],
|
|
person_data['headline'],
|
|
person_data['location'],
|
|
person_data['country_code'],
|
|
person_data['custodian_slug'],
|
|
person_data['custodian_name'],
|
|
person_data['linkedin_url'],
|
|
person_data['profile_image_url'],
|
|
person_data['heritage_relevant'],
|
|
person_data['heritage_types'],
|
|
person_data['experience'],
|
|
person_data['education'],
|
|
person_data['skills'],
|
|
person_data['languages'],
|
|
person_data['about'],
|
|
person_data['connections'],
|
|
person_data['extraction_date'],
|
|
person_data['extraction_method'],
|
|
person_data['source_file'],
|
|
))
|
|
succeeded += 1
|
|
|
|
# Commit in batches
|
|
if (i + 1) % self.batch_size == 0:
|
|
conn.commit()
|
|
self.logger.info(f"Committed batch: {i + 1}/{len(person_files)}")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error processing {file_path.name}: {e}")
|
|
self.progress.errors.append(f"{file_path.name}: {e}")
|
|
failed += 1
|
|
|
|
# Final commit
|
|
conn.commit()
|
|
cursor.close()
|
|
|
|
result.records_succeeded = succeeded
|
|
result.records_failed = failed
|
|
result.status = SyncStatus.SUCCESS if failed == 0 else SyncStatus.PARTIAL
|
|
|
|
self.logger.info(f"Sync complete: {succeeded} succeeded, {failed} failed")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Sync failed: {e}")
|
|
result.status = SyncStatus.FAILED
|
|
result.error_message = str(e)
|
|
finally:
|
|
if self._conn:
|
|
self._conn.close()
|
|
self._conn = None
|
|
|
|
result.end_time = datetime.now(timezone.utc)
|
|
return result
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Sync person JSON files to PostgreSQL")
|
|
parser.add_argument("--dry-run", action="store_true", help="Parse files but don't upload")
|
|
parser.add_argument("--limit", type=int, help="Limit number of files to process")
|
|
args = parser.parse_args()
|
|
|
|
import logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(message)s",
|
|
)
|
|
|
|
syncer = PostgresPersonSyncer()
|
|
|
|
print("=" * 60)
|
|
print("PostgreSQL Person Sync")
|
|
print("=" * 60)
|
|
|
|
if not args.dry_run:
|
|
print("Checking connection...")
|
|
status = syncer.get_status()
|
|
print(f" Status: {status.get('status', 'unknown')}")
|
|
if status.get('persons_count'):
|
|
print(f" Current persons: {status['persons_count']}")
|
|
|
|
result = syncer.sync(limit=args.limit, dry_run=args.dry_run)
|
|
|
|
print("\n" + "=" * 60)
|
|
print(f"Sync Result: {result.status.value.upper()}")
|
|
print(f" Processed: {result.records_processed}")
|
|
print(f" Succeeded: {result.records_succeeded}")
|
|
print(f" Failed: {result.records_failed}")
|
|
print(f" Duration: {result.duration_seconds:.2f}s")
|
|
if result.error_message:
|
|
print(f" Error: {result.error_message}")
|
|
print("=" * 60)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|