glam/scripts/migrate_wcms_users.py
kempersc 416aa407cc Add new slots for financial and heritage documentation
- Introduced total expense, total frames analyzed, total investment, total liability, total net asset, and traditional product slots to enhance financial reporting capabilities.
- Added transition types detected, treatment description, type hypothesis, typical condition, typical HTTP methods, typical response formats, and typical scope slots for improved heritage documentation.
- Implemented user community, verified, web observation, WhatsApp business likelihood, wikidata equivalent, and wikidata mapping slots to enrich institutional data representation.
- Established has_or_had_asset, has_or_had_budget, has_or_had_expense, and is_or_was_threatened_by slots to capture asset, budget, expense relationships, and threats to heritage forms.
2026-01-15 19:35:39 +01:00

448 lines
16 KiB
Python

#!/usr/bin/env python3
"""
Migrate WCMS user profiles to data/person/ with entity resolution signals.
This script:
1. Imports ALL WCMS profiles as separate records (NO auto-merging)
2. Extracts entity resolution signals (name, email domain, CRM ID)
3. Flags potential matches with existing LinkedIn profiles
4. Stores match candidates for manual review
CRITICAL: We do NOT auto-merge on name alone. Entity resolution requires:
- Multiple matching signals (employer, location, career, age)
- Human verification for ambiguous cases
- Full provenance tracking
Usage:
python scripts/migrate_wcms_users.py --dry-run --limit 100
python scripts/migrate_wcms_users.py --dry-run
python scripts/migrate_wcms_users.py
"""
import json
import argparse
import re
import uuid
from pathlib import Path
from datetime import datetime, timezone
import unicodedata
from typing import Dict, List, Tuple, Any, Optional, Set
from collections import defaultdict
# WCMS source paths
WCMS_USERS_DIR = Path('/Volumes/KINGSTON/data/wcms/data/person_profiles/users')
WCMS_USERS_NEW_DIR = Path('/Volumes/KINGSTON/data/wcms/data/person_profiles/users_new')
PERSON_DIR = Path('/Users/kempersc/apps/glam/data/person')
def normalize_name(name: str) -> str:
"""Normalize name for comparison (lowercase, no diacritics, no titles)."""
if not name:
return ""
# Remove titles
name = re.sub(r'\b(Dr|Prof|Mr|Mrs|Ms|Drs|Ir|Ing|PhD|MA|MSc|MBA|BSc)\b\.?', '', name, flags=re.IGNORECASE)
# Normalize unicode
nfkd = unicodedata.normalize('NFKD', name)
ascii_name = ''.join(c for c in nfkd if not unicodedata.combining(c))
# Lowercase and clean
return re.sub(r'[^a-z\s]', '', ascii_name.lower()).strip()
def normalize_name_for_ppid(name: str) -> str:
"""Convert name to PPID format: FIRST-LAST"""
if not name:
return "UNKNOWN"
name = re.sub(r'\b(Dr|Prof|Mr|Mrs|Ms|PhD|MA|MSc|MBA|BSc|Jr|Sr)\b\.?', '', name, flags=re.IGNORECASE)
parts = [p.strip() for p in name.split() if p.strip()]
if not parts:
return "UNKNOWN"
def normalize_part(p):
nfkd = unicodedata.normalize('NFKD', p)
ascii_name = ''.join(c for c in nfkd if not unicodedata.combining(c))
return re.sub(r'[^A-Za-z0-9]', '', ascii_name).upper()
normalized = [normalize_part(p) for p in parts if normalize_part(p)]
return '-'.join(normalized) if normalized else "UNKNOWN"
def extract_email_domain(email: str) -> Optional[str]:
"""Extract domain from email for entity resolution."""
if not email or '@' not in email:
return None
return email.split('@')[-1].lower()
def generate_wcms_ppid(name: str) -> str:
"""Generate PPID for WCMS user (same ID_ prefix as LinkedIn profiles)."""
name_token = normalize_name_for_ppid(name)
# Use same ID_ prefix as LinkedIn profiles - source tracked in metadata
return f"ID_XX-XX-XXX_XXXX_XX-XX-XXX_XXXX_{name_token}"
def build_linkedin_name_index(person_dir: Path) -> Dict[str, List[Dict]]:
"""Build index of existing LinkedIn profiles by normalized name.
Returns: {normalized_name: [list of profile summaries]}
"""
print(" Building LinkedIn name index for entity resolution...")
name_index = defaultdict(list)
count = 0
for f in person_dir.glob('ID_*.json'):
try:
with open(f) as fp:
data = json.load(fp)
name = data.get('name', '')
if not name:
continue
normalized = normalize_name(name)
if not normalized:
continue
# Extract signals for entity resolution
profile_summary = {
'file': f.name,
'ppid': data.get('ppid'),
'name': name,
'linkedin_slug': data.get('linkedin_slug'),
'headline': data.get('profile_data', {}).get('headline', ''),
'location': data.get('profile_data', {}).get('location', ''),
'affiliations': [a.get('custodian_name', '') for a in data.get('affiliations', [])],
}
name_index[normalized].append(profile_summary)
count += 1
except:
pass
print(f" Indexed {count:,} LinkedIn profiles by name")
print(f" Unique normalized names: {len(name_index):,}")
return name_index
def find_potential_matches(wcms_profile: Dict, name_index: Dict[str, List[Dict]]) -> List[Dict]:
"""Find potential LinkedIn matches for a WCMS user.
Returns list of potential matches with match signals.
DOES NOT AUTO-MERGE - just flags for manual review.
"""
full_name = wcms_profile.get('full_name', '')
normalized = normalize_name(full_name)
if not normalized or normalized not in name_index:
return []
candidates = name_index[normalized]
# Add match analysis for each candidate
matches = []
for candidate in candidates:
match_signals = {
'name_match': True,
'normalized_name': normalized,
}
# Email domain could match employer domain
email = wcms_profile.get('email', '')
email_domain = extract_email_domain(email)
if email_domain:
match_signals['wcms_email_domain'] = email_domain
# Check if email domain appears in affiliations
for aff in candidate.get('affiliations', []):
if email_domain.split('.')[0] in aff.lower():
match_signals['email_domain_in_affiliation'] = True
break
matches.append({
'linkedin_profile': candidate,
'match_signals': match_signals,
'requires_manual_review': True,
'auto_merge_allowed': False, # NEVER auto-merge
})
return matches
def transform_wcms_to_ppid(wcms_data: Dict, source_file: str, potential_matches: List[Dict]) -> Tuple[str, Dict]:
"""Transform WCMS profile to PPID format."""
full_name = wcms_data.get('full_name', 'Unknown')
ppid = generate_wcms_ppid(full_name)
# Extract email domain for entity resolution
email = wcms_data.get('email', '')
email_domain = extract_email_domain(email)
ppid_profile = {
"ppid": ppid,
"ppid_type": "ID", # Same type as LinkedIn - source tracked in metadata
"ppid_components": {
"type": "ID",
"first_location": "XX-XX-XXX",
"first_date": "XXXX",
"last_location": "XX-XX-XXX",
"last_date": "XXXX",
"name_tokens": normalize_name_for_ppid(full_name).split('-')
},
"name": full_name,
"birth_date": {
"edtf": "XXXX",
"precision": "unknown",
"note": "Not available from WCMS"
},
"is_living": True,
"is_anonymous": False,
# WCMS-specific identifiers - FULL contact details preserved!
"wcms_identifiers": {
"user_id": wcms_data.get('user_id'),
"username": wcms_data.get('username'),
"username_url": wcms_data.get('username_url'),
"abs_id": wcms_data.get('abs_id'),
"crm_id": wcms_data.get('crm_id'),
},
# CONTACT DETAILS - CRUCIAL! Store full email, not just domain
"contact_details": {
"email": wcms_data.get('email'), # FULL email preserved
"email_domain": email_domain,
},
# Activity data
"wcms_activity": {
"status": wcms_data.get('status'),
"roles": wcms_data.get('roles', []),
"registered_since": wcms_data.get('registered_since'),
"last_access": wcms_data.get('last_access'),
"operations": wcms_data.get('operations', []),
},
# Entity resolution - potential matches with LinkedIn profiles
"entity_resolution": {
"potential_linkedin_matches": len(potential_matches),
"match_candidates": potential_matches[:5], # Store top 5 candidates
"requires_manual_review": len(potential_matches) > 0,
"auto_merged": False,
"reviewed": False,
"review_notes": None,
},
# Classification - indicate this is from WCMS source
"profile_classification": {
"primary_classification": "human", # WCMS users are people
"confidence": 0.95,
"indicators": [{"type": "wcms_user", "reason": "Registered user in heritage CMS system"}],
"reasoning": "WCMS user profile - registered heritage sector CMS user"
},
# Data source tracking
"data_sources": ["wcms"], # Track which systems this person appears in
# Source provenance
"extraction_metadata": {
"extraction_agent": "migrate_wcms_users.py",
"extraction_date": datetime.now(timezone.utc).isoformat(),
"source_file": source_file,
"source_system": "WCMS",
"schema_version": "1.0.0"
},
"migration_metadata": {
"original_wcms_file": source_file,
"original_user_id": wcms_data.get('user_id'),
"migrated_at": datetime.now(timezone.utc).isoformat(),
"migration_script": "migrate_wcms_users.py",
"migration_version": "1.0"
}
}
return ppid, ppid_profile
def main():
parser = argparse.ArgumentParser(description='Migrate WCMS users with entity resolution')
parser.add_argument('--dry-run', action='store_true', help='Preview only, no file changes')
parser.add_argument('--limit', type=int, default=None, help='Limit number of profiles to process')
parser.add_argument('--skip-matching', action='store_true', help='Skip LinkedIn matching (faster)')
args = parser.parse_args()
print("=" * 70)
print("WCMS USER MIGRATION with Entity Resolution")
print("=" * 70)
print(" CRITICAL: No auto-merging on name alone!")
print(" Potential matches flagged for manual review")
# Check KINGSTON mount
if not WCMS_USERS_DIR.exists():
print(f"\n ERROR: KINGSTON not mounted or path not found: {WCMS_USERS_DIR}")
return
# Phase 1: Build LinkedIn name index for entity resolution
if not args.skip_matching:
print("\nPhase 1: Building LinkedIn profile index...")
name_index = build_linkedin_name_index(PERSON_DIR)
else:
print("\nPhase 1: Skipping LinkedIn matching (--skip-matching)")
name_index = {}
# Phase 2: Collect WCMS files
print("\nPhase 2: Collecting WCMS user files...")
wcms_files = []
# Main users directory - scan subdirectories (000/, 001/, etc.)
# WCMS user files are organized in subdirectories by batch number
for f in WCMS_USERS_DIR.rglob('user_*.json'):
# Skip hidden files (macOS metadata)
if f.name.startswith('._'):
continue
wcms_files.append(('users', f))
# users_new directory
if WCMS_USERS_NEW_DIR.exists():
for f in WCMS_USERS_NEW_DIR.glob('*.json'):
if not f.name.startswith('._'):
wcms_files.append(('users_new', f))
print(f" Found {len(wcms_files):,} WCMS user files")
if args.limit:
wcms_files = wcms_files[:args.limit]
print(f" Limited to {args.limit} files")
# Phase 3: Build existing PPID index
print("\nPhase 3: Indexing existing PPID files...")
existing_ppids = {f.stem for f in PERSON_DIR.glob('*.json')}
print(f" Found {len(existing_ppids):,} existing PPID files")
# Also index by WCMS user_id to prevent duplicates
existing_wcms_ids = set()
for f in PERSON_DIR.glob('ID_*.json'):
try:
with open(f) as fp:
data = json.load(fp)
uid = data.get('wcms_identifiers', {}).get('user_id')
if uid:
existing_wcms_ids.add(uid)
except:
pass
print(f" Found {len(existing_wcms_ids):,} existing WCMS user IDs")
# Phase 4: Process WCMS files
print(f"\nPhase 4: Processing WCMS files (dry_run={args.dry_run})...")
results = {'migrated': 0, 'duplicate': 0, 'error': 0}
match_stats = {'with_matches': 0, 'no_matches': 0}
samples = []
for i, (source_type, wcms_file) in enumerate(wcms_files):
try:
with open(wcms_file) as f:
wcms_data = json.load(f)
user_id = wcms_data.get('user_id')
# Check for duplicate by WCMS user_id
if user_id and user_id in existing_wcms_ids:
results['duplicate'] += 1
continue
# Find potential LinkedIn matches
if name_index:
potential_matches = find_potential_matches(wcms_data, name_index)
else:
potential_matches = []
if potential_matches:
match_stats['with_matches'] += 1
else:
match_stats['no_matches'] += 1
# Transform to PPID
ppid, ppid_profile = transform_wcms_to_ppid(
wcms_data,
f"{source_type}/{wcms_file.name}",
potential_matches
)
# Handle PPID filename collision
output_ppid = ppid
if ppid in existing_ppids:
short_uuid = str(uuid.uuid4())[:8]
output_ppid = f"{ppid}-{short_uuid}"
ppid_profile['ppid'] = output_ppid
ppid_profile['ppid_components']['collision_uuid'] = short_uuid
output_file = PERSON_DIR / f"{output_ppid}.json"
# Double-check file doesn't exist
while output_file.exists():
short_uuid = str(uuid.uuid4())[:8]
output_ppid = f"{ppid}-{short_uuid}"
ppid_profile['ppid'] = output_ppid
ppid_profile['ppid_components']['collision_uuid'] = short_uuid
output_file = PERSON_DIR / f"{output_ppid}.json"
if not args.dry_run:
with open(output_file, 'w') as f:
json.dump(ppid_profile, f, indent=2, ensure_ascii=False)
existing_ppids.add(output_ppid)
if user_id:
existing_wcms_ids.add(user_id)
results['migrated'] += 1
if len(samples) < 5:
samples.append((output_ppid, wcms_data.get('full_name', ''), len(potential_matches)))
except Exception as e:
results['error'] += 1
if results['error'] <= 5:
print(f" ERROR: {wcms_file.name}: {e}")
# Progress
if (i + 1) % 10000 == 0:
pct = ((i + 1) / len(wcms_files)) * 100
print(f" Progress: {i+1:,}/{len(wcms_files):,} ({pct:.1f}%) - "
f"Migrated:{results['migrated']:,} Dup:{results['duplicate']:,} "
f"Matches:{match_stats['with_matches']:,} Err:{results['error']}")
# Summary
print("\n" + "=" * 70)
print(f"{'DRY RUN ' if args.dry_run else ''}MIGRATION SUMMARY")
print("=" * 70)
print(f" Total processed: {sum(results.values()):,}")
print(f" Successfully migrated: {results['migrated']:,}")
print(f" Duplicates skipped: {results['duplicate']:,}")
print(f" Errors: {results['error']}")
print(f"\n Entity Resolution:")
print(f" With potential LinkedIn matches: {match_stats['with_matches']:,}")
print(f" No matches found: {match_stats['no_matches']:,}")
print(f" Requires manual review: {match_stats['with_matches']:,}")
if samples:
print(f"\n Sample migrated profiles:")
for ppid, name, match_count in samples:
print(f" {ppid[:50]}... | {name} | {match_count} potential matches")
if args.dry_run:
print(f"\n To execute migration, run without --dry-run flag")
else:
final_count = len(list(PERSON_DIR.glob('*.json')))
print(f"\n Migration complete!")
print(f" Final profile count: {final_count:,}")
if __name__ == '__main__':
main()