#!/usr/bin/env python3 """ Migrate entity profiles from data/custodian/person/entity/ to data/person/ This script (v5) uses LINKEDIN SLUG for deduplication: 1. LinkedIn slug is the unique identifier - NOT PPID 2. If LinkedIn slug already exists in data/person/, skip (not a duplicate) 3. PPID filename collisions (same name, different person) get UUID suffix 4. Processes ALL entries with classification tags Usage: python scripts/migrate_entity_to_ppid_v5.py --dry-run --limit 100 # Preview 100 profiles python scripts/migrate_entity_to_ppid_v5.py --dry-run # Preview all python scripts/migrate_entity_to_ppid_v5.py # Execute migration """ import json import argparse import re import uuid from pathlib import Path from datetime import datetime, timezone import unicodedata from urllib.parse import unquote from multiprocessing import Pool from typing import Dict, List, Tuple, Any, Optional, Set # Patterns that suggest this might be an INSTITUTION (not a person) INSTITUTION_INDICATORS = [ (r'^Company\s+name\s', 'parsing_artifact', 'Profile name starts with "Company name" - likely parsing artifact'), (r'^Stichting\s', 'dutch_foundation', 'Dutch foundation (Stichting)'), (r'^Fondazione\s', 'italian_foundation', 'Italian foundation (Fondazione)'), (r'^ICOM\s', 'icom_organization', 'ICOM organization'), (r'^Google\s', 'company_profile', 'Google company profile'), (r'^TheMuseumsLab$', 'organization', 'TheMuseumsLab organization'), (r'^Sound\s+Heritage$', 'organization', 'Sound Heritage organization'), (r'^Computational\s+Research$', 'organization', 'Computational Research organization'), (r'Museum$', 'museum_suffix', 'Name ends with "Museum" - likely institution'), (r'Foundation$', 'foundation_suffix', 'Name ends with "Foundation" - likely institution'), (r'Institute$', 'institute_suffix', 'Name ends with "Institute" - likely institution'), (r'Organisation$', 'org_suffix', 'Name ends with "Organisation" - likely institution'), (r'Organization$', 'org_suffix', 'Name ends with "Organization" - likely institution'), (r'University$', 'university_suffix', 'Name ends with "University" - likely institution'), (r'Library$', 'library_suffix', 'Name ends with "Library" - likely institution'), (r'Archive$', 'archive_suffix', 'Name ends with "Archive" - likely institution'), (r'Archief$', 'archive_suffix', 'Name ends with "Archief" (Dutch archive) - likely institution'), (r'Bibliotheek$', 'library_suffix', 'Name ends with "Bibliotheek" (Dutch library) - likely institution'), ] # Patterns that suggest this is a PERSON PERSON_INDICATORS = [ (r'^(Dr|Prof|Mr|Mrs|Ms|Drs|Ir|Ing)\.\s', 'title_prefix', 'Has personal title prefix'), (r'\s(PhD|MA|MSc|MBA|BSc|Jr|Sr)$', 'degree_suffix', 'Has degree/suffix'), (r'^[A-Z][a-z]+\s+[A-Z][a-z]+$', 'two_word_name', 'Simple two-word personal name pattern'), (r'^[A-Z][a-z]+\s+(van|de|den|der|von|van der|van den|van de)\s+[A-Z]', 'dutch_name', 'Dutch personal name with particle'), ] def extract_linkedin_slug(url: Optional[str]) -> Optional[str]: """Extract LinkedIn slug from URL - this is the UNIQUE IDENTIFIER.""" if not url or 'linkedin.com/in/' not in url: return None slug = url.split('linkedin.com/in/')[-1].rstrip('/').split('?')[0] slug = unquote(slug) return slug.lower() def get_linkedin_slug_from_profile(data: Dict) -> Optional[str]: """Extract LinkedIn slug from profile data.""" # Try profile_data.linkedin_url first linkedin_url = data.get('profile_data', {}).get('linkedin_url') if linkedin_url: slug = extract_linkedin_slug(linkedin_url) if slug: return slug # Try linkedin_slug field if data.get('linkedin_slug'): return data['linkedin_slug'].lower() # Try person_id (often is the slug) if data.get('person_id'): return data['person_id'].lower() return None def classify_profile(name: str, profile_data: Dict) -> Dict[str, Any]: """Classify profile as human, institution, anonymous, or unknown.""" if not name: return { 'primary_classification': 'unknown', 'confidence': 0.0, 'indicators': [{'type': 'empty_name', 'reason': 'Name field is empty'}], 'reasoning': 'Cannot classify - name is empty' } if name == 'LinkedIn Member': headline = profile_data.get('headline', '') return { 'primary_classification': 'anonymous', 'confidence': 0.9, 'indicators': [ {'type': 'linkedin_member', 'reason': 'LinkedIn privacy settings hide real name'}, {'type': 'has_headline', 'value': headline[:50] if headline else None}, ], 'reasoning': f'Anonymous LinkedIn profile with privacy settings. Has headline: {bool(headline)}' } institution_matches = [] person_matches = [] for pattern, indicator_type, reason in INSTITUTION_INDICATORS: if re.search(pattern, name, re.IGNORECASE): institution_matches.append({ 'type': indicator_type, 'pattern': pattern, 'reason': reason }) for pattern, indicator_type, reason in PERSON_INDICATORS: if re.search(pattern, name, re.IGNORECASE): person_matches.append({ 'type': indicator_type, 'pattern': pattern, 'reason': reason }) linkedin_url = profile_data.get('linkedin_url', '') if linkedin_url and '/in/' in linkedin_url: person_matches.append({ 'type': 'personal_linkedin_url', 'reason': 'Has personal LinkedIn /in/ URL' }) if institution_matches and not person_matches: return { 'primary_classification': 'institution', 'confidence': min(0.5 + 0.1 * len(institution_matches), 0.9), 'indicators': institution_matches, 'reasoning': f'Matched {len(institution_matches)} institution pattern(s), no person patterns' } elif person_matches and not institution_matches: return { 'primary_classification': 'human', 'confidence': min(0.5 + 0.15 * len(person_matches), 0.95), 'indicators': person_matches, 'reasoning': f'Matched {len(person_matches)} person pattern(s), no institution patterns' } elif person_matches and institution_matches: if any(i['type'] == 'personal_linkedin_url' for i in person_matches): return { 'primary_classification': 'human', 'confidence': 0.7, 'indicators': person_matches + institution_matches, 'reasoning': f'Conflicting patterns but has personal LinkedIn URL - likely human' } return { 'primary_classification': 'unknown', 'confidence': 0.3, 'indicators': person_matches + institution_matches, 'reasoning': f'Conflicting patterns: {len(person_matches)} person, {len(institution_matches)} institution' } else: return { 'primary_classification': 'human', 'confidence': 0.6, 'indicators': [{'type': 'default', 'reason': 'No specific patterns matched, defaulting to human'}], 'reasoning': 'No specific patterns matched - assuming human (default)' } def normalize_name_for_ppid(name: str) -> str: """Convert name to PPID format: FIRST-LAST""" if not name: return "UNKNOWN" name = re.sub(r'\b(Dr|Prof|Mr|Mrs|Ms|PhD|MA|MSc|MBA|BSc|Jr|Sr|PSM|GIA|GG)\b\.?', '', name, flags=re.IGNORECASE) parts = [p.strip() for p in name.split() if p.strip()] if not parts: return "UNKNOWN" def normalize_part(p): nfkd = unicodedata.normalize('NFKD', p) ascii_name = ''.join(c for c in nfkd if not unicodedata.combining(c)) return re.sub(r'[^A-Za-z0-9]', '', ascii_name).upper() normalized = [normalize_part(p) for p in parts if normalize_part(p)] return '-'.join(normalized) if normalized else "UNKNOWN" def generate_ppid(name: str, entity_data: Dict = None) -> str: """Generate PPID from name (locations/dates use XX placeholders).""" if name == 'LinkedIn Member' and entity_data: affiliations = entity_data.get('affiliations', []) headline = entity_data.get('profile_data', {}).get('headline', '') if entity_data.get('profile_data') else '' if affiliations and isinstance(affiliations, list) and len(affiliations) > 0: org = affiliations[0].get('custodian_name', 'UNKNOWN-ORG') org_token = normalize_name_for_ppid(org)[:20] if headline: role_words = [] for word in headline.split()[:3]: normalized = normalize_name_for_ppid(word) if normalized and len(normalized) > 2: role_words.append(normalized) role_token = '-'.join(role_words[:2]) if role_words else 'STAFF' else: role_token = 'STAFF' name_token = f"ANON-{org_token}-{role_token[:15]}" else: name_token = "LINKEDIN-MEMBER" else: name_token = normalize_name_for_ppid(name) return f"ID_XX-XX-XXX_XXXX_XX-XX-XXX_XXXX_{name_token}" def transform_entity_to_ppid(entity_data: Dict, entity_file_name: str) -> Tuple[str, Dict]: """Transform entity profile to PPID format, preserving ALL data.""" name = entity_data.get('profile_data', {}).get('name') or entity_data.get('name', 'Unknown') ppid = generate_ppid(name, entity_data) profile_data = entity_data.get('profile_data', {}) profile_data_for_classification = {**profile_data} if 'affiliations' not in profile_data_for_classification: profile_data_for_classification['affiliations'] = entity_data.get('affiliations', []) classification = classify_profile(name, profile_data_for_classification) is_anonymous = (name == 'LinkedIn Member') if is_anonymous: name_tokens = ppid.split('_')[-1].split('-') else: name_tokens = normalize_name_for_ppid(name).split('-') # Get LinkedIn slug for this profile linkedin_slug = get_linkedin_slug_from_profile(entity_data) ppid_profile = { "ppid": ppid, "ppid_type": "ID", "ppid_components": { "type": "ID", "first_location": "XX-XX-XXX", "first_date": "XXXX", "last_location": "XX-XX-XXX", "last_date": "XXXX", "name_tokens": name_tokens }, "name": name, "linkedin_slug": linkedin_slug, # Store slug at top level for easy deduplication "birth_date": { "edtf": "XXXX", "precision": "unknown", "note": "Not yet enriched - requires manual research" }, "is_living": True, "is_anonymous": is_anonymous, "profile_classification": classification, "heritage_relevance": entity_data.get('heritage_relevance', { "is_heritage_relevant": True, "heritage_types": [], "rationale": "Extracted from heritage custodian LinkedIn page" }), "affiliations": entity_data.get('affiliations', []), "profile_data": entity_data.get('profile_data', {}), "web_claims": entity_data.get('web_claims', []), "source_observations": entity_data.get('source_observations', []), "extraction_metadata": entity_data.get('extraction_metadata', {}), "migration_metadata": { "original_entity_file": entity_file_name, "original_person_id": entity_data.get('person_id'), "original_linkedin_slug": linkedin_slug, "migrated_at": datetime.now(timezone.utc).isoformat(), "migration_script": "migrate_entity_to_ppid_v5.py", "migration_version": "5.0" } } return ppid, ppid_profile def build_existing_linkedin_slugs(person_dir: Path) -> Set[str]: """Build set of LinkedIn slugs already in data/person/.""" existing_slugs = set() for f in person_dir.glob('ID_*.json'): try: with open(f) as fp: data = json.load(fp) slug = get_linkedin_slug_from_profile(data) if slug: existing_slugs.add(slug) except: pass return existing_slugs def build_existing_ppid_filenames(person_dir: Path) -> Set[str]: """Build set of existing PPID filenames (for collision detection).""" return {f.stem for f in person_dir.glob('ID_*.json')} # Global sets - populated in main, used by worker processes EXISTING_LINKEDIN_SLUGS: Set[str] = set() EXISTING_PPID_FILENAMES: Set[str] = set() def init_worker(existing_slugs: Set[str], existing_ppids: Set[str]): """Initialize worker process with shared data.""" global EXISTING_LINKEDIN_SLUGS, EXISTING_PPID_FILENAMES EXISTING_LINKEDIN_SLUGS = existing_slugs EXISTING_PPID_FILENAMES = existing_ppids def process_entity_file(args): """Process a single entity file. Returns: (status, detail, classification, linkedin_slug, file_path) - status: 'migrated', 'duplicate', 'no_slug', 'error' """ entity_file_path, person_dir, dry_run, new_slugs_lock_file = args try: with open(entity_file_path) as f: data = json.load(f) # Get LinkedIn slug - this is the DEDUPLICATION KEY linkedin_slug = get_linkedin_slug_from_profile(data) if not linkedin_slug: # No LinkedIn slug - can't dedupe, but still migrate with UUID name = data.get('profile_data', {}).get('name') or data.get('name', '') ppid, ppid_profile = transform_entity_to_ppid(data, Path(entity_file_path).name) classification = ppid_profile['profile_classification']['primary_classification'] # Always add UUID for no-slug profiles to avoid collisions short_uuid = str(uuid.uuid4())[:8] output_ppid = f"{ppid}-{short_uuid}" ppid_profile['ppid'] = output_ppid ppid_profile['ppid_components']['collision_uuid'] = short_uuid ppid_profile['ppid_components']['no_linkedin_slug'] = True output_file = Path(person_dir) / f"{output_ppid}.json" if not dry_run: with open(output_file, 'w') as f: json.dump(ppid_profile, f, indent=2, ensure_ascii=False) return ('no_slug', output_ppid, classification, None, str(entity_file_path)) # Check if LinkedIn slug already exists in data/person/ if linkedin_slug in EXISTING_LINKEDIN_SLUGS: return ('duplicate', linkedin_slug, 'skipped', linkedin_slug, str(entity_file_path)) # Check if we've already processed this slug in this batch # (Read from lock file to handle multiprocessing) try: if Path(new_slugs_lock_file).exists(): with open(new_slugs_lock_file) as lf: batch_slugs = set(line.strip() for line in lf) if linkedin_slug in batch_slugs: return ('duplicate', linkedin_slug, 'skipped', linkedin_slug, str(entity_file_path)) except: pass # Transform to PPID ppid, ppid_profile = transform_entity_to_ppid(data, Path(entity_file_path).name) classification = ppid_profile['profile_classification']['primary_classification'] # Check for PPID filename collision (different person, same name) output_ppid = ppid if ppid in EXISTING_PPID_FILENAMES: short_uuid = str(uuid.uuid4())[:8] output_ppid = f"{ppid}-{short_uuid}" ppid_profile['ppid'] = output_ppid ppid_profile['ppid_components']['collision_uuid'] = short_uuid output_file = Path(person_dir) / f"{output_ppid}.json" # Double-check file doesn't exist (race condition protection) while output_file.exists(): short_uuid = str(uuid.uuid4())[:8] output_ppid = f"{ppid}-{short_uuid}" ppid_profile['ppid'] = output_ppid ppid_profile['ppid_components']['collision_uuid'] = short_uuid output_file = Path(person_dir) / f"{output_ppid}.json" if not dry_run: with open(output_file, 'w') as f: json.dump(ppid_profile, f, indent=2, ensure_ascii=False) # Record this slug as processed with open(new_slugs_lock_file, 'a') as lf: lf.write(linkedin_slug + '\n') return ('migrated', output_ppid, classification, linkedin_slug, str(entity_file_path)) except Exception as e: return ('error', str(e), 'error', None, str(entity_file_path)) def main(): parser = argparse.ArgumentParser(description='Migrate entity profiles to PPID (v5 - LinkedIn slug dedup)') parser.add_argument('--dry-run', action='store_true', help='Preview only, no file changes') parser.add_argument('--limit', type=int, default=None, help='Limit number of profiles to process') parser.add_argument('--workers', type=int, default=4, help='Number of parallel workers') parser.add_argument('--verbose', action='store_true', help='Show each migrated file') args = parser.parse_args() entity_dir = Path('/Users/kempersc/apps/glam/data/custodian/person/entity') person_dir = Path('/Users/kempersc/apps/glam/data/person') print("=" * 70) print("PPID MIGRATION SCRIPT v5.0 (LinkedIn Slug Deduplication)") print("=" * 70) print(" DEDUPLICATION KEY: LinkedIn slug (NOT PPID)") print(" PPID collisions: Resolved with UUID suffix") # Phase 1: Build index of existing LinkedIn slugs print("\nPhase 1: Indexing existing LinkedIn slugs in data/person/...") existing_slugs = build_existing_linkedin_slugs(person_dir) print(f" Found {len(existing_slugs):,} existing LinkedIn slugs") # Phase 2: Build index of existing PPID filenames print("\nPhase 2: Indexing existing PPID filenames...") existing_ppids = build_existing_ppid_filenames(person_dir) print(f" Found {len(existing_ppids):,} existing PPID files") # Phase 3: List entity files print("\nPhase 3: Listing entity files...") entity_files = list(entity_dir.glob('*.json')) total_entity = len(entity_files) print(f" Found {total_entity:,} entity files") if args.limit: entity_files = entity_files[:args.limit] print(f" Limited to {args.limit} files for this run") # Create temp file for tracking processed slugs lock_file = person_dir / '.migration_slugs_temp.txt' if lock_file.exists(): lock_file.unlink() # Phase 4: Process files (sequential for proper dedup) print(f"\nPhase 4: Processing files (dry_run={args.dry_run})...") print(" Using sequential processing for reliable deduplication") global EXISTING_LINKEDIN_SLUGS, EXISTING_PPID_FILENAMES EXISTING_LINKEDIN_SLUGS = existing_slugs EXISTING_PPID_FILENAMES = existing_ppids results = {'migrated': 0, 'duplicate': 0, 'no_slug': 0, 'error': 0} classifications = {'human': 0, 'institution': 0, 'anonymous': 0, 'unknown': 0, 'skipped': 0} new_slugs = set() samples = [] for i, entity_file in enumerate(entity_files): result = process_entity_file((str(entity_file), str(person_dir), args.dry_run, str(lock_file))) status, detail, classification, linkedin_slug, file_path = result results[status] += 1 classifications[classification] = classifications.get(classification, 0) + 1 if status == 'migrated': if linkedin_slug: EXISTING_LINKEDIN_SLUGS.add(linkedin_slug) new_slugs.add(linkedin_slug) EXISTING_PPID_FILENAMES.add(detail.split('/')[-1].replace('.json', '')) if len(samples) < 5: samples.append((detail, classification, Path(file_path).name)) if status == 'error': print(f" ERROR: {file_path}: {detail}") # Progress every 1000 if (i + 1) % 1000 == 0: pct = ((i + 1) / len(entity_files)) * 100 print(f" Progress: {i+1:,}/{len(entity_files):,} ({pct:.1f}%) - " f"Migrated:{results['migrated']:,} Dup:{results['duplicate']:,} " f"NoSlug:{results['no_slug']:,} Err:{results['error']}") # Cleanup if lock_file.exists(): lock_file.unlink() # Summary print("\n" + "=" * 70) print(f"{'DRY RUN ' if args.dry_run else ''}MIGRATION SUMMARY") print("=" * 70) print(f" Total processed: {sum(results.values()):,}") print(f" Successfully migrated: {results['migrated']:,}") print(f" Duplicates skipped (slug exists): {results['duplicate']:,}") print(f" No LinkedIn slug (migrated with UUID): {results['no_slug']:,}") print(f" Errors: {results['error']}") print(f"\n Classification breakdown (migrated only):") print(f" Human: {classifications.get('human', 0):,}") print(f" Institution: {classifications.get('institution', 0):,}") print(f" Anonymous: {classifications.get('anonymous', 0):,}") print(f" Unknown: {classifications.get('unknown', 0):,}") if samples: print(f"\n Sample migrated profiles:") for ppid, classification, source in samples: print(f" [{classification:11}] {ppid[:55]}... <- {source[:35]}...") if args.dry_run: print(f"\n To execute migration, run without --dry-run flag") else: final_count = len(list(person_dir.glob('ID_*.json'))) print(f"\n Migration complete!") print(f" Final PPID count: {final_count:,}") print(f" New unique LinkedIn slugs added: {len(new_slugs):,}") if __name__ == '__main__': main()