glam/scripts/migrate_entity_to_ppid_v5.py
2026-01-11 18:08:40 +01:00

526 lines
22 KiB
Python

#!/usr/bin/env python3
"""
Migrate entity profiles from data/custodian/person/entity/ to data/person/
This script (v5) uses LINKEDIN SLUG for deduplication:
1. LinkedIn slug is the unique identifier - NOT PPID
2. If LinkedIn slug already exists in data/person/, skip (not a duplicate)
3. PPID filename collisions (same name, different person) get UUID suffix
4. Processes ALL entries with classification tags
Usage:
python scripts/migrate_entity_to_ppid_v5.py --dry-run --limit 100 # Preview 100 profiles
python scripts/migrate_entity_to_ppid_v5.py --dry-run # Preview all
python scripts/migrate_entity_to_ppid_v5.py # Execute migration
"""
import json
import argparse
import re
import uuid
from pathlib import Path
from datetime import datetime, timezone
import unicodedata
from urllib.parse import unquote
from multiprocessing import Pool
from typing import Dict, List, Tuple, Any, Optional, Set
# Patterns that suggest this might be an INSTITUTION (not a person)
INSTITUTION_INDICATORS = [
(r'^Company\s+name\s', 'parsing_artifact', 'Profile name starts with "Company name" - likely parsing artifact'),
(r'^Stichting\s', 'dutch_foundation', 'Dutch foundation (Stichting)'),
(r'^Fondazione\s', 'italian_foundation', 'Italian foundation (Fondazione)'),
(r'^ICOM\s', 'icom_organization', 'ICOM organization'),
(r'^Google\s', 'company_profile', 'Google company profile'),
(r'^TheMuseumsLab$', 'organization', 'TheMuseumsLab organization'),
(r'^Sound\s+Heritage$', 'organization', 'Sound Heritage organization'),
(r'^Computational\s+Research$', 'organization', 'Computational Research organization'),
(r'Museum$', 'museum_suffix', 'Name ends with "Museum" - likely institution'),
(r'Foundation$', 'foundation_suffix', 'Name ends with "Foundation" - likely institution'),
(r'Institute$', 'institute_suffix', 'Name ends with "Institute" - likely institution'),
(r'Organisation$', 'org_suffix', 'Name ends with "Organisation" - likely institution'),
(r'Organization$', 'org_suffix', 'Name ends with "Organization" - likely institution'),
(r'University$', 'university_suffix', 'Name ends with "University" - likely institution'),
(r'Library$', 'library_suffix', 'Name ends with "Library" - likely institution'),
(r'Archive$', 'archive_suffix', 'Name ends with "Archive" - likely institution'),
(r'Archief$', 'archive_suffix', 'Name ends with "Archief" (Dutch archive) - likely institution'),
(r'Bibliotheek$', 'library_suffix', 'Name ends with "Bibliotheek" (Dutch library) - likely institution'),
]
# Patterns that suggest this is a PERSON
PERSON_INDICATORS = [
(r'^(Dr|Prof|Mr|Mrs|Ms|Drs|Ir|Ing)\.\s', 'title_prefix', 'Has personal title prefix'),
(r'\s(PhD|MA|MSc|MBA|BSc|Jr|Sr)$', 'degree_suffix', 'Has degree/suffix'),
(r'^[A-Z][a-z]+\s+[A-Z][a-z]+$', 'two_word_name', 'Simple two-word personal name pattern'),
(r'^[A-Z][a-z]+\s+(van|de|den|der|von|van der|van den|van de)\s+[A-Z]', 'dutch_name', 'Dutch personal name with particle'),
]
def extract_linkedin_slug(url: Optional[str]) -> Optional[str]:
"""Extract LinkedIn slug from URL - this is the UNIQUE IDENTIFIER."""
if not url or 'linkedin.com/in/' not in url:
return None
slug = url.split('linkedin.com/in/')[-1].rstrip('/').split('?')[0]
slug = unquote(slug)
return slug.lower()
def get_linkedin_slug_from_profile(data: Dict) -> Optional[str]:
"""Extract LinkedIn slug from profile data."""
# Try profile_data.linkedin_url first
linkedin_url = data.get('profile_data', {}).get('linkedin_url')
if linkedin_url:
slug = extract_linkedin_slug(linkedin_url)
if slug:
return slug
# Try linkedin_slug field
if data.get('linkedin_slug'):
return data['linkedin_slug'].lower()
# Try person_id (often is the slug)
if data.get('person_id'):
return data['person_id'].lower()
return None
def classify_profile(name: str, profile_data: Dict) -> Dict[str, Any]:
"""Classify profile as human, institution, anonymous, or unknown."""
if not name:
return {
'primary_classification': 'unknown',
'confidence': 0.0,
'indicators': [{'type': 'empty_name', 'reason': 'Name field is empty'}],
'reasoning': 'Cannot classify - name is empty'
}
if name == 'LinkedIn Member':
headline = profile_data.get('headline', '')
return {
'primary_classification': 'anonymous',
'confidence': 0.9,
'indicators': [
{'type': 'linkedin_member', 'reason': 'LinkedIn privacy settings hide real name'},
{'type': 'has_headline', 'value': headline[:50] if headline else None},
],
'reasoning': f'Anonymous LinkedIn profile with privacy settings. Has headline: {bool(headline)}'
}
institution_matches = []
person_matches = []
for pattern, indicator_type, reason in INSTITUTION_INDICATORS:
if re.search(pattern, name, re.IGNORECASE):
institution_matches.append({
'type': indicator_type,
'pattern': pattern,
'reason': reason
})
for pattern, indicator_type, reason in PERSON_INDICATORS:
if re.search(pattern, name, re.IGNORECASE):
person_matches.append({
'type': indicator_type,
'pattern': pattern,
'reason': reason
})
linkedin_url = profile_data.get('linkedin_url', '')
if linkedin_url and '/in/' in linkedin_url:
person_matches.append({
'type': 'personal_linkedin_url',
'reason': 'Has personal LinkedIn /in/ URL'
})
if institution_matches and not person_matches:
return {
'primary_classification': 'institution',
'confidence': min(0.5 + 0.1 * len(institution_matches), 0.9),
'indicators': institution_matches,
'reasoning': f'Matched {len(institution_matches)} institution pattern(s), no person patterns'
}
elif person_matches and not institution_matches:
return {
'primary_classification': 'human',
'confidence': min(0.5 + 0.15 * len(person_matches), 0.95),
'indicators': person_matches,
'reasoning': f'Matched {len(person_matches)} person pattern(s), no institution patterns'
}
elif person_matches and institution_matches:
if any(i['type'] == 'personal_linkedin_url' for i in person_matches):
return {
'primary_classification': 'human',
'confidence': 0.7,
'indicators': person_matches + institution_matches,
'reasoning': f'Conflicting patterns but has personal LinkedIn URL - likely human'
}
return {
'primary_classification': 'unknown',
'confidence': 0.3,
'indicators': person_matches + institution_matches,
'reasoning': f'Conflicting patterns: {len(person_matches)} person, {len(institution_matches)} institution'
}
else:
return {
'primary_classification': 'human',
'confidence': 0.6,
'indicators': [{'type': 'default', 'reason': 'No specific patterns matched, defaulting to human'}],
'reasoning': 'No specific patterns matched - assuming human (default)'
}
def normalize_name_for_ppid(name: str) -> str:
"""Convert name to PPID format: FIRST-LAST"""
if not name:
return "UNKNOWN"
name = re.sub(r'\b(Dr|Prof|Mr|Mrs|Ms|PhD|MA|MSc|MBA|BSc|Jr|Sr|PSM|GIA|GG)\b\.?', '', name, flags=re.IGNORECASE)
parts = [p.strip() for p in name.split() if p.strip()]
if not parts:
return "UNKNOWN"
def normalize_part(p):
nfkd = unicodedata.normalize('NFKD', p)
ascii_name = ''.join(c for c in nfkd if not unicodedata.combining(c))
return re.sub(r'[^A-Za-z0-9]', '', ascii_name).upper()
normalized = [normalize_part(p) for p in parts if normalize_part(p)]
return '-'.join(normalized) if normalized else "UNKNOWN"
def generate_ppid(name: str, entity_data: Dict = None) -> str:
"""Generate PPID from name (locations/dates use XX placeholders)."""
if name == 'LinkedIn Member' and entity_data:
affiliations = entity_data.get('affiliations', [])
headline = entity_data.get('profile_data', {}).get('headline', '') if entity_data.get('profile_data') else ''
if affiliations and isinstance(affiliations, list) and len(affiliations) > 0:
org = affiliations[0].get('custodian_name', 'UNKNOWN-ORG')
org_token = normalize_name_for_ppid(org)[:20]
if headline:
role_words = []
for word in headline.split()[:3]:
normalized = normalize_name_for_ppid(word)
if normalized and len(normalized) > 2:
role_words.append(normalized)
role_token = '-'.join(role_words[:2]) if role_words else 'STAFF'
else:
role_token = 'STAFF'
name_token = f"ANON-{org_token}-{role_token[:15]}"
else:
name_token = "LINKEDIN-MEMBER"
else:
name_token = normalize_name_for_ppid(name)
return f"ID_XX-XX-XXX_XXXX_XX-XX-XXX_XXXX_{name_token}"
def transform_entity_to_ppid(entity_data: Dict, entity_file_name: str) -> Tuple[str, Dict]:
"""Transform entity profile to PPID format, preserving ALL data."""
name = entity_data.get('profile_data', {}).get('name') or entity_data.get('name', 'Unknown')
ppid = generate_ppid(name, entity_data)
profile_data = entity_data.get('profile_data', {})
profile_data_for_classification = {**profile_data}
if 'affiliations' not in profile_data_for_classification:
profile_data_for_classification['affiliations'] = entity_data.get('affiliations', [])
classification = classify_profile(name, profile_data_for_classification)
is_anonymous = (name == 'LinkedIn Member')
if is_anonymous:
name_tokens = ppid.split('_')[-1].split('-')
else:
name_tokens = normalize_name_for_ppid(name).split('-')
# Get LinkedIn slug for this profile
linkedin_slug = get_linkedin_slug_from_profile(entity_data)
ppid_profile = {
"ppid": ppid,
"ppid_type": "ID",
"ppid_components": {
"type": "ID",
"first_location": "XX-XX-XXX",
"first_date": "XXXX",
"last_location": "XX-XX-XXX",
"last_date": "XXXX",
"name_tokens": name_tokens
},
"name": name,
"linkedin_slug": linkedin_slug, # Store slug at top level for easy deduplication
"birth_date": {
"edtf": "XXXX",
"precision": "unknown",
"note": "Not yet enriched - requires manual research"
},
"is_living": True,
"is_anonymous": is_anonymous,
"profile_classification": classification,
"heritage_relevance": entity_data.get('heritage_relevance', {
"is_heritage_relevant": True,
"heritage_types": [],
"rationale": "Extracted from heritage custodian LinkedIn page"
}),
"affiliations": entity_data.get('affiliations', []),
"profile_data": entity_data.get('profile_data', {}),
"web_claims": entity_data.get('web_claims', []),
"source_observations": entity_data.get('source_observations', []),
"extraction_metadata": entity_data.get('extraction_metadata', {}),
"migration_metadata": {
"original_entity_file": entity_file_name,
"original_person_id": entity_data.get('person_id'),
"original_linkedin_slug": linkedin_slug,
"migrated_at": datetime.now(timezone.utc).isoformat(),
"migration_script": "migrate_entity_to_ppid_v5.py",
"migration_version": "5.0"
}
}
return ppid, ppid_profile
def build_existing_linkedin_slugs(person_dir: Path) -> Set[str]:
"""Build set of LinkedIn slugs already in data/person/."""
existing_slugs = set()
for f in person_dir.glob('ID_*.json'):
try:
with open(f) as fp:
data = json.load(fp)
slug = get_linkedin_slug_from_profile(data)
if slug:
existing_slugs.add(slug)
except:
pass
return existing_slugs
def build_existing_ppid_filenames(person_dir: Path) -> Set[str]:
"""Build set of existing PPID filenames (for collision detection)."""
return {f.stem for f in person_dir.glob('ID_*.json')}
# Global sets - populated in main, used by worker processes
EXISTING_LINKEDIN_SLUGS: Set[str] = set()
EXISTING_PPID_FILENAMES: Set[str] = set()
def init_worker(existing_slugs: Set[str], existing_ppids: Set[str]):
"""Initialize worker process with shared data."""
global EXISTING_LINKEDIN_SLUGS, EXISTING_PPID_FILENAMES
EXISTING_LINKEDIN_SLUGS = existing_slugs
EXISTING_PPID_FILENAMES = existing_ppids
def process_entity_file(args):
"""Process a single entity file.
Returns: (status, detail, classification, linkedin_slug, file_path)
- status: 'migrated', 'duplicate', 'no_slug', 'error'
"""
entity_file_path, person_dir, dry_run, new_slugs_lock_file = args
try:
with open(entity_file_path) as f:
data = json.load(f)
# Get LinkedIn slug - this is the DEDUPLICATION KEY
linkedin_slug = get_linkedin_slug_from_profile(data)
if not linkedin_slug:
# No LinkedIn slug - can't dedupe, but still migrate with UUID
name = data.get('profile_data', {}).get('name') or data.get('name', '')
ppid, ppid_profile = transform_entity_to_ppid(data, Path(entity_file_path).name)
classification = ppid_profile['profile_classification']['primary_classification']
# Always add UUID for no-slug profiles to avoid collisions
short_uuid = str(uuid.uuid4())[:8]
output_ppid = f"{ppid}-{short_uuid}"
ppid_profile['ppid'] = output_ppid
ppid_profile['ppid_components']['collision_uuid'] = short_uuid
ppid_profile['ppid_components']['no_linkedin_slug'] = True
output_file = Path(person_dir) / f"{output_ppid}.json"
if not dry_run:
with open(output_file, 'w') as f:
json.dump(ppid_profile, f, indent=2, ensure_ascii=False)
return ('no_slug', output_ppid, classification, None, str(entity_file_path))
# Check if LinkedIn slug already exists in data/person/
if linkedin_slug in EXISTING_LINKEDIN_SLUGS:
return ('duplicate', linkedin_slug, 'skipped', linkedin_slug, str(entity_file_path))
# Check if we've already processed this slug in this batch
# (Read from lock file to handle multiprocessing)
try:
if Path(new_slugs_lock_file).exists():
with open(new_slugs_lock_file) as lf:
batch_slugs = set(line.strip() for line in lf)
if linkedin_slug in batch_slugs:
return ('duplicate', linkedin_slug, 'skipped', linkedin_slug, str(entity_file_path))
except:
pass
# Transform to PPID
ppid, ppid_profile = transform_entity_to_ppid(data, Path(entity_file_path).name)
classification = ppid_profile['profile_classification']['primary_classification']
# Check for PPID filename collision (different person, same name)
output_ppid = ppid
if ppid in EXISTING_PPID_FILENAMES:
short_uuid = str(uuid.uuid4())[:8]
output_ppid = f"{ppid}-{short_uuid}"
ppid_profile['ppid'] = output_ppid
ppid_profile['ppid_components']['collision_uuid'] = short_uuid
output_file = Path(person_dir) / f"{output_ppid}.json"
# Double-check file doesn't exist (race condition protection)
while output_file.exists():
short_uuid = str(uuid.uuid4())[:8]
output_ppid = f"{ppid}-{short_uuid}"
ppid_profile['ppid'] = output_ppid
ppid_profile['ppid_components']['collision_uuid'] = short_uuid
output_file = Path(person_dir) / f"{output_ppid}.json"
if not dry_run:
with open(output_file, 'w') as f:
json.dump(ppid_profile, f, indent=2, ensure_ascii=False)
# Record this slug as processed
with open(new_slugs_lock_file, 'a') as lf:
lf.write(linkedin_slug + '\n')
return ('migrated', output_ppid, classification, linkedin_slug, str(entity_file_path))
except Exception as e:
return ('error', str(e), 'error', None, str(entity_file_path))
def main():
parser = argparse.ArgumentParser(description='Migrate entity profiles to PPID (v5 - LinkedIn slug dedup)')
parser.add_argument('--dry-run', action='store_true', help='Preview only, no file changes')
parser.add_argument('--limit', type=int, default=None, help='Limit number of profiles to process')
parser.add_argument('--workers', type=int, default=4, help='Number of parallel workers')
parser.add_argument('--verbose', action='store_true', help='Show each migrated file')
args = parser.parse_args()
entity_dir = Path('/Users/kempersc/apps/glam/data/custodian/person/entity')
person_dir = Path('/Users/kempersc/apps/glam/data/person')
print("=" * 70)
print("PPID MIGRATION SCRIPT v5.0 (LinkedIn Slug Deduplication)")
print("=" * 70)
print(" DEDUPLICATION KEY: LinkedIn slug (NOT PPID)")
print(" PPID collisions: Resolved with UUID suffix")
# Phase 1: Build index of existing LinkedIn slugs
print("\nPhase 1: Indexing existing LinkedIn slugs in data/person/...")
existing_slugs = build_existing_linkedin_slugs(person_dir)
print(f" Found {len(existing_slugs):,} existing LinkedIn slugs")
# Phase 2: Build index of existing PPID filenames
print("\nPhase 2: Indexing existing PPID filenames...")
existing_ppids = build_existing_ppid_filenames(person_dir)
print(f" Found {len(existing_ppids):,} existing PPID files")
# Phase 3: List entity files
print("\nPhase 3: Listing entity files...")
entity_files = list(entity_dir.glob('*.json'))
total_entity = len(entity_files)
print(f" Found {total_entity:,} entity files")
if args.limit:
entity_files = entity_files[:args.limit]
print(f" Limited to {args.limit} files for this run")
# Create temp file for tracking processed slugs
lock_file = person_dir / '.migration_slugs_temp.txt'
if lock_file.exists():
lock_file.unlink()
# Phase 4: Process files (sequential for proper dedup)
print(f"\nPhase 4: Processing files (dry_run={args.dry_run})...")
print(" Using sequential processing for reliable deduplication")
global EXISTING_LINKEDIN_SLUGS, EXISTING_PPID_FILENAMES
EXISTING_LINKEDIN_SLUGS = existing_slugs
EXISTING_PPID_FILENAMES = existing_ppids
results = {'migrated': 0, 'duplicate': 0, 'no_slug': 0, 'error': 0}
classifications = {'human': 0, 'institution': 0, 'anonymous': 0, 'unknown': 0, 'skipped': 0}
new_slugs = set()
samples = []
for i, entity_file in enumerate(entity_files):
result = process_entity_file((str(entity_file), str(person_dir), args.dry_run, str(lock_file)))
status, detail, classification, linkedin_slug, file_path = result
results[status] += 1
classifications[classification] = classifications.get(classification, 0) + 1
if status == 'migrated':
if linkedin_slug:
EXISTING_LINKEDIN_SLUGS.add(linkedin_slug)
new_slugs.add(linkedin_slug)
EXISTING_PPID_FILENAMES.add(detail.split('/')[-1].replace('.json', ''))
if len(samples) < 5:
samples.append((detail, classification, Path(file_path).name))
if status == 'error':
print(f" ERROR: {file_path}: {detail}")
# Progress every 1000
if (i + 1) % 1000 == 0:
pct = ((i + 1) / len(entity_files)) * 100
print(f" Progress: {i+1:,}/{len(entity_files):,} ({pct:.1f}%) - "
f"Migrated:{results['migrated']:,} Dup:{results['duplicate']:,} "
f"NoSlug:{results['no_slug']:,} Err:{results['error']}")
# Cleanup
if lock_file.exists():
lock_file.unlink()
# Summary
print("\n" + "=" * 70)
print(f"{'DRY RUN ' if args.dry_run else ''}MIGRATION SUMMARY")
print("=" * 70)
print(f" Total processed: {sum(results.values()):,}")
print(f" Successfully migrated: {results['migrated']:,}")
print(f" Duplicates skipped (slug exists): {results['duplicate']:,}")
print(f" No LinkedIn slug (migrated with UUID): {results['no_slug']:,}")
print(f" Errors: {results['error']}")
print(f"\n Classification breakdown (migrated only):")
print(f" Human: {classifications.get('human', 0):,}")
print(f" Institution: {classifications.get('institution', 0):,}")
print(f" Anonymous: {classifications.get('anonymous', 0):,}")
print(f" Unknown: {classifications.get('unknown', 0):,}")
if samples:
print(f"\n Sample migrated profiles:")
for ppid, classification, source in samples:
print(f" [{classification:11}] {ppid[:55]}... <- {source[:35]}...")
if args.dry_run:
print(f"\n To execute migration, run without --dry-run flag")
else:
final_count = len(list(person_dir.glob('ID_*.json')))
print(f"\n Migration complete!")
print(f" Final PPID count: {final_count:,}")
print(f" New unique LinkedIn slugs added: {len(new_slugs):,}")
if __name__ == '__main__':
main()