#!/usr/bin/env python3 """ Build entity resolution candidates between WCMS and LinkedIn profiles. This script: 1. Indexes all profiles by normalized name 2. Finds potential matches based on multiple signals 3. Scores matches but NEVER auto-merges 4. Outputs candidates for manual review CRITICAL: No auto-merging! Entity resolution requires human verification. Matching signals: - Name similarity (primary) - Email domain matches employer - Overlapping affiliations - Location overlap Usage: python scripts/build_entity_resolution.py --limit 10000 python scripts/build_entity_resolution.py --output candidates.json """ import json import argparse import re from pathlib import Path from datetime import datetime, timezone import unicodedata from typing import Dict, List, Optional, Set, Tuple from collections import defaultdict from dataclasses import dataclass, asdict, field # Add project root to path for imports import sys sys.path.insert(0, str(Path(__file__).parent.parent / 'src')) try: from glam_extractor.entity_resolution.email_semantics import ( parse_email_semantics, email_matches_name, email_domain_matches_employer, EmailSemantics ) EMAIL_SEMANTICS_AVAILABLE = True except ImportError: EMAIL_SEMANTICS_AVAILABLE = False print("Warning: email_semantics module not available, using basic email analysis") PERSON_DIR = Path('/Users/kempersc/apps/glam/data/person') OUTPUT_DIR = Path('/Users/kempersc/apps/glam/data/entity_resolution') def normalize_name(name) -> str: """Normalize name for comparison.""" if not name: return "" # Handle dict or other types if isinstance(name, dict): name = name.get('full_name', name.get('name', str(name))) if not isinstance(name, str): name = str(name) # Remove titles name = re.sub(r'\b(Dr|Prof|Mr|Mrs|Ms|Drs|Ir|Ing|PhD|MA|MSc|MBA|BSc|Jr|Sr)\b\.?', '', name, flags=re.IGNORECASE) # Normalize unicode nfkd = unicodedata.normalize('NFKD', name) ascii_name = ''.join(c for c in nfkd if not unicodedata.combining(c)) # Lowercase, remove punctuation clean = re.sub(r'[^a-z\s]', '', ascii_name.lower()) # Normalize whitespace return ' '.join(clean.split()) def extract_name_parts(name: str) -> Tuple[str, str]: """Extract first and last name parts.""" parts = name.split() if len(parts) >= 2: return parts[0], parts[-1] elif len(parts) == 1: return parts[0], "" return "", "" def extract_employer_domains(profile: dict) -> Set[str]: """Extract domains from employer information in profile.""" domains = set() # From affiliations for aff in profile.get('affiliations', []) or []: if not isinstance(aff, dict): continue org = aff.get('organization', {}) if isinstance(org, str): continue # Skip if org is just a string if not isinstance(org, dict): continue website = org.get('website', '') or '' if website and isinstance(website, str): # Extract domain from URL domain = re.sub(r'^https?://(www\.)?', '', website.lower()) domain = domain.split('/')[0] if domain: domains.add(domain) # From profile_data profile_data = profile.get('profile_data', {}) or {} if not isinstance(profile_data, dict): return domains for exp in profile_data.get('experience', []) or []: if not isinstance(exp, dict): continue company_url = exp.get('company_url', '') or '' if company_url and isinstance(company_url, str): domain = re.sub(r'^https?://(www\.)?', '', company_url.lower()) domain = domain.split('/')[0] if domain: domains.add(domain) return domains def extract_employer_names(profile: dict) -> Set[str]: """Extract employer names from profile.""" names = set() # From affiliations for aff in profile.get('affiliations', []) or []: if not isinstance(aff, dict): continue org = aff.get('organization', {}) if isinstance(org, str): # org is just a string name normalized = normalize_name(org) if normalized: names.add(normalized) continue if isinstance(org, dict): name = org.get('name', '') if name and isinstance(name, str): normalized = normalize_name(name) if normalized: names.add(normalized) # From profile_data experience profile_data = profile.get('profile_data', {}) or {} if not isinstance(profile_data, dict): return names for exp in profile_data.get('experience', []) or []: if not isinstance(exp, dict): continue company = exp.get('company', '') if company and isinstance(company, str): normalized = normalize_name(company) if normalized: names.add(normalized) return names @dataclass class MatchCandidate: """A potential entity resolution match.""" wcms_ppid: str wcms_name: str wcms_email: Optional[str] wcms_email_domain: Optional[str] linkedin_ppid: str linkedin_name: str linkedin_slug: Optional[str] # Scoring signals name_match_score: float # 0-1, how similar are names email_domain_matches_employer: bool employer_name_overlap: List[str] # NEW: Email semantic signals email_birth_year: Optional[int] = None email_birth_year_confidence: float = 0.0 email_name_components: List[str] = field(default_factory=list) email_name_matches_profile: bool = False email_institution_name: Optional[str] = None email_institution_type: Optional[str] = None email_is_institutional: bool = False # Overall score confidence_score: float = 0.0 match_signals: List[str] = field(default_factory=list) # Review status requires_review: bool = True reviewed: bool = False review_decision: Optional[str] = None # "match", "not_match", "uncertain" def calculate_name_similarity(name1: str, name2: str) -> float: """Calculate name similarity score (0-1).""" n1 = normalize_name(name1) n2 = normalize_name(name2) if not n1 or not n2: return 0.0 # Exact match if n1 == n2: return 1.0 # Check first/last name match first1, last1 = extract_name_parts(n1) first2, last2 = extract_name_parts(n2) # Both first and last match if first1 == first2 and last1 == last2: return 0.95 # Last name matches, first name is initial or similar if last1 == last2: if first1 and first2 and first1[0] == first2[0]: return 0.85 if first1 == first2: return 0.9 # First name matches, last name similar if first1 == first2 and last1 and last2: # Check if one is substring of other if last1 in last2 or last2 in last1: return 0.8 # Token overlap tokens1 = set(n1.split()) tokens2 = set(n2.split()) overlap = tokens1 & tokens2 if overlap: jaccard = len(overlap) / len(tokens1 | tokens2) return 0.5 + (jaccard * 0.4) return 0.0 def build_name_index(profiles: List[dict]) -> Dict[str, List[dict]]: """Build index of profiles by normalized name components.""" index = defaultdict(list) for profile in profiles: # Skip profiles without ppid if not profile.get('ppid'): continue name = profile.get('name', '') normalized = normalize_name(name) if normalized: # Index by full normalized name index[normalized].append(profile) # Also index by last name _, last = extract_name_parts(normalized) if last: index[f"_last_{last}"].append(profile) return index def find_candidates(wcms_profile: dict, linkedin_index: Dict[str, List[dict]]) -> List[MatchCandidate]: """Find potential LinkedIn matches for a WCMS profile.""" candidates = [] wcms_name = wcms_profile.get('name', '') wcms_normalized = normalize_name(wcms_name) wcms_email = wcms_profile.get('contact_details', {}).get('email') wcms_domain = wcms_profile.get('contact_details', {}).get('email_domain') if not wcms_normalized: return [] # Parse email semantics if available email_semantics = None if EMAIL_SEMANTICS_AVAILABLE and wcms_email: email_semantics = parse_email_semantics(wcms_email) # Get candidate LinkedIn profiles potential_matches = set() # Exact name match for p in linkedin_index.get(wcms_normalized, []): ppid = p.get('ppid') if ppid: potential_matches.add(ppid) # Last name match _, last = extract_name_parts(wcms_normalized) if last: for p in linkedin_index.get(f"_last_{last}", []): ppid = p.get('ppid') if ppid: potential_matches.add(ppid) # NEW: Also try matching by email-extracted last name if different if email_semantics and email_semantics.extracted_last_name: email_last = email_semantics.extracted_last_name.replace(' ', '') # Remove spaces from prefixes if email_last and email_last != last: for p in linkedin_index.get(f"_last_{email_last}", []): ppid = p.get('ppid') if ppid: potential_matches.add(ppid) # Score each potential match for linkedin_ppid in potential_matches: # Get full profile profile_path = PERSON_DIR / f"{linkedin_ppid}.json" if not profile_path.exists(): continue try: with open(profile_path) as f: linkedin_profile = json.load(f) except: continue # Skip if same profile (WCMS profile, not LinkedIn) if 'wcms' in linkedin_profile.get('data_sources', []): continue linkedin_name = linkedin_profile.get('name', '') # Calculate signals name_score = calculate_name_similarity(wcms_name, linkedin_name) if name_score < 0.5: continue # Too low to consider # Check email domain vs employer employer_domains = extract_employer_domains(linkedin_profile) domain_matches = wcms_domain and wcms_domain in employer_domains # Check employer name overlap employer_names = extract_employer_names(linkedin_profile) # NEW: Email semantic analysis signals email_birth_year = None email_birth_year_conf = 0.0 email_name_components = [] email_name_matches = False email_institution_name = None email_institution_type = None email_is_institutional = False if email_semantics: email_birth_year = email_semantics.probable_birth_year email_birth_year_conf = email_semantics.birth_year_confidence email_name_components = email_semantics.extracted_names email_institution_name = email_semantics.institution_name email_institution_type = email_semantics.institution_type email_is_institutional = email_semantics.is_institutional_domain # Check if email name components match LinkedIn name if email_semantics.extracted_names: name_match_result = email_matches_name(email_semantics, linkedin_name) email_name_matches = name_match_result[0] # Build match signals signals = [] if name_score >= 0.95: signals.append("exact_name_match") elif name_score >= 0.85: signals.append("strong_name_match") elif name_score >= 0.7: signals.append("partial_name_match") if domain_matches: signals.append("email_domain_matches_employer") # NEW: Email-based signals if email_birth_year and email_birth_year_conf >= 0.6: signals.append(f"email_indicates_birth_year_{email_birth_year}") if email_name_matches: signals.append("email_name_matches_linkedin") if email_is_institutional: signals.append(f"institutional_email_{email_institution_type or 'unknown'}") if email_institution_name and any( email_institution_name.lower() in emp.lower() or emp.lower() in email_institution_name.lower() for emp in employer_names ): signals.append("email_institution_matches_employer") # Calculate overall confidence (updated scoring) confidence = name_score * 0.5 # Name is 50% of score if domain_matches: confidence += 0.25 # Domain match is strong signal if email_name_matches: confidence += 0.1 # Email name matches profile if email_is_institutional and email_institution_name: confidence += 0.1 # Institutional email adds credibility if email_birth_year and email_birth_year_conf >= 0.6: confidence += 0.05 # Birth year hint adds some value # Create candidate candidate = MatchCandidate( wcms_ppid=wcms_profile['ppid'], wcms_name=wcms_name, wcms_email=wcms_email, wcms_email_domain=wcms_domain, linkedin_ppid=linkedin_ppid, linkedin_name=linkedin_name, linkedin_slug=linkedin_profile.get('linkedin_slug'), name_match_score=name_score, email_domain_matches_employer=domain_matches, employer_name_overlap=list(employer_names)[:5], # NEW: Email semantic fields email_birth_year=email_birth_year, email_birth_year_confidence=email_birth_year_conf, email_name_components=email_name_components, email_name_matches_profile=email_name_matches, email_institution_name=email_institution_name, email_institution_type=email_institution_type, email_is_institutional=email_is_institutional, # Scoring confidence_score=confidence, match_signals=signals, requires_review=True ) candidates.append(candidate) # Sort by confidence candidates.sort(key=lambda c: c.confidence_score, reverse=True) return candidates[:5] # Return top 5 candidates per WCMS profile def main(): parser = argparse.ArgumentParser(description='Build entity resolution candidates') parser.add_argument('--limit', type=int, default=None, help='Limit profiles to process') parser.add_argument('--output', type=str, default='entity_resolution_candidates.json', help='Output file name') parser.add_argument('--min-confidence', type=float, default=0.5, help='Minimum confidence threshold') args = parser.parse_args() print("=" * 70) print("ENTITY RESOLUTION CANDIDATE BUILDER") print("=" * 70) print(" CRITICAL: No auto-merging! All candidates require manual review.") OUTPUT_DIR.mkdir(exist_ok=True) # Phase 1: Load all profiles print("\nPhase 1: Loading profiles...") wcms_profiles = [] linkedin_profiles = [] count = 0 for f in PERSON_DIR.glob('ID_*.json'): count += 1 if count % 20000 == 0: print(f" Loaded {count:,} profiles...") try: with open(f) as fp: data = json.load(fp) sources = data.get('data_sources', []) if 'wcms' in sources: wcms_profiles.append(data) elif 'linkedin' in sources or data.get('linkedin_slug'): linkedin_profiles.append(data) except: pass print(f" Loaded {len(wcms_profiles):,} WCMS profiles") print(f" Loaded {len(linkedin_profiles):,} LinkedIn profiles") if args.limit: wcms_profiles = wcms_profiles[:args.limit] print(f" Limited WCMS profiles to {args.limit}") # Phase 2: Build LinkedIn index print("\nPhase 2: Building LinkedIn name index...") linkedin_index = build_name_index(linkedin_profiles) print(f" Index size: {len(linkedin_index):,} name keys") # Phase 3: Find candidates print("\nPhase 3: Finding match candidates...") all_candidates = [] profiles_with_matches = 0 for i, wcms in enumerate(wcms_profiles): candidates = find_candidates(wcms, linkedin_index) # Filter by confidence candidates = [c for c in candidates if c.confidence_score >= args.min_confidence] if candidates: profiles_with_matches += 1 all_candidates.extend(candidates) if (i + 1) % 10000 == 0: print(f" Processed {i+1:,}/{len(wcms_profiles):,} - " f"Found {len(all_candidates):,} candidates from {profiles_with_matches:,} profiles") # Phase 4: Output results print("\nPhase 4: Saving results...") output_path = OUTPUT_DIR / args.output # Convert to dict for JSON serialization results = { "metadata": { "generated_at": datetime.now(timezone.utc).isoformat(), "wcms_profiles_processed": len(wcms_profiles), "linkedin_profiles_indexed": len(linkedin_profiles), "profiles_with_matches": profiles_with_matches, "total_candidates": len(all_candidates), "min_confidence_threshold": args.min_confidence, "requires_manual_review": True }, "candidates": [asdict(c) for c in all_candidates] } with open(output_path, 'w') as f: json.dump(results, f, indent=2, ensure_ascii=False) # Summary print("\n" + "=" * 70) print("ENTITY RESOLUTION SUMMARY") print("=" * 70) print(f" WCMS profiles processed: {len(wcms_profiles):,}") print(f" LinkedIn profiles indexed: {len(linkedin_profiles):,}") print(f" Profiles with potential matches: {profiles_with_matches:,}") print(f" Total match candidates: {len(all_candidates):,}") print(f" Output saved to: {output_path}") # Show confidence distribution if all_candidates: high = sum(1 for c in all_candidates if c.confidence_score >= 0.8) medium = sum(1 for c in all_candidates if 0.6 <= c.confidence_score < 0.8) low = sum(1 for c in all_candidates if c.confidence_score < 0.6) print(f"\n Confidence distribution:") print(f" High (>=0.8): {high:,}") print(f" Medium (0.6-0.8): {medium:,}") print(f" Low (<0.6): {low:,}") # Show sample candidates if all_candidates: print(f"\n Sample high-confidence candidates:") for c in sorted(all_candidates, key=lambda x: x.confidence_score, reverse=True)[:5]: print(f" {c.wcms_name} <-> {c.linkedin_name}") print(f" Score: {c.confidence_score:.2f}, Signals: {', '.join(c.match_signals)}") if __name__ == '__main__': main()