glam/scripts/batch_extract_linkedin_exa.py
2025-12-14 17:09:55 +01:00

420 lines
15 KiB
Python
Executable file

#!/usr/bin/env python3
"""
Batch extract LinkedIn profile data using Exa API.
This script processes the missing_entity_profiles.json file and extracts
full LinkedIn profile data using the Exa crawling API.
Usage:
python batch_extract_linkedin_exa.py [--limit N] [--offset N] [--dry-run]
Options:
--limit N Only process N profiles (default: all)
--offset N Start from profile N (default: 0)
--dry-run Don't save files, just show what would be extracted
"""
import json
import os
import re
import sys
import time
import argparse
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
import httpx
from tqdm import tqdm
# Configuration
EXA_API_URL = "https://api.exa.ai/contents"
ENTITY_DIR = Path("/Users/kempersc/apps/glam/data/custodian/person/entity")
MISSING_PROFILES_FILE = Path("/Users/kempersc/apps/glam/data/custodian/person/affiliated/parsed/missing_entity_profiles.json")
RATE_LIMIT_DELAY = 0.5 # seconds between requests
def get_exa_api_key() -> str:
"""Get Exa API key from environment."""
key = os.environ.get("EXA_API_KEY")
if not key:
raise ValueError(
"EXA_API_KEY environment variable not set.\n"
"Please set it: export EXA_API_KEY='your-key-here'"
)
return key
def load_missing_profiles() -> list[dict]:
"""Load the list of profiles that need extraction."""
with open(MISSING_PROFILES_FILE, 'r', encoding='utf-8') as f:
data = json.load(f)
return data.get('missing_heritage_profiles', [])
def get_existing_slugs() -> set[str]:
"""Get set of existing entity profile slugs."""
existing = set()
if not ENTITY_DIR.exists():
ENTITY_DIR.mkdir(parents=True, exist_ok=True)
return existing
for file_path in ENTITY_DIR.glob("*.json"):
# Extract slug from filename (slug_timestamp.json)
match = re.match(r"([a-zA-Z0-9_\-]+)_\d{8}T\d{6}Z\.json", file_path.name)
if match:
existing.add(match.group(1))
return existing
def fetch_profile_exa(url: str, api_key: str, client: httpx.Client) -> Optional[dict]:
"""Fetch LinkedIn profile using Exa contents API."""
try:
response = client.post(
EXA_API_URL,
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
},
json={
"ids": [url],
"text": True,
"livecrawl": "fallback"
},
timeout=60.0
)
if response.status_code == 200:
data = response.json()
if data.get('results') and len(data['results']) > 0:
return {
'result': data['results'][0],
'request_id': data.get('requestId', ''),
'cost': data.get('costDollars', {}).get('total', 0)
}
else:
print(f"\nError fetching {url}: HTTP {response.status_code}")
return None
except Exception as e:
print(f"\nException fetching {url}: {e}")
return None
def parse_linkedin_profile(raw_data: dict) -> dict:
"""Parse Exa response into structured profile data."""
result = raw_data.get('result', {})
text = result.get('text', '')
# Extract structured data from the markdown text
profile_data = {
"name": result.get('author', ''),
"linkedin_url": result.get('url', ''),
"headline": "",
"location": "",
"connections": "",
"about": "",
"experience": [],
"education": [],
"skills": [],
"languages": [],
"profile_image_url": result.get('image', '')
}
# Parse headline from title
title = result.get('title', '')
if '|' in title:
profile_data['headline'] = title.split('|')[1].strip()
elif title:
profile_data['headline'] = title.replace(profile_data['name'], '').strip(' |')
# Parse sections from text
lines = text.split('\n')
current_section = None
current_item = {}
for line in lines:
line = line.strip()
if not line:
continue
# Section headers
if line.startswith('## About'):
current_section = 'about'
continue
elif line.startswith('## Experience'):
current_section = 'experience'
continue
elif line.startswith('## Education'):
current_section = 'education'
continue
elif line.startswith('## Skills'):
current_section = 'skills'
continue
elif line.startswith('## Languages'):
current_section = 'languages'
continue
elif line.startswith('## '):
current_section = None
continue
# Parse location and connections
if 'connections' in line.lower() and 'followers' in line.lower():
profile_data['connections'] = line
continue
if re.match(r'^[A-Za-z\s,]+\s*\([A-Z]{2}\)$', line):
profile_data['location'] = line
continue
# Parse content based on section
if current_section == 'about':
if not profile_data['about']:
profile_data['about'] = line
else:
profile_data['about'] += ' ' + line
elif current_section == 'experience':
if line.startswith('### '):
# Save previous item if exists
if current_item and current_item.get('title'):
profile_data['experience'].append(current_item)
# Parse new experience item
exp_text = line[4:] # Remove '### '
# Extract title and company
if ' at ' in exp_text:
parts = exp_text.split(' at ', 1)
title = parts[0].strip()
company_part = parts[1].strip()
# Extract company name from markdown link if present
company_match = re.search(r'\[([^\]]+)\]', company_part)
company = company_match.group(1) if company_match else company_part
current_item = {'title': title, 'company': company}
else:
current_item = {'title': exp_text}
elif current_item and ' - ' in line and ('Present' in line or re.search(r'\d{4}', line)):
# Date range line
current_item['date_range'] = line
elif current_item and line and not line.startswith('Company:') and not line.startswith('Department:'):
# Description or location
if 'location' not in current_item and re.match(r'^[A-Za-z\s,\-]+$', line):
current_item['location'] = line
elif 'description' not in current_item:
current_item['description'] = line
elif current_section == 'education':
if line.startswith('### '):
# Save previous item
if current_item and current_item.get('institution'):
profile_data['education'].append(current_item)
# Parse education
edu_text = line[4:]
if ' at ' in edu_text:
parts = edu_text.split(' at ', 1)
degree = parts[0].strip()
inst_part = parts[1].strip()
inst_match = re.search(r'\[([^\]]+)\]', inst_part)
institution = inst_match.group(1) if inst_match else inst_part
current_item = {'degree': degree, 'institution': institution}
else:
# Just institution name
inst_match = re.search(r'\[([^\]]+)\]', edu_text)
current_item = {'institution': inst_match.group(1) if inst_match else edu_text}
elif current_item and re.match(r'^\d{4}\s*-\s*\d{4}', line):
current_item['date_range'] = line
elif current_section == 'skills':
# Skills are often comma-separated or bullet points
skills = [s.strip() for s in re.split(r'[•,]', line) if s.strip()]
profile_data['skills'].extend(skills)
elif current_section == 'languages':
# Languages with proficiency
lang_match = re.match(r'^([A-Za-z\s]+)\s*-\s*(.+)$', line)
if lang_match:
profile_data['languages'].append({
'language': lang_match.group(1).strip(),
'proficiency': lang_match.group(2).strip()
})
# Save last items
if current_section == 'experience' and current_item and current_item.get('title'):
profile_data['experience'].append(current_item)
if current_section == 'education' and current_item and current_item.get('institution'):
profile_data['education'].append(current_item)
return profile_data
def save_entity_profile(
slug: str,
profile_data: dict,
raw_response: dict,
source_info: dict,
dry_run: bool = False
) -> str:
"""Save entity profile to JSON file."""
timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
filename = f"{slug}_{timestamp}.json"
filepath = ENTITY_DIR / filename
entity_data = {
"extraction_metadata": {
"source_file": str(MISSING_PROFILES_FILE),
"staff_id": f"{source_info.get('custodian', 'unknown').lower().replace(' ', '-')}_staff_{slug}",
"extraction_date": datetime.now(timezone.utc).isoformat(),
"extraction_method": "exa_contents",
"extraction_agent": "claude-sonnet-4-20250514",
"linkedin_url": source_info.get('linkedin_url', ''),
"cost_usd": raw_response.get('cost', 0),
"request_id": raw_response.get('request_id', '')
},
"source_staff_info": {
"name": source_info.get('name', ''),
"headline": source_info.get('headline', ''),
"heritage_type": source_info.get('heritage_type'),
"custodian": source_info.get('custodian', '')
},
"profile_data": profile_data,
"heritage_relevance": {
"is_heritage_relevant": source_info.get('heritage_type') is not None,
"heritage_types": [source_info.get('heritage_type')] if source_info.get('heritage_type') else [],
"rationale": f"Identified as heritage staff at {source_info.get('custodian', 'unknown institution')}"
}
}
if not dry_run:
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(entity_data, f, indent=2, ensure_ascii=False)
return filename
def main():
parser = argparse.ArgumentParser(description='Batch extract LinkedIn profiles using Exa API')
parser.add_argument('--limit', type=int, default=None, help='Limit number of profiles to process')
parser.add_argument('--offset', type=int, default=0, help='Start from profile N')
parser.add_argument('--dry-run', action='store_true', help='Do not save files')
args = parser.parse_args()
# Get API key
try:
api_key = get_exa_api_key()
except ValueError as e:
print(str(e))
sys.exit(1)
# Load profiles to extract
print("Loading missing profiles...")
all_profiles = load_missing_profiles()
print(f"Total profiles in list: {len(all_profiles)}")
# Get existing profiles to skip
print("Checking existing entity profiles...")
existing_slugs = get_existing_slugs()
print(f"Existing entity profiles: {len(existing_slugs)}")
# Filter to profiles that still need extraction
profiles_to_extract = [
p for p in all_profiles
if p.get('slug') and p['slug'] not in existing_slugs
]
print(f"Profiles needing extraction: {len(profiles_to_extract)}")
# Apply offset and limit
if args.offset > 0:
profiles_to_extract = profiles_to_extract[args.offset:]
print(f"After offset {args.offset}: {len(profiles_to_extract)}")
if args.limit:
profiles_to_extract = profiles_to_extract[:args.limit]
print(f"After limit {args.limit}: {len(profiles_to_extract)}")
if not profiles_to_extract:
print("No profiles to extract!")
return
if args.dry_run:
print("\n*** DRY RUN - No files will be saved ***\n")
# Statistics
success_count = 0
failed_count = 0
skipped_count = 0
total_cost = 0.0
# Process profiles
print(f"\nExtracting {len(profiles_to_extract)} profiles...")
with httpx.Client() as client:
for profile in tqdm(profiles_to_extract, desc="Extracting profiles"):
slug = profile.get('slug')
url = profile.get('linkedin_url')
if not slug or not url:
skipped_count += 1
continue
# Double-check not already exists
if slug in existing_slugs:
skipped_count += 1
continue
# Fetch profile
raw_response = fetch_profile_exa(url, api_key, client)
if raw_response:
# Parse profile data
profile_data = parse_linkedin_profile(raw_response)
# Save entity profile
filename = save_entity_profile(
slug=slug,
profile_data=profile_data,
raw_response=raw_response,
source_info=profile,
dry_run=args.dry_run
)
success_count += 1
total_cost += raw_response.get('cost', 0)
existing_slugs.add(slug) # Mark as processed
else:
failed_count += 1
# Rate limiting
time.sleep(RATE_LIMIT_DELAY)
# Print summary
print("\n" + "=" * 60)
print("EXTRACTION COMPLETE")
print("=" * 60)
print(f"Successful: {success_count}")
print(f"Failed: {failed_count}")
print(f"Skipped: {skipped_count}")
print(f"Total cost: ${total_cost:.4f}")
print(f"Entity directory: {ENTITY_DIR}")
# Save extraction log
if not args.dry_run:
log_file = ENTITY_DIR.parent / f"extraction_log_{datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%SZ')}.json"
log_data = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"profiles_attempted": len(profiles_to_extract),
"successful": success_count,
"failed": failed_count,
"skipped": skipped_count,
"total_cost_usd": total_cost,
"offset": args.offset,
"limit": args.limit
}
with open(log_file, 'w') as f:
json.dump(log_data, f, indent=2)
print(f"Log saved to: {log_file}")
if __name__ == "__main__":
main()