670 lines
25 KiB
Python
670 lines
25 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Import person data from entity and staff files into PostgreSQL persons table.
|
|
|
|
This script reads:
|
|
1. Entity files from data/custodian/person/entity/ (primary - rich profile data)
|
|
2. Staff files from data/custodian/person/affiliated/parsed/ (secondary - custodian associations)
|
|
|
|
And imports them into the glam_geo.persons table.
|
|
|
|
Usage:
|
|
python scripts/import_persons_to_postgres.py [--dry-run] [--remote]
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
import argparse
|
|
import re
|
|
from pathlib import Path
|
|
from typing import Dict, List, Any, Optional
|
|
import psycopg2
|
|
from psycopg2.extras import execute_values
|
|
|
|
|
|
# Configuration
|
|
ENTITY_DIR = Path("data/custodian/person/entity")
|
|
PARSED_DIR = Path("data/custodian/person/affiliated/parsed")
|
|
|
|
# For remote execution
|
|
REMOTE_ENTITY_DIR = Path("/mnt/data/custodian/person/entity")
|
|
REMOTE_PARSED_DIR = Path("/mnt/data/custodian/person/affiliated/parsed")
|
|
|
|
DB_CONFIG = {
|
|
"host": os.getenv("GEO_POSTGRES_HOST", "localhost"),
|
|
"port": int(os.getenv("GEO_POSTGRES_PORT", "5432")),
|
|
"database": os.getenv("GEO_POSTGRES_DB", "glam_geo"),
|
|
"user": os.getenv("GEO_POSTGRES_USER", "glam_api"),
|
|
"password": os.getenv("GEO_POSTGRES_PASSWORD", "glam_secret_2025"),
|
|
}
|
|
|
|
|
|
# Cached custodian types from database
|
|
_custodian_type_cache: Dict[str, str] = {}
|
|
_custodian_cache_loaded: bool = False
|
|
|
|
|
|
def load_custodian_types_from_db() -> Dict[str, str]:
|
|
"""
|
|
Load custodian types from the database custodians table.
|
|
|
|
This provides AUTHORITATIVE heritage types based on verified institutional data,
|
|
not heuristics based on name patterns or headline keywords.
|
|
|
|
Returns a dict mapping lowercase custodian names to their type code (M, A, L, etc.)
|
|
"""
|
|
global _custodian_type_cache, _custodian_cache_loaded
|
|
|
|
if _custodian_cache_loaded:
|
|
return _custodian_type_cache
|
|
|
|
try:
|
|
conn = psycopg2.connect(**DB_CONFIG)
|
|
cur = conn.cursor()
|
|
|
|
# Get all custodians with verified types
|
|
cur.execute("""
|
|
SELECT DISTINCT LOWER(name), type
|
|
FROM custodians
|
|
WHERE type IS NOT NULL AND name IS NOT NULL
|
|
""")
|
|
|
|
for row in cur.fetchall():
|
|
name_lower, type_code = row
|
|
if name_lower and type_code:
|
|
_custodian_type_cache[name_lower] = type_code.strip()
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
print(f" Loaded {len(_custodian_type_cache)} custodian types from database")
|
|
_custodian_cache_loaded = True
|
|
|
|
except Exception as e:
|
|
print(f" Warning: Could not load custodian types from database: {e}")
|
|
print(f" Heritage types will be NULL for persons without verified custodian data")
|
|
_custodian_cache_loaded = True # Don't retry
|
|
|
|
return _custodian_type_cache
|
|
|
|
|
|
def get_custodian_type_from_db(custodian_name: Optional[str]) -> Optional[str]:
|
|
"""
|
|
Look up the authoritative heritage type for a custodian from the database.
|
|
|
|
This ensures that people at "Van Gogh Museum" are tagged as 'M' (Museum)
|
|
based on verified institutional data, not heuristics.
|
|
|
|
Returns None if the custodian is not found in the database - we do NOT
|
|
guess or infer types from name patterns. No data is better than wrong data.
|
|
"""
|
|
if not custodian_name:
|
|
return None
|
|
|
|
cache = load_custodian_types_from_db()
|
|
|
|
# Try exact match (lowercase)
|
|
name_lower = custodian_name.lower().strip()
|
|
if name_lower in cache:
|
|
return cache[name_lower]
|
|
|
|
# Try partial match for common variations
|
|
# e.g., "Van Gogh Museum" should match "Van Gogh Museum Amsterdam"
|
|
for db_name, type_code in cache.items():
|
|
if name_lower in db_name or db_name in name_lower:
|
|
return type_code
|
|
|
|
# No match found - return None (don't guess!)
|
|
return None
|
|
|
|
|
|
def extract_country_from_location(location: Optional[str]) -> Optional[str]:
|
|
"""Extract ISO country code from location string."""
|
|
if not location:
|
|
return None
|
|
|
|
# Handle dict locations
|
|
if isinstance(location, dict):
|
|
location = f"{location.get('city', '')} {location.get('region', '')} {location.get('country', '')}"
|
|
|
|
if not isinstance(location, str):
|
|
return None
|
|
|
|
location_lower = location.lower()
|
|
|
|
# Country mappings
|
|
country_patterns = {
|
|
"NL": ["netherlands", "nederland", "amsterdam", "rotterdam", "den haag", "utrecht",
|
|
"noord-holland", "zuid-holland", "gelderland", "brabant", "limburg",
|
|
"overijssel", "friesland", "groningen", "drenthe", "zeeland", "flevoland"],
|
|
"BE": ["belgium", "belgië", "belgique", "brussels", "bruxelles", "flanders",
|
|
"wallonia", "antwerp", "ghent", "bruges"],
|
|
"ID": ["indonesia", "jakarta", "bandung", "surabaya", "aceh", "java", "sumatra"],
|
|
"US": ["united states", "usa", "california", "new york", "washington", "texas"],
|
|
"GB": ["united kingdom", "uk", "england", "london", "scotland", "wales"],
|
|
"DE": ["germany", "deutschland", "berlin", "munich", "frankfurt"],
|
|
"FR": ["france", "paris", "lyon", "marseille"],
|
|
"IL": ["israel", "tel aviv", "jerusalem"],
|
|
"MA": ["morocco", "rabat", "casablanca"],
|
|
"PH": ["philippines", "manila"],
|
|
"IN": ["india", " india", "mumbai", "delhi", "bangalore"],
|
|
"AU": ["australia", "sydney", "melbourne"],
|
|
"CA": ["canada", "toronto", "vancouver"],
|
|
"JP": ["japan", "tokyo", "osaka"],
|
|
"SG": ["singapore"],
|
|
"HK": ["hong kong"],
|
|
"TW": ["taiwan", "taipei"],
|
|
"KR": ["korea", "seoul"],
|
|
"CN": ["china", "beijing", "shanghai"],
|
|
"BR": ["brazil", "são paulo", "rio"],
|
|
"MX": ["mexico", "mexico city"],
|
|
"ZA": ["south africa", "cape town", "johannesburg"],
|
|
"PS": ["palestine", "gaza", "west bank"],
|
|
}
|
|
|
|
for code, patterns in country_patterns.items():
|
|
for pattern in patterns:
|
|
if pattern in location_lower:
|
|
return code
|
|
|
|
# Check for (XX) country code pattern
|
|
match = re.search(r'\(([A-Z]{2})\)', location)
|
|
if match:
|
|
return match.group(1)
|
|
|
|
return None
|
|
|
|
|
|
def load_entity_files(entity_dir: Path) -> Dict[str, Dict[str, Any]]:
|
|
"""Load all entity JSON files, keyed by LinkedIn slug."""
|
|
entities = {}
|
|
|
|
for json_file in entity_dir.glob("*.json"):
|
|
# Skip non-entity files
|
|
if json_file.name.startswith(("extraction_log", "batch_", ".")):
|
|
continue
|
|
|
|
try:
|
|
with open(json_file, "r", encoding="utf-8") as f:
|
|
data = json.load(f)
|
|
|
|
# Extract LinkedIn slug from filename or data
|
|
linkedin_url = data.get("extraction_metadata", {}).get("linkedin_url", "")
|
|
if not linkedin_url:
|
|
linkedin_url = data.get("profile_data", {}).get("linkedin_url", "")
|
|
|
|
if linkedin_url:
|
|
# Extract slug from URL
|
|
slug = linkedin_url.rstrip("/").split("/")[-1]
|
|
if slug and slug != "in":
|
|
entities[slug] = {
|
|
"file": json_file.name,
|
|
"data": data
|
|
}
|
|
except (json.JSONDecodeError, Exception) as e:
|
|
print(f" Warning: Error loading {json_file.name}: {e}")
|
|
continue
|
|
|
|
return entities
|
|
|
|
|
|
def load_staff_files(parsed_dir: Path) -> List[Dict[str, Any]]:
|
|
"""Load all parsed staff JSON files."""
|
|
all_staff = []
|
|
|
|
for json_file in parsed_dir.glob("*_staff_*.json"):
|
|
# Skip batch result files and analysis files
|
|
if any(skip in json_file.name for skip in ["batch_results", "cross_custodian", "missing_entity"]):
|
|
continue
|
|
|
|
try:
|
|
with open(json_file, "r", encoding="utf-8") as f:
|
|
data = json.load(f)
|
|
|
|
custodian_meta = data.get("custodian_metadata", {})
|
|
custodian_name = custodian_meta.get("custodian_name", "")
|
|
custodian_slug = custodian_meta.get("custodian_slug", "")
|
|
location = custodian_meta.get("location", {})
|
|
city = location.get("city", "")
|
|
region = location.get("region", "")
|
|
|
|
for staff in data.get("staff", []):
|
|
# Skip organization entries (e.g., company itself listed as staff)
|
|
if staff.get("name") == custodian_name:
|
|
continue
|
|
|
|
# Skip anonymous names
|
|
if staff.get("name_type") == "anonymous":
|
|
continue
|
|
|
|
# Get LinkedIn slug
|
|
linkedin_url = staff.get("linkedin_profile_url", "")
|
|
linkedin_slug = staff.get("linkedin_slug", "")
|
|
if not linkedin_slug and linkedin_url:
|
|
linkedin_slug = linkedin_url.rstrip("/").split("/")[-1]
|
|
|
|
person = {
|
|
"staff_id": staff.get("staff_id"),
|
|
"name": staff.get("name"),
|
|
"headline": staff.get("headline", ""),
|
|
"location": f"{city}, {region}" if city and region else (city or region or None),
|
|
"custodian_slug": custodian_slug,
|
|
"custodian_name": custodian_name,
|
|
"linkedin_url": linkedin_url,
|
|
"linkedin_slug": linkedin_slug,
|
|
"heritage_relevant": staff.get("heritage_relevant", False),
|
|
"heritage_type": staff.get("heritage_type"),
|
|
}
|
|
|
|
if person["staff_id"]:
|
|
all_staff.append(person)
|
|
|
|
except (json.JSONDecodeError, Exception) as e:
|
|
print(f" Warning: Error loading {json_file.name}: {e}")
|
|
continue
|
|
|
|
return all_staff
|
|
|
|
|
|
def merge_data(entities: Dict[str, Dict], staff_list: List[Dict]) -> List[Dict]:
|
|
"""Merge entity data with staff data."""
|
|
persons = {}
|
|
|
|
# First, index staff by LinkedIn slug
|
|
staff_by_slug = {}
|
|
for staff in staff_list:
|
|
slug = staff.get("linkedin_slug")
|
|
if slug:
|
|
if slug not in staff_by_slug:
|
|
staff_by_slug[slug] = []
|
|
staff_by_slug[slug].append(staff)
|
|
|
|
# Process entities first (they have richer data)
|
|
for slug, entity_info in entities.items():
|
|
data = entity_info["data"]
|
|
meta = data.get("extraction_metadata", {})
|
|
profile = data.get("profile_data", {})
|
|
|
|
# Get name
|
|
name = profile.get("name") or profile.get("full_name")
|
|
if not name:
|
|
continue
|
|
|
|
# Get location
|
|
location = profile.get("location")
|
|
# Handle dict locations
|
|
if isinstance(location, dict):
|
|
location = f"{location.get('city', '')} {location.get('region', '')} {location.get('country', '')}".strip()
|
|
if not location:
|
|
# Try to build from city/region
|
|
city = profile.get("city", "")
|
|
region = profile.get("region", "")
|
|
if isinstance(city, dict):
|
|
city = city.get("name", "")
|
|
if isinstance(region, dict):
|
|
region = region.get("name", "")
|
|
location = f"{city}, {region}" if city and region else (city or region or None)
|
|
|
|
# Get country code
|
|
country_code = extract_country_from_location(location)
|
|
|
|
# Get headline
|
|
headline = profile.get("headline") or profile.get("current_position")
|
|
if isinstance(headline, dict):
|
|
headline = headline.get("title", "")
|
|
|
|
# Get staff associations from staff_by_slug
|
|
staff_entries = staff_by_slug.get(slug, [])
|
|
custodian_slug = None
|
|
custodian_name = None
|
|
heritage_relevant = False
|
|
heritage_types = []
|
|
staff_id = meta.get("staff_id")
|
|
|
|
if staff_entries:
|
|
# Use first staff entry for custodian info
|
|
first_staff = staff_entries[0]
|
|
custodian_slug = first_staff.get("custodian_slug")
|
|
custodian_name = first_staff.get("custodian_name")
|
|
staff_id = staff_id or first_staff.get("staff_id")
|
|
|
|
# Aggregate heritage info from all entries
|
|
for se in staff_entries:
|
|
if se.get("heritage_relevant"):
|
|
heritage_relevant = True
|
|
ht = se.get("heritage_type")
|
|
if ht and ht not in heritage_types:
|
|
heritage_types.append(ht)
|
|
|
|
# Look up authoritative heritage_type from custodians database table
|
|
# This ensures Van Gogh Museum staff are tagged as 'M' not 'E' or 'A'
|
|
# We do NOT use heuristics - only verified institutional data
|
|
custodian_type = get_custodian_type_from_db(custodian_name)
|
|
if custodian_type:
|
|
heritage_relevant = True
|
|
# Replace first heritage_type with custodian-inferred type, or insert at front
|
|
if heritage_types and heritage_types[0] != custodian_type:
|
|
# Only keep the custodian type - headline-based types were often wrong
|
|
heritage_types = [custodian_type]
|
|
elif not heritage_types:
|
|
heritage_types = [custodian_type]
|
|
|
|
# Generate staff_id if missing
|
|
if not staff_id:
|
|
name_slug = re.sub(r'[^a-z0-9]+', '_', name.lower()).strip('_')
|
|
staff_id = f"entity_{name_slug}_{slug}"
|
|
|
|
# Get profile image
|
|
profile_image_url = None
|
|
if "exa_raw_response" in data:
|
|
results = data["exa_raw_response"].get("results", [])
|
|
if results:
|
|
profile_image_url = results[0].get("image")
|
|
if not profile_image_url:
|
|
profile_image_url = profile.get("profile_image_url")
|
|
|
|
# Ensure location is a string before truncating
|
|
location_str = str(location)[:200] if location and isinstance(location, str) else (str(location)[:200] if location else None)
|
|
|
|
# Extract rich profile data from entity files
|
|
about = profile.get("about")
|
|
if about and isinstance(about, str):
|
|
about = about[:5000] # Truncate very long about sections
|
|
|
|
experience = profile.get("experience")
|
|
if experience and isinstance(experience, list):
|
|
# Ensure it's JSON-serializable
|
|
experience = json.dumps(experience)
|
|
else:
|
|
experience = None
|
|
|
|
education = profile.get("education")
|
|
if education and isinstance(education, list):
|
|
education = json.dumps(education)
|
|
else:
|
|
education = None
|
|
|
|
skills = profile.get("skills")
|
|
if skills and isinstance(skills, list):
|
|
# Ensure all items are strings
|
|
skills = [str(s) for s in skills if s]
|
|
else:
|
|
skills = None
|
|
|
|
languages = profile.get("languages")
|
|
if languages and isinstance(languages, list):
|
|
# Languages can be strings or dicts - convert dicts to JSON strings
|
|
processed_languages = []
|
|
for l in languages:
|
|
if l:
|
|
if isinstance(l, dict):
|
|
processed_languages.append(json.dumps(l))
|
|
else:
|
|
processed_languages.append(str(l))
|
|
languages = processed_languages if processed_languages else None
|
|
else:
|
|
languages = None
|
|
|
|
connections = profile.get("connections")
|
|
if connections and isinstance(connections, str):
|
|
connections = connections[:200]
|
|
elif connections:
|
|
connections = str(connections)[:200]
|
|
else:
|
|
connections = None
|
|
|
|
# Extraction metadata
|
|
extraction_date = meta.get("extraction_date")
|
|
extraction_method = meta.get("extraction_method")
|
|
source_file = entity_info.get("file") or meta.get("source_file")
|
|
|
|
persons[staff_id] = {
|
|
"staff_id": staff_id,
|
|
"name": name,
|
|
"headline": headline[:500] if headline and isinstance(headline, str) else None,
|
|
"location": location_str,
|
|
"country_code": country_code,
|
|
"custodian_slug": custodian_slug,
|
|
"custodian_name": custodian_name,
|
|
"linkedin_url": meta.get("linkedin_url") or profile.get("linkedin_url"),
|
|
"profile_image_url": profile_image_url,
|
|
"heritage_relevant": heritage_relevant,
|
|
"heritage_types": heritage_types if heritage_types else None,
|
|
# New rich profile fields
|
|
"about": about,
|
|
"experience": experience,
|
|
"education": education,
|
|
"skills": skills,
|
|
"languages": languages,
|
|
"connections": connections,
|
|
"extraction_date": extraction_date,
|
|
"extraction_method": extraction_method,
|
|
"source_file": source_file,
|
|
}
|
|
|
|
# Then add staff entries that don't have entity files
|
|
for staff in staff_list:
|
|
slug = staff.get("linkedin_slug")
|
|
staff_id = staff.get("staff_id")
|
|
|
|
# Skip if we already have this person from entity
|
|
if slug and slug in entities:
|
|
continue
|
|
|
|
# Skip if we already have this staff_id
|
|
if staff_id in persons:
|
|
continue
|
|
|
|
if not staff_id:
|
|
continue
|
|
|
|
# Get country code
|
|
country_code = extract_country_from_location(staff.get("location"))
|
|
|
|
heritage_types = []
|
|
if staff.get("heritage_type"):
|
|
heritage_types.append(staff["heritage_type"])
|
|
|
|
persons[staff_id] = {
|
|
"staff_id": staff_id,
|
|
"name": staff.get("name"),
|
|
"headline": staff.get("headline", "")[:500] if staff.get("headline") else None,
|
|
"location": staff.get("location"),
|
|
"country_code": country_code,
|
|
"custodian_slug": staff.get("custodian_slug"),
|
|
"custodian_name": staff.get("custodian_name"),
|
|
"linkedin_url": staff.get("linkedin_url"),
|
|
"profile_image_url": None,
|
|
"heritage_relevant": staff.get("heritage_relevant", False),
|
|
"heritage_types": heritage_types if heritage_types else None,
|
|
# Staff-only entries don't have rich profile data
|
|
"about": None,
|
|
"experience": None,
|
|
"education": None,
|
|
"skills": None,
|
|
"languages": None,
|
|
"connections": None,
|
|
"extraction_date": None,
|
|
"extraction_method": None,
|
|
"source_file": None,
|
|
}
|
|
|
|
return list(persons.values())
|
|
|
|
|
|
def import_to_postgres(persons: List[Dict], dry_run: bool = False) -> int:
|
|
"""Import person data into PostgreSQL."""
|
|
if dry_run:
|
|
print(f"\n[DRY RUN] Would import {len(persons)} persons")
|
|
# Show sample
|
|
for person in persons[:10]:
|
|
img = "📸" if person.get("profile_image_url") else " "
|
|
hr = "🏛️" if person.get("heritage_relevant") else " "
|
|
print(f" {img}{hr} {person['name'][:40]:<40} | {(person.get('custodian_name') or 'No custodian')[:35]}")
|
|
return len(persons)
|
|
|
|
# Connect to database
|
|
conn = psycopg2.connect(**DB_CONFIG)
|
|
cur = conn.cursor()
|
|
|
|
try:
|
|
# Clear existing data
|
|
cur.execute("TRUNCATE TABLE persons")
|
|
|
|
# Prepare data for batch insert
|
|
columns = [
|
|
"staff_id", "name", "headline", "location", "country_code",
|
|
"custodian_slug", "custodian_name", "linkedin_url",
|
|
"profile_image_url", "heritage_relevant", "heritage_types",
|
|
# New rich profile columns
|
|
"about", "experience", "education", "skills", "languages",
|
|
"connections", "extraction_date", "extraction_method", "source_file"
|
|
]
|
|
|
|
values = [
|
|
(
|
|
p["staff_id"],
|
|
p["name"],
|
|
p["headline"],
|
|
p["location"],
|
|
p["country_code"],
|
|
p["custodian_slug"],
|
|
p["custodian_name"],
|
|
p["linkedin_url"],
|
|
p["profile_image_url"],
|
|
p["heritage_relevant"],
|
|
p["heritage_types"],
|
|
# New rich profile values
|
|
p.get("about"),
|
|
p.get("experience"),
|
|
p.get("education"),
|
|
p.get("skills"),
|
|
p.get("languages"),
|
|
p.get("connections"),
|
|
p.get("extraction_date"),
|
|
p.get("extraction_method"),
|
|
p.get("source_file"),
|
|
)
|
|
for p in persons
|
|
]
|
|
|
|
# Batch insert
|
|
insert_query = f"""
|
|
INSERT INTO persons ({', '.join(columns)})
|
|
VALUES %s
|
|
ON CONFLICT (staff_id) DO UPDATE SET
|
|
name = EXCLUDED.name,
|
|
headline = EXCLUDED.headline,
|
|
location = EXCLUDED.location,
|
|
country_code = EXCLUDED.country_code,
|
|
custodian_slug = EXCLUDED.custodian_slug,
|
|
custodian_name = EXCLUDED.custodian_name,
|
|
linkedin_url = EXCLUDED.linkedin_url,
|
|
profile_image_url = EXCLUDED.profile_image_url,
|
|
heritage_relevant = EXCLUDED.heritage_relevant,
|
|
heritage_types = EXCLUDED.heritage_types,
|
|
about = EXCLUDED.about,
|
|
experience = EXCLUDED.experience,
|
|
education = EXCLUDED.education,
|
|
skills = EXCLUDED.skills,
|
|
languages = EXCLUDED.languages,
|
|
connections = EXCLUDED.connections,
|
|
extraction_date = EXCLUDED.extraction_date,
|
|
extraction_method = EXCLUDED.extraction_method,
|
|
source_file = EXCLUDED.source_file,
|
|
updated_at = CURRENT_TIMESTAMP
|
|
"""
|
|
|
|
execute_values(cur, insert_query, values, page_size=1000)
|
|
|
|
conn.commit()
|
|
|
|
# Get final counts
|
|
cur.execute("SELECT COUNT(*) FROM persons")
|
|
total = cur.fetchone()[0]
|
|
|
|
cur.execute("SELECT COUNT(*) FROM persons WHERE heritage_relevant = true")
|
|
heritage_relevant = cur.fetchone()[0]
|
|
|
|
cur.execute("SELECT COUNT(*) FROM persons WHERE profile_image_url IS NOT NULL")
|
|
with_image = cur.fetchone()[0]
|
|
|
|
cur.execute("SELECT COUNT(*) FROM persons WHERE about IS NOT NULL")
|
|
with_about = cur.fetchone()[0]
|
|
|
|
cur.execute("SELECT COUNT(*) FROM persons WHERE experience IS NOT NULL")
|
|
with_experience = cur.fetchone()[0]
|
|
|
|
cur.execute("SELECT COUNT(*) FROM persons WHERE education IS NOT NULL")
|
|
with_education = cur.fetchone()[0]
|
|
|
|
cur.execute("SELECT COUNT(*) FROM persons WHERE skills IS NOT NULL")
|
|
with_skills = cur.fetchone()[0]
|
|
|
|
print(f"\n✓ Imported {total} persons")
|
|
print(f" - Heritage-relevant: {heritage_relevant}")
|
|
print(f" - With profile image: {with_image}")
|
|
print(f" - With about section: {with_about}")
|
|
print(f" - With experience: {with_experience}")
|
|
print(f" - With education: {with_education}")
|
|
print(f" - With skills: {with_skills}")
|
|
|
|
return total
|
|
|
|
except Exception as e:
|
|
conn.rollback()
|
|
print(f"Error importing data: {e}")
|
|
raise
|
|
finally:
|
|
cur.close()
|
|
conn.close()
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Import persons to PostgreSQL")
|
|
parser.add_argument("--dry-run", action="store_true", help="Don't actually import")
|
|
parser.add_argument("--remote", action="store_true", help="Use remote paths (for server execution)")
|
|
args = parser.parse_args()
|
|
|
|
# Set paths
|
|
entity_dir = REMOTE_ENTITY_DIR if args.remote else ENTITY_DIR
|
|
parsed_dir = REMOTE_PARSED_DIR if args.remote else PARSED_DIR
|
|
|
|
print(f"Loading entity files from {entity_dir}...")
|
|
entities = load_entity_files(entity_dir)
|
|
print(f" Loaded {len(entities)} entity profiles")
|
|
|
|
print(f"\nLoading staff files from {parsed_dir}...")
|
|
staff = load_staff_files(parsed_dir)
|
|
print(f" Loaded {len(staff)} staff entries")
|
|
|
|
print("\nMerging data...")
|
|
persons = merge_data(entities, staff)
|
|
print(f" Total unique persons: {len(persons)}")
|
|
|
|
# Stats
|
|
heritage_relevant = sum(1 for p in persons if p["heritage_relevant"])
|
|
with_linkedin = sum(1 for p in persons if p["linkedin_url"])
|
|
with_image = sum(1 for p in persons if p["profile_image_url"])
|
|
with_custodian = sum(1 for p in persons if p["custodian_name"])
|
|
|
|
by_country = {}
|
|
for p in persons:
|
|
cc = p["country_code"] or "Unknown"
|
|
by_country[cc] = by_country.get(cc, 0) + 1
|
|
|
|
print(f"\nStats:")
|
|
print(f" Heritage-relevant: {heritage_relevant}")
|
|
print(f" With LinkedIn URL: {with_linkedin}")
|
|
print(f" With profile image: {with_image}")
|
|
print(f" With custodian link: {with_custodian}")
|
|
print(f" By country (top 10):")
|
|
for cc, count in sorted(by_country.items(), key=lambda x: -x[1])[:10]:
|
|
print(f" {cc}: {count}")
|
|
|
|
import_to_postgres(persons, dry_run=args.dry_run)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|