All checks were successful
Deploy Frontend / build-and-deploy (push) Successful in 2m23s
DSPy RAG Evaluation / Layer 1 - Unit Tests (push) Successful in 5m37s
DSPy RAG Evaluation / Layer 2 - DSPy Module Tests (push) Successful in 7m24s
DSPy RAG Evaluation / Layer 3 - Integration Tests (push) Successful in 5m47s
DSPy RAG Evaluation / Layer 4 - Comprehensive Evaluation (push) Successful in 6m52s
DSPy RAG Evaluation / Quality Gate (push) Successful in 1s
- Add toggle in source URL form to indicate when a source provides sufficient information to create a person profile without LinkedIn - Store provides_match boolean in source observation data - Display green badge on existing sources that have provides_match: true - Include bilingual tooltip (EN/NL) explaining the toggle purpose
2801 lines
108 KiB
Python
2801 lines
108 KiB
Python
"""
|
|
Entity Resolution Review API
|
|
|
|
This module provides endpoints for reviewing and resolving entity matches
|
|
between WCMS and LinkedIn profiles.
|
|
|
|
Key features:
|
|
- Forgejo OAuth authentication (tracks reviewer identity)
|
|
- Git-backed persistence (full audit trail)
|
|
- Atomic writes to prevent data corruption
|
|
- Paginated candidate listing
|
|
|
|
CRITICAL: NO AUTO-MERGING. All decisions require human review.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import tempfile
|
|
import shutil
|
|
import subprocess
|
|
import httpx
|
|
from pathlib import Path
|
|
from datetime import datetime, timezone
|
|
from typing import Optional, List
|
|
from dataclasses import dataclass
|
|
from enum import Enum
|
|
|
|
from fastapi import APIRouter, HTTPException, Depends, Query, Form
|
|
from fastapi.security import OAuth2AuthorizationCodeBearer
|
|
from pydantic import BaseModel
|
|
|
|
# Linkup API configuration
|
|
LINKUP_API_KEY = os.getenv("LINKUP_API_KEY", "")
|
|
LINKUP_API_URL = "https://api.linkup.so/v1/search"
|
|
LINKUP_FETCH_URL = "https://api.linkup.so/v1/fetch"
|
|
|
|
# Exa API configuration for LinkedIn profile extraction
|
|
EXA_API_KEY = os.getenv("EXA_API_KEY", "")
|
|
EXA_API_URL = "https://api.exa.ai/contents"
|
|
ENTITY_DIR = Path(os.getenv("ENTITY_DIR", "/Users/kempersc/apps/glam/data/custodian/person/entity"))
|
|
|
|
# LinkedIn profile fetch provider configuration
|
|
# Options: "exa", "linkup", "exa,linkup" (try exa first, fallback to linkup), "linkup,exa"
|
|
LINKEDIN_FETCH_PROVIDERS = os.getenv("LINKEDIN_FETCH_PROVIDERS", "exa,linkup")
|
|
|
|
# Email semantics for on-demand analysis
|
|
try:
|
|
from glam_extractor.entity_resolution.email_semantics import parse_email_semantics
|
|
from dataclasses import asdict
|
|
EMAIL_SEMANTICS_AVAILABLE = True
|
|
except ImportError:
|
|
EMAIL_SEMANTICS_AVAILABLE = False
|
|
parse_email_semantics = None
|
|
|
|
|
|
# Configuration from environment
|
|
PERSON_DATA_DIR = Path(os.getenv("PERSON_DATA_DIR", "/Users/kempersc/apps/glam/data/person"))
|
|
CANDIDATES_FILE = Path(os.getenv("CANDIDATES_FILE", "/Users/kempersc/apps/glam/data/entity_resolution/entity_resolution_candidates.json"))
|
|
WCMS_EMAIL_INDEX_FILE = Path(os.getenv("WCMS_EMAIL_INDEX_FILE", "/Users/kempersc/apps/glam/data/wcms_email_index.json"))
|
|
|
|
# Cache for WCMS email index (loaded once on first use)
|
|
_WCMS_EMAIL_INDEX_CACHE: Optional[dict] = None
|
|
GIT_REPO_DIR = Path(os.getenv("GIT_REPO_DIR", "/Users/kempersc/apps/glam"))
|
|
FORGEJO_URL = os.getenv("FORGEJO_URL", "https://git.example.com")
|
|
FORGEJO_CLIENT_ID = os.getenv("FORGEJO_CLIENT_ID", "")
|
|
FORGEJO_CLIENT_SECRET = os.getenv("FORGEJO_CLIENT_SECRET", "")
|
|
|
|
# Cache for candidates data (loaded on first request)
|
|
_candidates_cache = None
|
|
_candidates_by_wcms = None
|
|
_candidates_by_linkedin = None # For LinkedIn profile lookup
|
|
_candidates_by_email = None # For email-to-ppid lookup (semantic search)
|
|
_candidates_list_index = {} # Map (wcms_ppid, linkedin_ppid) -> index in candidates list
|
|
|
|
# Cache for WCMS-only profiles (no LinkedIn candidates)
|
|
_wcms_only_profiles: Optional[List[dict]] = None
|
|
|
|
def invalidate_cache():
|
|
"""Invalidate the candidates cache to force reload."""
|
|
global _candidates_cache, _candidates_by_wcms, _candidates_by_linkedin, _candidates_by_email, _candidates_list_index, _wcms_only_profiles
|
|
_candidates_cache = None
|
|
_candidates_by_wcms = None
|
|
_candidates_by_linkedin = None
|
|
_candidates_by_email = None
|
|
_candidates_list_index = {}
|
|
_wcms_only_profiles = None
|
|
|
|
|
|
def load_wcms_email_index() -> dict:
|
|
"""Load the WCMS email index if not already loaded."""
|
|
global _WCMS_EMAIL_INDEX_CACHE
|
|
|
|
if _WCMS_EMAIL_INDEX_CACHE is not None:
|
|
return _WCMS_EMAIL_INDEX_CACHE
|
|
|
|
if WCMS_EMAIL_INDEX_FILE.exists():
|
|
try:
|
|
with open(WCMS_EMAIL_INDEX_FILE, 'r') as f:
|
|
_WCMS_EMAIL_INDEX_CACHE = json.load(f)
|
|
print(f"Loaded {len(_WCMS_EMAIL_INDEX_CACHE)} entries from WCMS email index")
|
|
except Exception as e:
|
|
print(f"Error loading WCMS email index: {e}")
|
|
_WCMS_EMAIL_INDEX_CACHE = {}
|
|
else:
|
|
_WCMS_EMAIL_INDEX_CACHE = {}
|
|
|
|
return _WCMS_EMAIL_INDEX_CACHE
|
|
|
|
|
|
def load_wcms_only_profiles() -> List[dict]:
|
|
"""
|
|
Load WCMS profiles that have NO LinkedIn candidates.
|
|
These are candidates for LinkedIn search.
|
|
"""
|
|
global _wcms_only_profiles
|
|
|
|
if _wcms_only_profiles is not None:
|
|
return _wcms_only_profiles
|
|
|
|
# Ensure both caches are loaded
|
|
load_candidates()
|
|
wcms_index = load_wcms_email_index()
|
|
|
|
# Get emails that already have candidates
|
|
emails_with_candidates = set()
|
|
if _candidates_by_email:
|
|
emails_with_candidates = set(_candidates_by_email.keys())
|
|
|
|
# Build list of WCMS-only profiles
|
|
_wcms_only_profiles = []
|
|
for email, profile_data in wcms_index.items():
|
|
email_lower = email.lower().strip()
|
|
if email_lower not in emails_with_candidates:
|
|
# Use full_name if available, otherwise fall back to username
|
|
full_name = profile_data.get("full_name", "").strip()
|
|
username = profile_data.get("username", "").strip()
|
|
name = full_name if full_name else username if username else "Unknown"
|
|
|
|
_wcms_only_profiles.append({
|
|
"email": profile_data.get("email", email),
|
|
"name": name,
|
|
"user_id": profile_data.get("user_id"),
|
|
"username": profile_data.get("username"),
|
|
"status": profile_data.get("status"),
|
|
"roles": profile_data.get("roles", []),
|
|
"registered_since": profile_data.get("registered_since"),
|
|
"last_access": profile_data.get("last_access"),
|
|
"abs_id": profile_data.get("abs_id"),
|
|
"crm_id": profile_data.get("crm_id"),
|
|
})
|
|
|
|
# Sort by name
|
|
_wcms_only_profiles.sort(key=lambda x: (x.get('name') or '').lower())
|
|
|
|
print(f"Loaded {len(_wcms_only_profiles)} WCMS-only profiles (no LinkedIn candidates)")
|
|
return _wcms_only_profiles
|
|
|
|
def load_candidates():
|
|
"""Load candidates from the aggregated candidates file."""
|
|
global _candidates_cache, _candidates_by_wcms, _candidates_by_linkedin, _candidates_by_email, _candidates_list_index
|
|
|
|
if _candidates_cache is not None:
|
|
return _candidates_cache
|
|
|
|
if not CANDIDATES_FILE.exists():
|
|
_candidates_cache = {"metadata": {}, "candidates": []}
|
|
_candidates_by_wcms = {}
|
|
_candidates_by_linkedin = {}
|
|
_candidates_by_email = {}
|
|
_candidates_list_index = {}
|
|
return _candidates_cache
|
|
|
|
try:
|
|
with open(CANDIDATES_FILE) as f:
|
|
_candidates_cache = json.load(f)
|
|
|
|
# Index candidates by WCMS PPID for fast lookup
|
|
_candidates_by_wcms = {}
|
|
_candidates_by_linkedin = {}
|
|
_candidates_by_email = {}
|
|
_candidates_list_index = {}
|
|
|
|
for idx, candidate in enumerate(_candidates_cache.get('candidates', [])):
|
|
wcms_ppid = candidate.get('wcms_ppid')
|
|
linkedin_ppid = candidate.get('linkedin_ppid')
|
|
|
|
# Index for decision updates
|
|
if wcms_ppid and linkedin_ppid:
|
|
_candidates_list_index[(wcms_ppid, linkedin_ppid)] = idx
|
|
|
|
if wcms_ppid:
|
|
if wcms_ppid not in _candidates_by_wcms:
|
|
_candidates_by_wcms[wcms_ppid] = {
|
|
'wcms_ppid': wcms_ppid,
|
|
'wcms_name': candidate.get('wcms_name'),
|
|
'wcms_email': candidate.get('wcms_email'),
|
|
'wcms_email_domain': candidate.get('wcms_email_domain'),
|
|
'candidates': [],
|
|
'source_urls': [] # Initialize source_urls array
|
|
}
|
|
# Aggregate source_urls from candidate entries
|
|
candidate_source_urls = candidate.get('source_urls', [])
|
|
if candidate_source_urls:
|
|
_candidates_by_wcms[wcms_ppid]['source_urls'].extend(candidate_source_urls)
|
|
linkedin_name_raw = candidate.get('linkedin_name')
|
|
if isinstance(linkedin_name_raw, dict):
|
|
linkedin_name = linkedin_name_raw.get('full_name') or linkedin_name_raw.get('display_name', 'Unknown')
|
|
else:
|
|
linkedin_name = linkedin_name_raw or 'Unknown'
|
|
|
|
candidate_data = {
|
|
'linkedin_ppid': linkedin_ppid,
|
|
'linkedin_name': linkedin_name,
|
|
'linkedin_slug': candidate.get('linkedin_slug'),
|
|
'confidence_score': candidate.get('confidence_score', 0),
|
|
'match_signals': candidate.get('match_signals', []),
|
|
'name_match_score': candidate.get('name_match_score'),
|
|
'email_domain_matches_employer': candidate.get('email_domain_matches_employer', False),
|
|
'employer_name_overlap': candidate.get('employer_name_overlap', []),
|
|
'reviewed': candidate.get('reviewed', False),
|
|
'review_decision': candidate.get('review_decision'),
|
|
'reviewed_by': candidate.get('reviewed_by'),
|
|
'reviewed_at': candidate.get('reviewed_at'),
|
|
'review_notes': candidate.get('review_notes'),
|
|
# Email semantic fields (map from JSON field names)
|
|
'email_birth_year': candidate.get('email_probable_birth_year'),
|
|
'email_birth_year_confidence': candidate.get('email_birth_year_confidence', 0.0),
|
|
'email_name_components': candidate.get('email_extracted_names', []),
|
|
'email_name_matches_profile': bool(candidate.get('email_extracted_names')),
|
|
'email_institution_name': candidate.get('email_institution_name'),
|
|
'email_institution_type': candidate.get('email_institution_type'),
|
|
'email_is_institutional': candidate.get('email_is_institutional', False),
|
|
# Wrong-person detection (confidence scoring v2)
|
|
'is_likely_wrong_person': candidate.get('is_likely_wrong_person', False),
|
|
'wrong_person_reason': candidate.get('wrong_person_reason'),
|
|
'confidence_original': candidate.get('confidence_original'),
|
|
}
|
|
_candidates_by_wcms[wcms_ppid]['candidates'].append(candidate_data)
|
|
|
|
# Index by email (for semantic search lookup)
|
|
wcms_email = candidate.get('wcms_email')
|
|
if wcms_email:
|
|
email_lower = wcms_email.lower().strip()
|
|
if email_lower not in _candidates_by_email:
|
|
_candidates_by_email[email_lower] = wcms_ppid
|
|
|
|
# Index by LinkedIn PPID too
|
|
if linkedin_ppid:
|
|
if linkedin_ppid not in _candidates_by_linkedin:
|
|
_candidates_by_linkedin[linkedin_ppid] = {
|
|
'linkedin_ppid': linkedin_ppid,
|
|
'linkedin_name': linkedin_name,
|
|
'linkedin_slug': candidate.get('linkedin_slug'),
|
|
'linkedin_name_raw': linkedin_name_raw
|
|
}
|
|
|
|
return _candidates_cache
|
|
except Exception as e:
|
|
print(f"Error loading candidates: {e}")
|
|
_candidates_cache = {"metadata": {}, "candidates": []}
|
|
_candidates_by_wcms = {}
|
|
_candidates_by_linkedin = {}
|
|
_candidates_by_email = {}
|
|
_candidates_list_index = {}
|
|
return _candidates_cache
|
|
|
|
# Router
|
|
router = APIRouter(prefix="/api/review", tags=["Entity Resolution Review"])
|
|
|
|
|
|
# ============================================================================
|
|
# Models
|
|
# ============================================================================
|
|
|
|
class ReviewDecision(str, Enum):
|
|
MATCH = "match"
|
|
NOT_MATCH = "not_match"
|
|
UNCERTAIN = "uncertain"
|
|
|
|
|
|
class MatchCandidateResponse(BaseModel):
|
|
linkedin_ppid: str
|
|
linkedin_name: str
|
|
linkedin_slug: Optional[str] = None
|
|
confidence_score: float
|
|
match_signals: List[str] = []
|
|
reviewed: bool = False
|
|
review_decision: Optional[str] = None
|
|
reviewed_by: Optional[str] = None
|
|
reviewed_at: Optional[str] = None
|
|
review_notes: Optional[str] = None
|
|
|
|
# Email semantic analysis fields
|
|
email_birth_year: Optional[int] = None
|
|
email_birth_year_confidence: float = 0.0
|
|
email_name_components: List[str] = []
|
|
email_name_matches_profile: bool = False
|
|
email_institution_name: Optional[str] = None
|
|
email_institution_type: Optional[str] = None
|
|
email_is_institutional: bool = False
|
|
|
|
# Additional match context
|
|
name_match_score: Optional[float] = None
|
|
email_domain_matches_employer: bool = False
|
|
employer_name_overlap: List[str] = []
|
|
|
|
# Wrong-person detection (from confidence scoring v2)
|
|
is_likely_wrong_person: bool = False
|
|
wrong_person_reason: Optional[str] = None
|
|
confidence_original: Optional[float] = None # Score before adjustments
|
|
|
|
|
|
class ProfileSummary(BaseModel):
|
|
ppid: str
|
|
name: str
|
|
email: Optional[str]
|
|
email_domain: Optional[str]
|
|
potential_matches: int
|
|
reviewed_count: int
|
|
pending_count: int
|
|
|
|
|
|
class SourceUrlItem(BaseModel):
|
|
"""A source URL added manually for a profile"""
|
|
source_id: str
|
|
source_url: str
|
|
source_type: Optional[str] = None
|
|
source_domain: Optional[str] = None
|
|
comment: Optional[str] = None
|
|
added_at: Optional[str] = None
|
|
added_manually: bool = True
|
|
|
|
|
|
class ProfileDetail(BaseModel):
|
|
ppid: str
|
|
name: str
|
|
email: Optional[str]
|
|
email_domain: Optional[str]
|
|
wcms_identifiers: Optional[dict]
|
|
wcms_activity: Optional[dict]
|
|
match_candidates: List[MatchCandidateResponse]
|
|
annotation_date: Optional[str]
|
|
source_urls: List[SourceUrlItem] = []
|
|
|
|
|
|
class ReviewRequest(BaseModel):
|
|
wcms_ppid: str
|
|
linkedin_ppid: str
|
|
decision: ReviewDecision
|
|
notes: Optional[str] = None
|
|
auto_reject_others: bool = False # Auto-reject other candidates when matching
|
|
|
|
|
|
class ReviewResponse(BaseModel):
|
|
success: bool
|
|
message: str
|
|
git_commit: Optional[str]
|
|
auto_rejected_count: int = 0 # Number of other candidates auto-rejected
|
|
|
|
|
|
class CandidateListResponse(BaseModel):
|
|
total: int
|
|
page: int
|
|
page_size: int
|
|
profiles: List[ProfileSummary]
|
|
|
|
|
|
class LinkupSearchRequest(BaseModel):
|
|
"""Request for Linkup web search"""
|
|
wcms_ppid: str
|
|
name: str
|
|
email: Optional[str] = None
|
|
email_domain: Optional[str] = None
|
|
institution: Optional[str] = None # Detected institution from email
|
|
additional_context: Optional[str] = None # Any extra search terms
|
|
# New parameters for customizable search
|
|
custom_query: Optional[str] = None # If set, use this instead of auto-generated query
|
|
num_results: int = 10 # Number of results to return (default 10)
|
|
include_linkedin_keyword: bool = True # Whether to add "LinkedIn" to query
|
|
|
|
|
|
class LinkupSearchResult(BaseModel):
|
|
"""Single result from Linkup search"""
|
|
url: str
|
|
title: str
|
|
content: str
|
|
linkedin_slug: Optional[str] = None
|
|
extracted_name: Optional[str] = None
|
|
extracted_headline: Optional[str] = None
|
|
|
|
|
|
class LinkupSearchResponse(BaseModel):
|
|
"""Response from Linkup search"""
|
|
success: bool
|
|
query: str
|
|
results: List[LinkupSearchResult]
|
|
error: Optional[str] = None
|
|
|
|
|
|
class AddCandidateRequest(BaseModel):
|
|
"""Request to add a LinkedIn candidate manually"""
|
|
wcms_ppid: str
|
|
linkedin_slug: str
|
|
# Optional WCMS data for profiles that aren't in the candidates file (e.g., from Qdrant search)
|
|
wcms_name: Optional[str] = None
|
|
wcms_email: Optional[str] = None
|
|
wcms_email_domain: Optional[str] = None
|
|
wcms_user_id: Optional[str] = None
|
|
wcms_abs_id: Optional[str] = None
|
|
wcms_crm_id: Optional[str] = None
|
|
wcms_status: Optional[str] = None
|
|
|
|
|
|
class AddCandidateResponse(BaseModel):
|
|
"""Response from adding a manual candidate"""
|
|
success: bool
|
|
message: str
|
|
linkedin_ppid: Optional[str] = None
|
|
|
|
|
|
class AddSourceUrlRequest(BaseModel):
|
|
"""Request to add a generic source URL with comments"""
|
|
wcms_ppid: str
|
|
source_url: str
|
|
comment: Optional[str] = None # User comment explaining the source, e.g., "De lessen worden gegeven door Mala Sardjoepersad"
|
|
source_type: Optional[str] = None # Optional type: "webpage", "social_media", "news_article", etc.
|
|
provides_match: bool = False # When True, this source provides sufficient info to identify the person without LinkedIn
|
|
# Optional WCMS data for profiles that aren't in the candidates file
|
|
wcms_name: Optional[str] = None
|
|
wcms_email: Optional[str] = None
|
|
wcms_email_domain: Optional[str] = None
|
|
wcms_user_id: Optional[str] = None
|
|
wcms_abs_id: Optional[str] = None
|
|
wcms_crm_id: Optional[str] = None
|
|
wcms_status: Optional[str] = None
|
|
|
|
|
|
class AddSourceUrlResponse(BaseModel):
|
|
"""Response from adding a source URL"""
|
|
success: bool
|
|
message: str
|
|
source_id: Optional[str] = None
|
|
|
|
|
|
# ============================================================================
|
|
# Authentication (Forgejo OAuth)
|
|
# ============================================================================
|
|
|
|
# For now, use a simple token-based auth that can be replaced with full OAuth
|
|
# In production, implement full Forgejo OAuth2 flow
|
|
|
|
async def get_current_user(token: str = None) -> dict:
|
|
"""
|
|
Get current authenticated user.
|
|
|
|
TODO: Implement full Forgejo OAuth2 flow:
|
|
1. Redirect to FORGEJO_URL/login/oauth/authorize
|
|
2. Exchange code for token at FORGEJO_URL/login/oauth/access_token
|
|
3. Get user info from FORGEJO_URL/api/v1/user
|
|
|
|
For now, accepts a simple auth header and returns reviewer info.
|
|
"""
|
|
# Placeholder - in production, validate OAuth token with Forgejo
|
|
if not token:
|
|
return {"username": "anonymous", "email": None}
|
|
|
|
# TODO: Call Forgejo API to validate token and get user info
|
|
return {"username": "reviewer", "email": "reviewer@example.com"}
|
|
|
|
|
|
# ============================================================================
|
|
# Git Operations
|
|
# ============================================================================
|
|
|
|
def git_commit_review(file_path: Path, reviewer: str, decision: str, notes: str = None) -> Optional[str]:
|
|
"""
|
|
Create a git commit for a review decision.
|
|
|
|
Returns the commit hash if successful, None otherwise.
|
|
"""
|
|
try:
|
|
# Stage the file
|
|
subprocess.run(
|
|
["git", "add", str(file_path)],
|
|
cwd=GIT_REPO_DIR,
|
|
check=True,
|
|
capture_output=True
|
|
)
|
|
|
|
# Create commit message
|
|
commit_msg = f"review: {decision} by {reviewer}\n\nFile: {file_path.name}"
|
|
if notes:
|
|
commit_msg += f"\nNotes: {notes}"
|
|
|
|
# Commit
|
|
result = subprocess.run(
|
|
["git", "commit", "-m", commit_msg, "--author", f"{reviewer} <{reviewer}@review>"],
|
|
cwd=GIT_REPO_DIR,
|
|
check=True,
|
|
capture_output=True,
|
|
text=True
|
|
)
|
|
|
|
# Get commit hash
|
|
hash_result = subprocess.run(
|
|
["git", "rev-parse", "HEAD"],
|
|
cwd=GIT_REPO_DIR,
|
|
check=True,
|
|
capture_output=True,
|
|
text=True
|
|
)
|
|
|
|
return hash_result.stdout.strip()[:8]
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Git error: {e.stderr}")
|
|
return None
|
|
|
|
|
|
# Backup configuration
|
|
BACKUP_DIR = Path(os.getenv("BACKUP_DIR", "")) or (CANDIDATES_FILE.parent / "backups")
|
|
MAX_BACKUPS = int(os.getenv("MAX_BACKUPS", "50")) # Keep last 50 backups
|
|
|
|
|
|
def create_backup(file_path: Path) -> Optional[Path]:
|
|
"""
|
|
Create a timestamped backup of a file before modifying it.
|
|
|
|
Returns the backup path if successful, None otherwise.
|
|
"""
|
|
if not file_path.exists():
|
|
return None
|
|
|
|
try:
|
|
# Ensure backup directory exists
|
|
backup_dir = BACKUP_DIR
|
|
backup_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Create timestamped backup filename
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
backup_name = f"{file_path.stem}_{timestamp}{file_path.suffix}"
|
|
backup_path = backup_dir / backup_name
|
|
|
|
# Copy file to backup
|
|
shutil.copy2(file_path, backup_path)
|
|
print(f"Backup created: {backup_path}")
|
|
|
|
# Rotate old backups (keep only MAX_BACKUPS)
|
|
rotate_backups(backup_dir, file_path.stem, MAX_BACKUPS)
|
|
|
|
return backup_path
|
|
|
|
except Exception as e:
|
|
print(f"Backup error: {e}")
|
|
return None
|
|
|
|
|
|
def rotate_backups(backup_dir: Path, file_stem: str, max_backups: int):
|
|
"""
|
|
Remove old backups, keeping only the most recent max_backups.
|
|
"""
|
|
try:
|
|
# Find all backups for this file
|
|
pattern = f"{file_stem}_*.json"
|
|
backups = sorted(backup_dir.glob(pattern), key=lambda p: p.stat().st_mtime, reverse=True)
|
|
|
|
# Remove old backups beyond max_backups
|
|
for old_backup in backups[max_backups:]:
|
|
old_backup.unlink()
|
|
print(f"Rotated old backup: {old_backup}")
|
|
|
|
except Exception as e:
|
|
print(f"Backup rotation error: {e}")
|
|
|
|
|
|
def atomic_write_json(file_path: Path, data: dict, create_backup_first: bool = True) -> bool:
|
|
"""
|
|
Atomically write JSON to a file with automatic backup.
|
|
|
|
Writes to a temp file first, then renames to prevent corruption.
|
|
Optionally creates a timestamped backup before overwriting.
|
|
|
|
Args:
|
|
file_path: Path to write to
|
|
data: Data to write as JSON
|
|
create_backup_first: If True, create a backup before writing (default: True)
|
|
"""
|
|
try:
|
|
# Create backup before modifying (if file exists and backup requested)
|
|
if create_backup_first and file_path.exists():
|
|
create_backup(file_path)
|
|
|
|
# Write to temp file in same directory (for atomic rename)
|
|
fd, temp_path = tempfile.mkstemp(
|
|
suffix='.json',
|
|
prefix='.tmp_',
|
|
dir=file_path.parent
|
|
)
|
|
|
|
with os.fdopen(fd, 'w') as f:
|
|
json.dump(data, f, indent=2, ensure_ascii=False)
|
|
|
|
# Atomic rename
|
|
shutil.move(temp_path, file_path)
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"Write error: {e}")
|
|
# Clean up temp file if it exists
|
|
if 'temp_path' in locals() and os.path.exists(temp_path):
|
|
os.unlink(temp_path)
|
|
return False
|
|
|
|
|
|
# ============================================================================
|
|
# API Endpoints
|
|
# ============================================================================
|
|
|
|
@router.get("/candidates", response_model=CandidateListResponse)
|
|
async def list_candidates(
|
|
page: int = Query(1, ge=1),
|
|
page_size: int = Query(50, ge=10, le=200),
|
|
filter_status: Optional[str] = Query(None, description="pending, reviewed, matched, all"),
|
|
min_confidence: float = Query(0.5, ge=0.0, le=1.0),
|
|
min_signals: int = Query(1, ge=1, le=10, description="Minimum number of match signals"),
|
|
signal_types: Optional[str] = Query(None, description="Comma-separated signal types to require (e.g. exact_name_match,employer_match)"),
|
|
sort_by: str = Query("confidence", description="confidence, name, pending_count, signals")
|
|
):
|
|
"""
|
|
List WCMS profiles with match candidates for review.
|
|
|
|
Returns paginated list of profiles that have potential LinkedIn matches.
|
|
Uses pre-aggregated candidates file for performance.
|
|
"""
|
|
# Load candidates from cached file
|
|
load_candidates()
|
|
|
|
if not _candidates_by_wcms:
|
|
return CandidateListResponse(
|
|
total=0,
|
|
page=page,
|
|
page_size=page_size,
|
|
profiles=[]
|
|
)
|
|
|
|
profiles = []
|
|
|
|
# Parse required signal types
|
|
required_signals = set()
|
|
if signal_types:
|
|
required_signals = set(s.strip() for s in signal_types.split(',') if s.strip())
|
|
|
|
for wcms_ppid, wcms_data in _candidates_by_wcms.items():
|
|
candidates = wcms_data.get('candidates', [])
|
|
|
|
# Filter by confidence
|
|
candidates = [c for c in candidates if c.get('confidence_score', 0) >= min_confidence]
|
|
|
|
# Filter by minimum signal count
|
|
candidates = [c for c in candidates if len(c.get('match_signals', [])) >= min_signals]
|
|
|
|
# Filter by required signal types (candidate must have ALL required signals)
|
|
if required_signals:
|
|
candidates = [c for c in candidates
|
|
if required_signals.issubset(set(c.get('match_signals', [])))]
|
|
|
|
if not candidates:
|
|
continue
|
|
|
|
# Count reviewed vs pending
|
|
reviewed = sum(1 for c in candidates if c.get('reviewed', False))
|
|
pending = len(candidates) - reviewed
|
|
|
|
# Apply status filter
|
|
if filter_status == "pending" and pending == 0:
|
|
continue
|
|
if filter_status == "reviewed" and pending > 0:
|
|
continue
|
|
if filter_status == "matched":
|
|
# Only include profiles that have at least one confirmed match
|
|
has_match = any(c.get('review_decision') == 'match' for c in candidates)
|
|
if not has_match:
|
|
continue
|
|
|
|
# Get name from linkedin_name if it's a dict
|
|
wcms_name = wcms_data.get('wcms_name', 'Unknown')
|
|
|
|
# Calculate max signals for sorting
|
|
max_signals = max(len(c.get('match_signals', [])) for c in candidates) if candidates else 0
|
|
|
|
profiles.append({
|
|
"ppid": wcms_ppid,
|
|
"name": wcms_name,
|
|
"email": wcms_data.get('wcms_email'),
|
|
"email_domain": wcms_data.get('wcms_email_domain'),
|
|
"potential_matches": len(candidates),
|
|
"reviewed_count": reviewed,
|
|
"pending_count": pending,
|
|
"max_confidence": max(c.get('confidence_score', 0) for c in candidates),
|
|
"max_signals": max_signals
|
|
})
|
|
|
|
# Sort
|
|
if sort_by == "confidence":
|
|
profiles.sort(key=lambda x: x.get('max_confidence', 0), reverse=True)
|
|
elif sort_by == "name":
|
|
profiles.sort(key=lambda x: (x.get('name') or '').lower())
|
|
elif sort_by == "pending_count":
|
|
profiles.sort(key=lambda x: x.get('pending_count', 0), reverse=True)
|
|
elif sort_by == "signals":
|
|
profiles.sort(key=lambda x: (x.get('max_signals', 0), x.get('max_confidence', 0)), reverse=True)
|
|
|
|
# Paginate
|
|
start = (page - 1) * page_size
|
|
end = start + page_size
|
|
page_profiles = profiles[start:end]
|
|
|
|
return CandidateListResponse(
|
|
total=len(profiles),
|
|
page=page,
|
|
page_size=page_size,
|
|
profiles=[ProfileSummary(**p) for p in page_profiles]
|
|
)
|
|
|
|
|
|
class WcmsOnlyProfile(BaseModel):
|
|
"""A WCMS profile without LinkedIn candidates - ready for LinkedIn search."""
|
|
email: str
|
|
name: str
|
|
user_id: Optional[str] = None
|
|
username: Optional[str] = None
|
|
status: Optional[str] = None
|
|
roles: List[str] = []
|
|
registered_since: Optional[str] = None
|
|
last_access: Optional[str] = None
|
|
abs_id: Optional[str] = None
|
|
crm_id: Optional[str] = None
|
|
|
|
|
|
class WcmsOnlyListResponse(BaseModel):
|
|
"""Response for listing WCMS-only profiles."""
|
|
total: int
|
|
page: int
|
|
page_size: int
|
|
profiles: List[WcmsOnlyProfile]
|
|
|
|
|
|
@router.get("/wcms-only", response_model=WcmsOnlyListResponse)
|
|
async def list_wcms_only_profiles(
|
|
page: int = Query(1, ge=1, description="Page number"),
|
|
page_size: int = Query(50, ge=1, le=200, description="Items per page"),
|
|
search: Optional[str] = Query(None, description="Search by name or email"),
|
|
status: Optional[str] = Query(None, description="Filter by status (Active, Blocked, etc.)"),
|
|
sort_by: str = Query("name", description="Sort by: name, registered_since, last_access"),
|
|
):
|
|
"""
|
|
List WCMS profiles that have NO LinkedIn candidates yet.
|
|
|
|
These profiles can be searched on LinkedIn to find potential matches.
|
|
This enables proactive LinkedIn research for WCMS users.
|
|
"""
|
|
profiles = load_wcms_only_profiles()
|
|
|
|
# Apply filters
|
|
filtered = profiles
|
|
|
|
if search:
|
|
search_lower = search.lower()
|
|
filtered = [p for p in filtered if
|
|
search_lower in (p.get('name') or '').lower() or
|
|
search_lower in (p.get('email') or '').lower()]
|
|
|
|
if status:
|
|
filtered = [p for p in filtered if p.get('status') == status]
|
|
|
|
# Sort
|
|
if sort_by == "name":
|
|
filtered.sort(key=lambda x: (x.get('name') or '').lower())
|
|
elif sort_by == "registered_since":
|
|
filtered.sort(key=lambda x: x.get('registered_since') or '', reverse=True)
|
|
elif sort_by == "last_access":
|
|
filtered.sort(key=lambda x: x.get('last_access') or '', reverse=True)
|
|
|
|
# Paginate
|
|
start = (page - 1) * page_size
|
|
end = start + page_size
|
|
page_profiles = filtered[start:end]
|
|
|
|
return WcmsOnlyListResponse(
|
|
total=len(filtered),
|
|
page=page,
|
|
page_size=page_size,
|
|
profiles=[WcmsOnlyProfile(**p) for p in page_profiles]
|
|
)
|
|
|
|
|
|
@router.get("/profile/by-email/{email}")
|
|
async def get_profile_by_email(email: str):
|
|
"""
|
|
Look up a profile by WCMS email address.
|
|
|
|
Used by semantic search to find the wcms_ppid for a given email,
|
|
so the profile can be loaded directly without pagination.
|
|
|
|
Returns:
|
|
- found: Boolean indicating if profile was found
|
|
- wcms_ppid: The WCMS profile identifier (if has LinkedIn candidates)
|
|
- wcms_only: Boolean indicating this is a WCMS-only user (no LinkedIn candidates)
|
|
- wcms_data: WCMS user data for wcms_only profiles
|
|
"""
|
|
global _WCMS_EMAIL_INDEX_CACHE
|
|
|
|
# Load candidates from cached file
|
|
load_candidates()
|
|
|
|
email_lower = email.lower().strip()
|
|
|
|
# First check candidates (users with LinkedIn matches to review)
|
|
if _candidates_by_email and email_lower in _candidates_by_email:
|
|
wcms_ppid = _candidates_by_email[email_lower]
|
|
return {"found": True, "wcms_ppid": wcms_ppid, "wcms_only": False}
|
|
|
|
# Not in candidates - check full WCMS email index
|
|
# Load email index if not cached
|
|
if _WCMS_EMAIL_INDEX_CACHE is None:
|
|
if WCMS_EMAIL_INDEX_FILE.exists():
|
|
try:
|
|
with open(WCMS_EMAIL_INDEX_FILE, 'r') as f:
|
|
_WCMS_EMAIL_INDEX_CACHE = json.load(f)
|
|
except Exception:
|
|
_WCMS_EMAIL_INDEX_CACHE = {}
|
|
else:
|
|
_WCMS_EMAIL_INDEX_CACHE = {}
|
|
|
|
# Check WCMS email index
|
|
if email_lower in _WCMS_EMAIL_INDEX_CACHE:
|
|
wcms_data = _WCMS_EMAIL_INDEX_CACHE[email_lower]
|
|
return {
|
|
"found": True,
|
|
"wcms_ppid": None, # No PPID - not in entity resolution system
|
|
"wcms_only": True, # This is a WCMS-only user
|
|
"wcms_data": {
|
|
"full_name": wcms_data.get("full_name", "Unknown"),
|
|
"email": wcms_data.get("email"),
|
|
"user_id": wcms_data.get("user_id"),
|
|
"username": wcms_data.get("username"),
|
|
"username_url": wcms_data.get("username_url"),
|
|
"status": wcms_data.get("status"),
|
|
"roles": wcms_data.get("roles"),
|
|
"registered_since": wcms_data.get("registered_since"),
|
|
"last_access": wcms_data.get("last_access"),
|
|
"abs_id": wcms_data.get("abs_id"),
|
|
"crm_id": wcms_data.get("crm_id"),
|
|
},
|
|
"message": "WCMS user found but has no LinkedIn candidates for entity resolution"
|
|
}
|
|
|
|
return {"found": False, "wcms_ppid": None, "wcms_only": False, "message": f"Email not found: {email}"}
|
|
|
|
|
|
@router.get("/wcms-only-profile/{email}")
|
|
async def get_wcms_only_profile_detail(email: str):
|
|
"""
|
|
Get full profile detail for a WCMS-only user (no LinkedIn candidates).
|
|
|
|
Returns data compatible with the entity resolution review UI, including:
|
|
- WCMS profile data (user_id, username, status, roles, etc.)
|
|
- Email semantics (inferred birth year, institution, name patterns)
|
|
- Empty match_candidates list (ready for manual LinkedIn addition)
|
|
|
|
This allows WCMS-only profiles to be loaded in the main review UI
|
|
with the "Find more candidates" functionality available.
|
|
"""
|
|
global _WCMS_EMAIL_INDEX_CACHE
|
|
|
|
# Load WCMS email index
|
|
if _WCMS_EMAIL_INDEX_CACHE is None:
|
|
if WCMS_EMAIL_INDEX_FILE.exists():
|
|
try:
|
|
with open(WCMS_EMAIL_INDEX_FILE, 'r') as f:
|
|
_WCMS_EMAIL_INDEX_CACHE = json.load(f)
|
|
except Exception:
|
|
_WCMS_EMAIL_INDEX_CACHE = {}
|
|
else:
|
|
_WCMS_EMAIL_INDEX_CACHE = {}
|
|
|
|
email_lower = email.lower().strip()
|
|
|
|
if email_lower not in _WCMS_EMAIL_INDEX_CACHE:
|
|
raise HTTPException(status_code=404, detail=f"WCMS profile not found for email: {email}")
|
|
|
|
wcms_data = _WCMS_EMAIL_INDEX_CACHE[email_lower]
|
|
|
|
# Get email semantics if available
|
|
email_semantics = None
|
|
if EMAIL_SEMANTICS_AVAILABLE and parse_email_semantics:
|
|
try:
|
|
result = parse_email_semantics(email)
|
|
if result:
|
|
email_semantics = {
|
|
"probable_birth_year": result.probable_birth_year,
|
|
"birth_year_confidence": result.birth_year_confidence,
|
|
"birth_year_position": result.birth_year_position,
|
|
"institution_name": result.institution_name,
|
|
"institution_type": result.institution_type,
|
|
"is_institutional": result.is_institutional_domain,
|
|
"is_consumer_domain": result.is_consumer_domain,
|
|
"extracted_names": result.extracted_names,
|
|
"extracted_first_name": result.extracted_first_name,
|
|
"extracted_last_name": result.extracted_last_name,
|
|
"name_pattern": result.name_pattern,
|
|
"has_dutch_prefix": result.has_dutch_prefix,
|
|
}
|
|
except Exception:
|
|
pass
|
|
|
|
# Extract domain from email
|
|
email_domain = email.split('@')[-1] if '@' in email else None
|
|
|
|
# Use full_name or fall back to username
|
|
full_name = wcms_data.get("full_name", "").strip()
|
|
username = wcms_data.get("username", "").strip()
|
|
name = full_name if full_name else username if username else "Unknown"
|
|
|
|
# Build comprehensive wcms_identifiers
|
|
wcms_identifiers = {
|
|
"user_id": wcms_data.get("user_id"),
|
|
"username": wcms_data.get("username"),
|
|
"username_url": wcms_data.get("username_url"),
|
|
"abs_id": wcms_data.get("abs_id"),
|
|
"crm_id": wcms_data.get("crm_id"),
|
|
}
|
|
|
|
# Build wcms_activity
|
|
wcms_activity = {
|
|
"status": wcms_data.get("status"),
|
|
"roles": wcms_data.get("roles", []),
|
|
"registered_since": wcms_data.get("registered_since"),
|
|
"last_access": wcms_data.get("last_access"),
|
|
}
|
|
|
|
# Build PPID for WCMS-only profiles
|
|
wcms_only_ppid = f"wcms-only-{wcms_data.get('user_id', email_lower)}"
|
|
|
|
# Load candidates to check for source_urls
|
|
# WCMS-only profiles that have had sources added will be in candidates file
|
|
load_candidates()
|
|
source_urls = []
|
|
if _candidates_by_wcms is not None and wcms_only_ppid in _candidates_by_wcms:
|
|
source_urls_raw = _candidates_by_wcms[wcms_only_ppid].get('source_urls', [])
|
|
source_urls = source_urls_raw # Already list of dicts
|
|
|
|
return {
|
|
"ppid": wcms_only_ppid,
|
|
"name": name,
|
|
"email": wcms_data.get("email", email),
|
|
"email_domain": email_domain,
|
|
"wcms_identifiers": wcms_identifiers,
|
|
"wcms_activity": wcms_activity,
|
|
"email_semantics": email_semantics,
|
|
"match_candidates": [], # Empty - ready for manual LinkedIn addition
|
|
"annotation_date": None,
|
|
"is_wcms_only": True, # Flag to indicate this is a WCMS-only profile
|
|
"source_urls": source_urls, # Include any added source URLs
|
|
}
|
|
|
|
|
|
@router.get("/profile/{ppid}", response_model=ProfileDetail)
|
|
async def get_profile(ppid: str):
|
|
"""
|
|
Get detailed profile information for review.
|
|
|
|
Returns WCMS profile with all match candidates and their review status.
|
|
Uses pre-aggregated candidates data.
|
|
|
|
WCMS identifiers are extracted from the first confirmed/high-confidence
|
|
LinkedIn entity file that has merged WCMS data.
|
|
"""
|
|
# Load candidates from cached file
|
|
load_candidates()
|
|
|
|
if _candidates_by_wcms is None or ppid not in _candidates_by_wcms:
|
|
raise HTTPException(status_code=404, detail=f"Profile not found: {ppid}")
|
|
|
|
wcms_data = _candidates_by_wcms[ppid]
|
|
candidates = wcms_data.get('candidates', [])
|
|
|
|
# Try to get WCMS identifiers from a confirmed match's LinkedIn entity file
|
|
wcms_identifiers = None
|
|
wcms_activity = None
|
|
|
|
# ONLY use confirmed matches (review_decision == 'match')
|
|
# NO AUTO-MERGING: All entity resolution requires human review
|
|
for candidate in candidates:
|
|
if candidate.get('review_decision') == 'match':
|
|
linkedin_slug = candidate.get('linkedin_slug')
|
|
if linkedin_slug:
|
|
entity_file = find_entity_file_by_slug(linkedin_slug)
|
|
if entity_file and entity_file.exists():
|
|
try:
|
|
with open(entity_file) as f:
|
|
person_data = json.load(f)
|
|
if person_data.get('wcms_identifiers'):
|
|
wcms_identifiers = person_data.get('wcms_identifiers')
|
|
wcms_activity = person_data.get('wcms_activity')
|
|
break
|
|
except Exception:
|
|
pass
|
|
|
|
# Get source URLs if any exist
|
|
source_urls_raw = wcms_data.get('source_urls', [])
|
|
source_urls = [SourceUrlItem(**s) for s in source_urls_raw]
|
|
|
|
return ProfileDetail(
|
|
ppid=ppid,
|
|
name=wcms_data.get('wcms_name', 'Unknown'),
|
|
email=wcms_data.get('wcms_email'),
|
|
email_domain=wcms_data.get('wcms_email_domain'),
|
|
wcms_identifiers=wcms_identifiers,
|
|
wcms_activity=wcms_activity,
|
|
match_candidates=[MatchCandidateResponse(**c) for c in candidates],
|
|
annotation_date=None,
|
|
source_urls=source_urls
|
|
)
|
|
|
|
|
|
def find_entity_file_by_slug(linkedin_slug: str) -> Optional[Path]:
|
|
"""Find person entity file by LinkedIn slug in ENTITY_DIR.
|
|
|
|
Files are named: {linkedin-slug}_{timestamp}.json
|
|
Returns the most recent version if multiple exist.
|
|
"""
|
|
if not ENTITY_DIR.exists():
|
|
return None
|
|
|
|
# Find all files matching the slug pattern
|
|
pattern = f"{linkedin_slug}_*.json"
|
|
matches = list(ENTITY_DIR.glob(pattern))
|
|
|
|
if not matches:
|
|
return None
|
|
|
|
# Return most recent (sorted by timestamp in filename)
|
|
return sorted(matches)[-1]
|
|
|
|
|
|
@router.get("/linkedin/{ppid}")
|
|
async def get_linkedin_profile(ppid: str):
|
|
"""
|
|
Get LinkedIn profile for comparison during review.
|
|
|
|
Returns data from cached candidates file. If person entity files are available
|
|
in ENTITY_DIR, enriches with additional profile data including WCMS identifiers.
|
|
"""
|
|
# First try to get from cached candidates
|
|
load_candidates()
|
|
|
|
if _candidates_by_linkedin and ppid in _candidates_by_linkedin:
|
|
profile_data = _candidates_by_linkedin[ppid]
|
|
linkedin_slug = profile_data.get('linkedin_slug')
|
|
|
|
# Try to get additional data from person entity file if available
|
|
if linkedin_slug:
|
|
entity_file = find_entity_file_by_slug(linkedin_slug)
|
|
if entity_file and entity_file.exists():
|
|
try:
|
|
with open(entity_file) as f:
|
|
person_data = json.load(f)
|
|
return {
|
|
"ppid": ppid,
|
|
"name": profile_data.get('linkedin_name'),
|
|
"linkedin_slug": linkedin_slug,
|
|
"profile_data": person_data.get('profile_data', {}),
|
|
"affiliations": person_data.get('affiliations', []),
|
|
"web_claims": person_data.get('web_claims', [])[:10],
|
|
"wcms_identifiers": person_data.get('wcms_identifiers'),
|
|
"wcms_activity": person_data.get('wcms_activity'),
|
|
"contact_details": person_data.get('contact_details'),
|
|
"data_sources": person_data.get('data_sources', []),
|
|
"entity_file": entity_file.name,
|
|
}
|
|
except Exception:
|
|
pass # Fall through to cached data
|
|
|
|
# Return what we have from candidates cache
|
|
linkedin_name_raw = profile_data.get('linkedin_name_raw', {})
|
|
return {
|
|
"ppid": ppid,
|
|
"name": profile_data.get('linkedin_name'),
|
|
"linkedin_slug": linkedin_slug,
|
|
"profile_data": {
|
|
"name_tokens": linkedin_name_raw.get('name_tokens', []) if isinstance(linkedin_name_raw, dict) else [],
|
|
"source": linkedin_name_raw.get('source') if isinstance(linkedin_name_raw, dict) else None,
|
|
},
|
|
"affiliations": [],
|
|
"web_claims": [],
|
|
"wcms_identifiers": None,
|
|
"wcms_activity": None,
|
|
"contact_details": None,
|
|
"data_sources": [],
|
|
"note": "Limited data available from candidates file. Full profile not deployed."
|
|
}
|
|
|
|
raise HTTPException(status_code=404, detail=f"LinkedIn profile not found: {ppid}")
|
|
|
|
|
|
def get_wcms_identifiers(wcms_ppid: str, wcms_email: str = '') -> Optional[dict]:
|
|
"""
|
|
Load WCMS identifiers from the WCMS email index.
|
|
|
|
First tries to look up by email in wcms_email_index.json (the authoritative source).
|
|
Falls back to looking for a separate person file if the email lookup fails.
|
|
|
|
Returns dict with user_id, username, username_url, abs_id, crm_id or None if not found.
|
|
"""
|
|
global _WCMS_EMAIL_INDEX_CACHE
|
|
|
|
# Try email index first (authoritative source)
|
|
if wcms_email:
|
|
# Load email index if not cached
|
|
if _WCMS_EMAIL_INDEX_CACHE is None:
|
|
if WCMS_EMAIL_INDEX_FILE.exists():
|
|
try:
|
|
print(f"Loading WCMS email index from {WCMS_EMAIL_INDEX_FILE}...")
|
|
with open(WCMS_EMAIL_INDEX_FILE, 'r') as f:
|
|
_WCMS_EMAIL_INDEX_CACHE = json.load(f)
|
|
print(f"Loaded {len(_WCMS_EMAIL_INDEX_CACHE)} entries from WCMS email index")
|
|
except Exception as e:
|
|
print(f"Error loading WCMS email index: {e}")
|
|
_WCMS_EMAIL_INDEX_CACHE = {}
|
|
else:
|
|
print(f"WCMS email index not found: {WCMS_EMAIL_INDEX_FILE}")
|
|
_WCMS_EMAIL_INDEX_CACHE = {}
|
|
|
|
# Look up by email
|
|
email_lower = wcms_email.lower().strip()
|
|
if email_lower in _WCMS_EMAIL_INDEX_CACHE:
|
|
data = _WCMS_EMAIL_INDEX_CACHE[email_lower]
|
|
identifiers = {}
|
|
for key in ['user_id', 'username', 'username_url', 'abs_id', 'crm_id']:
|
|
if data.get(key):
|
|
identifiers[key] = data[key]
|
|
if identifiers:
|
|
return identifiers
|
|
|
|
# Fall back to separate person file
|
|
wcms_file = PERSON_DATA_DIR / f"{wcms_ppid}.json"
|
|
if not wcms_file.exists():
|
|
print(f"WCMS file not found: {wcms_file}")
|
|
return None
|
|
|
|
try:
|
|
with open(wcms_file, 'r') as f:
|
|
data = json.load(f)
|
|
|
|
# Check if wcms_identifiers is nested
|
|
if data.get('wcms_identifiers'):
|
|
return data['wcms_identifiers']
|
|
|
|
# Build from top-level fields
|
|
identifiers = {}
|
|
for key in ['user_id', 'username', 'username_url', 'abs_id', 'crm_id']:
|
|
if data.get(key):
|
|
identifiers[key] = data[key]
|
|
|
|
return identifiers if identifiers else None
|
|
except Exception as e:
|
|
print(f"Error loading WCMS file {wcms_ppid}: {e}")
|
|
return None
|
|
|
|
|
|
def find_entity_file(linkedin_slug: str) -> Optional[Path]:
|
|
"""
|
|
Find the most recent entity file for a LinkedIn slug.
|
|
"""
|
|
if not linkedin_slug:
|
|
return None
|
|
|
|
# Normalize slug for matching
|
|
norm_slug = linkedin_slug.replace('%', '').replace('✓', '').replace('✔', '')
|
|
|
|
# Look for matching files
|
|
matches = []
|
|
for f in ENTITY_DIR.glob('*.json'):
|
|
name = f.stem
|
|
# Extract slug (before timestamp)
|
|
if '_202' in name:
|
|
file_slug = name.rsplit('_202', 1)[0]
|
|
else:
|
|
file_slug = name
|
|
|
|
if file_slug == linkedin_slug or file_slug == norm_slug:
|
|
matches.append(f)
|
|
|
|
if matches:
|
|
# Return the most recent
|
|
return max(matches, key=lambda p: p.stat().st_mtime)
|
|
return None
|
|
|
|
|
|
async def update_entity_with_wcms_identifiers(
|
|
linkedin_slug: str,
|
|
wcms_ppid: str,
|
|
wcms_name: str = '',
|
|
wcms_email: str = '',
|
|
linkedin_url: str = '',
|
|
source_urls: Optional[List[dict]] = None
|
|
) -> Optional[str]:
|
|
"""
|
|
Update or create an entity file with WCMS identifiers when a match is confirmed.
|
|
|
|
If entity file exists: Updates it with wcms_identifiers.
|
|
If entity file doesn't exist: Fetches LinkedIn profile via Exa and creates entity file.
|
|
|
|
Also syncs any source_urls (non-LinkedIn evidence URLs) to the entity's web_claims.
|
|
|
|
Returns a status message string or None if nothing was done.
|
|
"""
|
|
now = datetime.now(timezone.utc)
|
|
|
|
# Get WCMS identifiers
|
|
wcms_ids = get_wcms_identifiers(wcms_ppid, wcms_email)
|
|
if not wcms_ids:
|
|
print(f"Warning: Could not load WCMS identifiers for {wcms_ppid}")
|
|
return "WCMS identifiers not found"
|
|
|
|
# Check if entity file already exists
|
|
existing_file = find_entity_file(linkedin_slug)
|
|
|
|
if existing_file:
|
|
# Update existing file
|
|
try:
|
|
with open(existing_file, 'r') as f:
|
|
entity_data = json.load(f)
|
|
|
|
# Add or update WCMS identifiers
|
|
entity_data['wcms_identifiers'] = wcms_ids
|
|
|
|
# Update heritage relevance to mark as confirmed
|
|
if 'heritage_relevance' not in entity_data:
|
|
entity_data['heritage_relevance'] = {}
|
|
entity_data['heritage_relevance']['is_heritage_relevant'] = True
|
|
entity_data['heritage_relevance']['rationale'] = 'Confirmed match via entity resolution review'
|
|
|
|
# Add web claim for the match confirmation
|
|
if 'web_claims' not in entity_data:
|
|
entity_data['web_claims'] = []
|
|
entity_data['web_claims'].append({
|
|
'claim_type': 'entity_resolution_match',
|
|
'claim_value': wcms_ppid,
|
|
'source_url': 'entity_resolution_review',
|
|
'retrieved_on': now.isoformat(),
|
|
'statement_created_at': now.isoformat(),
|
|
'source_archived_at': now.isoformat(),
|
|
'retrieval_agent': 'entity_review_api'
|
|
})
|
|
|
|
# Sync source_urls (non-LinkedIn evidence URLs) to web_claims
|
|
if source_urls:
|
|
for src in source_urls:
|
|
url = src.get('url', '')
|
|
if url:
|
|
entity_data['web_claims'].append({
|
|
'claim_type': 'source_url',
|
|
'claim_value': url,
|
|
'source_url': url,
|
|
'retrieved_on': src.get('added_at', now.isoformat()),
|
|
'statement_created_at': now.isoformat(),
|
|
'source_archived_at': src.get('added_at', now.isoformat()),
|
|
'retrieval_agent': 'entity_review_api',
|
|
'notes': src.get('comment', '')
|
|
})
|
|
|
|
# Write back
|
|
with open(existing_file, 'w', encoding='utf-8') as f:
|
|
json.dump(entity_data, f, indent=2, ensure_ascii=False)
|
|
|
|
return f"Entity file updated: {existing_file.name}"
|
|
except Exception as e:
|
|
print(f"Error updating entity file: {e}")
|
|
return f"Error updating entity file: {e}"
|
|
|
|
else:
|
|
# Create new entity file - fetch profile first
|
|
if not linkedin_url:
|
|
linkedin_url = f"https://www.linkedin.com/in/{linkedin_slug}"
|
|
|
|
# Try to fetch LinkedIn profile via Exa
|
|
profile_data = None
|
|
fetch_response = await fetch_linkedin_profile(linkedin_url)
|
|
if fetch_response:
|
|
profile_data = parse_linkedin_profile_from_exa(fetch_response)
|
|
|
|
# Create entity structure
|
|
entity_data = {
|
|
'person_id': linkedin_slug,
|
|
'extraction_metadata': {
|
|
'extraction_agent': 'entity_review_api',
|
|
'extraction_date': now.isoformat(),
|
|
'extraction_source': 'Exa API + entity resolution match',
|
|
'schema_version': '1.0.0',
|
|
'notes': f'Created when match confirmed. WCMS: {wcms_ppid}'
|
|
},
|
|
'profile_data': profile_data or {
|
|
'name': wcms_name,
|
|
'linkedin_url': linkedin_url,
|
|
'headline': None,
|
|
'location': None,
|
|
'connections': None,
|
|
'about': None,
|
|
'experience': [],
|
|
'education': [],
|
|
'skills': [],
|
|
'languages': [],
|
|
'profile_image_url': None
|
|
},
|
|
'heritage_relevance': {
|
|
'is_heritage_relevant': True,
|
|
'heritage_types': ['O'],
|
|
'rationale': 'Confirmed match via entity resolution review'
|
|
},
|
|
'affiliations': [],
|
|
'web_claims': [
|
|
{
|
|
'claim_type': 'linkedin_url',
|
|
'claim_value': linkedin_url,
|
|
'source_url': 'entity_resolution_review',
|
|
'retrieved_on': now.isoformat(),
|
|
'statement_created_at': now.isoformat(),
|
|
'source_archived_at': now.isoformat(),
|
|
'retrieval_agent': 'entity_review_api'
|
|
},
|
|
{
|
|
'claim_type': 'entity_resolution_match',
|
|
'claim_value': wcms_ppid,
|
|
'source_url': 'entity_resolution_review',
|
|
'retrieved_on': now.isoformat(),
|
|
'statement_created_at': now.isoformat(),
|
|
'source_archived_at': now.isoformat(),
|
|
'retrieval_agent': 'entity_review_api'
|
|
}
|
|
],
|
|
'wcms_identifiers': wcms_ids
|
|
}
|
|
|
|
# Add source_urls (non-LinkedIn evidence URLs) to web_claims for new entity
|
|
if source_urls:
|
|
for src in source_urls:
|
|
url = src.get('url', '')
|
|
if url:
|
|
entity_data['web_claims'].append({
|
|
'claim_type': 'source_url',
|
|
'claim_value': url,
|
|
'source_url': url,
|
|
'retrieved_on': src.get('added_at', now.isoformat()),
|
|
'statement_created_at': now.isoformat(),
|
|
'source_archived_at': src.get('added_at', now.isoformat()),
|
|
'retrieval_agent': 'entity_review_api',
|
|
'notes': src.get('comment', '')
|
|
})
|
|
|
|
# Write new entity file
|
|
timestamp = now.strftime("%Y%m%dT%H%M%SZ")
|
|
# Normalize filename
|
|
safe_slug = linkedin_slug.replace('%', '').replace('✓', '').replace('✔', '')
|
|
filename = f"{safe_slug}_{timestamp}.json"
|
|
filepath = ENTITY_DIR / filename
|
|
|
|
try:
|
|
ENTITY_DIR.mkdir(parents=True, exist_ok=True)
|
|
with open(filepath, 'w', encoding='utf-8') as f:
|
|
json.dump(entity_data, f, indent=2, ensure_ascii=False)
|
|
|
|
status = "Entity file created"
|
|
if profile_data:
|
|
status += " with Exa profile data"
|
|
else:
|
|
status += " (profile fetch unavailable)"
|
|
return f"{status}: {filename}"
|
|
except Exception as e:
|
|
print(f"Error creating entity file: {e}")
|
|
return f"Error creating entity file: {e}"
|
|
|
|
|
|
@router.post("/decision", response_model=ReviewResponse)
|
|
async def save_review_decision(
|
|
request: ReviewRequest,
|
|
# user: dict = Depends(get_current_user) # Enable when OAuth is configured
|
|
):
|
|
"""
|
|
Save a review decision for a match candidate.
|
|
|
|
Updates the candidates file with the review decision.
|
|
DOES NOT MERGE profiles - only records the decision for later processing.
|
|
|
|
If auto_reject_others=True and decision=match, all OTHER candidates for the
|
|
same wcms_ppid will be automatically set to not_match.
|
|
"""
|
|
# Placeholder user until OAuth is configured
|
|
user = {"username": "reviewer"}
|
|
|
|
# Load candidates
|
|
load_candidates()
|
|
|
|
if _candidates_cache is None:
|
|
raise HTTPException(status_code=500, detail="Candidates data not loaded")
|
|
|
|
# Find the candidate using our index
|
|
key = (request.wcms_ppid, request.linkedin_ppid)
|
|
if key not in _candidates_list_index:
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail=f"Match candidate not found: {request.wcms_ppid} -> {request.linkedin_ppid}"
|
|
)
|
|
|
|
idx = _candidates_list_index[key]
|
|
candidate = _candidates_cache['candidates'][idx]
|
|
|
|
# Update review status
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
candidate['reviewed'] = True
|
|
candidate['review_decision'] = request.decision.value
|
|
candidate['reviewed_by'] = user.get('username', 'unknown')
|
|
candidate['reviewed_at'] = now
|
|
if request.notes:
|
|
candidate['review_notes'] = request.notes
|
|
|
|
# Also update in-memory cache for _candidates_by_wcms
|
|
if _candidates_by_wcms and request.wcms_ppid in _candidates_by_wcms:
|
|
for c in _candidates_by_wcms[request.wcms_ppid]['candidates']:
|
|
if c.get('linkedin_ppid') == request.linkedin_ppid:
|
|
c['reviewed'] = True
|
|
c['review_decision'] = request.decision.value
|
|
c['reviewed_by'] = user.get('username', 'unknown')
|
|
c['reviewed_at'] = now
|
|
if request.notes:
|
|
c['review_notes'] = request.notes
|
|
break
|
|
|
|
# Auto-reject other candidates if decision is 'match' and auto_reject_others is True
|
|
auto_rejected_count = 0
|
|
if request.decision == ReviewDecision.MATCH and request.auto_reject_others:
|
|
# Find all OTHER candidates for this WCMS profile
|
|
for other_key, other_idx in _candidates_list_index.items():
|
|
other_wcms_ppid, other_linkedin_ppid = other_key
|
|
if other_wcms_ppid == request.wcms_ppid and other_linkedin_ppid != request.linkedin_ppid:
|
|
other_candidate = _candidates_cache['candidates'][other_idx]
|
|
# Only auto-reject if not already reviewed
|
|
if not other_candidate.get('reviewed', False):
|
|
other_candidate['reviewed'] = True
|
|
other_candidate['review_decision'] = 'not_match'
|
|
other_candidate['reviewed_by'] = user.get('username', 'unknown')
|
|
other_candidate['reviewed_at'] = now
|
|
other_candidate['review_notes'] = 'Auto-rejected: another candidate was matched'
|
|
auto_rejected_count += 1
|
|
|
|
# Also update in-memory cache
|
|
if _candidates_by_wcms and request.wcms_ppid in _candidates_by_wcms:
|
|
for c in _candidates_by_wcms[request.wcms_ppid]['candidates']:
|
|
if c.get('linkedin_ppid') == other_linkedin_ppid:
|
|
c['reviewed'] = True
|
|
c['review_decision'] = 'not_match'
|
|
c['reviewed_by'] = user.get('username', 'unknown')
|
|
c['reviewed_at'] = now
|
|
c['review_notes'] = 'Auto-rejected: another candidate was matched'
|
|
break
|
|
|
|
# Update metadata
|
|
_candidates_cache['metadata']['last_review_at'] = now
|
|
_candidates_cache['metadata']['last_reviewed_by'] = user.get('username', 'unknown')
|
|
|
|
# Atomic write to candidates file
|
|
if not atomic_write_json(CANDIDATES_FILE, _candidates_cache):
|
|
raise HTTPException(status_code=500, detail="Failed to save review decision")
|
|
|
|
# If decision is MATCH, create or update entity file with WCMS identifiers
|
|
entity_update_result = None
|
|
if request.decision == ReviewDecision.MATCH:
|
|
# Get source_urls from in-memory cache (non-LinkedIn evidence URLs)
|
|
source_urls = None
|
|
if _candidates_by_wcms and request.wcms_ppid in _candidates_by_wcms:
|
|
source_urls = _candidates_by_wcms[request.wcms_ppid].get('source_urls', [])
|
|
|
|
entity_update_result = await update_entity_with_wcms_identifiers(
|
|
linkedin_slug=candidate.get('linkedin_slug'),
|
|
wcms_ppid=request.wcms_ppid,
|
|
wcms_name=candidate.get('wcms_name', ''),
|
|
wcms_email=candidate.get('wcms_email', ''),
|
|
linkedin_url=candidate.get('linkedin_url', f"https://www.linkedin.com/in/{candidate.get('linkedin_slug', '')}"),
|
|
source_urls=source_urls
|
|
)
|
|
|
|
# Try git commit (may fail if not a git repo on server, that's OK)
|
|
commit_hash = None
|
|
try:
|
|
commit_hash = git_commit_review(
|
|
CANDIDATES_FILE,
|
|
user.get('username', 'unknown'),
|
|
request.decision.value,
|
|
request.notes or ""
|
|
)
|
|
except Exception as e:
|
|
print(f"Git commit skipped: {e}")
|
|
|
|
message = f"Review saved: {request.decision.value}"
|
|
if auto_rejected_count > 0:
|
|
message += f" ({auto_rejected_count} other candidate(s) auto-rejected)"
|
|
if entity_update_result:
|
|
message += f" - {entity_update_result}"
|
|
|
|
return ReviewResponse(
|
|
success=True,
|
|
message=message,
|
|
git_commit=commit_hash,
|
|
auto_rejected_count=auto_rejected_count
|
|
)
|
|
|
|
|
|
@router.get("/stats")
|
|
async def get_review_stats():
|
|
"""
|
|
Get overall review statistics.
|
|
Uses pre-aggregated candidates data.
|
|
"""
|
|
# Load candidates from cached file
|
|
load_candidates()
|
|
|
|
if _candidates_cache is None or _candidates_by_wcms is None:
|
|
return {
|
|
"total_profiles": 0,
|
|
"profiles_with_candidates": 0,
|
|
"total_candidates": 0,
|
|
"reviewed_candidates": 0,
|
|
"pending_candidates": 0,
|
|
"review_progress_percent": 0,
|
|
"decisions": {"match": 0, "not_match": 0, "uncertain": 0},
|
|
"likely_wrong_person": 0,
|
|
}
|
|
|
|
metadata = _candidates_cache.get('metadata', {})
|
|
|
|
total_candidates = 0
|
|
reviewed_candidates = 0
|
|
likely_wrong_person = 0
|
|
decisions = {"match": 0, "not_match": 0, "uncertain": 0}
|
|
|
|
for wcms_ppid, wcms_data in _candidates_by_wcms.items():
|
|
candidates = wcms_data.get('candidates', [])
|
|
total_candidates += len(candidates)
|
|
|
|
for c in candidates:
|
|
if c.get('reviewed'):
|
|
reviewed_candidates += 1
|
|
decision = c.get('review_decision')
|
|
if decision in decisions:
|
|
decisions[decision] += 1
|
|
if c.get('is_likely_wrong_person'):
|
|
likely_wrong_person += 1
|
|
|
|
# Get WCMS-only profiles count
|
|
wcms_only_profiles = load_wcms_only_profiles()
|
|
wcms_only_count = len(wcms_only_profiles)
|
|
|
|
# Total WCMS in email index
|
|
wcms_email_index = load_wcms_email_index()
|
|
total_wcms_in_index = len(wcms_email_index)
|
|
|
|
return {
|
|
"total_profiles": metadata.get('wcms_profiles_processed', 0),
|
|
"profiles_with_candidates": len(_candidates_by_wcms),
|
|
"wcms_only_count": wcms_only_count, # WCMS profiles without LinkedIn candidates
|
|
"total_wcms_in_index": total_wcms_in_index, # Total in WCMS email index
|
|
"total_candidates": total_candidates,
|
|
"reviewed_candidates": reviewed_candidates,
|
|
"pending_candidates": total_candidates - reviewed_candidates,
|
|
"review_progress_percent": round(reviewed_candidates / total_candidates * 100, 1) if total_candidates > 0 else 0,
|
|
"decisions": decisions,
|
|
"likely_wrong_person": likely_wrong_person,
|
|
"confidence_scoring_version": metadata.get('confidence_scoring_version', '1.0'),
|
|
}
|
|
|
|
|
|
@router.get("/signal-types")
|
|
async def get_signal_types():
|
|
"""
|
|
Get all available match signal types for filtering.
|
|
"""
|
|
load_candidates()
|
|
|
|
if _candidates_cache is None:
|
|
return {"signal_types": [], "signal_counts": {}}
|
|
|
|
signal_counts: dict = {}
|
|
for candidate in _candidates_cache.get('candidates', []):
|
|
for signal in candidate.get('match_signals', []):
|
|
signal_counts[signal] = signal_counts.get(signal, 0) + 1
|
|
|
|
# Sort by count descending
|
|
sorted_signals = sorted(signal_counts.items(), key=lambda x: x[1], reverse=True)
|
|
|
|
return {
|
|
"signal_types": [s[0] for s in sorted_signals],
|
|
"signal_counts": dict(sorted_signals),
|
|
"descriptions": {
|
|
# Name-based signals
|
|
"exact_name_match": "Names match exactly (strongest signal)",
|
|
"strong_name_match": "Names are very similar (score >= 0.85)",
|
|
"moderate_name_match": "Names are similar (score >= 0.70)",
|
|
|
|
# Employer/domain signals
|
|
"employer_match": "Employer domain matches email domain",
|
|
"email_domain_match": "Email domains match between profiles",
|
|
|
|
# Email semantic signals (birth year)
|
|
"email_indicates_birth_year_1940s": "Email contains year pattern from 1940s",
|
|
"email_indicates_birth_year_1950s": "Email contains year pattern from 1950s",
|
|
"email_indicates_birth_year_1960s": "Email contains year pattern from 1960s",
|
|
"email_indicates_birth_year_1970s": "Email contains year pattern from 1970s",
|
|
"email_indicates_birth_year_1980s": "Email contains year pattern from 1980s",
|
|
"email_indicates_birth_year_1990s": "Email contains year pattern from 1990s",
|
|
|
|
# Email institutional signals
|
|
"institutional_email_museum": "Email from museum domain",
|
|
"institutional_email_archive": "Email from archive domain",
|
|
"institutional_email_library": "Email from library domain",
|
|
"institutional_email_university": "Email from university domain",
|
|
"institutional_email_government": "Email from government domain",
|
|
"institutional_email_research": "Email from research institute domain",
|
|
|
|
# Email name matching
|
|
"email_name_matches_profile": "Names extracted from email match LinkedIn profile",
|
|
"email_lastname_in_linkedin_name": "Last name from email appears in LinkedIn name",
|
|
}
|
|
}
|
|
|
|
|
|
@router.get("/analyze-email/{email:path}")
|
|
async def analyze_email(email: str):
|
|
"""
|
|
Analyze an email address to extract semantic signals.
|
|
|
|
Returns:
|
|
- Probable birth year (if found in email)
|
|
- Institution name and type (if institutional domain)
|
|
- Extracted name components
|
|
- Whether email is from a consumer domain (gmail, outlook, etc.)
|
|
|
|
This endpoint is useful for:
|
|
- Understanding what signals are extracted from a specific email
|
|
- Debugging why certain candidates have specific signals
|
|
- On-demand analysis during review
|
|
"""
|
|
if not EMAIL_SEMANTICS_AVAILABLE:
|
|
raise HTTPException(
|
|
status_code=503,
|
|
detail="Email semantics module not available. Install from glam_extractor.entity_resolution"
|
|
)
|
|
|
|
try:
|
|
result = parse_email_semantics(email)
|
|
if result is None:
|
|
return {
|
|
"email": email,
|
|
"error": "Could not parse email address",
|
|
"parsed": False
|
|
}
|
|
|
|
return {
|
|
"email": email,
|
|
"parsed": True,
|
|
"local_part": result.local_part,
|
|
"domain": result.domain,
|
|
"probable_birth_year": result.probable_birth_year,
|
|
"birth_year_confidence": result.birth_year_confidence,
|
|
"birth_year_position": result.birth_year_position,
|
|
"institution_name": result.institution_name,
|
|
"institution_type": result.institution_type,
|
|
"institution_ghcid": result.institution_ghcid,
|
|
"is_institutional": result.is_institutional_domain,
|
|
"is_consumer_domain": result.is_consumer_domain,
|
|
"extracted_names": result.extracted_names,
|
|
"extracted_first_name": result.extracted_first_name,
|
|
"extracted_last_name": result.extracted_last_name,
|
|
"name_pattern": result.name_pattern,
|
|
"has_dutch_prefix": result.has_dutch_prefix,
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
"email": email,
|
|
"error": str(e),
|
|
"parsed": False
|
|
}
|
|
|
|
|
|
# ============================================================================
|
|
# Linkup Web Search Integration
|
|
# ============================================================================
|
|
|
|
import re
|
|
|
|
def extract_linkedin_info(url: str, title: str, content: str) -> dict:
|
|
"""Extract LinkedIn profile info from search result."""
|
|
info = {
|
|
'linkedin_slug': None,
|
|
'extracted_name': None,
|
|
'extracted_headline': None
|
|
}
|
|
|
|
# Extract slug from URL
|
|
slug_match = re.search(r'linkedin\.com/in/([^/?]+)', url)
|
|
if slug_match:
|
|
info['linkedin_slug'] = slug_match.group(1).lower()
|
|
|
|
# Extract name from title (format: "Name - Title | LinkedIn")
|
|
name_match = re.match(r'^([^-|]+)', title)
|
|
if name_match:
|
|
info['extracted_name'] = name_match.group(1).strip()
|
|
|
|
# Extract headline from title
|
|
headline_match = re.search(r' - ([^|]+)', title)
|
|
if headline_match:
|
|
info['extracted_headline'] = headline_match.group(1).strip()
|
|
|
|
return info
|
|
|
|
|
|
@router.post("/linkup-search", response_model=LinkupSearchResponse)
|
|
async def linkup_search(request: LinkupSearchRequest):
|
|
"""
|
|
Search for LinkedIn profiles using Linkup API.
|
|
|
|
Builds a comprehensive search query from WCMS profile data to find
|
|
potential LinkedIn matches for entity resolution.
|
|
|
|
Supports customizable search:
|
|
- custom_query: Use a completely custom query string instead of auto-generated
|
|
- num_results: Control how many results to return (default 10)
|
|
- include_linkedin_keyword: Whether to add "LinkedIn" to the query (default True)
|
|
|
|
Returns:
|
|
- List of search results with extracted LinkedIn profile info
|
|
- Each result includes the LinkedIn slug if found in a LinkedIn URL
|
|
"""
|
|
if not LINKUP_API_KEY:
|
|
return LinkupSearchResponse(
|
|
success=False,
|
|
query="",
|
|
results=[],
|
|
error="Linkup API key not configured. Set LINKUP_API_KEY environment variable."
|
|
)
|
|
|
|
# Use custom query if provided, otherwise build from profile data
|
|
if request.custom_query and request.custom_query.strip():
|
|
query = request.custom_query.strip()
|
|
else:
|
|
# Build comprehensive search query from WCMS profile data
|
|
query_parts = [request.name]
|
|
|
|
# Add institution context if available
|
|
if request.institution:
|
|
query_parts.append(request.institution)
|
|
|
|
# Add email domain as potential employer hint
|
|
if request.email_domain and request.email_domain not in ['gmail.com', 'outlook.com', 'hotmail.com', 'yahoo.com']:
|
|
# Extract org name from domain (e.g., "rijksmuseum.nl" -> "rijksmuseum")
|
|
domain_parts = request.email_domain.split('.')
|
|
if len(domain_parts) >= 2:
|
|
org_hint = domain_parts[0]
|
|
if org_hint not in ['mail', 'email', 'info', 'contact']:
|
|
query_parts.append(org_hint)
|
|
|
|
# Add any additional context
|
|
if request.additional_context:
|
|
query_parts.append(request.additional_context)
|
|
|
|
# Add LinkedIn context to focus search (optional)
|
|
if request.include_linkedin_keyword:
|
|
query_parts.append("LinkedIn")
|
|
|
|
query = ' '.join(query_parts)
|
|
|
|
try:
|
|
async with httpx.AsyncClient() as client:
|
|
response = await client.post(
|
|
LINKUP_API_URL,
|
|
headers={
|
|
"Authorization": f"Bearer {LINKUP_API_KEY}",
|
|
"Content-Type": "application/json"
|
|
},
|
|
json={
|
|
"q": query,
|
|
"depth": "standard",
|
|
"outputType": "searchResults"
|
|
},
|
|
timeout=30.0
|
|
)
|
|
|
|
if response.status_code != 200:
|
|
return LinkupSearchResponse(
|
|
success=False,
|
|
query=query,
|
|
results=[],
|
|
error=f"Linkup API returned status {response.status_code}: {response.text[:200]}"
|
|
)
|
|
|
|
data = response.json()
|
|
raw_results = data.get('results', [])
|
|
|
|
# Limit results to requested number
|
|
raw_results = raw_results[:request.num_results]
|
|
|
|
# Process results and extract LinkedIn info
|
|
processed_results = []
|
|
for result in raw_results:
|
|
url = result.get('url', '')
|
|
title = result.get('name', '')
|
|
content = result.get('content', '')[:500] # Limit content length
|
|
|
|
# Extract LinkedIn info if this is a LinkedIn URL
|
|
linkedin_info = extract_linkedin_info(url, title, content)
|
|
|
|
processed_results.append(LinkupSearchResult(
|
|
url=url,
|
|
title=title,
|
|
content=content,
|
|
linkedin_slug=linkedin_info['linkedin_slug'],
|
|
extracted_name=linkedin_info['extracted_name'],
|
|
extracted_headline=linkedin_info['extracted_headline']
|
|
))
|
|
|
|
return LinkupSearchResponse(
|
|
success=True,
|
|
query=query,
|
|
results=processed_results
|
|
)
|
|
|
|
except httpx.TimeoutException:
|
|
return LinkupSearchResponse(
|
|
success=False,
|
|
query=query,
|
|
results=[],
|
|
error="Linkup API request timed out"
|
|
)
|
|
except Exception as e:
|
|
return LinkupSearchResponse(
|
|
success=False,
|
|
query=query,
|
|
results=[],
|
|
error=f"Linkup API error: {str(e)}"
|
|
)
|
|
|
|
|
|
# ============================================================================
|
|
# LinkedIn Profile Extraction (Exa and Linkup providers)
|
|
# ============================================================================
|
|
|
|
import re as regex_module # Avoid shadowing
|
|
|
|
async def fetch_linkedin_profile_exa(linkedin_url: str) -> Optional[dict]:
|
|
"""
|
|
Fetch LinkedIn profile data using Exa Contents API.
|
|
|
|
Returns parsed profile data or None if extraction fails.
|
|
"""
|
|
if not EXA_API_KEY:
|
|
print("Exa API: No API key configured")
|
|
return None
|
|
|
|
try:
|
|
async with httpx.AsyncClient() as client:
|
|
response = await client.post(
|
|
EXA_API_URL,
|
|
headers={
|
|
"Authorization": f"Bearer {EXA_API_KEY}",
|
|
"Content-Type": "application/json",
|
|
},
|
|
json={
|
|
"ids": [linkedin_url],
|
|
"text": True,
|
|
"livecrawl": "fallback"
|
|
},
|
|
timeout=60.0
|
|
)
|
|
|
|
if response.status_code != 200:
|
|
print(f"Exa API error: HTTP {response.status_code}")
|
|
return None
|
|
|
|
data = response.json()
|
|
if not data.get('results') or len(data['results']) == 0:
|
|
print("Exa API: No results returned")
|
|
return None
|
|
|
|
result = data['results'][0]
|
|
return {
|
|
'raw_result': result,
|
|
'request_id': data.get('requestId', ''),
|
|
'cost': data.get('costDollars', {}).get('total', 0),
|
|
'provider': 'exa'
|
|
}
|
|
|
|
except Exception as e:
|
|
print(f"Exa API exception: {e}")
|
|
return None
|
|
|
|
|
|
async def fetch_linkedin_profile_linkup(linkedin_url: str) -> Optional[dict]:
|
|
"""
|
|
Fetch LinkedIn profile data using Linkup Fetch API.
|
|
|
|
Returns parsed profile data or None if extraction fails.
|
|
"""
|
|
if not LINKUP_API_KEY:
|
|
print("Linkup API: No API key configured")
|
|
return None
|
|
|
|
try:
|
|
async with httpx.AsyncClient() as client:
|
|
response = await client.post(
|
|
LINKUP_FETCH_URL,
|
|
headers={
|
|
"Authorization": f"Bearer {LINKUP_API_KEY}",
|
|
"Content-Type": "application/json",
|
|
},
|
|
json={
|
|
"url": linkedin_url,
|
|
"outputType": "markdown"
|
|
},
|
|
timeout=60.0
|
|
)
|
|
|
|
if response.status_code != 200:
|
|
print(f"Linkup API error: HTTP {response.status_code}")
|
|
return None
|
|
|
|
data = response.json()
|
|
content = data.get('content', '')
|
|
|
|
if not content:
|
|
print("Linkup API: No content returned")
|
|
return None
|
|
|
|
# Transform to Exa-like format for consistent parsing
|
|
return {
|
|
'raw_result': {
|
|
'text': content,
|
|
'url': linkedin_url,
|
|
'title': data.get('title', ''),
|
|
'author': '', # Will be extracted from content
|
|
'image': ''
|
|
},
|
|
'request_id': '',
|
|
'cost': 0, # Linkup doesn't report cost per request
|
|
'provider': 'linkup'
|
|
}
|
|
|
|
except Exception as e:
|
|
print(f"Linkup API exception: {e}")
|
|
return None
|
|
|
|
|
|
async def fetch_linkedin_profile(linkedin_url: str) -> Optional[dict]:
|
|
"""
|
|
Fetch LinkedIn profile using configured providers with fallback.
|
|
|
|
Uses LINKEDIN_FETCH_PROVIDERS setting to determine order.
|
|
Examples:
|
|
"exa" - Use only Exa
|
|
"linkup" - Use only Linkup
|
|
"exa,linkup" - Try Exa first, fallback to Linkup
|
|
"linkup,exa" - Try Linkup first, fallback to Exa
|
|
|
|
Returns the raw response with 'provider' field indicating which was used.
|
|
"""
|
|
providers = [p.strip().lower() for p in LINKEDIN_FETCH_PROVIDERS.split(',')]
|
|
|
|
for provider in providers:
|
|
print(f"Trying LinkedIn profile fetch with: {provider}")
|
|
|
|
if provider == 'exa':
|
|
result = await fetch_linkedin_profile_exa(linkedin_url)
|
|
if result:
|
|
print(f"Successfully fetched profile with Exa")
|
|
return result
|
|
|
|
elif provider == 'linkup':
|
|
result = await fetch_linkedin_profile_linkup(linkedin_url)
|
|
if result:
|
|
print(f"Successfully fetched profile with Linkup")
|
|
return result
|
|
|
|
else:
|
|
print(f"Unknown provider: {provider}")
|
|
|
|
print(f"All providers failed for: {linkedin_url}")
|
|
return None
|
|
|
|
|
|
def parse_linkedin_profile_from_exa(raw_data: dict) -> dict:
|
|
"""Parse Exa response into structured profile data."""
|
|
result = raw_data.get('raw_result', {})
|
|
text = result.get('text', '')
|
|
|
|
profile_data = {
|
|
"name": result.get('author', ''),
|
|
"linkedin_url": result.get('url', ''),
|
|
"headline": "",
|
|
"location": "",
|
|
"connections": "",
|
|
"about": "",
|
|
"experience": [],
|
|
"education": [],
|
|
"skills": [],
|
|
"languages": [],
|
|
"profile_image_url": result.get('image', '')
|
|
}
|
|
|
|
# Parse headline from title
|
|
title = result.get('title', '')
|
|
if '|' in title:
|
|
profile_data['headline'] = title.split('|')[1].strip()
|
|
elif title:
|
|
profile_data['headline'] = title.replace(profile_data['name'], '').strip(' |')
|
|
|
|
# Parse sections from text
|
|
lines = text.split('\n')
|
|
current_section = None
|
|
current_item = {}
|
|
|
|
for line in lines:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
|
|
# Section headers
|
|
if line.startswith('## About'):
|
|
current_section = 'about'
|
|
continue
|
|
elif line.startswith('## Experience'):
|
|
current_section = 'experience'
|
|
continue
|
|
elif line.startswith('## Education'):
|
|
current_section = 'education'
|
|
continue
|
|
elif line.startswith('## Skills'):
|
|
current_section = 'skills'
|
|
continue
|
|
elif line.startswith('## Languages'):
|
|
current_section = 'languages'
|
|
continue
|
|
elif line.startswith('## '):
|
|
current_section = None
|
|
continue
|
|
|
|
# Parse location and connections
|
|
if 'connections' in line.lower() and 'followers' in line.lower():
|
|
profile_data['connections'] = line
|
|
continue
|
|
if regex_module.match(r'^[A-Za-z\s,]+\s*\([A-Z]{2}\)$', line):
|
|
profile_data['location'] = line
|
|
continue
|
|
|
|
# Parse content based on section
|
|
if current_section == 'about':
|
|
if not profile_data['about']:
|
|
profile_data['about'] = line
|
|
else:
|
|
profile_data['about'] += ' ' + line
|
|
|
|
elif current_section == 'experience':
|
|
if line.startswith('### '):
|
|
if current_item and current_item.get('title'):
|
|
profile_data['experience'].append(current_item)
|
|
|
|
exp_text = line[4:]
|
|
if ' at ' in exp_text:
|
|
parts = exp_text.split(' at ', 1)
|
|
exp_title = parts[0].strip()
|
|
company_part = parts[1].strip()
|
|
company_match = regex_module.search(r'\[([^\]]+)\]', company_part)
|
|
company = company_match.group(1) if company_match else company_part
|
|
current_item = {'title': exp_title, 'company': company}
|
|
else:
|
|
current_item = {'title': exp_text}
|
|
elif current_item and ' - ' in line and ('Present' in line or regex_module.search(r'\d{4}', line)):
|
|
current_item['date_range'] = line
|
|
elif current_item and line and not line.startswith('Company:') and not line.startswith('Department:'):
|
|
if 'location' not in current_item and regex_module.match(r'^[A-Za-z\s,\-]+$', line):
|
|
current_item['location'] = line
|
|
elif 'description' not in current_item:
|
|
current_item['description'] = line
|
|
|
|
elif current_section == 'education':
|
|
if line.startswith('### '):
|
|
if current_item and current_item.get('institution'):
|
|
profile_data['education'].append(current_item)
|
|
|
|
edu_text = line[4:]
|
|
if ' at ' in edu_text:
|
|
parts = edu_text.split(' at ', 1)
|
|
degree = parts[0].strip()
|
|
inst_part = parts[1].strip()
|
|
inst_match = regex_module.search(r'\[([^\]]+)\]', inst_part)
|
|
institution = inst_match.group(1) if inst_match else inst_part
|
|
current_item = {'degree': degree, 'institution': institution}
|
|
else:
|
|
inst_match = regex_module.search(r'\[([^\]]+)\]', edu_text)
|
|
current_item = {'institution': inst_match.group(1) if inst_match else edu_text}
|
|
elif current_item and regex_module.match(r'^\d{4}\s*-\s*\d{4}', line):
|
|
current_item['date_range'] = line
|
|
|
|
elif current_section == 'skills':
|
|
skills = [s.strip() for s in regex_module.split(r'[•,]', line) if s.strip()]
|
|
profile_data['skills'].extend(skills)
|
|
|
|
elif current_section == 'languages':
|
|
lang_match = regex_module.match(r'^([A-Za-z\s]+)\s*-\s*(.+)$', line)
|
|
if lang_match:
|
|
profile_data['languages'].append({
|
|
'language': lang_match.group(1).strip(),
|
|
'proficiency': lang_match.group(2).strip()
|
|
})
|
|
|
|
# Save last items
|
|
if current_section == 'experience' and current_item and current_item.get('title'):
|
|
profile_data['experience'].append(current_item)
|
|
if current_section == 'education' and current_item and current_item.get('institution'):
|
|
profile_data['education'].append(current_item)
|
|
|
|
return profile_data
|
|
|
|
|
|
def normalize_name_for_ppid(name: str) -> str:
|
|
"""Convert name to PPID format: FIRST-LAST (uppercase, hyphen-separated)."""
|
|
import unicodedata
|
|
if not name:
|
|
return "UNKNOWN"
|
|
|
|
# Normalize unicode and remove diacritics
|
|
normalized = unicodedata.normalize('NFD', name)
|
|
ascii_name = ''.join(c for c in normalized if unicodedata.category(c) != 'Mn')
|
|
|
|
# Keep only alphanumeric and spaces, convert to uppercase
|
|
clean = ''.join(c if c.isalnum() or c.isspace() else ' ' for c in ascii_name).upper()
|
|
|
|
# Split into words and join with hyphens
|
|
words = [w for w in clean.split() if w]
|
|
|
|
return '-'.join(words) if words else "UNKNOWN"
|
|
|
|
|
|
def generate_ppid_from_name(name: str, linkedin_slug: str) -> str:
|
|
"""Generate a PPID for a LinkedIn profile based on name."""
|
|
if name and name != 'LinkedIn Member':
|
|
name_token = normalize_name_for_ppid(name)
|
|
else:
|
|
# Use LinkedIn slug as fallback
|
|
name_token = linkedin_slug.upper().replace('-', '-')
|
|
|
|
return f"ID_XX-XX-XXX_XXXX_XX-XX-XXX_XXXX_{name_token}"
|
|
|
|
|
|
async def save_entity_profile(
|
|
linkedin_slug: str,
|
|
profile_data: dict,
|
|
raw_response: dict,
|
|
source_info: dict
|
|
) -> Optional[str]:
|
|
"""
|
|
Save extracted LinkedIn profile as a FULL PPID profile in data/person/.
|
|
Also saves a copy to entity directory for backward compatibility.
|
|
|
|
Returns the PPID filepath if saved, None if failed.
|
|
"""
|
|
import uuid
|
|
|
|
try:
|
|
# Ensure directories exist
|
|
ENTITY_DIR.mkdir(parents=True, exist_ok=True)
|
|
PERSON_DATA_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
|
|
timestamp_iso = datetime.now(timezone.utc).isoformat()
|
|
|
|
# Extract name for PPID generation
|
|
name = profile_data.get('name', '') or source_info.get('wcms_name', '') or 'Unknown'
|
|
|
|
# Generate PPID
|
|
ppid = generate_ppid_from_name(name, linkedin_slug)
|
|
|
|
# Check for PPID collision in data/person/
|
|
ppid_filename = f"{ppid}.json"
|
|
ppid_filepath = PERSON_DATA_DIR / ppid_filename
|
|
|
|
if ppid_filepath.exists():
|
|
# Add UUID suffix to avoid collision
|
|
short_uuid = str(uuid.uuid4())[:8]
|
|
ppid = f"{ppid}-{short_uuid}"
|
|
ppid_filename = f"{ppid}.json"
|
|
ppid_filepath = PERSON_DATA_DIR / ppid_filename
|
|
|
|
# Determine if anonymous profile
|
|
is_anonymous = (name == 'LinkedIn Member')
|
|
|
|
# Generate name tokens for PPID components
|
|
if is_anonymous:
|
|
name_tokens = ppid.split('_')[-1].split('-')
|
|
else:
|
|
name_tokens = normalize_name_for_ppid(name).split('-')
|
|
|
|
# Create FULL PPID profile structure (matching migrate_entity_to_ppid_v5.py format)
|
|
ppid_profile = {
|
|
"ppid": ppid,
|
|
"ppid_type": "ID",
|
|
"ppid_components": {
|
|
"type": "ID",
|
|
"first_location": "XX-XX-XXX",
|
|
"first_date": "XXXX",
|
|
"last_location": "XX-XX-XXX",
|
|
"last_date": "XXXX",
|
|
"name_tokens": name_tokens
|
|
},
|
|
"name": name,
|
|
"linkedin_slug": linkedin_slug,
|
|
"birth_date": {
|
|
"edtf": "XXXX",
|
|
"precision": "unknown",
|
|
"note": "Not yet enriched - requires manual research"
|
|
},
|
|
"is_living": True,
|
|
"is_anonymous": is_anonymous,
|
|
"profile_classification": {
|
|
"primary_classification": "human",
|
|
"confidence": 0.8,
|
|
"indicators": [
|
|
{"type": "personal_linkedin_url", "reason": "Has personal LinkedIn /in/ URL"}
|
|
],
|
|
"reasoning": "Manually added LinkedIn profile with personal /in/ URL"
|
|
},
|
|
"heritage_relevance": {
|
|
"is_heritage_relevant": None, # To be determined during review
|
|
"heritage_types": [],
|
|
"rationale": "Manually added candidate - heritage relevance to be determined"
|
|
},
|
|
"affiliations": [],
|
|
"profile_data": profile_data,
|
|
"web_claims": [
|
|
{
|
|
"claim_type": "linkedin_url",
|
|
"claim_value": source_info.get('linkedin_url', f"https://www.linkedin.com/in/{linkedin_slug}"),
|
|
"source_url": source_info.get('linkedin_url', f"https://www.linkedin.com/in/{linkedin_slug}"),
|
|
"retrieved_on": timestamp_iso,
|
|
"statement_created_at": timestamp_iso,
|
|
"source_archived_at": timestamp_iso,
|
|
"retrieval_agent": "entity_review_api"
|
|
},
|
|
{
|
|
"claim_type": "full_name",
|
|
"claim_value": name,
|
|
"source_url": source_info.get('linkedin_url', f"https://www.linkedin.com/in/{linkedin_slug}"),
|
|
"retrieved_on": timestamp_iso,
|
|
"statement_created_at": timestamp_iso,
|
|
"source_archived_at": timestamp_iso,
|
|
"retrieval_agent": "entity_review_api"
|
|
}
|
|
],
|
|
"source_observations": [
|
|
{
|
|
"source_file": "manual_add_candidate",
|
|
"observed_on": timestamp_iso,
|
|
"extraction_agent": "entity_review_api"
|
|
}
|
|
],
|
|
"extraction_metadata": {
|
|
"source_file": "manual_add_candidate",
|
|
"staff_id": f"manual_add_{linkedin_slug}",
|
|
"extraction_date": timestamp_iso,
|
|
"extraction_method": f"{raw_response.get('provider', 'unknown')}_contents",
|
|
"extraction_agent": "entity_review_api",
|
|
"linkedin_url": source_info.get('linkedin_url', f"https://www.linkedin.com/in/{linkedin_slug}"),
|
|
"cost_usd": raw_response.get('cost', 0),
|
|
"request_id": raw_response.get('request_id', ''),
|
|
"schema_version": "1.0.0",
|
|
"notes": "Created via manual add-candidate API endpoint"
|
|
},
|
|
"migration_metadata": {
|
|
"original_entity_file": None,
|
|
"original_person_id": linkedin_slug,
|
|
"original_linkedin_slug": linkedin_slug,
|
|
"migrated_at": timestamp_iso,
|
|
"migration_script": "entity_review_api.save_entity_profile",
|
|
"migration_version": "1.0"
|
|
}
|
|
}
|
|
|
|
# Add headline claim if available
|
|
if profile_data.get('headline'):
|
|
ppid_profile["web_claims"].append({
|
|
"claim_type": "headline",
|
|
"claim_value": profile_data['headline'],
|
|
"source_url": source_info.get('linkedin_url', f"https://www.linkedin.com/in/{linkedin_slug}"),
|
|
"retrieved_on": timestamp_iso,
|
|
"statement_created_at": timestamp_iso,
|
|
"source_archived_at": timestamp_iso,
|
|
"retrieval_agent": "entity_review_api"
|
|
})
|
|
|
|
# Add location claim if available
|
|
if profile_data.get('location'):
|
|
ppid_profile["web_claims"].append({
|
|
"claim_type": "location",
|
|
"claim_value": profile_data['location'],
|
|
"source_url": source_info.get('linkedin_url', f"https://www.linkedin.com/in/{linkedin_slug}"),
|
|
"retrieved_on": timestamp_iso,
|
|
"statement_created_at": timestamp_iso,
|
|
"source_archived_at": timestamp_iso,
|
|
"retrieval_agent": "entity_review_api"
|
|
})
|
|
|
|
# Save PPID profile to data/person/
|
|
with open(ppid_filepath, 'w', encoding='utf-8') as f:
|
|
json.dump(ppid_profile, f, indent=2, ensure_ascii=False)
|
|
|
|
print(f"Saved PPID profile: {ppid_filepath}")
|
|
|
|
# Also save to entity directory for backward compatibility
|
|
entity_filename = f"{linkedin_slug}_{timestamp}.json"
|
|
entity_filepath = ENTITY_DIR / entity_filename
|
|
|
|
# Create simplified entity format (backward compat)
|
|
entity_data = {
|
|
"person_id": linkedin_slug,
|
|
"ppid": ppid,
|
|
"ppid_file": str(ppid_filepath),
|
|
"extraction_metadata": ppid_profile["extraction_metadata"],
|
|
"source_staff_info": {
|
|
"name": source_info.get('wcms_name', ''),
|
|
"headline": profile_data.get('headline', ''),
|
|
"heritage_type": None,
|
|
"custodian": source_info.get('wcms_institution', '')
|
|
},
|
|
"profile_data": profile_data,
|
|
"heritage_relevance": ppid_profile["heritage_relevance"],
|
|
"linkedin_slug": linkedin_slug
|
|
}
|
|
|
|
with open(entity_filepath, 'w', encoding='utf-8') as f:
|
|
json.dump(entity_data, f, indent=2, ensure_ascii=False)
|
|
|
|
print(f"Saved entity file: {entity_filepath}")
|
|
|
|
# Return the PPID filepath (primary location)
|
|
return str(ppid_filepath)
|
|
|
|
except Exception as e:
|
|
print(f"Error saving entity profile: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return None
|
|
|
|
|
|
@router.post("/add-candidate", response_model=AddCandidateResponse)
|
|
async def add_manual_candidate(request: AddCandidateRequest):
|
|
"""
|
|
Add a LinkedIn candidate manually by providing a LinkedIn slug.
|
|
This creates a match candidate entry for the given WCMS profile.
|
|
|
|
The candidate is added to the entity_resolution_candidates.json file
|
|
which is the single source of truth for entity resolution.
|
|
"""
|
|
import re
|
|
|
|
# Normalize the LinkedIn slug (remove any URL parts)
|
|
linkedin_slug = request.linkedin_slug.strip().lower()
|
|
# Remove any remaining URL fragments
|
|
linkedin_slug = re.sub(r'^https?://.*linkedin\.com/in/', '', linkedin_slug)
|
|
linkedin_slug = linkedin_slug.rstrip('/')
|
|
|
|
if not linkedin_slug:
|
|
raise HTTPException(status_code=400, detail="Invalid LinkedIn slug")
|
|
|
|
wcms_ppid = request.wcms_ppid
|
|
|
|
# Load candidates from the aggregated file
|
|
load_candidates()
|
|
|
|
if _candidates_cache is None:
|
|
raise HTTPException(status_code=500, detail="Candidates data not loaded")
|
|
|
|
# Check if the WCMS profile exists in our candidates file
|
|
profile_exists_in_candidates = _candidates_by_wcms is not None and wcms_ppid in _candidates_by_wcms
|
|
|
|
if profile_exists_in_candidates:
|
|
wcms_data = _candidates_by_wcms[wcms_ppid]
|
|
existing_candidates = wcms_data.get('candidates', [])
|
|
else:
|
|
# Profile not in candidates file - this happens for WCMS-only profiles found via Qdrant search
|
|
# We need WCMS data from the request to create a new entry
|
|
if not request.wcms_name and not request.wcms_email:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"WCMS profile {wcms_ppid} not found in candidates. Provide wcms_name and wcms_email to create new entry."
|
|
)
|
|
|
|
# Create wcms_data from request
|
|
wcms_data = {
|
|
'wcms_ppid': wcms_ppid,
|
|
'wcms_name': request.wcms_name or 'Unknown',
|
|
'wcms_email': request.wcms_email,
|
|
'wcms_email_domain': request.wcms_email_domain or (request.wcms_email.split('@')[1] if request.wcms_email and '@' in request.wcms_email else None),
|
|
'wcms_user_id': request.wcms_user_id,
|
|
'wcms_abs_id': request.wcms_abs_id,
|
|
'wcms_crm_id': request.wcms_crm_id,
|
|
'wcms_status': request.wcms_status,
|
|
'candidates': []
|
|
}
|
|
existing_candidates = []
|
|
|
|
# Add the new WCMS profile to the cache so it can be found later
|
|
if _candidates_by_wcms is not None:
|
|
_candidates_by_wcms[wcms_ppid] = wcms_data
|
|
|
|
# Check if this LinkedIn slug already exists as a candidate
|
|
for candidate in existing_candidates:
|
|
if candidate.get('linkedin_slug') == linkedin_slug:
|
|
return AddCandidateResponse(
|
|
success=False,
|
|
message=f"Candidate {linkedin_slug} already exists for this profile"
|
|
)
|
|
|
|
# Fetch LinkedIn profile using configured providers (exa, linkup, or both)
|
|
linkedin_url = f"https://www.linkedin.com/in/{linkedin_slug}"
|
|
profile_data = None
|
|
entity_filepath = None
|
|
fetch_response = None
|
|
provider_used = None
|
|
|
|
fetch_response = await fetch_linkedin_profile(linkedin_url)
|
|
if fetch_response:
|
|
provider_used = fetch_response.get('provider', 'unknown')
|
|
profile_data = parse_linkedin_profile_from_exa(fetch_response) # Parser works for both providers
|
|
# Save entity profile file
|
|
source_info = {
|
|
'linkedin_url': linkedin_url,
|
|
'wcms_name': wcms_data.get('wcms_name', ''),
|
|
'wcms_institution': wcms_data.get('wcms_email_domain', '')
|
|
}
|
|
entity_filepath = await save_entity_profile(
|
|
linkedin_slug, profile_data, fetch_response, source_info
|
|
)
|
|
|
|
# Generate a PPID for the LinkedIn candidate
|
|
# Format: ID_XX-XX-XXX_XXXX_XX-XX-XXX_XXXX_NAME-SLUG
|
|
name_part = linkedin_slug.upper().replace('-', '-')
|
|
# Extract just the name portion for the PPID (without the LinkedIn hash suffix)
|
|
name_parts = linkedin_slug.rsplit('-', 1)
|
|
if len(name_parts) == 2 and len(name_parts[1]) >= 6 and name_parts[1].isalnum():
|
|
# Has a LinkedIn hash suffix like "marijn-cornelis-meijer-09b799200"
|
|
name_for_ppid = name_parts[0].upper().replace('-', '-')
|
|
else:
|
|
name_for_ppid = linkedin_slug.upper().replace('-', '-')
|
|
|
|
linkedin_ppid = f"ID_XX-XX-XXX_XXXX_XX-XX-XXX_XXXX_{name_for_ppid.upper()}"
|
|
|
|
# Derive a human-readable name from the slug or use fetched name
|
|
if profile_data and profile_data.get('name'):
|
|
display_name = profile_data['name']
|
|
else:
|
|
# Remove the hash suffix if present (e.g., "marijn-cornelis-meijer-09b799200" -> "Marijn Cornelis Meijer")
|
|
name_parts = linkedin_slug.rsplit('-', 1)
|
|
if len(name_parts) == 2 and len(name_parts[1]) >= 6 and name_parts[1].isalnum():
|
|
display_name = name_parts[0].replace('-', ' ').title()
|
|
else:
|
|
display_name = linkedin_slug.replace('-', ' ').title()
|
|
|
|
# Get headline from fetched profile
|
|
headline = profile_data.get('headline', '') if profile_data else ''
|
|
location = profile_data.get('location', '') if profile_data else ''
|
|
|
|
# Create the new candidate entry matching the existing format
|
|
new_candidate = {
|
|
"wcms_ppid": wcms_ppid,
|
|
"wcms_name": wcms_data.get('wcms_name', ''),
|
|
"wcms_email": wcms_data.get('wcms_email'),
|
|
"wcms_email_domain": wcms_data.get('wcms_email_domain'),
|
|
"linkedin_ppid": linkedin_ppid,
|
|
"linkedin_name": {
|
|
"full_name": display_name,
|
|
"display_name": display_name,
|
|
"name_romanized": None,
|
|
"name_tokens": [t.upper() for t in display_name.split()],
|
|
"source": f"{provider_used}_extraction" if profile_data else "manual_entry"
|
|
},
|
|
"linkedin_slug": linkedin_slug,
|
|
"linkedin_headline": headline,
|
|
"linkedin_location": location,
|
|
"linkedin_url": linkedin_url,
|
|
"entity_profile_path": entity_filepath,
|
|
"name_match_score": 0.5, # Neutral score for manual entries
|
|
"email_domain_matches_employer": False,
|
|
"employer_name_overlap": [],
|
|
"confidence_score": 0.6 if profile_data else 0.5, # Higher if we got profile data
|
|
"match_signals": ["manual_entry", f"{provider_used}_profile_fetched"] if profile_data else ["manual_entry"],
|
|
"requires_review": True,
|
|
"reviewed": False,
|
|
"review_decision": None,
|
|
"reviewed_by": None,
|
|
"reviewed_at": None,
|
|
"review_notes": None,
|
|
"email_probable_birth_year": None,
|
|
"email_birth_year_confidence": 0.0,
|
|
"email_institution_name": None,
|
|
"email_institution_type": None,
|
|
"email_institution_ghcid": None,
|
|
"email_extracted_names": [],
|
|
"email_extracted_first_name": None,
|
|
"email_extracted_last_name": None,
|
|
"email_has_dutch_prefix": False,
|
|
"email_is_institutional": False,
|
|
"is_likely_wrong_person": False,
|
|
"confidence_adjustments": [],
|
|
"added_manually": True,
|
|
"added_at": datetime.now(timezone.utc).isoformat()
|
|
}
|
|
|
|
# Add to the candidates list (raw format for file storage)
|
|
_candidates_cache['candidates'].append(new_candidate)
|
|
|
|
# Update the index
|
|
idx = len(_candidates_cache['candidates']) - 1
|
|
_candidates_list_index[(wcms_ppid, linkedin_ppid)] = idx
|
|
|
|
# Create the transformed format for the cache (matching load_candidates transform)
|
|
candidate_for_cache = {
|
|
'linkedin_ppid': linkedin_ppid,
|
|
'linkedin_name': display_name, # String format for API responses
|
|
'linkedin_slug': linkedin_slug,
|
|
'linkedin_headline': headline,
|
|
'linkedin_location': location,
|
|
'linkedin_url': linkedin_url,
|
|
'entity_profile_path': entity_filepath,
|
|
'confidence_score': 0.6 if profile_data else 0.5,
|
|
'match_signals': ["manual_entry", "exa_profile_fetched"] if profile_data else ["manual_entry"],
|
|
'name_match_score': 0.5,
|
|
'email_domain_matches_employer': False,
|
|
'employer_name_overlap': [],
|
|
'reviewed': False,
|
|
'review_decision': None,
|
|
'reviewed_by': None,
|
|
'reviewed_at': None,
|
|
'review_notes': None,
|
|
'email_birth_year': None,
|
|
'email_birth_year_confidence': 0.0,
|
|
'email_name_components': [],
|
|
'email_name_matches_profile': False,
|
|
'email_institution_name': None,
|
|
'email_institution_type': None,
|
|
'email_is_institutional': False,
|
|
'is_likely_wrong_person': False,
|
|
'wrong_person_reason': None,
|
|
'confidence_original': None,
|
|
}
|
|
|
|
# Also add to wcms lookup (transformed format)
|
|
if wcms_ppid in _candidates_by_wcms:
|
|
_candidates_by_wcms[wcms_ppid]['candidates'].append(candidate_for_cache)
|
|
|
|
# Add to linkedin lookup
|
|
if _candidates_by_linkedin is not None:
|
|
_candidates_by_linkedin[linkedin_ppid] = {
|
|
'linkedin_ppid': linkedin_ppid,
|
|
'linkedin_name': display_name,
|
|
'linkedin_slug': linkedin_slug,
|
|
'linkedin_headline': headline,
|
|
'linkedin_name_raw': new_candidate['linkedin_name']
|
|
}
|
|
|
|
# Update metadata
|
|
if 'metadata' in _candidates_cache:
|
|
_candidates_cache['metadata']['total_candidates'] = len(_candidates_cache['candidates'])
|
|
_candidates_cache['metadata']['last_manual_add_at'] = datetime.now(timezone.utc).isoformat()
|
|
|
|
# Save using atomic write with backup
|
|
if not atomic_write_json(CANDIDATES_FILE, _candidates_cache):
|
|
# Invalidate cache on failure
|
|
invalidate_cache()
|
|
raise HTTPException(status_code=500, detail="Failed to save candidate")
|
|
|
|
# Build response message
|
|
if profile_data:
|
|
message = f"Added {linkedin_slug} with profile data ({headline[:50]}...) as a candidate for {wcms_data.get('wcms_name', wcms_ppid)}"
|
|
else:
|
|
message = f"Added {linkedin_slug} as a candidate for {wcms_data.get('wcms_name', wcms_ppid)} (profile fetch unavailable)"
|
|
|
|
return AddCandidateResponse(
|
|
success=True,
|
|
message=message,
|
|
linkedin_ppid=linkedin_ppid
|
|
)
|
|
|
|
|
|
@router.post("/add-source-url", response_model=AddSourceUrlResponse)
|
|
async def add_source_url(request: AddSourceUrlRequest):
|
|
"""
|
|
Add a generic source URL with optional comments to a WCMS profile.
|
|
|
|
This allows adding any URL that provides evidence of a person's identity,
|
|
such as:
|
|
- Social directories (socialekaartdenhaag.nl)
|
|
- News articles mentioning the person
|
|
- Company websites with staff pages
|
|
- Academic profiles
|
|
- Any other webpage that helps identify the person
|
|
|
|
The URL and comment are stored as a source_observation in the candidates file.
|
|
"""
|
|
from urllib.parse import urlparse
|
|
import hashlib
|
|
|
|
source_url = request.source_url.strip()
|
|
if not source_url:
|
|
raise HTTPException(status_code=400, detail="Source URL is required")
|
|
|
|
# Validate URL format
|
|
try:
|
|
parsed = urlparse(source_url)
|
|
if not parsed.scheme:
|
|
source_url = "https://" + source_url
|
|
parsed = urlparse(source_url)
|
|
if not parsed.netloc:
|
|
raise ValueError("Invalid URL")
|
|
except Exception:
|
|
raise HTTPException(status_code=400, detail="Invalid URL format")
|
|
|
|
wcms_ppid = request.wcms_ppid
|
|
comment = request.comment.strip() if request.comment else None
|
|
source_type = request.source_type or "webpage"
|
|
provides_match = request.provides_match
|
|
|
|
# Load candidates from the aggregated file
|
|
load_candidates()
|
|
|
|
if _candidates_cache is None:
|
|
raise HTTPException(status_code=500, detail="Candidates data not loaded")
|
|
|
|
# Check if the WCMS profile exists in our candidates file
|
|
profile_exists = _candidates_by_wcms is not None and wcms_ppid in _candidates_by_wcms
|
|
|
|
if profile_exists:
|
|
wcms_data = _candidates_by_wcms[wcms_ppid]
|
|
else:
|
|
# Profile not in candidates file - create entry from request
|
|
if not request.wcms_name and not request.wcms_email:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"WCMS profile {wcms_ppid} not found. Provide wcms_name and wcms_email to create new entry."
|
|
)
|
|
|
|
wcms_data = {
|
|
'wcms_ppid': wcms_ppid,
|
|
'wcms_name': request.wcms_name or 'Unknown',
|
|
'wcms_email': request.wcms_email,
|
|
'wcms_email_domain': request.wcms_email_domain or (request.wcms_email.split('@')[1] if request.wcms_email and '@' in request.wcms_email else None),
|
|
'wcms_user_id': request.wcms_user_id,
|
|
'wcms_abs_id': request.wcms_abs_id,
|
|
'wcms_crm_id': request.wcms_crm_id,
|
|
'wcms_status': request.wcms_status,
|
|
'candidates': [],
|
|
'source_urls': []
|
|
}
|
|
|
|
# Add to cache
|
|
if _candidates_by_wcms is not None:
|
|
_candidates_by_wcms[wcms_ppid] = wcms_data
|
|
|
|
# Generate a unique ID for this source
|
|
source_id = hashlib.sha256(f"{wcms_ppid}:{source_url}".encode()).hexdigest()[:12]
|
|
|
|
# Create the source observation entry
|
|
source_observation = {
|
|
"source_id": source_id,
|
|
"source_url": source_url,
|
|
"source_type": source_type,
|
|
"source_domain": parsed.netloc,
|
|
"comment": comment,
|
|
"provides_match": provides_match,
|
|
"added_at": datetime.now(timezone.utc).isoformat(),
|
|
"added_manually": True
|
|
}
|
|
|
|
# Find the raw entry in candidates list and add source_urls
|
|
for candidate_entry in _candidates_cache.get('candidates', []):
|
|
if candidate_entry.get('wcms_ppid') == wcms_ppid:
|
|
# Add source_urls array if it doesn't exist
|
|
if 'source_urls' not in candidate_entry:
|
|
candidate_entry['source_urls'] = []
|
|
|
|
# Check for duplicate
|
|
existing_urls = [s.get('source_url') for s in candidate_entry.get('source_urls', [])]
|
|
if source_url in existing_urls:
|
|
return AddSourceUrlResponse(
|
|
success=False,
|
|
message=f"Source URL already exists for this profile",
|
|
source_id=None
|
|
)
|
|
|
|
candidate_entry['source_urls'].append(source_observation)
|
|
break
|
|
else:
|
|
# No existing entry found - create a new WCMS-only entry
|
|
new_entry = {
|
|
'wcms_ppid': wcms_ppid,
|
|
'wcms_name': wcms_data.get('wcms_name', 'Unknown'),
|
|
'wcms_email': wcms_data.get('wcms_email'),
|
|
'wcms_email_domain': wcms_data.get('wcms_email_domain'),
|
|
'candidates': [],
|
|
'source_urls': [source_observation]
|
|
}
|
|
_candidates_cache['candidates'].append(new_entry)
|
|
|
|
# Also add to the in-memory lookup cache so it can be found on subsequent requests
|
|
if _candidates_by_wcms is not None:
|
|
_candidates_by_wcms[wcms_ppid] = new_entry
|
|
|
|
# Also update the wcms lookup cache (for existing entries)
|
|
if _candidates_by_wcms is not None and wcms_ppid in _candidates_by_wcms:
|
|
if 'source_urls' not in _candidates_by_wcms[wcms_ppid]:
|
|
_candidates_by_wcms[wcms_ppid]['source_urls'] = []
|
|
|
|
# Check for duplicate in cache
|
|
existing_in_cache = [s.get('source_url') for s in _candidates_by_wcms[wcms_ppid].get('source_urls', [])]
|
|
if source_url not in existing_in_cache:
|
|
_candidates_by_wcms[wcms_ppid]['source_urls'].append(source_observation)
|
|
|
|
# Update metadata
|
|
if 'metadata' in _candidates_cache:
|
|
_candidates_cache['metadata']['last_source_url_add_at'] = datetime.now(timezone.utc).isoformat()
|
|
|
|
# Save using atomic write with backup
|
|
if not atomic_write_json(CANDIDATES_FILE, _candidates_cache):
|
|
invalidate_cache()
|
|
raise HTTPException(status_code=500, detail="Failed to save source URL")
|
|
|
|
# Build response message
|
|
if comment:
|
|
message = f"Added source URL from {parsed.netloc} with comment for {wcms_data.get('wcms_name', wcms_ppid)}"
|
|
else:
|
|
message = f"Added source URL from {parsed.netloc} for {wcms_data.get('wcms_name', wcms_ppid)}"
|
|
|
|
return AddSourceUrlResponse(
|
|
success=True,
|
|
message=message,
|
|
source_id=source_id
|
|
)
|
|
|
|
|
|
# ============================================================================
|
|
# Review Authentication (Separate from main app auth)
|
|
# ============================================================================
|
|
|
|
# Simple token-based auth for review page
|
|
REVIEW_AUTH_TOKEN = os.getenv("REVIEW_AUTH_TOKEN", "review-secret-token-change-me")
|
|
REVIEW_USERNAME = os.getenv("REVIEW_USERNAME", "reviewer")
|
|
REVIEW_PASSWORD = os.getenv("REVIEW_PASSWORD", "review-password-change-me")
|
|
|
|
|
|
@router.post("/auth/login")
|
|
async def review_login(username: str = Form(""), password: str = Form("")):
|
|
"""
|
|
Simple authentication for the review page.
|
|
Returns a session token if credentials are valid.
|
|
"""
|
|
# Debug logging
|
|
import logging
|
|
logging.info(f"Login attempt: username={repr(username)}, expected={repr(REVIEW_USERNAME)}")
|
|
|
|
# Accept form data
|
|
if username == REVIEW_USERNAME and password == REVIEW_PASSWORD:
|
|
# Generate a simple session token (in production, use proper JWT)
|
|
import hashlib
|
|
import time
|
|
token = hashlib.sha256(f"{username}:{time.time()}:{REVIEW_AUTH_TOKEN}".encode()).hexdigest()[:32]
|
|
return {
|
|
"success": True,
|
|
"token": token,
|
|
"username": username
|
|
}
|
|
|
|
raise HTTPException(status_code=401, detail="Invalid credentials")
|
|
|
|
|
|
@router.get("/auth/verify")
|
|
async def verify_review_token(token: str = ""):
|
|
"""
|
|
Verify a review session token.
|
|
"""
|
|
# For now, accept any non-empty token (in production, validate properly)
|
|
if token and len(token) >= 16:
|
|
return {"valid": True, "username": REVIEW_USERNAME}
|
|
|
|
raise HTTPException(status_code=401, detail="Invalid token")
|