420 lines
15 KiB
Python
Executable file
420 lines
15 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
Batch extract LinkedIn profile data using Exa API.
|
|
|
|
This script processes the missing_entity_profiles.json file and extracts
|
|
full LinkedIn profile data using the Exa crawling API.
|
|
|
|
Usage:
|
|
python batch_extract_linkedin_exa.py [--limit N] [--offset N] [--dry-run]
|
|
|
|
Options:
|
|
--limit N Only process N profiles (default: all)
|
|
--offset N Start from profile N (default: 0)
|
|
--dry-run Don't save files, just show what would be extracted
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
import time
|
|
import argparse
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
import httpx
|
|
from tqdm import tqdm
|
|
|
|
|
|
# Configuration
|
|
EXA_API_URL = "https://api.exa.ai/contents"
|
|
ENTITY_DIR = Path("/Users/kempersc/apps/glam/data/custodian/person/entity")
|
|
MISSING_PROFILES_FILE = Path("/Users/kempersc/apps/glam/data/custodian/person/affiliated/parsed/missing_entity_profiles.json")
|
|
RATE_LIMIT_DELAY = 0.5 # seconds between requests
|
|
|
|
|
|
def get_exa_api_key() -> str:
|
|
"""Get Exa API key from environment."""
|
|
key = os.environ.get("EXA_API_KEY")
|
|
if not key:
|
|
raise ValueError(
|
|
"EXA_API_KEY environment variable not set.\n"
|
|
"Please set it: export EXA_API_KEY='your-key-here'"
|
|
)
|
|
return key
|
|
|
|
|
|
def load_missing_profiles() -> list[dict]:
|
|
"""Load the list of profiles that need extraction."""
|
|
with open(MISSING_PROFILES_FILE, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
return data.get('missing_heritage_profiles', [])
|
|
|
|
|
|
def get_existing_slugs() -> set[str]:
|
|
"""Get set of existing entity profile slugs."""
|
|
existing = set()
|
|
if not ENTITY_DIR.exists():
|
|
ENTITY_DIR.mkdir(parents=True, exist_ok=True)
|
|
return existing
|
|
|
|
for file_path in ENTITY_DIR.glob("*.json"):
|
|
# Extract slug from filename (slug_timestamp.json)
|
|
match = re.match(r"([a-zA-Z0-9_\-]+)_\d{8}T\d{6}Z\.json", file_path.name)
|
|
if match:
|
|
existing.add(match.group(1))
|
|
|
|
return existing
|
|
|
|
|
|
def fetch_profile_exa(url: str, api_key: str, client: httpx.Client) -> Optional[dict]:
|
|
"""Fetch LinkedIn profile using Exa contents API."""
|
|
try:
|
|
response = client.post(
|
|
EXA_API_URL,
|
|
headers={
|
|
"Authorization": f"Bearer {api_key}",
|
|
"Content-Type": "application/json",
|
|
},
|
|
json={
|
|
"ids": [url],
|
|
"text": True,
|
|
"livecrawl": "fallback"
|
|
},
|
|
timeout=60.0
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
if data.get('results') and len(data['results']) > 0:
|
|
return {
|
|
'result': data['results'][0],
|
|
'request_id': data.get('requestId', ''),
|
|
'cost': data.get('costDollars', {}).get('total', 0)
|
|
}
|
|
else:
|
|
print(f"\nError fetching {url}: HTTP {response.status_code}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
print(f"\nException fetching {url}: {e}")
|
|
return None
|
|
|
|
|
|
def parse_linkedin_profile(raw_data: dict) -> dict:
|
|
"""Parse Exa response into structured profile data."""
|
|
result = raw_data.get('result', {})
|
|
text = result.get('text', '')
|
|
|
|
# Extract structured data from the markdown text
|
|
profile_data = {
|
|
"name": result.get('author', ''),
|
|
"linkedin_url": result.get('url', ''),
|
|
"headline": "",
|
|
"location": "",
|
|
"connections": "",
|
|
"about": "",
|
|
"experience": [],
|
|
"education": [],
|
|
"skills": [],
|
|
"languages": [],
|
|
"profile_image_url": result.get('image', '')
|
|
}
|
|
|
|
# Parse headline from title
|
|
title = result.get('title', '')
|
|
if '|' in title:
|
|
profile_data['headline'] = title.split('|')[1].strip()
|
|
elif title:
|
|
profile_data['headline'] = title.replace(profile_data['name'], '').strip(' |')
|
|
|
|
# Parse sections from text
|
|
lines = text.split('\n')
|
|
current_section = None
|
|
current_item = {}
|
|
|
|
for line in lines:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
|
|
# Section headers
|
|
if line.startswith('## About'):
|
|
current_section = 'about'
|
|
continue
|
|
elif line.startswith('## Experience'):
|
|
current_section = 'experience'
|
|
continue
|
|
elif line.startswith('## Education'):
|
|
current_section = 'education'
|
|
continue
|
|
elif line.startswith('## Skills'):
|
|
current_section = 'skills'
|
|
continue
|
|
elif line.startswith('## Languages'):
|
|
current_section = 'languages'
|
|
continue
|
|
elif line.startswith('## '):
|
|
current_section = None
|
|
continue
|
|
|
|
# Parse location and connections
|
|
if 'connections' in line.lower() and 'followers' in line.lower():
|
|
profile_data['connections'] = line
|
|
continue
|
|
if re.match(r'^[A-Za-z\s,]+\s*\([A-Z]{2}\)$', line):
|
|
profile_data['location'] = line
|
|
continue
|
|
|
|
# Parse content based on section
|
|
if current_section == 'about':
|
|
if not profile_data['about']:
|
|
profile_data['about'] = line
|
|
else:
|
|
profile_data['about'] += ' ' + line
|
|
|
|
elif current_section == 'experience':
|
|
if line.startswith('### '):
|
|
# Save previous item if exists
|
|
if current_item and current_item.get('title'):
|
|
profile_data['experience'].append(current_item)
|
|
|
|
# Parse new experience item
|
|
exp_text = line[4:] # Remove '### '
|
|
# Extract title and company
|
|
if ' at ' in exp_text:
|
|
parts = exp_text.split(' at ', 1)
|
|
title = parts[0].strip()
|
|
company_part = parts[1].strip()
|
|
# Extract company name from markdown link if present
|
|
company_match = re.search(r'\[([^\]]+)\]', company_part)
|
|
company = company_match.group(1) if company_match else company_part
|
|
current_item = {'title': title, 'company': company}
|
|
else:
|
|
current_item = {'title': exp_text}
|
|
elif current_item and ' - ' in line and ('Present' in line or re.search(r'\d{4}', line)):
|
|
# Date range line
|
|
current_item['date_range'] = line
|
|
elif current_item and line and not line.startswith('Company:') and not line.startswith('Department:'):
|
|
# Description or location
|
|
if 'location' not in current_item and re.match(r'^[A-Za-z\s,\-]+$', line):
|
|
current_item['location'] = line
|
|
elif 'description' not in current_item:
|
|
current_item['description'] = line
|
|
|
|
elif current_section == 'education':
|
|
if line.startswith('### '):
|
|
# Save previous item
|
|
if current_item and current_item.get('institution'):
|
|
profile_data['education'].append(current_item)
|
|
|
|
# Parse education
|
|
edu_text = line[4:]
|
|
if ' at ' in edu_text:
|
|
parts = edu_text.split(' at ', 1)
|
|
degree = parts[0].strip()
|
|
inst_part = parts[1].strip()
|
|
inst_match = re.search(r'\[([^\]]+)\]', inst_part)
|
|
institution = inst_match.group(1) if inst_match else inst_part
|
|
current_item = {'degree': degree, 'institution': institution}
|
|
else:
|
|
# Just institution name
|
|
inst_match = re.search(r'\[([^\]]+)\]', edu_text)
|
|
current_item = {'institution': inst_match.group(1) if inst_match else edu_text}
|
|
elif current_item and re.match(r'^\d{4}\s*-\s*\d{4}', line):
|
|
current_item['date_range'] = line
|
|
|
|
elif current_section == 'skills':
|
|
# Skills are often comma-separated or bullet points
|
|
skills = [s.strip() for s in re.split(r'[•,]', line) if s.strip()]
|
|
profile_data['skills'].extend(skills)
|
|
|
|
elif current_section == 'languages':
|
|
# Languages with proficiency
|
|
lang_match = re.match(r'^([A-Za-z\s]+)\s*-\s*(.+)$', line)
|
|
if lang_match:
|
|
profile_data['languages'].append({
|
|
'language': lang_match.group(1).strip(),
|
|
'proficiency': lang_match.group(2).strip()
|
|
})
|
|
|
|
# Save last items
|
|
if current_section == 'experience' and current_item and current_item.get('title'):
|
|
profile_data['experience'].append(current_item)
|
|
if current_section == 'education' and current_item and current_item.get('institution'):
|
|
profile_data['education'].append(current_item)
|
|
|
|
return profile_data
|
|
|
|
|
|
def save_entity_profile(
|
|
slug: str,
|
|
profile_data: dict,
|
|
raw_response: dict,
|
|
source_info: dict,
|
|
dry_run: bool = False
|
|
) -> str:
|
|
"""Save entity profile to JSON file."""
|
|
timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
|
|
filename = f"{slug}_{timestamp}.json"
|
|
filepath = ENTITY_DIR / filename
|
|
|
|
entity_data = {
|
|
"extraction_metadata": {
|
|
"source_file": str(MISSING_PROFILES_FILE),
|
|
"staff_id": f"{source_info.get('custodian', 'unknown').lower().replace(' ', '-')}_staff_{slug}",
|
|
"extraction_date": datetime.now(timezone.utc).isoformat(),
|
|
"extraction_method": "exa_contents",
|
|
"extraction_agent": "claude-sonnet-4-20250514",
|
|
"linkedin_url": source_info.get('linkedin_url', ''),
|
|
"cost_usd": raw_response.get('cost', 0),
|
|
"request_id": raw_response.get('request_id', '')
|
|
},
|
|
"source_staff_info": {
|
|
"name": source_info.get('name', ''),
|
|
"headline": source_info.get('headline', ''),
|
|
"heritage_type": source_info.get('heritage_type'),
|
|
"custodian": source_info.get('custodian', '')
|
|
},
|
|
"profile_data": profile_data,
|
|
"heritage_relevance": {
|
|
"is_heritage_relevant": source_info.get('heritage_type') is not None,
|
|
"heritage_types": [source_info.get('heritage_type')] if source_info.get('heritage_type') else [],
|
|
"rationale": f"Identified as heritage staff at {source_info.get('custodian', 'unknown institution')}"
|
|
}
|
|
}
|
|
|
|
if not dry_run:
|
|
with open(filepath, 'w', encoding='utf-8') as f:
|
|
json.dump(entity_data, f, indent=2, ensure_ascii=False)
|
|
|
|
return filename
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Batch extract LinkedIn profiles using Exa API')
|
|
parser.add_argument('--limit', type=int, default=None, help='Limit number of profiles to process')
|
|
parser.add_argument('--offset', type=int, default=0, help='Start from profile N')
|
|
parser.add_argument('--dry-run', action='store_true', help='Do not save files')
|
|
args = parser.parse_args()
|
|
|
|
# Get API key
|
|
try:
|
|
api_key = get_exa_api_key()
|
|
except ValueError as e:
|
|
print(str(e))
|
|
sys.exit(1)
|
|
|
|
# Load profiles to extract
|
|
print("Loading missing profiles...")
|
|
all_profiles = load_missing_profiles()
|
|
print(f"Total profiles in list: {len(all_profiles)}")
|
|
|
|
# Get existing profiles to skip
|
|
print("Checking existing entity profiles...")
|
|
existing_slugs = get_existing_slugs()
|
|
print(f"Existing entity profiles: {len(existing_slugs)}")
|
|
|
|
# Filter to profiles that still need extraction
|
|
profiles_to_extract = [
|
|
p for p in all_profiles
|
|
if p.get('slug') and p['slug'] not in existing_slugs
|
|
]
|
|
print(f"Profiles needing extraction: {len(profiles_to_extract)}")
|
|
|
|
# Apply offset and limit
|
|
if args.offset > 0:
|
|
profiles_to_extract = profiles_to_extract[args.offset:]
|
|
print(f"After offset {args.offset}: {len(profiles_to_extract)}")
|
|
|
|
if args.limit:
|
|
profiles_to_extract = profiles_to_extract[:args.limit]
|
|
print(f"After limit {args.limit}: {len(profiles_to_extract)}")
|
|
|
|
if not profiles_to_extract:
|
|
print("No profiles to extract!")
|
|
return
|
|
|
|
if args.dry_run:
|
|
print("\n*** DRY RUN - No files will be saved ***\n")
|
|
|
|
# Statistics
|
|
success_count = 0
|
|
failed_count = 0
|
|
skipped_count = 0
|
|
total_cost = 0.0
|
|
|
|
# Process profiles
|
|
print(f"\nExtracting {len(profiles_to_extract)} profiles...")
|
|
|
|
with httpx.Client() as client:
|
|
for profile in tqdm(profiles_to_extract, desc="Extracting profiles"):
|
|
slug = profile.get('slug')
|
|
url = profile.get('linkedin_url')
|
|
|
|
if not slug or not url:
|
|
skipped_count += 1
|
|
continue
|
|
|
|
# Double-check not already exists
|
|
if slug in existing_slugs:
|
|
skipped_count += 1
|
|
continue
|
|
|
|
# Fetch profile
|
|
raw_response = fetch_profile_exa(url, api_key, client)
|
|
|
|
if raw_response:
|
|
# Parse profile data
|
|
profile_data = parse_linkedin_profile(raw_response)
|
|
|
|
# Save entity profile
|
|
filename = save_entity_profile(
|
|
slug=slug,
|
|
profile_data=profile_data,
|
|
raw_response=raw_response,
|
|
source_info=profile,
|
|
dry_run=args.dry_run
|
|
)
|
|
|
|
success_count += 1
|
|
total_cost += raw_response.get('cost', 0)
|
|
existing_slugs.add(slug) # Mark as processed
|
|
else:
|
|
failed_count += 1
|
|
|
|
# Rate limiting
|
|
time.sleep(RATE_LIMIT_DELAY)
|
|
|
|
# Print summary
|
|
print("\n" + "=" * 60)
|
|
print("EXTRACTION COMPLETE")
|
|
print("=" * 60)
|
|
print(f"Successful: {success_count}")
|
|
print(f"Failed: {failed_count}")
|
|
print(f"Skipped: {skipped_count}")
|
|
print(f"Total cost: ${total_cost:.4f}")
|
|
print(f"Entity directory: {ENTITY_DIR}")
|
|
|
|
# Save extraction log
|
|
if not args.dry_run:
|
|
log_file = ENTITY_DIR.parent / f"extraction_log_{datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%SZ')}.json"
|
|
log_data = {
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
"profiles_attempted": len(profiles_to_extract),
|
|
"successful": success_count,
|
|
"failed": failed_count,
|
|
"skipped": skipped_count,
|
|
"total_cost_usd": total_cost,
|
|
"offset": args.offset,
|
|
"limit": args.limit
|
|
}
|
|
with open(log_file, 'w') as f:
|
|
json.dump(log_data, f, indent=2)
|
|
print(f"Log saved to: {log_file}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|