glam/scripts/import_persons_to_postgres.py
2025-12-14 17:09:55 +01:00

670 lines
25 KiB
Python

#!/usr/bin/env python3
"""
Import person data from entity and staff files into PostgreSQL persons table.
This script reads:
1. Entity files from data/custodian/person/entity/ (primary - rich profile data)
2. Staff files from data/custodian/person/affiliated/parsed/ (secondary - custodian associations)
And imports them into the glam_geo.persons table.
Usage:
python scripts/import_persons_to_postgres.py [--dry-run] [--remote]
"""
import json
import os
import sys
import argparse
import re
from pathlib import Path
from typing import Dict, List, Any, Optional
import psycopg2
from psycopg2.extras import execute_values
# Configuration
ENTITY_DIR = Path("data/custodian/person/entity")
PARSED_DIR = Path("data/custodian/person/affiliated/parsed")
# For remote execution
REMOTE_ENTITY_DIR = Path("/mnt/data/custodian/person/entity")
REMOTE_PARSED_DIR = Path("/mnt/data/custodian/person/affiliated/parsed")
DB_CONFIG = {
"host": os.getenv("GEO_POSTGRES_HOST", "localhost"),
"port": int(os.getenv("GEO_POSTGRES_PORT", "5432")),
"database": os.getenv("GEO_POSTGRES_DB", "glam_geo"),
"user": os.getenv("GEO_POSTGRES_USER", "glam_api"),
"password": os.getenv("GEO_POSTGRES_PASSWORD", "glam_secret_2025"),
}
# Cached custodian types from database
_custodian_type_cache: Dict[str, str] = {}
_custodian_cache_loaded: bool = False
def load_custodian_types_from_db() -> Dict[str, str]:
"""
Load custodian types from the database custodians table.
This provides AUTHORITATIVE heritage types based on verified institutional data,
not heuristics based on name patterns or headline keywords.
Returns a dict mapping lowercase custodian names to their type code (M, A, L, etc.)
"""
global _custodian_type_cache, _custodian_cache_loaded
if _custodian_cache_loaded:
return _custodian_type_cache
try:
conn = psycopg2.connect(**DB_CONFIG)
cur = conn.cursor()
# Get all custodians with verified types
cur.execute("""
SELECT DISTINCT LOWER(name), type
FROM custodians
WHERE type IS NOT NULL AND name IS NOT NULL
""")
for row in cur.fetchall():
name_lower, type_code = row
if name_lower and type_code:
_custodian_type_cache[name_lower] = type_code.strip()
cur.close()
conn.close()
print(f" Loaded {len(_custodian_type_cache)} custodian types from database")
_custodian_cache_loaded = True
except Exception as e:
print(f" Warning: Could not load custodian types from database: {e}")
print(f" Heritage types will be NULL for persons without verified custodian data")
_custodian_cache_loaded = True # Don't retry
return _custodian_type_cache
def get_custodian_type_from_db(custodian_name: Optional[str]) -> Optional[str]:
"""
Look up the authoritative heritage type for a custodian from the database.
This ensures that people at "Van Gogh Museum" are tagged as 'M' (Museum)
based on verified institutional data, not heuristics.
Returns None if the custodian is not found in the database - we do NOT
guess or infer types from name patterns. No data is better than wrong data.
"""
if not custodian_name:
return None
cache = load_custodian_types_from_db()
# Try exact match (lowercase)
name_lower = custodian_name.lower().strip()
if name_lower in cache:
return cache[name_lower]
# Try partial match for common variations
# e.g., "Van Gogh Museum" should match "Van Gogh Museum Amsterdam"
for db_name, type_code in cache.items():
if name_lower in db_name or db_name in name_lower:
return type_code
# No match found - return None (don't guess!)
return None
def extract_country_from_location(location: Optional[str]) -> Optional[str]:
"""Extract ISO country code from location string."""
if not location:
return None
# Handle dict locations
if isinstance(location, dict):
location = f"{location.get('city', '')} {location.get('region', '')} {location.get('country', '')}"
if not isinstance(location, str):
return None
location_lower = location.lower()
# Country mappings
country_patterns = {
"NL": ["netherlands", "nederland", "amsterdam", "rotterdam", "den haag", "utrecht",
"noord-holland", "zuid-holland", "gelderland", "brabant", "limburg",
"overijssel", "friesland", "groningen", "drenthe", "zeeland", "flevoland"],
"BE": ["belgium", "belgië", "belgique", "brussels", "bruxelles", "flanders",
"wallonia", "antwerp", "ghent", "bruges"],
"ID": ["indonesia", "jakarta", "bandung", "surabaya", "aceh", "java", "sumatra"],
"US": ["united states", "usa", "california", "new york", "washington", "texas"],
"GB": ["united kingdom", "uk", "england", "london", "scotland", "wales"],
"DE": ["germany", "deutschland", "berlin", "munich", "frankfurt"],
"FR": ["france", "paris", "lyon", "marseille"],
"IL": ["israel", "tel aviv", "jerusalem"],
"MA": ["morocco", "rabat", "casablanca"],
"PH": ["philippines", "manila"],
"IN": ["india", " india", "mumbai", "delhi", "bangalore"],
"AU": ["australia", "sydney", "melbourne"],
"CA": ["canada", "toronto", "vancouver"],
"JP": ["japan", "tokyo", "osaka"],
"SG": ["singapore"],
"HK": ["hong kong"],
"TW": ["taiwan", "taipei"],
"KR": ["korea", "seoul"],
"CN": ["china", "beijing", "shanghai"],
"BR": ["brazil", "são paulo", "rio"],
"MX": ["mexico", "mexico city"],
"ZA": ["south africa", "cape town", "johannesburg"],
"PS": ["palestine", "gaza", "west bank"],
}
for code, patterns in country_patterns.items():
for pattern in patterns:
if pattern in location_lower:
return code
# Check for (XX) country code pattern
match = re.search(r'\(([A-Z]{2})\)', location)
if match:
return match.group(1)
return None
def load_entity_files(entity_dir: Path) -> Dict[str, Dict[str, Any]]:
"""Load all entity JSON files, keyed by LinkedIn slug."""
entities = {}
for json_file in entity_dir.glob("*.json"):
# Skip non-entity files
if json_file.name.startswith(("extraction_log", "batch_", ".")):
continue
try:
with open(json_file, "r", encoding="utf-8") as f:
data = json.load(f)
# Extract LinkedIn slug from filename or data
linkedin_url = data.get("extraction_metadata", {}).get("linkedin_url", "")
if not linkedin_url:
linkedin_url = data.get("profile_data", {}).get("linkedin_url", "")
if linkedin_url:
# Extract slug from URL
slug = linkedin_url.rstrip("/").split("/")[-1]
if slug and slug != "in":
entities[slug] = {
"file": json_file.name,
"data": data
}
except (json.JSONDecodeError, Exception) as e:
print(f" Warning: Error loading {json_file.name}: {e}")
continue
return entities
def load_staff_files(parsed_dir: Path) -> List[Dict[str, Any]]:
"""Load all parsed staff JSON files."""
all_staff = []
for json_file in parsed_dir.glob("*_staff_*.json"):
# Skip batch result files and analysis files
if any(skip in json_file.name for skip in ["batch_results", "cross_custodian", "missing_entity"]):
continue
try:
with open(json_file, "r", encoding="utf-8") as f:
data = json.load(f)
custodian_meta = data.get("custodian_metadata", {})
custodian_name = custodian_meta.get("custodian_name", "")
custodian_slug = custodian_meta.get("custodian_slug", "")
location = custodian_meta.get("location", {})
city = location.get("city", "")
region = location.get("region", "")
for staff in data.get("staff", []):
# Skip organization entries (e.g., company itself listed as staff)
if staff.get("name") == custodian_name:
continue
# Skip anonymous names
if staff.get("name_type") == "anonymous":
continue
# Get LinkedIn slug
linkedin_url = staff.get("linkedin_profile_url", "")
linkedin_slug = staff.get("linkedin_slug", "")
if not linkedin_slug and linkedin_url:
linkedin_slug = linkedin_url.rstrip("/").split("/")[-1]
person = {
"staff_id": staff.get("staff_id"),
"name": staff.get("name"),
"headline": staff.get("headline", ""),
"location": f"{city}, {region}" if city and region else (city or region or None),
"custodian_slug": custodian_slug,
"custodian_name": custodian_name,
"linkedin_url": linkedin_url,
"linkedin_slug": linkedin_slug,
"heritage_relevant": staff.get("heritage_relevant", False),
"heritage_type": staff.get("heritage_type"),
}
if person["staff_id"]:
all_staff.append(person)
except (json.JSONDecodeError, Exception) as e:
print(f" Warning: Error loading {json_file.name}: {e}")
continue
return all_staff
def merge_data(entities: Dict[str, Dict], staff_list: List[Dict]) -> List[Dict]:
"""Merge entity data with staff data."""
persons = {}
# First, index staff by LinkedIn slug
staff_by_slug = {}
for staff in staff_list:
slug = staff.get("linkedin_slug")
if slug:
if slug not in staff_by_slug:
staff_by_slug[slug] = []
staff_by_slug[slug].append(staff)
# Process entities first (they have richer data)
for slug, entity_info in entities.items():
data = entity_info["data"]
meta = data.get("extraction_metadata", {})
profile = data.get("profile_data", {})
# Get name
name = profile.get("name") or profile.get("full_name")
if not name:
continue
# Get location
location = profile.get("location")
# Handle dict locations
if isinstance(location, dict):
location = f"{location.get('city', '')} {location.get('region', '')} {location.get('country', '')}".strip()
if not location:
# Try to build from city/region
city = profile.get("city", "")
region = profile.get("region", "")
if isinstance(city, dict):
city = city.get("name", "")
if isinstance(region, dict):
region = region.get("name", "")
location = f"{city}, {region}" if city and region else (city or region or None)
# Get country code
country_code = extract_country_from_location(location)
# Get headline
headline = profile.get("headline") or profile.get("current_position")
if isinstance(headline, dict):
headline = headline.get("title", "")
# Get staff associations from staff_by_slug
staff_entries = staff_by_slug.get(slug, [])
custodian_slug = None
custodian_name = None
heritage_relevant = False
heritage_types = []
staff_id = meta.get("staff_id")
if staff_entries:
# Use first staff entry for custodian info
first_staff = staff_entries[0]
custodian_slug = first_staff.get("custodian_slug")
custodian_name = first_staff.get("custodian_name")
staff_id = staff_id or first_staff.get("staff_id")
# Aggregate heritage info from all entries
for se in staff_entries:
if se.get("heritage_relevant"):
heritage_relevant = True
ht = se.get("heritage_type")
if ht and ht not in heritage_types:
heritage_types.append(ht)
# Look up authoritative heritage_type from custodians database table
# This ensures Van Gogh Museum staff are tagged as 'M' not 'E' or 'A'
# We do NOT use heuristics - only verified institutional data
custodian_type = get_custodian_type_from_db(custodian_name)
if custodian_type:
heritage_relevant = True
# Replace first heritage_type with custodian-inferred type, or insert at front
if heritage_types and heritage_types[0] != custodian_type:
# Only keep the custodian type - headline-based types were often wrong
heritage_types = [custodian_type]
elif not heritage_types:
heritage_types = [custodian_type]
# Generate staff_id if missing
if not staff_id:
name_slug = re.sub(r'[^a-z0-9]+', '_', name.lower()).strip('_')
staff_id = f"entity_{name_slug}_{slug}"
# Get profile image
profile_image_url = None
if "exa_raw_response" in data:
results = data["exa_raw_response"].get("results", [])
if results:
profile_image_url = results[0].get("image")
if not profile_image_url:
profile_image_url = profile.get("profile_image_url")
# Ensure location is a string before truncating
location_str = str(location)[:200] if location and isinstance(location, str) else (str(location)[:200] if location else None)
# Extract rich profile data from entity files
about = profile.get("about")
if about and isinstance(about, str):
about = about[:5000] # Truncate very long about sections
experience = profile.get("experience")
if experience and isinstance(experience, list):
# Ensure it's JSON-serializable
experience = json.dumps(experience)
else:
experience = None
education = profile.get("education")
if education and isinstance(education, list):
education = json.dumps(education)
else:
education = None
skills = profile.get("skills")
if skills and isinstance(skills, list):
# Ensure all items are strings
skills = [str(s) for s in skills if s]
else:
skills = None
languages = profile.get("languages")
if languages and isinstance(languages, list):
# Languages can be strings or dicts - convert dicts to JSON strings
processed_languages = []
for l in languages:
if l:
if isinstance(l, dict):
processed_languages.append(json.dumps(l))
else:
processed_languages.append(str(l))
languages = processed_languages if processed_languages else None
else:
languages = None
connections = profile.get("connections")
if connections and isinstance(connections, str):
connections = connections[:200]
elif connections:
connections = str(connections)[:200]
else:
connections = None
# Extraction metadata
extraction_date = meta.get("extraction_date")
extraction_method = meta.get("extraction_method")
source_file = entity_info.get("file") or meta.get("source_file")
persons[staff_id] = {
"staff_id": staff_id,
"name": name,
"headline": headline[:500] if headline and isinstance(headline, str) else None,
"location": location_str,
"country_code": country_code,
"custodian_slug": custodian_slug,
"custodian_name": custodian_name,
"linkedin_url": meta.get("linkedin_url") or profile.get("linkedin_url"),
"profile_image_url": profile_image_url,
"heritage_relevant": heritage_relevant,
"heritage_types": heritage_types if heritage_types else None,
# New rich profile fields
"about": about,
"experience": experience,
"education": education,
"skills": skills,
"languages": languages,
"connections": connections,
"extraction_date": extraction_date,
"extraction_method": extraction_method,
"source_file": source_file,
}
# Then add staff entries that don't have entity files
for staff in staff_list:
slug = staff.get("linkedin_slug")
staff_id = staff.get("staff_id")
# Skip if we already have this person from entity
if slug and slug in entities:
continue
# Skip if we already have this staff_id
if staff_id in persons:
continue
if not staff_id:
continue
# Get country code
country_code = extract_country_from_location(staff.get("location"))
heritage_types = []
if staff.get("heritage_type"):
heritage_types.append(staff["heritage_type"])
persons[staff_id] = {
"staff_id": staff_id,
"name": staff.get("name"),
"headline": staff.get("headline", "")[:500] if staff.get("headline") else None,
"location": staff.get("location"),
"country_code": country_code,
"custodian_slug": staff.get("custodian_slug"),
"custodian_name": staff.get("custodian_name"),
"linkedin_url": staff.get("linkedin_url"),
"profile_image_url": None,
"heritage_relevant": staff.get("heritage_relevant", False),
"heritage_types": heritage_types if heritage_types else None,
# Staff-only entries don't have rich profile data
"about": None,
"experience": None,
"education": None,
"skills": None,
"languages": None,
"connections": None,
"extraction_date": None,
"extraction_method": None,
"source_file": None,
}
return list(persons.values())
def import_to_postgres(persons: List[Dict], dry_run: bool = False) -> int:
"""Import person data into PostgreSQL."""
if dry_run:
print(f"\n[DRY RUN] Would import {len(persons)} persons")
# Show sample
for person in persons[:10]:
img = "📸" if person.get("profile_image_url") else " "
hr = "🏛️" if person.get("heritage_relevant") else " "
print(f" {img}{hr} {person['name'][:40]:<40} | {(person.get('custodian_name') or 'No custodian')[:35]}")
return len(persons)
# Connect to database
conn = psycopg2.connect(**DB_CONFIG)
cur = conn.cursor()
try:
# Clear existing data
cur.execute("TRUNCATE TABLE persons")
# Prepare data for batch insert
columns = [
"staff_id", "name", "headline", "location", "country_code",
"custodian_slug", "custodian_name", "linkedin_url",
"profile_image_url", "heritage_relevant", "heritage_types",
# New rich profile columns
"about", "experience", "education", "skills", "languages",
"connections", "extraction_date", "extraction_method", "source_file"
]
values = [
(
p["staff_id"],
p["name"],
p["headline"],
p["location"],
p["country_code"],
p["custodian_slug"],
p["custodian_name"],
p["linkedin_url"],
p["profile_image_url"],
p["heritage_relevant"],
p["heritage_types"],
# New rich profile values
p.get("about"),
p.get("experience"),
p.get("education"),
p.get("skills"),
p.get("languages"),
p.get("connections"),
p.get("extraction_date"),
p.get("extraction_method"),
p.get("source_file"),
)
for p in persons
]
# Batch insert
insert_query = f"""
INSERT INTO persons ({', '.join(columns)})
VALUES %s
ON CONFLICT (staff_id) DO UPDATE SET
name = EXCLUDED.name,
headline = EXCLUDED.headline,
location = EXCLUDED.location,
country_code = EXCLUDED.country_code,
custodian_slug = EXCLUDED.custodian_slug,
custodian_name = EXCLUDED.custodian_name,
linkedin_url = EXCLUDED.linkedin_url,
profile_image_url = EXCLUDED.profile_image_url,
heritage_relevant = EXCLUDED.heritage_relevant,
heritage_types = EXCLUDED.heritage_types,
about = EXCLUDED.about,
experience = EXCLUDED.experience,
education = EXCLUDED.education,
skills = EXCLUDED.skills,
languages = EXCLUDED.languages,
connections = EXCLUDED.connections,
extraction_date = EXCLUDED.extraction_date,
extraction_method = EXCLUDED.extraction_method,
source_file = EXCLUDED.source_file,
updated_at = CURRENT_TIMESTAMP
"""
execute_values(cur, insert_query, values, page_size=1000)
conn.commit()
# Get final counts
cur.execute("SELECT COUNT(*) FROM persons")
total = cur.fetchone()[0]
cur.execute("SELECT COUNT(*) FROM persons WHERE heritage_relevant = true")
heritage_relevant = cur.fetchone()[0]
cur.execute("SELECT COUNT(*) FROM persons WHERE profile_image_url IS NOT NULL")
with_image = cur.fetchone()[0]
cur.execute("SELECT COUNT(*) FROM persons WHERE about IS NOT NULL")
with_about = cur.fetchone()[0]
cur.execute("SELECT COUNT(*) FROM persons WHERE experience IS NOT NULL")
with_experience = cur.fetchone()[0]
cur.execute("SELECT COUNT(*) FROM persons WHERE education IS NOT NULL")
with_education = cur.fetchone()[0]
cur.execute("SELECT COUNT(*) FROM persons WHERE skills IS NOT NULL")
with_skills = cur.fetchone()[0]
print(f"\n✓ Imported {total} persons")
print(f" - Heritage-relevant: {heritage_relevant}")
print(f" - With profile image: {with_image}")
print(f" - With about section: {with_about}")
print(f" - With experience: {with_experience}")
print(f" - With education: {with_education}")
print(f" - With skills: {with_skills}")
return total
except Exception as e:
conn.rollback()
print(f"Error importing data: {e}")
raise
finally:
cur.close()
conn.close()
def main():
parser = argparse.ArgumentParser(description="Import persons to PostgreSQL")
parser.add_argument("--dry-run", action="store_true", help="Don't actually import")
parser.add_argument("--remote", action="store_true", help="Use remote paths (for server execution)")
args = parser.parse_args()
# Set paths
entity_dir = REMOTE_ENTITY_DIR if args.remote else ENTITY_DIR
parsed_dir = REMOTE_PARSED_DIR if args.remote else PARSED_DIR
print(f"Loading entity files from {entity_dir}...")
entities = load_entity_files(entity_dir)
print(f" Loaded {len(entities)} entity profiles")
print(f"\nLoading staff files from {parsed_dir}...")
staff = load_staff_files(parsed_dir)
print(f" Loaded {len(staff)} staff entries")
print("\nMerging data...")
persons = merge_data(entities, staff)
print(f" Total unique persons: {len(persons)}")
# Stats
heritage_relevant = sum(1 for p in persons if p["heritage_relevant"])
with_linkedin = sum(1 for p in persons if p["linkedin_url"])
with_image = sum(1 for p in persons if p["profile_image_url"])
with_custodian = sum(1 for p in persons if p["custodian_name"])
by_country = {}
for p in persons:
cc = p["country_code"] or "Unknown"
by_country[cc] = by_country.get(cc, 0) + 1
print(f"\nStats:")
print(f" Heritage-relevant: {heritage_relevant}")
print(f" With LinkedIn URL: {with_linkedin}")
print(f" With profile image: {with_image}")
print(f" With custodian link: {with_custodian}")
print(f" By country (top 10):")
for cc, count in sorted(by_country.items(), key=lambda x: -x[1])[:10]:
print(f" {cc}: {count}")
import_to_postgres(persons, dry_run=args.dry_run)
if __name__ == "__main__":
main()