#!/usr/bin/env python3 """ Merge WCMS data into LinkedIn person entity files. This script: 1. Builds an email → WCMS data map from data/person/ID_*.json files 2. Uses entity_resolution_candidates to find confirmed/high-confidence matches 3. Updates LinkedIn person entity files (data/custodian/person/entity/) with WCMS data Usage: python scripts/merge_wcms_to_linkedin_profiles.py --dry-run # Preview python scripts/merge_wcms_to_linkedin_profiles.py # Apply changes python scripts/merge_wcms_to_linkedin_profiles.py --confirmed-only # Only confirmed matches """ import argparse import json import os import sys from datetime import datetime, timezone from pathlib import Path from typing import Dict, Optional, Any, List from collections import defaultdict import re def build_wcms_email_index(wcms_dir: Path, verbose: bool = False) -> Dict[str, Dict]: """Build email → WCMS data map from data/person/ID_*.json files.""" print(f"Scanning WCMS person files in {wcms_dir}...") email_index = {} processed = 0 errors = 0 no_email = 0 # Find all ID_*.json files pattern = wcms_dir / "ID_*.json" files = list(wcms_dir.glob("ID_*.json")) total = len(files) print(f"Found {total:,} WCMS person files") for i, filepath in enumerate(files): if i % 50000 == 0 and i > 0: print(f" Processed {i:,}/{total:,} ({i*100//total}%)...") try: with open(filepath, 'r', encoding='utf-8') as f: data = json.load(f) processed += 1 # Extract email from contact_details email = data.get('contact_details', {}).get('email', '').lower().strip() if not email: no_email += 1 continue # Store WCMS data indexed by email email_index[email] = { 'ppid': data.get('ppid'), 'name': data.get('name'), 'wcms_identifiers': data.get('wcms_identifiers', {}), 'wcms_activity': data.get('wcms_activity', {}), 'contact_details': data.get('contact_details', {}), '_source_file': filepath.name } except Exception as e: errors += 1 if verbose: print(f" Error reading {filepath}: {e}") print(f"Built WCMS email index: {len(email_index):,} emails indexed") print(f" - Processed: {processed:,}") print(f" - No email: {no_email:,}") print(f" - Errors: {errors:,}") return email_index def load_entity_candidates(candidates_file: Path, confirmed_only: bool = False, min_confidence: float = 0.65) -> Dict[str, Dict]: """Load entity resolution candidates and build email → candidate mapping.""" print(f"Loading entity resolution candidates from {candidates_file}...") with open(candidates_file, 'r', encoding='utf-8') as f: data = json.load(f) candidates = data.get('candidates', []) print(f" Total candidates: {len(candidates):,}") # Build email → best candidate mapping email_to_candidate = {} confirmed_count = 0 high_confidence_count = 0 for c in candidates: email = c.get('wcms_email', '').lower().strip() linkedin_slug = c.get('linkedin_slug') if not email or not linkedin_slug: continue is_confirmed = c.get('review_decision') == 'match' is_rejected = c.get('review_decision') == 'not_match' confidence = c.get('confidence_score', 0) # Skip rejected matches if is_rejected: continue # Filter based on mode if confirmed_only and not is_confirmed: continue if not is_confirmed and confidence < min_confidence: continue # Check if this is a better candidate than existing existing = email_to_candidate.get(email) if not existing: email_to_candidate[email] = c if is_confirmed: confirmed_count += 1 else: high_confidence_count += 1 else: existing_confirmed = existing.get('review_decision') == 'match' # Prefer confirmed match if is_confirmed and not existing_confirmed: email_to_candidate[email] = c elif is_confirmed == existing_confirmed: # Same status - prefer higher confidence if confidence > existing.get('confidence_score', 0): email_to_candidate[email] = c print(f" Selected candidates: {len(email_to_candidate):,}") print(f" - Confirmed matches: {confirmed_count:,}") print(f" - High-confidence (>= {min_confidence}): {high_confidence_count:,}") return email_to_candidate def build_slug_to_files_index(person_dir: Path) -> Dict[str, List[str]]: """Build LinkedIn slug → list of person entity filenames index.""" print(f"Building slug-to-files index from {person_dir}...") slug_to_files = defaultdict(list) files = list(person_dir.glob("*.json")) print(f" Found {len(files):,} person entity files") # Pattern: {slug}_{timestamp}.json # Handle URL-encoded characters and various slug formats for filepath in files: filename = filepath.name # Skip metadata files if filename.startswith('_'): continue # Extract slug from filename (everything before last underscore + timestamp) # Example: john-doe-123abc_20260109T224201Z.json -> john-doe-123abc match = re.match(r'^(.+?)_(\d{8}T\d{6}Z)\.json$', filename) if match: slug = match.group(1) slug_to_files[slug].append(filename) print(f" Indexed {len(slug_to_files):,} unique slugs") return dict(slug_to_files) def find_person_file(person_dir: Path, slug_index: Dict[str, List[str]], linkedin_slug: str) -> Optional[Path]: """Find person entity file by LinkedIn slug, returning most recent version.""" files = slug_index.get(linkedin_slug, []) if not files: return None # Return most recent (sorted by timestamp in filename) most_recent = sorted(files)[-1] return person_dir / most_recent def update_person_file( person_file: Path, wcms_data: Dict, dry_run: bool = False, verbose: bool = False ) -> tuple[bool, str]: """ Update person entity file with WCMS data. Returns (success, reason). """ try: with open(person_file, 'r', encoding='utf-8') as f: person_data = json.load(f) except Exception as e: return False, f"read_error: {e}" # Check if already has WCMS data if person_data.get('wcms_identifiers'): return False, "already_has_wcms" # Add WCMS fields person_data['wcms_identifiers'] = wcms_data.get('wcms_identifiers', {}) person_data['wcms_activity'] = wcms_data.get('wcms_activity', {}) # Add/merge contact details wcms_contact = wcms_data.get('contact_details', {}) if 'contact_details' not in person_data: person_data['contact_details'] = wcms_contact else: # Merge - WCMS contact details take precedence for email for key, value in wcms_contact.items(): if value and not person_data['contact_details'].get(key): person_data['contact_details'][key] = value # Add data source marker if 'data_sources' not in person_data: person_data['data_sources'] = [] if 'wcms' not in person_data['data_sources']: person_data['data_sources'].append('wcms') # Add provenance note merge_note = f"WCMS data merged on {datetime.now(timezone.utc).isoformat()}" if 'extraction_metadata' in person_data: notes = person_data['extraction_metadata'].get('notes', '') or '' person_data['extraction_metadata']['notes'] = f"{notes} {merge_note}".strip() else: person_data['merge_metadata'] = { 'wcms_merged_at': datetime.now(timezone.utc).isoformat(), 'wcms_source_file': wcms_data.get('_source_file') } if dry_run: if verbose: print(f" [DRY RUN] Would update: {person_file.name}") return True, "would_update" # Write updated file try: with open(person_file, 'w', encoding='utf-8') as f: json.dump(person_data, f, indent=2, ensure_ascii=False) return True, "updated" except Exception as e: return False, f"write_error: {e}" def main(): parser = argparse.ArgumentParser(description='Merge WCMS data into LinkedIn person entity files') parser.add_argument('--wcms-dir', type=Path, default=Path('data/person'), help='Path to WCMS person files directory (default: data/person)') parser.add_argument('--candidates-file', type=Path, default=Path('data/entity_resolution/entity_resolution_candidates.json'), help='Path to entity resolution candidates file') parser.add_argument('--person-dir', type=Path, default=Path('data/custodian/person/entity'), help='Path to LinkedIn person entity files directory') parser.add_argument('--dry-run', action='store_true', help='Preview changes without writing files') parser.add_argument('--confirmed-only', action='store_true', help='Only process confirmed matches (review_decision=match)') parser.add_argument('--min-confidence', type=float, default=0.65, help='Minimum confidence score for unconfirmed matches (default: 0.65)') parser.add_argument('--verbose', '-v', action='store_true', help='Verbose output') parser.add_argument('--limit', type=int, default=None, help='Limit number of candidates to process (for testing)') args = parser.parse_args() print("=" * 70) print("WCMS → LinkedIn Person Entity Merge") print("=" * 70) if args.dry_run: print("MODE: DRY RUN - no files will be modified") if args.confirmed_only: print("MODE: Confirmed matches only") else: print(f"MODE: Confirmed matches + high-confidence (>= {args.min_confidence})") print() # Step 1: Build WCMS email index print("[1/4] Building WCMS email index...") wcms_index = build_wcms_email_index(args.wcms_dir, verbose=args.verbose) print() # Step 2: Load entity resolution candidates print("[2/4] Loading entity resolution candidates...") candidates = load_entity_candidates( args.candidates_file, confirmed_only=args.confirmed_only, min_confidence=args.min_confidence ) print() # Step 3: Build slug-to-files index print("[3/4] Building LinkedIn slug-to-files index...") slug_index = build_slug_to_files_index(args.person_dir) print() # Step 4: Match and update print("[4/4] Matching and updating person files...") stats = { 'processed': 0, 'matched': 0, 'updated': 0, 'already_has_wcms': 0, 'no_person_file': 0, 'no_wcms_data': 0, 'errors': 0, } items = list(candidates.items()) if args.limit: items = items[:args.limit] for email, candidate in items: stats['processed'] += 1 linkedin_slug = candidate.get('linkedin_slug') if not linkedin_slug: continue # Look up WCMS data by email wcms_data = wcms_index.get(email) if not wcms_data: stats['no_wcms_data'] += 1 if args.verbose: print(f" No WCMS data for email: {email}") continue # Find person entity file person_file = find_person_file(args.person_dir, slug_index, linkedin_slug) if not person_file: stats['no_person_file'] += 1 if args.verbose: print(f" No person file for slug: {linkedin_slug}") continue stats['matched'] += 1 # Update person file success, reason = update_person_file( person_file, wcms_data, dry_run=args.dry_run, verbose=args.verbose ) if success and reason in ('updated', 'would_update'): stats['updated'] += 1 if args.verbose and not args.dry_run: print(f" Updated: {person_file.name}") elif reason == 'already_has_wcms': stats['already_has_wcms'] += 1 else: stats['errors'] += 1 if args.verbose: print(f" Error: {person_file.name} - {reason}") # Report results print() print("=" * 70) print("RESULTS") print("=" * 70) print(f" Candidates processed: {stats['processed']:,}") print(f" WCMS↔LinkedIn matches: {stats['matched']:,}") print(f" Files updated: {stats['updated']:,}") print(f" Already had WCMS: {stats['already_has_wcms']:,}") print(f" No LinkedIn file found: {stats['no_person_file']:,}") print(f" No WCMS data found: {stats['no_wcms_data']:,}") print(f" Errors: {stats['errors']:,}") if args.dry_run: print() print("DRY RUN complete - no files were modified.") print("Run without --dry-run to apply changes.") else: print() print("Merge complete!") if stats['updated'] > 0: print(f"Next step: Deploy updated files to server:") print(f" rsync -avz --progress {args.person_dir}/ root@91.98.224.44:/mnt/data/custodian/person/entity/") if __name__ == '__main__': main()