glam/scripts/migrate_wcms_resume.py

357 lines
12 KiB
Python

#!/usr/bin/env python3
"""
Fast WCMS migration with state file checkpointing.
This is an optimized version that:
1. Uses a state file to track processed user IDs (no scanning 190K+ files)
2. Processes in batches with checkpoints
3. Resumes from where it left off
Usage:
python scripts/migrate_wcms_resume.py --batch-size 10000
python scripts/migrate_wcms_resume.py --dry-run --limit 100
"""
import json
import argparse
import re
import uuid
from pathlib import Path
from datetime import datetime, timezone
import unicodedata
from typing import Dict, Optional, Set
# Paths
WCMS_USERS_DIR = Path('/Volumes/KINGSTON/data/wcms/data/person_profiles/users')
WCMS_USERS_NEW_DIR = Path('/Volumes/KINGSTON/data/wcms/data/person_profiles/users_new')
PERSON_DIR = Path('/Users/kempersc/apps/glam/data/person')
STATE_FILE = Path('/Users/kempersc/apps/glam/data/wcms_migration_state.json')
def normalize_name_for_ppid(name: str) -> str:
"""Convert name to PPID format: FIRST-LAST"""
if not name:
return "UNKNOWN"
name = re.sub(r'\b(Dr|Prof|Mr|Mrs|Ms|PhD|MA|MSc|MBA|BSc|Jr|Sr)\b\.?', '', name, flags=re.IGNORECASE)
parts = [p.strip() for p in name.split() if p.strip()]
if not parts:
return "UNKNOWN"
def normalize_part(p):
nfkd = unicodedata.normalize('NFKD', p)
ascii_name = ''.join(c for c in nfkd if not unicodedata.combining(c))
return re.sub(r'[^A-Za-z0-9]', '', ascii_name).upper()
normalized = [normalize_part(p) for p in parts if normalize_part(p)]
return '-'.join(normalized) if normalized else "UNKNOWN"
def extract_email_domain(email: str) -> Optional[str]:
"""Extract domain from email."""
if not email or '@' not in email:
return None
return email.split('@')[-1].lower()
def generate_wcms_ppid(name: str) -> str:
"""Generate PPID for WCMS user."""
name_token = normalize_name_for_ppid(name)
return f"ID_XX-XX-XXX_XXXX_XX-XX-XXX_XXXX_{name_token}"
def load_state() -> dict:
"""Load migration state from file."""
if STATE_FILE.exists():
with open(STATE_FILE) as f:
return json.load(f)
return {
'processed_user_ids': [],
'existing_ppids': [],
'last_checkpoint': None,
'stats': {'migrated': 0, 'duplicate': 0, 'error': 0}
}
def save_state(state: dict):
"""Save migration state to file."""
state['last_checkpoint'] = datetime.now(timezone.utc).isoformat()
with open(STATE_FILE, 'w') as f:
json.dump(state, f)
def transform_wcms_to_ppid(wcms_data: dict, source_file: str) -> tuple:
"""Transform WCMS user data to PPID profile format."""
name = wcms_data.get('full_name') or wcms_data.get('username') or 'Unknown'
ppid = generate_wcms_ppid(name)
email_domain = extract_email_domain(wcms_data.get('email', ''))
# Find Wikipedia URL from entity_resolution if present
wikipedia_url = wcms_data.get('entity_resolution', {}).get('wikipedia_url')
ppid_profile = {
"ppid": ppid,
"ppid_type": "ID",
"ppid_components": {
"type": "ID",
"first_location": "XX-XX-XXX",
"first_date": "XXXX",
"last_location": "XX-XX-XXX",
"last_date": "XXXX",
"name_tokens": normalize_name_for_ppid(name).split('-')
},
"name": name,
"birth_date": {
"edtf": "XXXX",
"precision": "unknown",
"note": "Not available from WCMS"
},
"is_living": True,
"is_anonymous": False,
# WCMS-specific identifiers
"wcms_identifiers": {
"user_id": wcms_data.get('user_id'),
"username": wcms_data.get('username'),
"username_url": wcms_data.get('username_url'),
"abs_id": wcms_data.get('abs_id'),
"crm_id": wcms_data.get('crm_id'),
},
# CONTACT DETAILS - FULL email preserved
"contact_details": {
"email": wcms_data.get('email'),
"email_domain": email_domain,
},
# Activity data
"wcms_activity": {
"status": wcms_data.get('status'),
"roles": wcms_data.get('roles', []),
"registered_since": wcms_data.get('registered_since'),
"last_access": wcms_data.get('last_access'),
"operations": wcms_data.get('operations', []),
},
# Entity resolution - no auto-matching in this fast version
"entity_resolution": {
"potential_linkedin_matches": 0,
"wikipedia_url": wikipedia_url,
"match_candidates": [],
"requires_manual_review": False,
"auto_merged": False,
"reviewed": False,
"review_notes": None,
},
"profile_classification": {
"primary_classification": "human",
"confidence": 0.95,
"indicators": [{"type": "wcms_user", "reason": "Registered user in heritage CMS system"}],
"reasoning": "WCMS user profile - registered heritage sector CMS user"
},
"data_sources": ["wcms"],
"extraction_metadata": {
"extraction_agent": "migrate_wcms_resume.py",
"extraction_date": datetime.now(timezone.utc).isoformat(),
"source_file": source_file,
"source_system": "WCMS",
"schema_version": "1.0.0"
},
"migration_metadata": {
"original_wcms_file": source_file,
"original_user_id": wcms_data.get('user_id'),
"migrated_at": datetime.now(timezone.utc).isoformat(),
"migration_script": "migrate_wcms_resume.py",
"migration_version": "2.0"
}
}
return ppid, ppid_profile
def main():
parser = argparse.ArgumentParser(description='Fast WCMS migration with checkpointing')
parser.add_argument('--dry-run', action='store_true', help='Preview only')
parser.add_argument('--limit', type=int, default=None, help='Limit files to process')
parser.add_argument('--batch-size', type=int, default=5000, help='Save checkpoint every N files')
parser.add_argument('--rebuild-state', action='store_true', help='Rebuild state from existing files')
args = parser.parse_args()
print("=" * 70)
print("WCMS MIGRATION (RESUME MODE)")
print("=" * 70)
# Check KINGSTON mount
if not WCMS_USERS_DIR.exists():
print(f"\nERROR: KINGSTON not mounted: {WCMS_USERS_DIR}")
return
# Load or rebuild state
if args.rebuild_state or not STATE_FILE.exists():
print("\nPhase 1: Building state from existing files...")
print(" This may take a while for 190K+ files...")
existing_ppids = set()
processed_user_ids = set()
count = 0
for f in PERSON_DIR.glob('ID_*.json'):
count += 1
if count % 10000 == 0:
print(f" Indexed {count:,} files...")
try:
existing_ppids.add(f.stem)
with open(f) as fp:
data = json.load(fp)
uid = data.get('wcms_identifiers', {}).get('user_id')
if uid:
processed_user_ids.add(uid)
except:
pass
state = {
'processed_user_ids': list(processed_user_ids),
'existing_ppids': list(existing_ppids),
'last_checkpoint': datetime.now(timezone.utc).isoformat(),
'stats': {'migrated': len(processed_user_ids), 'duplicate': 0, 'error': 0}
}
save_state(state)
print(f" State built: {len(processed_user_ids):,} WCMS user IDs, {len(existing_ppids):,} PPIDs")
else:
print("\nPhase 1: Loading state from file...")
state = load_state()
print(f" Loaded: {len(state['processed_user_ids']):,} processed WCMS user IDs")
print(f" Last checkpoint: {state['last_checkpoint']}")
processed_user_ids = set(state['processed_user_ids'])
existing_ppids = set(state['existing_ppids'])
stats = state['stats']
# Collect WCMS files
print("\nPhase 2: Collecting WCMS files...")
wcms_files = []
# Use recursive glob to find files in subdirectories AND at top level
for f in WCMS_USERS_DIR.glob('**/user_*.json'):
if not f.name.startswith('._'): # Skip macOS hidden files
wcms_files.append(('users', f))
if WCMS_USERS_NEW_DIR.exists():
for f in WCMS_USERS_NEW_DIR.glob('*.json'):
if not f.name.startswith('._'):
wcms_files.append(('users_new', f))
print(f" Found {len(wcms_files):,} WCMS source files")
print(f" Already processed: {len(processed_user_ids):,}")
print(f" Remaining: ~{len(wcms_files) - len(processed_user_ids):,}")
if args.limit:
wcms_files = wcms_files[:args.limit]
print(f" Limited to {args.limit} files")
# Process WCMS files
print(f"\nPhase 3: Processing (dry_run={args.dry_run}, batch_size={args.batch_size})...")
batch_migrated = 0
batch_skipped = 0
batch_errors = 0
for i, (source_type, wcms_file) in enumerate(wcms_files):
try:
with open(wcms_file) as f:
wcms_data = json.load(f)
user_id = wcms_data.get('user_id')
# Skip if already processed
if user_id and user_id in processed_user_ids:
batch_skipped += 1
continue
# Transform to PPID
ppid, ppid_profile = transform_wcms_to_ppid(
wcms_data,
f"{source_type}/{wcms_file.name}"
)
# Handle PPID filename collision
output_ppid = ppid
if ppid in existing_ppids:
short_uuid = str(uuid.uuid4())[:8]
output_ppid = f"{ppid}-{short_uuid}"
ppid_profile['ppid'] = output_ppid
ppid_profile['ppid_components']['collision_uuid'] = short_uuid
output_file = PERSON_DIR / f"{output_ppid}.json"
# Double-check file doesn't exist
while output_file.exists():
short_uuid = str(uuid.uuid4())[:8]
output_ppid = f"{ppid}-{short_uuid}"
ppid_profile['ppid'] = output_ppid
ppid_profile['ppid_components']['collision_uuid'] = short_uuid
output_file = PERSON_DIR / f"{output_ppid}.json"
if not args.dry_run:
with open(output_file, 'w') as f:
json.dump(ppid_profile, f, indent=2, ensure_ascii=False)
existing_ppids.add(output_ppid)
if user_id:
processed_user_ids.add(user_id)
batch_migrated += 1
stats['migrated'] += 1
except Exception as e:
batch_errors += 1
stats['error'] += 1
if batch_errors <= 5:
print(f" ERROR: {wcms_file.name}: {e}")
# Progress and checkpoint
if (i + 1) % args.batch_size == 0:
pct = ((i + 1) / len(wcms_files)) * 100
print(f" Progress: {i+1:,}/{len(wcms_files):,} ({pct:.1f}%) - "
f"Batch: +{batch_migrated:,} new, {batch_skipped:,} skip, {batch_errors} err")
# Save checkpoint
if not args.dry_run:
state['processed_user_ids'] = list(processed_user_ids)
state['existing_ppids'] = list(existing_ppids)
state['stats'] = stats
save_state(state)
print(f" Checkpoint saved at {i+1:,}")
batch_migrated = 0
batch_skipped = 0
batch_errors = 0
# Final checkpoint
if not args.dry_run:
state['processed_user_ids'] = list(processed_user_ids)
state['existing_ppids'] = list(existing_ppids)
state['stats'] = stats
save_state(state)
# Summary
print("\n" + "=" * 70)
print(f"{'DRY RUN ' if args.dry_run else ''}MIGRATION SUMMARY")
print("=" * 70)
print(f" Total WCMS source files: {len(wcms_files):,}")
print(f" Total migrated (cumulative): {stats['migrated']:,}")
print(f" Errors: {stats['error']}")
print(f" WCMS user IDs in state: {len(processed_user_ids):,}")
print(f" PPID files tracked: {len(existing_ppids):,}")
if not args.dry_run:
print(f"\n State saved to: {STATE_FILE}")
if __name__ == '__main__':
main()