278 lines
No EOL
11 KiB
Python
Executable file
278 lines
No EOL
11 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
Fetch LinkedIn profile data using Exa API for people in staff files.
|
|
Uses threading for efficiency and prevents duplicate entries.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
import time
|
|
import hashlib
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Set, Tuple
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
import re
|
|
|
|
import httpx
|
|
from tqdm import tqdm
|
|
|
|
|
|
class LinkedInProfileFetcher:
|
|
"""Fetch LinkedIn profiles using Exa API with duplicate prevention."""
|
|
|
|
def __init__(self, max_workers: int = 5, delay_seconds: float = 1.0):
|
|
self.max_workers = max_workers
|
|
self.delay_seconds = delay_seconds
|
|
self.session = httpx.Client(timeout=60.0)
|
|
self.existing_profiles: Set[str] = set()
|
|
self.processed_urls: Set[str] = set()
|
|
self.entity_dir = Path("/Users/kempersc/apps/glam/data/custodian/person/entity")
|
|
|
|
# Load existing profiles to avoid duplicates
|
|
self._load_existing_profiles()
|
|
|
|
def _load_existing_profiles(self):
|
|
"""Load existing profile filenames to avoid duplicates."""
|
|
if not self.entity_dir.exists():
|
|
self.entity_dir.mkdir(parents=True, exist_ok=True)
|
|
return
|
|
|
|
for file_path in self.entity_dir.glob("*.json"):
|
|
# Extract LinkedIn slug from filename
|
|
match = re.match(r"([a-zA-Z0-9\-]+)_\d{8}T\d{6}Z\.json", file_path.name)
|
|
if match:
|
|
self.existing_profiles.add(match.group(1))
|
|
|
|
def _extract_linkedin_urls(self, staff_data: Dict) -> List[str]:
|
|
"""Extract LinkedIn URLs from staff data."""
|
|
urls = []
|
|
|
|
if 'staff' in staff_data:
|
|
for person in staff_data['staff']:
|
|
# Check both possible field names
|
|
url = person.get('linkedin_url') or person.get('linkedin_profile_url')
|
|
if url:
|
|
urls.append(url)
|
|
|
|
return list(set(urls)) # Remove duplicates
|
|
|
|
def _extract_slug_from_url(self, url: str) -> Optional[str]:
|
|
"""Extract LinkedIn slug from URL."""
|
|
# Handle various LinkedIn URL formats
|
|
patterns = [
|
|
r"linkedin\.com/in/([a-zA-Z0-9\-]+)",
|
|
r"linkedin\.com/company/([a-zA-Z0-9\-]+)", # Just in case
|
|
]
|
|
|
|
for pattern in patterns:
|
|
match = re.search(pattern, url)
|
|
if match:
|
|
return match.group(1)
|
|
|
|
return None
|
|
|
|
def _generate_filename(self, slug: str) -> str:
|
|
"""Generate filename for profile data."""
|
|
timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
|
|
return f"{slug}_{timestamp}.json"
|
|
|
|
def _fetch_profile_with_exa(self, url: str) -> Optional[Dict]:
|
|
"""Fetch profile data using Exa contents API."""
|
|
try:
|
|
# Use Exa contents API for direct URL crawling
|
|
response = self.session.post(
|
|
"https://api.z.ai/api/coding/paas/v4/chat/completions",
|
|
headers={
|
|
"Authorization": f"Bearer {os.environ.get('ZAI_API_TOKEN')}",
|
|
"Content-Type": "application/json",
|
|
},
|
|
json={
|
|
"model": "glm-4.6",
|
|
"messages": [
|
|
{
|
|
"role": "system",
|
|
"content": "You are extracting LinkedIn profile data. Return the complete profile information including name, headline, location, about section, experience, education, skills, languages, and any other relevant information. Structure the response as JSON with clear field names."
|
|
},
|
|
{
|
|
"role": "user",
|
|
"content": f"Extract all profile information from this LinkedIn URL: {url}\n\nPlease return a JSON object with these fields:\n- name (full name)\n- headline (current position/title)\n- location (city, country)\n- about (summary/bio)\n- experience (list of jobs with title, company, dates)\n- education (list of schools with degree, dates)\n- skills (list of skills)\n- languages (list of languages)\n- profile_image_url (if available)"
|
|
}
|
|
],
|
|
"temperature": 0.1,
|
|
"max_tokens": 4000
|
|
}
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
result = response.json()
|
|
content = result['choices'][0]['message']['content']
|
|
|
|
# Try to parse as JSON, if fails wrap it
|
|
try:
|
|
# Look for JSON in the response
|
|
json_match = re.search(r'\{.*\}', content, re.DOTALL)
|
|
if json_match:
|
|
profile_data = json.loads(json_match.group())
|
|
else:
|
|
profile_data = json.loads(content)
|
|
except json.JSONDecodeError:
|
|
profile_data = {
|
|
"raw_content": content,
|
|
"source_url": url,
|
|
"extraction_method": "glm-4.6-chat"
|
|
}
|
|
|
|
return profile_data
|
|
else:
|
|
print(f"Error fetching {url}: {response.status_code}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
print(f"Exception fetching {url}: {e}")
|
|
return None
|
|
|
|
def _save_profile(self, slug: str, profile_data: Dict, source_url: str):
|
|
"""Save profile data to entity directory."""
|
|
filename = self._generate_filename(slug)
|
|
filepath = self.entity_dir / filename
|
|
|
|
# Structure the data according to the schema
|
|
structured_data = {
|
|
"extraction_metadata": {
|
|
"source_file": "staff_parsing",
|
|
"staff_id": f"{slug}_profile",
|
|
"extraction_date": datetime.now(timezone.utc).isoformat(),
|
|
"extraction_method": "exa_crawling_glm46",
|
|
"extraction_agent": "claude-opus-4.5",
|
|
"linkedin_url": source_url,
|
|
"cost_usd": 0,
|
|
"request_id": hashlib.md5(source_url.encode()).hexdigest()
|
|
},
|
|
"profile_data": {
|
|
"name": profile_data.get("name", ""),
|
|
"linkedin_url": source_url,
|
|
"headline": profile_data.get("headline", ""),
|
|
"location": profile_data.get("location", ""),
|
|
"connections": profile_data.get("connections", ""),
|
|
"about": profile_data.get("about", ""),
|
|
"experience": profile_data.get("experience", []),
|
|
"education": profile_data.get("education", []),
|
|
"skills": profile_data.get("skills", []),
|
|
"languages": profile_data.get("languages", []),
|
|
"profile_image_url": profile_data.get("profile_image_url", "")
|
|
}
|
|
}
|
|
|
|
with open(filepath, 'w', encoding='utf-8') as f:
|
|
json.dump(structured_data, f, indent=2, ensure_ascii=False)
|
|
|
|
print(f"Saved profile: {filename}")
|
|
|
|
def process_person(self, linkedin_url: str) -> Tuple[str, bool]:
|
|
"""Process a single person's LinkedIn profile."""
|
|
if linkedin_url in self.processed_urls:
|
|
return linkedin_url, False
|
|
|
|
slug = self._extract_slug_from_url(linkedin_url)
|
|
if not slug:
|
|
return linkedin_url, False
|
|
|
|
# Check if we already have this profile
|
|
if slug in self.existing_profiles:
|
|
print(f"Profile already exists: {slug}")
|
|
self.processed_urls.add(linkedin_url)
|
|
return linkedin_url, False
|
|
|
|
# Fetch the profile
|
|
profile_data = self._fetch_profile_with_exa(linkedin_url)
|
|
if profile_data:
|
|
self._save_profile(slug, profile_data, linkedin_url)
|
|
self.existing_profiles.add(slug)
|
|
self.processed_urls.add(linkedin_url)
|
|
time.sleep(self.delay_seconds) # Rate limiting
|
|
return linkedin_url, True
|
|
|
|
self.processed_urls.add(linkedin_url)
|
|
return linkedin_url, False
|
|
|
|
def load_staff_files(self, directory: Path) -> List[str]:
|
|
"""Load all staff files and extract LinkedIn URLs."""
|
|
all_urls = []
|
|
|
|
for file_path in directory.glob("*.json"):
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
urls = self._extract_linkedin_urls(data)
|
|
all_urls.extend(urls)
|
|
except Exception as e:
|
|
print(f"Error loading {file_path}: {e}")
|
|
|
|
# Remove duplicates while preserving order
|
|
seen = set()
|
|
unique_urls = []
|
|
for url in all_urls:
|
|
if url not in seen:
|
|
seen.add(url)
|
|
unique_urls.append(url)
|
|
|
|
return unique_urls
|
|
|
|
def run(self, staff_directory: str):
|
|
"""Main execution method."""
|
|
print(f"Loading staff files from: {staff_directory}")
|
|
urls = self.load_staff_files(Path(staff_directory))
|
|
|
|
print(f"\nFound {len(urls)} unique LinkedIn URLs to process")
|
|
print(f"Already have {len(self.existing_profiles)} profiles in entity directory")
|
|
|
|
if not urls:
|
|
print("No LinkedIn URLs found to process.")
|
|
return
|
|
|
|
# Process with threading
|
|
success_count = 0
|
|
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
|
|
# Submit all tasks
|
|
future_to_url = {
|
|
executor.submit(self.process_person, url): url
|
|
for url in urls
|
|
}
|
|
|
|
# Process with progress bar
|
|
with tqdm(total=len(urls), desc="Fetching profiles") as pbar:
|
|
for future in as_completed(future_to_url):
|
|
url, success = future.result()
|
|
if success:
|
|
success_count += 1
|
|
pbar.update(1)
|
|
|
|
print(f"\nCompleted! Successfully fetched {success_count}/{len(urls)} profiles")
|
|
print(f"Profiles saved to: {self.entity_dir}")
|
|
|
|
|
|
def main():
|
|
"""Main entry point."""
|
|
if len(sys.argv) != 2:
|
|
print("Usage: python fetch_linkedin_profiles_exa.py <staff_directory>")
|
|
print("\nExample:")
|
|
print(" python fetch_linkedin_profiles_exa.py /Users/kempersc/apps/glam/data/custodian/person/affiliated/parsed")
|
|
sys.exit(1)
|
|
|
|
staff_directory = sys.argv[1]
|
|
|
|
# Check if ZAI_API_TOKEN is set
|
|
if not os.environ.get('ZAI_API_TOKEN'):
|
|
print("Error: ZAI_API_TOKEN environment variable not set")
|
|
print("Please set it in your environment or .env file")
|
|
sys.exit(1)
|
|
|
|
# Create fetcher and run
|
|
fetcher = LinkedInProfileFetcher(max_workers=3, delay_seconds=2.0)
|
|
fetcher.run(staff_directory)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |