605 lines
21 KiB
Python
605 lines
21 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Build entity resolution candidates between WCMS and LinkedIn profiles.
|
|
|
|
This script:
|
|
1. Indexes all profiles by normalized name
|
|
2. Finds potential matches based on multiple signals
|
|
3. Scores matches but NEVER auto-merges
|
|
4. Outputs candidates for manual review
|
|
|
|
CRITICAL: No auto-merging! Entity resolution requires human verification.
|
|
|
|
Matching signals:
|
|
- Name similarity (primary)
|
|
- Email domain matches employer
|
|
- Overlapping affiliations
|
|
- Location overlap
|
|
|
|
Usage:
|
|
python scripts/build_entity_resolution.py --limit 10000
|
|
python scripts/build_entity_resolution.py --output candidates.json
|
|
"""
|
|
|
|
import json
|
|
import argparse
|
|
import re
|
|
from pathlib import Path
|
|
from datetime import datetime, timezone
|
|
import unicodedata
|
|
from typing import Dict, List, Optional, Set, Tuple
|
|
from collections import defaultdict
|
|
from dataclasses import dataclass, asdict, field
|
|
|
|
# Add project root to path for imports
|
|
import sys
|
|
sys.path.insert(0, str(Path(__file__).parent.parent / 'src'))
|
|
|
|
try:
|
|
from glam_extractor.entity_resolution.email_semantics import (
|
|
parse_email_semantics,
|
|
email_matches_name,
|
|
email_domain_matches_employer,
|
|
EmailSemantics
|
|
)
|
|
EMAIL_SEMANTICS_AVAILABLE = True
|
|
except ImportError:
|
|
EMAIL_SEMANTICS_AVAILABLE = False
|
|
print("Warning: email_semantics module not available, using basic email analysis")
|
|
|
|
PERSON_DIR = Path('/Users/kempersc/apps/glam/data/person')
|
|
OUTPUT_DIR = Path('/Users/kempersc/apps/glam/data/entity_resolution')
|
|
|
|
|
|
def normalize_name(name) -> str:
|
|
"""Normalize name for comparison."""
|
|
if not name:
|
|
return ""
|
|
|
|
# Handle dict or other types
|
|
if isinstance(name, dict):
|
|
name = name.get('full_name', name.get('name', str(name)))
|
|
if not isinstance(name, str):
|
|
name = str(name)
|
|
|
|
# Remove titles
|
|
name = re.sub(r'\b(Dr|Prof|Mr|Mrs|Ms|Drs|Ir|Ing|PhD|MA|MSc|MBA|BSc|Jr|Sr)\b\.?', '', name, flags=re.IGNORECASE)
|
|
|
|
# Normalize unicode
|
|
nfkd = unicodedata.normalize('NFKD', name)
|
|
ascii_name = ''.join(c for c in nfkd if not unicodedata.combining(c))
|
|
|
|
# Lowercase, remove punctuation
|
|
clean = re.sub(r'[^a-z\s]', '', ascii_name.lower())
|
|
|
|
# Normalize whitespace
|
|
return ' '.join(clean.split())
|
|
|
|
|
|
def extract_name_parts(name: str) -> Tuple[str, str]:
|
|
"""Extract first and last name parts."""
|
|
parts = name.split()
|
|
if len(parts) >= 2:
|
|
return parts[0], parts[-1]
|
|
elif len(parts) == 1:
|
|
return parts[0], ""
|
|
return "", ""
|
|
|
|
|
|
def extract_employer_domains(profile: dict) -> Set[str]:
|
|
"""Extract domains from employer information in profile."""
|
|
domains = set()
|
|
|
|
# From affiliations
|
|
for aff in profile.get('affiliations', []) or []:
|
|
if not isinstance(aff, dict):
|
|
continue
|
|
org = aff.get('organization', {})
|
|
if isinstance(org, str):
|
|
continue # Skip if org is just a string
|
|
if not isinstance(org, dict):
|
|
continue
|
|
website = org.get('website', '') or ''
|
|
if website and isinstance(website, str):
|
|
# Extract domain from URL
|
|
domain = re.sub(r'^https?://(www\.)?', '', website.lower())
|
|
domain = domain.split('/')[0]
|
|
if domain:
|
|
domains.add(domain)
|
|
|
|
# From profile_data
|
|
profile_data = profile.get('profile_data', {}) or {}
|
|
if not isinstance(profile_data, dict):
|
|
return domains
|
|
|
|
for exp in profile_data.get('experience', []) or []:
|
|
if not isinstance(exp, dict):
|
|
continue
|
|
company_url = exp.get('company_url', '') or ''
|
|
if company_url and isinstance(company_url, str):
|
|
domain = re.sub(r'^https?://(www\.)?', '', company_url.lower())
|
|
domain = domain.split('/')[0]
|
|
if domain:
|
|
domains.add(domain)
|
|
|
|
return domains
|
|
|
|
|
|
def extract_employer_names(profile: dict) -> Set[str]:
|
|
"""Extract employer names from profile."""
|
|
names = set()
|
|
|
|
# From affiliations
|
|
for aff in profile.get('affiliations', []) or []:
|
|
if not isinstance(aff, dict):
|
|
continue
|
|
org = aff.get('organization', {})
|
|
if isinstance(org, str):
|
|
# org is just a string name
|
|
normalized = normalize_name(org)
|
|
if normalized:
|
|
names.add(normalized)
|
|
continue
|
|
if isinstance(org, dict):
|
|
name = org.get('name', '')
|
|
if name and isinstance(name, str):
|
|
normalized = normalize_name(name)
|
|
if normalized:
|
|
names.add(normalized)
|
|
|
|
# From profile_data experience
|
|
profile_data = profile.get('profile_data', {}) or {}
|
|
if not isinstance(profile_data, dict):
|
|
return names
|
|
|
|
for exp in profile_data.get('experience', []) or []:
|
|
if not isinstance(exp, dict):
|
|
continue
|
|
company = exp.get('company', '')
|
|
if company and isinstance(company, str):
|
|
normalized = normalize_name(company)
|
|
if normalized:
|
|
names.add(normalized)
|
|
|
|
return names
|
|
|
|
|
|
@dataclass
|
|
class MatchCandidate:
|
|
"""A potential entity resolution match."""
|
|
wcms_ppid: str
|
|
wcms_name: str
|
|
wcms_email: Optional[str]
|
|
wcms_email_domain: Optional[str]
|
|
linkedin_ppid: str
|
|
linkedin_name: str
|
|
linkedin_slug: Optional[str]
|
|
|
|
# Scoring signals
|
|
name_match_score: float # 0-1, how similar are names
|
|
email_domain_matches_employer: bool
|
|
employer_name_overlap: List[str]
|
|
|
|
# NEW: Email semantic signals
|
|
email_birth_year: Optional[int] = None
|
|
email_birth_year_confidence: float = 0.0
|
|
email_name_components: List[str] = field(default_factory=list)
|
|
email_name_matches_profile: bool = False
|
|
email_institution_name: Optional[str] = None
|
|
email_institution_type: Optional[str] = None
|
|
email_is_institutional: bool = False
|
|
|
|
# Overall score
|
|
confidence_score: float = 0.0
|
|
match_signals: List[str] = field(default_factory=list)
|
|
|
|
# Review status
|
|
requires_review: bool = True
|
|
reviewed: bool = False
|
|
review_decision: Optional[str] = None # "match", "not_match", "uncertain"
|
|
|
|
|
|
def calculate_name_similarity(name1: str, name2: str) -> float:
|
|
"""Calculate name similarity score (0-1)."""
|
|
n1 = normalize_name(name1)
|
|
n2 = normalize_name(name2)
|
|
|
|
if not n1 or not n2:
|
|
return 0.0
|
|
|
|
# Exact match
|
|
if n1 == n2:
|
|
return 1.0
|
|
|
|
# Check first/last name match
|
|
first1, last1 = extract_name_parts(n1)
|
|
first2, last2 = extract_name_parts(n2)
|
|
|
|
# Both first and last match
|
|
if first1 == first2 and last1 == last2:
|
|
return 0.95
|
|
|
|
# Last name matches, check first name similarity
|
|
if last1 == last2:
|
|
if first1 == first2:
|
|
return 0.9
|
|
if first1 and first2:
|
|
# Determine if either name is truly an initial (1 char only)
|
|
# 2-char names like "ad", "jo" are nicknames, not initials
|
|
is_initial1 = len(first1) == 1
|
|
is_initial2 = len(first2) == 1
|
|
|
|
if (is_initial1 or is_initial2) and first1[0] == first2[0]:
|
|
# True initial match: "A. Peeters" matching "Andrea Peeters"
|
|
return 0.85
|
|
|
|
# For actual names (2+ chars), check similarity more carefully
|
|
if len(first1) >= 2 and len(first2) >= 2:
|
|
shorter, longer = (first1, first2) if len(first1) <= len(first2) else (first2, first1)
|
|
|
|
# Check if shorter is a prefix of longer (nickname pattern)
|
|
# e.g., "jo" -> "johan", "wim" -> "willem"
|
|
if longer.startswith(shorter):
|
|
# Require meaningful prefix: at least 2 chars AND at least 40% of longer
|
|
prefix_ratio = len(shorter) / len(longer)
|
|
if len(shorter) >= 2 and prefix_ratio >= 0.35:
|
|
return 0.85
|
|
|
|
# Check for positional similarity (typos, variants)
|
|
min_len = min(len(first1), len(first2))
|
|
max_len = max(len(first1), len(first2))
|
|
|
|
# Count matching positions
|
|
pos_matches = sum(1 for i in range(min_len) if first1[i] == first2[i])
|
|
pos_similarity = pos_matches / max_len if max_len > 0 else 0
|
|
|
|
# High positional similarity = likely same name with typo
|
|
# e.g., "Ilona" vs "Ilone" (4/5 = 0.8)
|
|
if pos_similarity >= 0.6 and abs(len(first1) - len(first2)) <= 2:
|
|
return 0.85
|
|
|
|
# Same last name only - return lower score
|
|
# This catches "ad" vs "andrea", "jan" vs "johan", "bert" vs "robert"
|
|
return 0.55 # Just last name match, first names are different
|
|
|
|
# First name matches, last name similar
|
|
if first1 == first2 and last1 and last2:
|
|
# Check if one is substring of other
|
|
if last1 in last2 or last2 in last1:
|
|
return 0.8
|
|
|
|
# Token overlap
|
|
tokens1 = set(n1.split())
|
|
tokens2 = set(n2.split())
|
|
overlap = tokens1 & tokens2
|
|
if overlap:
|
|
jaccard = len(overlap) / len(tokens1 | tokens2)
|
|
return 0.5 + (jaccard * 0.4)
|
|
|
|
return 0.0
|
|
|
|
|
|
def build_name_index(profiles: List[dict]) -> Dict[str, List[dict]]:
|
|
"""Build index of profiles by normalized name components."""
|
|
index = defaultdict(list)
|
|
|
|
for profile in profiles:
|
|
# Skip profiles without ppid
|
|
if not profile.get('ppid'):
|
|
continue
|
|
|
|
name = profile.get('name', '')
|
|
normalized = normalize_name(name)
|
|
|
|
if normalized:
|
|
# Index by full normalized name
|
|
index[normalized].append(profile)
|
|
|
|
# Also index by last name
|
|
_, last = extract_name_parts(normalized)
|
|
if last:
|
|
index[f"_last_{last}"].append(profile)
|
|
|
|
return index
|
|
|
|
|
|
def find_candidates(wcms_profile: dict, linkedin_index: Dict[str, List[dict]]) -> List[MatchCandidate]:
|
|
"""Find potential LinkedIn matches for a WCMS profile."""
|
|
candidates = []
|
|
|
|
wcms_name = wcms_profile.get('name', '')
|
|
wcms_normalized = normalize_name(wcms_name)
|
|
wcms_email = wcms_profile.get('contact_details', {}).get('email')
|
|
wcms_domain = wcms_profile.get('contact_details', {}).get('email_domain')
|
|
|
|
if not wcms_normalized:
|
|
return []
|
|
|
|
# Parse email semantics if available
|
|
email_semantics = None
|
|
if EMAIL_SEMANTICS_AVAILABLE and wcms_email:
|
|
email_semantics = parse_email_semantics(wcms_email)
|
|
|
|
# Get candidate LinkedIn profiles
|
|
potential_matches = set()
|
|
|
|
# Exact name match
|
|
for p in linkedin_index.get(wcms_normalized, []):
|
|
ppid = p.get('ppid')
|
|
if ppid:
|
|
potential_matches.add(ppid)
|
|
|
|
# Last name match
|
|
_, last = extract_name_parts(wcms_normalized)
|
|
if last:
|
|
for p in linkedin_index.get(f"_last_{last}", []):
|
|
ppid = p.get('ppid')
|
|
if ppid:
|
|
potential_matches.add(ppid)
|
|
|
|
# NEW: Also try matching by email-extracted last name if different
|
|
if email_semantics and email_semantics.extracted_last_name:
|
|
email_last = email_semantics.extracted_last_name.replace(' ', '') # Remove spaces from prefixes
|
|
if email_last and email_last != last:
|
|
for p in linkedin_index.get(f"_last_{email_last}", []):
|
|
ppid = p.get('ppid')
|
|
if ppid:
|
|
potential_matches.add(ppid)
|
|
|
|
# Score each potential match
|
|
for linkedin_ppid in potential_matches:
|
|
# Get full profile
|
|
profile_path = PERSON_DIR / f"{linkedin_ppid}.json"
|
|
if not profile_path.exists():
|
|
continue
|
|
|
|
try:
|
|
with open(profile_path) as f:
|
|
linkedin_profile = json.load(f)
|
|
except:
|
|
continue
|
|
|
|
# Skip if same profile (WCMS profile, not LinkedIn)
|
|
if 'wcms' in linkedin_profile.get('data_sources', []):
|
|
continue
|
|
|
|
linkedin_name = linkedin_profile.get('name', '')
|
|
|
|
# Calculate signals
|
|
name_score = calculate_name_similarity(wcms_name, linkedin_name)
|
|
|
|
if name_score < 0.5:
|
|
continue # Too low to consider
|
|
|
|
# Check email domain vs employer
|
|
employer_domains = extract_employer_domains(linkedin_profile)
|
|
domain_matches = wcms_domain and wcms_domain in employer_domains
|
|
|
|
# Check employer name overlap
|
|
employer_names = extract_employer_names(linkedin_profile)
|
|
|
|
# NEW: Email semantic analysis signals
|
|
email_birth_year = None
|
|
email_birth_year_conf = 0.0
|
|
email_name_components = []
|
|
email_name_matches = False
|
|
email_institution_name = None
|
|
email_institution_type = None
|
|
email_is_institutional = False
|
|
|
|
if email_semantics:
|
|
email_birth_year = email_semantics.probable_birth_year
|
|
email_birth_year_conf = email_semantics.birth_year_confidence
|
|
email_name_components = email_semantics.extracted_names
|
|
email_institution_name = email_semantics.institution_name
|
|
email_institution_type = email_semantics.institution_type
|
|
email_is_institutional = email_semantics.is_institutional_domain
|
|
|
|
# Check if email name components match LinkedIn name
|
|
if email_semantics.extracted_names:
|
|
name_match_result = email_matches_name(email_semantics, linkedin_name)
|
|
email_name_matches = name_match_result[0]
|
|
|
|
# Build match signals
|
|
signals = []
|
|
if name_score >= 0.95:
|
|
signals.append("exact_name_match")
|
|
elif name_score >= 0.85:
|
|
signals.append("strong_name_match")
|
|
elif name_score >= 0.7:
|
|
signals.append("partial_name_match")
|
|
elif name_score >= 0.5:
|
|
signals.append("last_name_match_only") # Same last name but different first name
|
|
|
|
if domain_matches:
|
|
signals.append("email_domain_matches_employer")
|
|
|
|
# NEW: Email-based signals
|
|
if email_birth_year and email_birth_year_conf >= 0.6:
|
|
signals.append(f"email_indicates_birth_year_{email_birth_year}")
|
|
|
|
if email_name_matches:
|
|
signals.append("email_name_matches_linkedin")
|
|
|
|
if email_is_institutional:
|
|
signals.append(f"institutional_email_{email_institution_type or 'unknown'}")
|
|
|
|
if email_institution_name and any(
|
|
email_institution_name.lower() in emp.lower() or emp.lower() in email_institution_name.lower()
|
|
for emp in employer_names
|
|
):
|
|
signals.append("email_institution_matches_employer")
|
|
|
|
# Calculate overall confidence (updated scoring)
|
|
confidence = name_score * 0.5 # Name is 50% of score
|
|
|
|
if domain_matches:
|
|
confidence += 0.25 # Domain match is strong signal
|
|
|
|
if email_name_matches:
|
|
confidence += 0.1 # Email name matches profile
|
|
|
|
if email_is_institutional and email_institution_name:
|
|
confidence += 0.1 # Institutional email adds credibility
|
|
|
|
if email_birth_year and email_birth_year_conf >= 0.6:
|
|
confidence += 0.05 # Birth year hint adds some value
|
|
|
|
# Create candidate
|
|
candidate = MatchCandidate(
|
|
wcms_ppid=wcms_profile['ppid'],
|
|
wcms_name=wcms_name,
|
|
wcms_email=wcms_email,
|
|
wcms_email_domain=wcms_domain,
|
|
linkedin_ppid=linkedin_ppid,
|
|
linkedin_name=linkedin_name,
|
|
linkedin_slug=linkedin_profile.get('linkedin_slug'),
|
|
name_match_score=name_score,
|
|
email_domain_matches_employer=domain_matches,
|
|
employer_name_overlap=list(employer_names)[:5],
|
|
# NEW: Email semantic fields
|
|
email_birth_year=email_birth_year,
|
|
email_birth_year_confidence=email_birth_year_conf,
|
|
email_name_components=email_name_components,
|
|
email_name_matches_profile=email_name_matches,
|
|
email_institution_name=email_institution_name,
|
|
email_institution_type=email_institution_type,
|
|
email_is_institutional=email_is_institutional,
|
|
# Scoring
|
|
confidence_score=confidence,
|
|
match_signals=signals,
|
|
requires_review=True
|
|
)
|
|
|
|
candidates.append(candidate)
|
|
|
|
# Sort by confidence
|
|
candidates.sort(key=lambda c: c.confidence_score, reverse=True)
|
|
|
|
return candidates[:5] # Return top 5 candidates per WCMS profile
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Build entity resolution candidates')
|
|
parser.add_argument('--limit', type=int, default=None, help='Limit profiles to process')
|
|
parser.add_argument('--output', type=str, default='entity_resolution_candidates.json', help='Output file name')
|
|
parser.add_argument('--min-confidence', type=float, default=0.5, help='Minimum confidence threshold')
|
|
args = parser.parse_args()
|
|
|
|
print("=" * 70)
|
|
print("ENTITY RESOLUTION CANDIDATE BUILDER")
|
|
print("=" * 70)
|
|
print(" CRITICAL: No auto-merging! All candidates require manual review.")
|
|
|
|
OUTPUT_DIR.mkdir(exist_ok=True)
|
|
|
|
# Phase 1: Load all profiles
|
|
print("\nPhase 1: Loading profiles...")
|
|
|
|
wcms_profiles = []
|
|
linkedin_profiles = []
|
|
|
|
count = 0
|
|
for f in PERSON_DIR.glob('ID_*.json'):
|
|
count += 1
|
|
if count % 20000 == 0:
|
|
print(f" Loaded {count:,} profiles...")
|
|
|
|
try:
|
|
with open(f) as fp:
|
|
data = json.load(fp)
|
|
|
|
sources = data.get('data_sources', [])
|
|
|
|
if 'wcms' in sources:
|
|
wcms_profiles.append(data)
|
|
elif 'linkedin' in sources or data.get('linkedin_slug'):
|
|
linkedin_profiles.append(data)
|
|
except:
|
|
pass
|
|
|
|
print(f" Loaded {len(wcms_profiles):,} WCMS profiles")
|
|
print(f" Loaded {len(linkedin_profiles):,} LinkedIn profiles")
|
|
|
|
if args.limit:
|
|
wcms_profiles = wcms_profiles[:args.limit]
|
|
print(f" Limited WCMS profiles to {args.limit}")
|
|
|
|
# Phase 2: Build LinkedIn index
|
|
print("\nPhase 2: Building LinkedIn name index...")
|
|
linkedin_index = build_name_index(linkedin_profiles)
|
|
print(f" Index size: {len(linkedin_index):,} name keys")
|
|
|
|
# Phase 3: Find candidates
|
|
print("\nPhase 3: Finding match candidates...")
|
|
|
|
all_candidates = []
|
|
profiles_with_matches = 0
|
|
|
|
for i, wcms in enumerate(wcms_profiles):
|
|
candidates = find_candidates(wcms, linkedin_index)
|
|
|
|
# Filter by confidence
|
|
candidates = [c for c in candidates if c.confidence_score >= args.min_confidence]
|
|
|
|
if candidates:
|
|
profiles_with_matches += 1
|
|
all_candidates.extend(candidates)
|
|
|
|
if (i + 1) % 10000 == 0:
|
|
print(f" Processed {i+1:,}/{len(wcms_profiles):,} - "
|
|
f"Found {len(all_candidates):,} candidates from {profiles_with_matches:,} profiles")
|
|
|
|
# Phase 4: Output results
|
|
print("\nPhase 4: Saving results...")
|
|
|
|
output_path = OUTPUT_DIR / args.output
|
|
|
|
# Convert to dict for JSON serialization
|
|
results = {
|
|
"metadata": {
|
|
"generated_at": datetime.now(timezone.utc).isoformat(),
|
|
"wcms_profiles_processed": len(wcms_profiles),
|
|
"linkedin_profiles_indexed": len(linkedin_profiles),
|
|
"profiles_with_matches": profiles_with_matches,
|
|
"total_candidates": len(all_candidates),
|
|
"min_confidence_threshold": args.min_confidence,
|
|
"requires_manual_review": True
|
|
},
|
|
"candidates": [asdict(c) for c in all_candidates]
|
|
}
|
|
|
|
with open(output_path, 'w') as f:
|
|
json.dump(results, f, indent=2, ensure_ascii=False)
|
|
|
|
# Summary
|
|
print("\n" + "=" * 70)
|
|
print("ENTITY RESOLUTION SUMMARY")
|
|
print("=" * 70)
|
|
print(f" WCMS profiles processed: {len(wcms_profiles):,}")
|
|
print(f" LinkedIn profiles indexed: {len(linkedin_profiles):,}")
|
|
print(f" Profiles with potential matches: {profiles_with_matches:,}")
|
|
print(f" Total match candidates: {len(all_candidates):,}")
|
|
print(f" Output saved to: {output_path}")
|
|
|
|
# Show confidence distribution
|
|
if all_candidates:
|
|
high = sum(1 for c in all_candidates if c.confidence_score >= 0.8)
|
|
medium = sum(1 for c in all_candidates if 0.6 <= c.confidence_score < 0.8)
|
|
low = sum(1 for c in all_candidates if c.confidence_score < 0.6)
|
|
|
|
print(f"\n Confidence distribution:")
|
|
print(f" High (>=0.8): {high:,}")
|
|
print(f" Medium (0.6-0.8): {medium:,}")
|
|
print(f" Low (<0.6): {low:,}")
|
|
|
|
# Show sample candidates
|
|
if all_candidates:
|
|
print(f"\n Sample high-confidence candidates:")
|
|
for c in sorted(all_candidates, key=lambda x: x.confidence_score, reverse=True)[:5]:
|
|
print(f" {c.wcms_name} <-> {c.linkedin_name}")
|
|
print(f" Score: {c.confidence_score:.2f}, Signals: {', '.join(c.match_signals)}")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|