glam/scripts/migrate_entity_to_ppid.py

191 lines
6.6 KiB
Python
Executable file

#!/usr/bin/env python3
"""
Migrate entity profiles from data/custodian/person/entity/ to data/person/
This script:
1. Reads entity profiles that are NOT already in data/person/
2. Generates PPID based on profile data
3. Creates proper PPID file in data/person/
4. Links via LinkedIn slug to prevent duplicates
Usage:
python scripts/migrate_entity_to_ppid.py --dry-run # Preview only
python scripts/migrate_entity_to_ppid.py # Execute migration
"""
import json
import argparse
import re
from pathlib import Path
from urllib.parse import unquote
from datetime import datetime, timezone
from collections import defaultdict
def extract_linkedin_slug(url):
"""Extract LinkedIn slug from URL."""
if not url or 'linkedin.com/in/' not in url:
return None
slug = url.split('linkedin.com/in/')[-1].rstrip('/').split('?')[0]
slug = unquote(slug)
return slug.lower()
def normalize_name_for_ppid(name):
"""Convert name to PPID format: FIRST-LAST"""
if not name:
return "UNKNOWN"
# Remove titles/suffixes
name = re.sub(r'\b(Dr|Prof|Mr|Mrs|Ms|PhD|MA|MSc|MBA|BSc|Jr|Sr)\b\.?', '', name, flags=re.IGNORECASE)
# Split and clean
parts = [p.strip() for p in name.split() if p.strip()]
if not parts:
return "UNKNOWN"
# Normalize: uppercase, remove diacritics
import unicodedata
def normalize_part(p):
nfkd = unicodedata.normalize('NFKD', p)
ascii_name = ''.join(c for c in nfkd if not unicodedata.combining(c))
return re.sub(r'[^A-Za-z]', '', ascii_name).upper()
normalized = [normalize_part(p) for p in parts if normalize_part(p)]
if not normalized:
return "UNKNOWN"
return '-'.join(normalized)
def generate_ppid(profile_data, name):
"""Generate PPID from profile data."""
# For now, use XX-XX-XXX placeholders (can be enriched later)
birth_loc = "XX-XX-XXX"
birth_date = "XXXX"
current_loc = "XX-XX-XXX"
death_date = "XXXX"
name_token = normalize_name_for_ppid(name)
return f"ID_{birth_loc}_{birth_date}_{current_loc}_{death_date}_{name_token}"
def main():
parser = argparse.ArgumentParser(description='Migrate entity profiles to PPID format')
parser.add_argument('--dry-run', action='store_true', help='Preview only, no file changes')
parser.add_argument('--limit', type=int, default=None, help='Limit number of profiles to process')
args = parser.parse_args()
entity_dir = Path('/Users/kempersc/apps/glam/data/custodian/person/entity')
person_dir = Path('/Users/kempersc/apps/glam/data/person')
# 1. Get existing LinkedIn slugs in data/person/
print("Loading existing PPID profiles...")
existing_slugs = set()
for f in person_dir.glob('ID_*.json'):
try:
data = json.load(open(f))
if 'profile_data' in data:
url = data['profile_data'].get('linkedin_url')
if url:
slug = extract_linkedin_slug(url)
if slug:
existing_slugs.add(slug)
except:
pass
print(f"Found {len(existing_slugs)} existing LinkedIn slugs in data/person/")
# 2. Find entity profiles NOT in data/person/
print("\nScanning entity profiles...")
to_migrate = []
for f in entity_dir.glob('*.json'):
try:
data = json.load(open(f))
if 'profile_data' in data:
url = data['profile_data'].get('linkedin_url')
if url:
slug = extract_linkedin_slug(url)
if slug and slug not in existing_slugs:
to_migrate.append((f, data, slug))
except Exception as e:
pass
print(f"Found {len(to_migrate)} entity profiles to migrate")
if args.limit:
to_migrate = to_migrate[:args.limit]
print(f"Limited to {args.limit} profiles")
# 3. Migrate profiles
migrated = 0
errors = 0
for entity_file, data, slug in to_migrate:
try:
name = data.get('profile_data', {}).get('name') or data.get('name', 'Unknown')
# Skip non-person entries
if name in ['LinkedIn Member', 'TheMuseumsLab'] or 'Museum' in name:
continue
ppid = generate_ppid(data.get('profile_data', {}), name)
output_file = person_dir / f"{ppid}.json"
# Handle collisions
counter = 1
while output_file.exists():
output_file = person_dir / f"{ppid}-{counter}.json"
counter += 1
# Transform to PPID format
ppid_profile = {
"ppid": output_file.stem,
"ppid_type": "ID",
"ppid_components": {
"type": "ID",
"first_location": "XX-XX-XXX",
"first_date": "XXXX",
"last_location": "XX-XX-XXX",
"last_date": "XXXX",
"name_tokens": normalize_name_for_ppid(name).split('-')
},
"name": name,
"birth_date": {
"edtf": "XXXX",
"precision": "unknown"
},
"is_living": True,
"heritage_relevance": data.get('heritage_relevance', {
"is_heritage_relevant": False,
"heritage_types": [],
"rationale": None
}),
"affiliations": data.get('affiliations', []),
"profile_data": data.get('profile_data', {}),
"web_claims": data.get('web_claims', []),
"extraction_metadata": {
"original_entity_file": entity_file.name,
"migrated_at": datetime.now(timezone.utc).isoformat(),
"migration_script": "migrate_entity_to_ppid.py"
}
}
if args.dry_run:
print(f"Would create: {output_file.name}")
else:
with open(output_file, 'w') as f:
json.dump(ppid_profile, f, indent=2, ensure_ascii=False)
print(f"Created: {output_file.name}")
migrated += 1
except Exception as e:
print(f"Error processing {entity_file.name}: {e}")
errors += 1
print(f"\n{'DRY RUN ' if args.dry_run else ''}SUMMARY:")
print(f" Migrated: {migrated}")
print(f" Errors: {errors}")
print(f" Skipped (non-person): {len(to_migrate) - migrated - errors}")
if __name__ == '__main__':
main()