glam/scripts/build_entity_resolution.py
2026-01-12 14:33:56 +01:00

472 lines
15 KiB
Python

#!/usr/bin/env python3
"""
Build entity resolution candidates between WCMS and LinkedIn profiles.
This script:
1. Indexes all profiles by normalized name
2. Finds potential matches based on multiple signals
3. Scores matches but NEVER auto-merges
4. Outputs candidates for manual review
CRITICAL: No auto-merging! Entity resolution requires human verification.
Matching signals:
- Name similarity (primary)
- Email domain matches employer
- Overlapping affiliations
- Location overlap
Usage:
python scripts/build_entity_resolution.py --limit 10000
python scripts/build_entity_resolution.py --output candidates.json
"""
import json
import argparse
import re
from pathlib import Path
from datetime import datetime, timezone
import unicodedata
from typing import Dict, List, Optional, Set, Tuple
from collections import defaultdict
from dataclasses import dataclass, asdict
PERSON_DIR = Path('/Users/kempersc/apps/glam/data/person')
OUTPUT_DIR = Path('/Users/kempersc/apps/glam/data/entity_resolution')
def normalize_name(name) -> str:
"""Normalize name for comparison."""
if not name:
return ""
# Handle dict or other types
if isinstance(name, dict):
name = name.get('full_name', name.get('name', str(name)))
if not isinstance(name, str):
name = str(name)
# Remove titles
name = re.sub(r'\b(Dr|Prof|Mr|Mrs|Ms|Drs|Ir|Ing|PhD|MA|MSc|MBA|BSc|Jr|Sr)\b\.?', '', name, flags=re.IGNORECASE)
# Normalize unicode
nfkd = unicodedata.normalize('NFKD', name)
ascii_name = ''.join(c for c in nfkd if not unicodedata.combining(c))
# Lowercase, remove punctuation
clean = re.sub(r'[^a-z\s]', '', ascii_name.lower())
# Normalize whitespace
return ' '.join(clean.split())
def extract_name_parts(name: str) -> Tuple[str, str]:
"""Extract first and last name parts."""
parts = name.split()
if len(parts) >= 2:
return parts[0], parts[-1]
elif len(parts) == 1:
return parts[0], ""
return "", ""
def extract_employer_domains(profile: dict) -> Set[str]:
"""Extract domains from employer information in profile."""
domains = set()
# From affiliations
for aff in profile.get('affiliations', []) or []:
if not isinstance(aff, dict):
continue
org = aff.get('organization', {})
if isinstance(org, str):
continue # Skip if org is just a string
if not isinstance(org, dict):
continue
website = org.get('website', '') or ''
if website and isinstance(website, str):
# Extract domain from URL
domain = re.sub(r'^https?://(www\.)?', '', website.lower())
domain = domain.split('/')[0]
if domain:
domains.add(domain)
# From profile_data
profile_data = profile.get('profile_data', {}) or {}
if not isinstance(profile_data, dict):
return domains
for exp in profile_data.get('experience', []) or []:
if not isinstance(exp, dict):
continue
company_url = exp.get('company_url', '') or ''
if company_url and isinstance(company_url, str):
domain = re.sub(r'^https?://(www\.)?', '', company_url.lower())
domain = domain.split('/')[0]
if domain:
domains.add(domain)
return domains
def extract_employer_names(profile: dict) -> Set[str]:
"""Extract employer names from profile."""
names = set()
# From affiliations
for aff in profile.get('affiliations', []) or []:
if not isinstance(aff, dict):
continue
org = aff.get('organization', {})
if isinstance(org, str):
# org is just a string name
normalized = normalize_name(org)
if normalized:
names.add(normalized)
continue
if isinstance(org, dict):
name = org.get('name', '')
if name and isinstance(name, str):
normalized = normalize_name(name)
if normalized:
names.add(normalized)
# From profile_data experience
profile_data = profile.get('profile_data', {}) or {}
if not isinstance(profile_data, dict):
return names
for exp in profile_data.get('experience', []) or []:
if not isinstance(exp, dict):
continue
company = exp.get('company', '')
if company and isinstance(company, str):
normalized = normalize_name(company)
if normalized:
names.add(normalized)
return names
@dataclass
class MatchCandidate:
"""A potential entity resolution match."""
wcms_ppid: str
wcms_name: str
wcms_email: Optional[str]
wcms_email_domain: Optional[str]
linkedin_ppid: str
linkedin_name: str
linkedin_slug: Optional[str]
# Scoring signals
name_match_score: float # 0-1, how similar are names
email_domain_matches_employer: bool
employer_name_overlap: List[str]
# Overall score
confidence_score: float
match_signals: List[str]
# Review status
requires_review: bool = True
reviewed: bool = False
review_decision: Optional[str] = None # "match", "not_match", "uncertain"
def calculate_name_similarity(name1: str, name2: str) -> float:
"""Calculate name similarity score (0-1)."""
n1 = normalize_name(name1)
n2 = normalize_name(name2)
if not n1 or not n2:
return 0.0
# Exact match
if n1 == n2:
return 1.0
# Check first/last name match
first1, last1 = extract_name_parts(n1)
first2, last2 = extract_name_parts(n2)
# Both first and last match
if first1 == first2 and last1 == last2:
return 0.95
# Last name matches, first name is initial or similar
if last1 == last2:
if first1 and first2 and first1[0] == first2[0]:
return 0.85
if first1 == first2:
return 0.9
# First name matches, last name similar
if first1 == first2 and last1 and last2:
# Check if one is substring of other
if last1 in last2 or last2 in last1:
return 0.8
# Token overlap
tokens1 = set(n1.split())
tokens2 = set(n2.split())
overlap = tokens1 & tokens2
if overlap:
jaccard = len(overlap) / len(tokens1 | tokens2)
return 0.5 + (jaccard * 0.4)
return 0.0
def build_name_index(profiles: List[dict]) -> Dict[str, List[dict]]:
"""Build index of profiles by normalized name components."""
index = defaultdict(list)
for profile in profiles:
# Skip profiles without ppid
if not profile.get('ppid'):
continue
name = profile.get('name', '')
normalized = normalize_name(name)
if normalized:
# Index by full normalized name
index[normalized].append(profile)
# Also index by last name
_, last = extract_name_parts(normalized)
if last:
index[f"_last_{last}"].append(profile)
return index
def find_candidates(wcms_profile: dict, linkedin_index: Dict[str, List[dict]]) -> List[MatchCandidate]:
"""Find potential LinkedIn matches for a WCMS profile."""
candidates = []
wcms_name = wcms_profile.get('name', '')
wcms_normalized = normalize_name(wcms_name)
wcms_email = wcms_profile.get('contact_details', {}).get('email')
wcms_domain = wcms_profile.get('contact_details', {}).get('email_domain')
if not wcms_normalized:
return []
# Get candidate LinkedIn profiles
potential_matches = set()
# Exact name match
for p in linkedin_index.get(wcms_normalized, []):
ppid = p.get('ppid')
if ppid:
potential_matches.add(ppid)
# Last name match
_, last = extract_name_parts(wcms_normalized)
if last:
for p in linkedin_index.get(f"_last_{last}", []):
ppid = p.get('ppid')
if ppid:
potential_matches.add(ppid)
# Score each potential match
for linkedin_ppid in potential_matches:
# Get full profile
profile_path = PERSON_DIR / f"{linkedin_ppid}.json"
if not profile_path.exists():
continue
try:
with open(profile_path) as f:
linkedin_profile = json.load(f)
except:
continue
# Skip if same profile (WCMS profile, not LinkedIn)
if 'wcms' in linkedin_profile.get('data_sources', []):
continue
linkedin_name = linkedin_profile.get('name', '')
# Calculate signals
name_score = calculate_name_similarity(wcms_name, linkedin_name)
if name_score < 0.5:
continue # Too low to consider
# Check email domain vs employer
employer_domains = extract_employer_domains(linkedin_profile)
domain_matches = wcms_domain and wcms_domain in employer_domains
# Check employer name overlap
employer_names = extract_employer_names(linkedin_profile)
# Could enhance with WCMS organization data if available
# Build match signals
signals = []
if name_score >= 0.95:
signals.append("exact_name_match")
elif name_score >= 0.85:
signals.append("strong_name_match")
elif name_score >= 0.7:
signals.append("partial_name_match")
if domain_matches:
signals.append("email_domain_matches_employer")
# Calculate overall confidence
confidence = name_score * 0.6 # Name is 60% of score
if domain_matches:
confidence += 0.3 # Domain match is strong signal
# Create candidate
candidate = MatchCandidate(
wcms_ppid=wcms_profile['ppid'],
wcms_name=wcms_name,
wcms_email=wcms_email,
wcms_email_domain=wcms_domain,
linkedin_ppid=linkedin_ppid,
linkedin_name=linkedin_name,
linkedin_slug=linkedin_profile.get('linkedin_slug'),
name_match_score=name_score,
email_domain_matches_employer=domain_matches,
employer_name_overlap=list(employer_names)[:5],
confidence_score=confidence,
match_signals=signals,
requires_review=True
)
candidates.append(candidate)
# Sort by confidence
candidates.sort(key=lambda c: c.confidence_score, reverse=True)
return candidates[:5] # Return top 5 candidates per WCMS profile
def main():
parser = argparse.ArgumentParser(description='Build entity resolution candidates')
parser.add_argument('--limit', type=int, default=None, help='Limit profiles to process')
parser.add_argument('--output', type=str, default='entity_resolution_candidates.json', help='Output file name')
parser.add_argument('--min-confidence', type=float, default=0.5, help='Minimum confidence threshold')
args = parser.parse_args()
print("=" * 70)
print("ENTITY RESOLUTION CANDIDATE BUILDER")
print("=" * 70)
print(" CRITICAL: No auto-merging! All candidates require manual review.")
OUTPUT_DIR.mkdir(exist_ok=True)
# Phase 1: Load all profiles
print("\nPhase 1: Loading profiles...")
wcms_profiles = []
linkedin_profiles = []
count = 0
for f in PERSON_DIR.glob('ID_*.json'):
count += 1
if count % 20000 == 0:
print(f" Loaded {count:,} profiles...")
try:
with open(f) as fp:
data = json.load(fp)
sources = data.get('data_sources', [])
if 'wcms' in sources:
wcms_profiles.append(data)
elif 'linkedin' in sources or data.get('linkedin_slug'):
linkedin_profiles.append(data)
except:
pass
print(f" Loaded {len(wcms_profiles):,} WCMS profiles")
print(f" Loaded {len(linkedin_profiles):,} LinkedIn profiles")
if args.limit:
wcms_profiles = wcms_profiles[:args.limit]
print(f" Limited WCMS profiles to {args.limit}")
# Phase 2: Build LinkedIn index
print("\nPhase 2: Building LinkedIn name index...")
linkedin_index = build_name_index(linkedin_profiles)
print(f" Index size: {len(linkedin_index):,} name keys")
# Phase 3: Find candidates
print("\nPhase 3: Finding match candidates...")
all_candidates = []
profiles_with_matches = 0
for i, wcms in enumerate(wcms_profiles):
candidates = find_candidates(wcms, linkedin_index)
# Filter by confidence
candidates = [c for c in candidates if c.confidence_score >= args.min_confidence]
if candidates:
profiles_with_matches += 1
all_candidates.extend(candidates)
if (i + 1) % 10000 == 0:
print(f" Processed {i+1:,}/{len(wcms_profiles):,} - "
f"Found {len(all_candidates):,} candidates from {profiles_with_matches:,} profiles")
# Phase 4: Output results
print("\nPhase 4: Saving results...")
output_path = OUTPUT_DIR / args.output
# Convert to dict for JSON serialization
results = {
"metadata": {
"generated_at": datetime.now(timezone.utc).isoformat(),
"wcms_profiles_processed": len(wcms_profiles),
"linkedin_profiles_indexed": len(linkedin_profiles),
"profiles_with_matches": profiles_with_matches,
"total_candidates": len(all_candidates),
"min_confidence_threshold": args.min_confidence,
"requires_manual_review": True
},
"candidates": [asdict(c) for c in all_candidates]
}
with open(output_path, 'w') as f:
json.dump(results, f, indent=2, ensure_ascii=False)
# Summary
print("\n" + "=" * 70)
print("ENTITY RESOLUTION SUMMARY")
print("=" * 70)
print(f" WCMS profiles processed: {len(wcms_profiles):,}")
print(f" LinkedIn profiles indexed: {len(linkedin_profiles):,}")
print(f" Profiles with potential matches: {profiles_with_matches:,}")
print(f" Total match candidates: {len(all_candidates):,}")
print(f" Output saved to: {output_path}")
# Show confidence distribution
if all_candidates:
high = sum(1 for c in all_candidates if c.confidence_score >= 0.8)
medium = sum(1 for c in all_candidates if 0.6 <= c.confidence_score < 0.8)
low = sum(1 for c in all_candidates if c.confidence_score < 0.6)
print(f"\n Confidence distribution:")
print(f" High (>=0.8): {high:,}")
print(f" Medium (0.6-0.8): {medium:,}")
print(f" Low (<0.6): {low:,}")
# Show sample candidates
if all_candidates:
print(f"\n Sample high-confidence candidates:")
for c in sorted(all_candidates, key=lambda x: x.confidence_score, reverse=True)[:5]:
print(f" {c.wcms_name} <-> {c.linkedin_name}")
print(f" Score: {c.confidence_score:.2f}, Signals: {', '.join(c.match_signals)}")
if __name__ == '__main__':
main()