333 lines
13 KiB
Python
333 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Migrate entity profiles from data/custodian/person/entity/ to data/person/
|
|
|
|
This script (v3) optimizes for large-scale migration:
|
|
1. Pre-builds an index of existing PPID filenames (fast)
|
|
2. Processes entity files in batches with progress reporting
|
|
3. Uses multiprocessing for parallel file operations
|
|
4. Handles name collisions with counter suffixes
|
|
|
|
Usage:
|
|
python scripts/migrate_entity_to_ppid_v3.py --dry-run --limit 100 # Preview 100 profiles
|
|
python scripts/migrate_entity_to_ppid_v3.py --dry-run # Preview all
|
|
python scripts/migrate_entity_to_ppid_v3.py # Execute migration
|
|
"""
|
|
|
|
import json
|
|
import argparse
|
|
import re
|
|
from pathlib import Path
|
|
from datetime import datetime, timezone
|
|
import unicodedata
|
|
from multiprocessing import Pool, cpu_count
|
|
import os
|
|
|
|
# Patterns for detecting non-human profiles (institutions, not people)
|
|
# NOTE: LinkedIn Member is INCLUDED - they are real people with privacy settings
|
|
NON_HUMAN_PATTERNS = [
|
|
r'^TheMuseumsLab$',
|
|
r'^Piet Blom Museum$', # Specific institution profile
|
|
r'^Limburgs Museum$', # Specific institution profile
|
|
r'^Miniature Museum$', # Specific institution profile
|
|
r'^Stichting\s', # Dutch foundation names
|
|
r'^ICOM\s', # ICOM organization
|
|
r'^Fondazione\s', # Italian foundation
|
|
r'^Google\s', # Company profiles (Google DeepMind etc)
|
|
r'^Sound\s+Heritage$', # Specific organization
|
|
r'^Company\s+name\s', # Parsing artifact "Company name X"
|
|
r'^Computational\s+Research$', # Specific organization
|
|
]
|
|
|
|
# Patterns for organization profiles that should be excluded
|
|
# These end with institutional suffixes and have NO personal LinkedIn URL
|
|
INSTITUTION_SUFFIX_PATTERNS = [
|
|
r'Museum$',
|
|
r'Foundation$',
|
|
r'Institute$',
|
|
r'Organisation$',
|
|
r'Organization$',
|
|
r'University$',
|
|
]
|
|
|
|
def is_human_profile(name, profile_data):
|
|
"""Determine if profile represents a human being (not an institution).
|
|
|
|
LinkedIn Member profiles ARE included - they are real people with privacy settings.
|
|
They have job titles and affiliations, just no visible name.
|
|
"""
|
|
if not name:
|
|
return False
|
|
|
|
# Check explicit non-human patterns (specific organizations)
|
|
for pattern in NON_HUMAN_PATTERNS:
|
|
if re.search(pattern, name, re.IGNORECASE):
|
|
return False
|
|
|
|
# Check institution suffix patterns - only exclude if NO personal LinkedIn URL
|
|
# (Real people with names like "Jan Museum" would have a personal /in/ URL)
|
|
linkedin_url = profile_data.get('linkedin_url', '')
|
|
has_personal_linkedin = linkedin_url and '/in/' in linkedin_url
|
|
|
|
if not has_personal_linkedin:
|
|
for pattern in INSTITUTION_SUFFIX_PATTERNS:
|
|
if re.search(pattern, name, re.IGNORECASE):
|
|
return False
|
|
|
|
# LinkedIn Member profiles ARE human - they just have privacy settings
|
|
# They have job titles, affiliations, and are real people
|
|
# (We'll generate their PPID from affiliation context)
|
|
|
|
return True
|
|
|
|
def normalize_name_for_ppid(name):
|
|
"""Convert name to PPID format: FIRST-LAST"""
|
|
if not name:
|
|
return "UNKNOWN"
|
|
|
|
name = re.sub(r'\b(Dr|Prof|Mr|Mrs|Ms|PhD|MA|MSc|MBA|BSc|Jr|Sr|PSM|GIA|GG)\b\.?', '', name, flags=re.IGNORECASE)
|
|
parts = [p.strip() for p in name.split() if p.strip()]
|
|
if not parts:
|
|
return "UNKNOWN"
|
|
|
|
def normalize_part(p):
|
|
nfkd = unicodedata.normalize('NFKD', p)
|
|
ascii_name = ''.join(c for c in nfkd if not unicodedata.combining(c))
|
|
return re.sub(r'[^A-Za-z]', '', ascii_name).upper()
|
|
|
|
normalized = [normalize_part(p) for p in parts if normalize_part(p)]
|
|
return '-'.join(normalized) if normalized else "UNKNOWN"
|
|
|
|
def generate_ppid(name, entity_data=None):
|
|
"""Generate PPID from name (locations/dates use XX placeholders).
|
|
|
|
For LinkedIn Member profiles, use affiliation context to create unique ID.
|
|
"""
|
|
if name == 'LinkedIn Member' and entity_data:
|
|
# Use affiliation context for anonymous profiles
|
|
affiliations = entity_data.get('affiliations', [])
|
|
headline = entity_data.get('profile_data', {}).get('headline', '')
|
|
|
|
# Try to build context from affiliation
|
|
if affiliations:
|
|
org = affiliations[0].get('custodian_name', 'UNKNOWN-ORG')
|
|
org_token = normalize_name_for_ppid(org)
|
|
|
|
# Add headline keywords if available
|
|
if headline:
|
|
# Extract key role word from headline
|
|
role_words = []
|
|
for word in headline.split()[:3]: # First 3 words
|
|
normalized = normalize_name_for_ppid(word)
|
|
if normalized and len(normalized) > 2:
|
|
role_words.append(normalized)
|
|
role_token = '-'.join(role_words[:2]) if role_words else 'STAFF'
|
|
else:
|
|
role_token = 'STAFF'
|
|
|
|
name_token = f"ANON-{org_token[:20]}-{role_token[:15]}"
|
|
else:
|
|
name_token = "LINKEDIN-MEMBER"
|
|
else:
|
|
name_token = normalize_name_for_ppid(name)
|
|
|
|
return f"ID_XX-XX-XXX_XXXX_XX-XX-XXX_XXXX_{name_token}"
|
|
|
|
def transform_entity_to_ppid(entity_data, entity_file_name):
|
|
"""Transform entity profile to PPID format, preserving ALL data."""
|
|
|
|
name = entity_data.get('profile_data', {}).get('name') or entity_data.get('name', 'Unknown')
|
|
ppid = generate_ppid(name, entity_data) # Pass entity_data for LinkedIn Member context
|
|
|
|
# Determine if this is an anonymous profile
|
|
is_anonymous = (name == 'LinkedIn Member')
|
|
|
|
# Get name tokens based on PPID type
|
|
if is_anonymous:
|
|
# For anonymous, tokens come from PPID structure
|
|
name_tokens = ppid.split('_')[-1].split('-')
|
|
else:
|
|
name_tokens = normalize_name_for_ppid(name).split('-')
|
|
|
|
ppid_profile = {
|
|
"ppid": ppid,
|
|
"ppid_type": "ID",
|
|
"ppid_components": {
|
|
"type": "ID",
|
|
"first_location": "XX-XX-XXX",
|
|
"first_date": "XXXX",
|
|
"last_location": "XX-XX-XXX",
|
|
"last_date": "XXXX",
|
|
"name_tokens": name_tokens,
|
|
"is_anonymous": is_anonymous
|
|
},
|
|
"name": name,
|
|
"birth_date": {
|
|
"edtf": "XXXX",
|
|
"precision": "unknown",
|
|
"note": "Not yet enriched - requires manual research"
|
|
},
|
|
"is_living": True,
|
|
"is_anonymous": is_anonymous, # Top-level flag for easy filtering
|
|
"heritage_relevance": entity_data.get('heritage_relevance', {
|
|
"is_heritage_relevant": True,
|
|
"heritage_types": [],
|
|
"rationale": "Extracted from heritage custodian LinkedIn page"
|
|
}),
|
|
"affiliations": entity_data.get('affiliations', []),
|
|
"profile_data": entity_data.get('profile_data', {}),
|
|
"web_claims": entity_data.get('web_claims', []),
|
|
"source_observations": entity_data.get('source_observations', []),
|
|
"extraction_metadata": entity_data.get('extraction_metadata', {}),
|
|
"migration_metadata": {
|
|
"original_entity_file": entity_file_name,
|
|
"original_person_id": entity_data.get('person_id'),
|
|
"original_linkedin_slug": entity_data.get('linkedin_slug'),
|
|
"migrated_at": datetime.now(timezone.utc).isoformat(),
|
|
"migration_script": "migrate_entity_to_ppid_v3.py",
|
|
"migration_version": "3.0"
|
|
}
|
|
}
|
|
|
|
return ppid, ppid_profile
|
|
|
|
|
|
def process_entity_file(args):
|
|
"""Process a single entity file. Returns (status, ppid, file_path) or (status, reason, file_path)."""
|
|
entity_file_path, existing_ppids_set, person_dir, dry_run = args
|
|
|
|
try:
|
|
with open(entity_file_path) as f:
|
|
data = json.load(f)
|
|
|
|
name = data.get('profile_data', {}).get('name') or data.get('name', '')
|
|
|
|
# Skip non-human profiles
|
|
if not is_human_profile(name, data.get('profile_data', {})):
|
|
return ('skip', 'non-human', str(entity_file_path))
|
|
|
|
# Generate PPID
|
|
ppid, ppid_profile = transform_entity_to_ppid(data, Path(entity_file_path).name)
|
|
|
|
# Check if already exists
|
|
if ppid in existing_ppids_set:
|
|
return ('exists', ppid, str(entity_file_path))
|
|
|
|
# Handle collisions with counter suffix
|
|
output_ppid = ppid
|
|
counter = 1
|
|
while output_ppid in existing_ppids_set:
|
|
output_ppid = f"{ppid}-{counter}"
|
|
ppid_profile['ppid'] = output_ppid
|
|
counter += 1
|
|
|
|
output_file = Path(person_dir) / f"{output_ppid}.json"
|
|
|
|
if not dry_run:
|
|
with open(output_file, 'w') as f:
|
|
json.dump(ppid_profile, f, indent=2, ensure_ascii=False)
|
|
|
|
return ('migrated', output_ppid, str(entity_file_path))
|
|
|
|
except Exception as e:
|
|
return ('error', str(e), str(entity_file_path))
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Migrate entity profiles to PPID format (v3 - optimized)')
|
|
parser.add_argument('--dry-run', action='store_true', help='Preview only, no file changes')
|
|
parser.add_argument('--limit', type=int, default=None, help='Limit number of profiles to process')
|
|
parser.add_argument('--workers', type=int, default=4, help='Number of parallel workers')
|
|
parser.add_argument('--verbose', action='store_true', help='Show each migrated file')
|
|
args = parser.parse_args()
|
|
|
|
entity_dir = Path('/Users/kempersc/apps/glam/data/custodian/person/entity')
|
|
person_dir = Path('/Users/kempersc/apps/glam/data/person')
|
|
|
|
print("=" * 70)
|
|
print("PPID MIGRATION SCRIPT v3.0 (Optimized)")
|
|
print("=" * 70)
|
|
|
|
# Phase 1: Build index of existing PPID filenames
|
|
print("\nPhase 1: Indexing existing PPID files...")
|
|
existing_ppids = set()
|
|
for f in person_dir.glob('ID_*.json'):
|
|
# Extract PPID from filename (remove .json)
|
|
existing_ppids.add(f.stem)
|
|
print(f" Found {len(existing_ppids):,} existing PPID files")
|
|
|
|
# Phase 2: List entity files
|
|
print("\nPhase 2: Listing entity files...")
|
|
entity_files = list(entity_dir.glob('*.json'))
|
|
total_entity = len(entity_files)
|
|
print(f" Found {total_entity:,} entity files")
|
|
|
|
if args.limit:
|
|
entity_files = entity_files[:args.limit]
|
|
print(f" Limited to {args.limit} files for this run")
|
|
|
|
# Phase 3: Process files
|
|
print(f"\nPhase 3: Processing files (workers={args.workers}, dry_run={args.dry_run})...")
|
|
|
|
# Prepare args for multiprocessing
|
|
process_args = [
|
|
(str(f), existing_ppids, str(person_dir), args.dry_run)
|
|
for f in entity_files
|
|
]
|
|
|
|
# Process in batches with progress
|
|
results = {'migrated': 0, 'exists': 0, 'skip': 0, 'error': 0}
|
|
migrated_samples = []
|
|
|
|
batch_size = 1000
|
|
for batch_start in range(0, len(process_args), batch_size):
|
|
batch_end = min(batch_start + batch_size, len(process_args))
|
|
batch = process_args[batch_start:batch_end]
|
|
|
|
with Pool(args.workers) as pool:
|
|
batch_results = pool.map(process_entity_file, batch)
|
|
|
|
for status, detail, file_path in batch_results:
|
|
results[status] += 1
|
|
|
|
if status == 'migrated':
|
|
# Add to existing set to prevent collisions within batch
|
|
existing_ppids.add(detail)
|
|
if args.verbose or len(migrated_samples) < 5:
|
|
migrated_samples.append((detail, Path(file_path).name))
|
|
|
|
if status == 'error':
|
|
print(f" ERROR: {file_path}: {detail}")
|
|
|
|
# Progress report
|
|
processed = batch_end
|
|
pct = (processed / len(process_args)) * 100
|
|
print(f" Progress: {processed:,}/{len(process_args):,} ({pct:.1f}%) - "
|
|
f"Migrated: {results['migrated']:,}, Exists: {results['exists']:,}, "
|
|
f"Skip: {results['skip']:,}, Errors: {results['error']}")
|
|
|
|
# Summary
|
|
print("\n" + "=" * 70)
|
|
print(f"{'DRY RUN ' if args.dry_run else ''}MIGRATION SUMMARY")
|
|
print("=" * 70)
|
|
print(f" Total processed: {sum(results.values()):,}")
|
|
print(f" Migrated (new): {results['migrated']:,}")
|
|
print(f" Already exists: {results['exists']:,}")
|
|
print(f" Skipped (non-human): {results['skip']:,}")
|
|
print(f" Errors: {results['error']}")
|
|
|
|
if migrated_samples:
|
|
print(f"\n Sample migrated profiles:")
|
|
for ppid, source in migrated_samples[:5]:
|
|
print(f" {ppid} <- {source}")
|
|
|
|
if args.dry_run:
|
|
print(f"\n To execute migration, run without --dry-run flag")
|
|
else:
|
|
final_count = len(list(person_dir.glob('ID_*.json')))
|
|
print(f"\n Migration complete!")
|
|
print(f" Final PPID count: {final_count:,}")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|