354 lines
No EOL
14 KiB
Python
354 lines
No EOL
14 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Enrich ALL LinkedIn profiles for Eye Filmmuseum using Unipile API.
|
|
This is the final corrected enrichment script.
|
|
"""
|
|
import os
|
|
import sys
|
|
import json
|
|
import yaml
|
|
import asyncio
|
|
import httpx
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Dict, List, Any, Optional
|
|
from dotenv import load_dotenv
|
|
|
|
# Load environment variables
|
|
load_dotenv()
|
|
|
|
class LinkedInUltimateEnricher:
|
|
"""Ultimate LinkedIn profile enricher using Unipile API."""
|
|
|
|
def __init__(self, api_key: str, dsn: str = "api1.unipile.com:13111"):
|
|
self.api_key = api_key
|
|
self.dsn = dsn
|
|
self.base_url = f"https://{dsn}/api/v1"
|
|
self.headers = {
|
|
"accept": "application/json",
|
|
"X-API-KEY": api_key
|
|
}
|
|
|
|
async def enrich_profile(self, identifier: str, name: str = "Unknown") -> Optional[Dict[str, Any]]:
|
|
"""Enrich a single LinkedIn profile."""
|
|
# Try different endpoint patterns
|
|
endpoints = [
|
|
f"{self.base_url}/users/{identifier}",
|
|
f"{self.base_url}/linkedin/profile/{identifier}",
|
|
f"{self.base_url}/profile/{identifier}"
|
|
]
|
|
|
|
for endpoint in endpoints:
|
|
try:
|
|
async with httpx.AsyncClient(timeout=60.0, headers=self.headers) as client:
|
|
response = await client.get(endpoint)
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
print(f"✓ Successfully enriched: {name}")
|
|
return data
|
|
elif response.status_code == 404:
|
|
continue # Try next endpoint
|
|
else:
|
|
print(f"✗ Error enriching {name} ({identifier}): {response.status_code}")
|
|
return None
|
|
except Exception as e:
|
|
print(f"✗ Exception enriching {name}: {e}")
|
|
continue
|
|
|
|
print(f"✗ Could not fetch data for {name} ({identifier})")
|
|
return None
|
|
|
|
async def enrich_all_profiles(self, profiles: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|
"""Enrich all LinkedIn profiles with rate limiting."""
|
|
enriched = []
|
|
|
|
# Process in batches to respect rate limits
|
|
batch_size = 5 # Conservative batch size
|
|
total_batches = (len(profiles) + batch_size - 1) // batch_size
|
|
|
|
for batch_start in range(0, len(profiles), batch_size):
|
|
batch_end = min(batch_start + batch_size, len(profiles))
|
|
batch = profiles[batch_start:batch_end]
|
|
batch_num = batch_start // batch_size + 1
|
|
|
|
print(f"\nProcessing batch {batch_num}/{total_batches}...")
|
|
|
|
# Create tasks for this batch
|
|
tasks = []
|
|
for profile in batch:
|
|
identifier = profile.get('linkedin_identifier')
|
|
name = profile.get('name', 'Unknown')
|
|
if identifier:
|
|
task = asyncio.create_task(
|
|
self.enrich_profile(identifier, name)
|
|
)
|
|
tasks.append((profile, task))
|
|
|
|
# Wait for batch to complete
|
|
results = await asyncio.gather(*[task for _, task in tasks], return_exceptions=True)
|
|
|
|
# Process results
|
|
for i, (profile, result) in enumerate(zip(batch, results)):
|
|
if isinstance(result, Exception):
|
|
print(f"✗ Failed: {profile.get('name', 'Unknown')} - {result}")
|
|
enriched_profile = dict(profile) # Create copy
|
|
enriched_profile['enrichment_status'] = 'failed'
|
|
enriched_profile['enrichment_error'] = str(result)
|
|
enriched.append(enriched_profile)
|
|
elif result:
|
|
enriched_profile = dict(profile) # Create copy
|
|
enriched_profile['enrichment_status'] = 'success'
|
|
enriched_profile['enriched_data'] = result
|
|
enriched_profile['enrichment_timestamp'] = datetime.now().isoformat() + 'Z'
|
|
enriched.append(enriched_profile)
|
|
else:
|
|
enriched_profile = dict(profile) # Create copy
|
|
enriched_profile['enrichment_status'] = 'not_found'
|
|
enriched_profile['enrichment_error'] = 'Profile not found or inaccessible'
|
|
enriched.append(enriched_profile)
|
|
|
|
# Rate limiting between batches
|
|
if batch_end < len(profiles):
|
|
print(f"Waiting 3 seconds before next batch...")
|
|
await asyncio.sleep(3)
|
|
|
|
return enriched
|
|
|
|
def extract_profile_details(self, api_data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Extract comprehensive profile details from API response."""
|
|
details = {}
|
|
|
|
# Basic info
|
|
details['first_name'] = api_data.get('first_name')
|
|
details['last_name'] = api_data.get('last_name')
|
|
details['full_name'] = api_data.get('full_name') or f"{api_data.get('first_name', '')} {api_data.get('last_name', '')}".strip()
|
|
details['headline'] = api_data.get('headline')
|
|
details['summary'] = api_data.get('summary') or api_data.get('about')
|
|
|
|
# Location and industry
|
|
details['location'] = api_data.get('location')
|
|
details['industry'] = api_data.get('industry')
|
|
details['country'] = api_data.get('country')
|
|
|
|
# Counts
|
|
details['connections_count'] = api_data.get('connections_count')
|
|
details['followers_count'] = api_data.get('followers_count')
|
|
|
|
# URLs
|
|
details['profile_url'] = api_data.get('profile_url')
|
|
details['profile_image_url'] = api_data.get('profile_image_url')
|
|
|
|
# Professional info
|
|
details['company'] = api_data.get('company')
|
|
details['job_title'] = api_data.get('job_title')
|
|
details['experience'] = api_data.get('experience', [])
|
|
details['education'] = api_data.get('education', [])
|
|
details['skills'] = api_data.get('skills', [])
|
|
details['languages'] = api_data.get('languages', [])
|
|
|
|
# Additional metadata
|
|
details['enrichment_source'] = 'unipile_api'
|
|
details['api_response_raw'] = api_data # Keep raw data for debugging
|
|
|
|
return details
|
|
|
|
async def main():
|
|
"""Main enrichment function."""
|
|
# Check for API credentials
|
|
api_key = os.getenv('UNIPILE_API_KEY')
|
|
if not api_key:
|
|
print("\n" + "="*80)
|
|
print("LINKEDIN ULTIMATE ENRICHER - UNIPILE API REQUIRED")
|
|
print("="*80)
|
|
print("\n❌ ERROR: UNIPILE_API_KEY environment variable not set")
|
|
print("\n📋 TO ENRICH PROFILES:")
|
|
print("1. Sign up for Unipile: https://dashboard.unipile.com/signup")
|
|
print("2. Connect LinkedIn account via Hosted Auth")
|
|
print("3. Get API key from dashboard")
|
|
print("4. Set environment variable:")
|
|
print(" export UNIPILE_API_KEY=your_api_key_here")
|
|
print("\n⚡ After setting credentials, run this script again.")
|
|
print("="*80)
|
|
return
|
|
|
|
# Initialize enricher
|
|
dsn = os.getenv('UNIPILE_DSN', 'api1.unipile.com:13111')
|
|
enricher = LinkedInUltimateEnricher(api_key, dsn)
|
|
|
|
# Load extracted profiles
|
|
profiles_file = "/Users/kempersc/apps/glam/data/custodian/NL-NH-AMS-U-EFM-eye_filmmuseum_linkedin_ultimate_all_profiles.json"
|
|
|
|
if not os.path.exists(profiles_file):
|
|
print(f"\n❌ ERROR: Profiles file not found: {profiles_file}")
|
|
print("Please run linkedin_ultimate_extraction.py first to extract profiles.")
|
|
return
|
|
|
|
print("\n" + "="*80)
|
|
print("LOADING LINKEDIN PROFILES FOR ENRICHMENT")
|
|
print("="*80)
|
|
|
|
with open(profiles_file, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
|
|
profiles = data.get('profiles', [])
|
|
if not profiles:
|
|
print("\n❌ No profiles found in extracted data!")
|
|
return
|
|
|
|
print(f"\n📊 Found {len(profiles)} LinkedIn profiles to enrich")
|
|
|
|
# Show sample of profiles to be enriched
|
|
print("\n📋 Sample profiles to enrich:")
|
|
for i, profile in enumerate(profiles[:5]):
|
|
name = profile.get('name', 'Unknown')
|
|
url = profile.get('linkedin_url', 'N/A')
|
|
print(f" {i+1}. {name}")
|
|
print(f" URL: {url}")
|
|
|
|
if len(profiles) > 5:
|
|
print(f" ... and {len(profiles) - 5} more")
|
|
|
|
# Confirm before proceeding
|
|
print("\n" + "="*80)
|
|
print("STARTING ENRICHMENT")
|
|
print("="*80)
|
|
|
|
# Enrich all profiles
|
|
enriched_profiles = await enricher.enrich_all_profiles(profiles)
|
|
|
|
# Calculate statistics
|
|
successful = sum(1 for p in enriched_profiles if p.get('enrichment_status') == 'success')
|
|
failed = sum(1 for p in enriched_profiles if p.get('enrichment_status') == 'failed')
|
|
not_found = sum(1 for p in enriched_profiles if p.get('enrichment_status') == 'not_found')
|
|
|
|
print("\n" + "="*80)
|
|
print("ENRICHMENT COMPLETE!")
|
|
print("="*80)
|
|
print(f"✅ Successful: {successful}")
|
|
print(f"❌ Failed: {failed}")
|
|
print(f"🔍 Not found: {not_found}")
|
|
print(f"📊 Success rate: {successful/len(profiles)*100:.1f}%")
|
|
|
|
# Load main Eye Filmmuseum file
|
|
eye_file = "/Users/kempersc/apps/glam/data/custodian/NL-NH-AMS-U-EFM-eye_filmmuseum.yaml"
|
|
|
|
print(f"\n📁 Loading main Eye Filmmuseum file...")
|
|
with open(eye_file, 'r', encoding='utf-8') as f:
|
|
eye_data = yaml.safe_load(f)
|
|
|
|
# Create enrichment mapping
|
|
enrichment_map = {}
|
|
for profile in enriched_profiles:
|
|
if profile.get('enrichment_status') == 'success':
|
|
linkedin_url = profile.get('linkedin_url')
|
|
if linkedin_url:
|
|
enrichment_map[linkedin_url] = enricher.extract_profile_details(profile['enriched_data'])
|
|
|
|
# Update Eye Filmmuseum data with enriched information
|
|
def update_with_enriched_data(obj):
|
|
"""Recursively update LinkedIn URLs with enriched data."""
|
|
if isinstance(obj, dict):
|
|
for key, value in obj.items():
|
|
if key == 'linkedin_url' and isinstance(value, str) and value in enrichment_map:
|
|
# Add enriched data
|
|
obj['linkedin_enriched_data'] = enrichment_map[value]
|
|
obj['enrichment_timestamp'] = datetime.now().isoformat() + 'Z'
|
|
obj['enrichment_source'] = 'unipile_api'
|
|
elif isinstance(value, (dict, list)):
|
|
update_with_enriched_data(value)
|
|
elif isinstance(obj, list):
|
|
for item in obj:
|
|
update_with_enriched_data(item)
|
|
|
|
print("\n🔄 Integrating enriched data into Eye Filmmuseum file...")
|
|
update_with_enriched_data(eye_data)
|
|
|
|
# Add enrichment metadata
|
|
if 'linkedin_enrichment' not in eye_data:
|
|
eye_data['linkedin_enrichment'] = {}
|
|
|
|
eye_data['linkedin_enrichment']['api_enrichment_ultimate'] = {
|
|
'enrichment_timestamp': datetime.now().isoformat() + 'Z',
|
|
'api_source': 'unipile',
|
|
'total_profiles_processed': len(profiles),
|
|
'successful_enrichments': successful,
|
|
'failed_enrichments': failed,
|
|
'not_found_profiles': not_found,
|
|
'success_rate': f"{successful/len(profiles)*100:.1f}%",
|
|
'api_endpoints_tried': [
|
|
"/api/v1/users/{identifier}",
|
|
"/api/v1/linkedin/profile/{identifier}",
|
|
"/api/v1/profile/{identifier}"
|
|
],
|
|
'notes': [
|
|
f"Ultimate API enrichment completed on {datetime.now().isoformat()}Z",
|
|
f"Successfully enriched {successful} out of {len(profiles)} profiles",
|
|
f"Failed enrichments: {failed}, Not found: {not_found}",
|
|
"Enriched data includes: profile details, experience, education, skills",
|
|
"Rate limiting: 5 profiles per batch with 3 second delays"
|
|
]
|
|
}
|
|
|
|
# Update provenance
|
|
if 'provenance' not in eye_data:
|
|
eye_data['provenance'] = {}
|
|
if 'notes' not in eye_data['provenance']:
|
|
eye_data['provenance']['notes'] = []
|
|
|
|
eye_data['provenance']['notes'].append(
|
|
f"Ultimate LinkedIn API enrichment on {datetime.now().isoformat()}Z "
|
|
f"({successful}/{len(profiles)} profiles successfully enriched)"
|
|
)
|
|
|
|
# Save enriched data
|
|
output_file = eye_file.replace('.yaml', '_linkedin_ultimate_enriched.yaml')
|
|
print(f"\n💾 Saving enriched data to: {output_file}")
|
|
|
|
with open(output_file, 'w', encoding='utf-8') as f:
|
|
yaml.dump(eye_data, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
|
|
|
|
# Create enrichment report
|
|
report = {
|
|
'enrichment_timestamp': datetime.now().isoformat() + 'Z',
|
|
'api_source': 'unipile',
|
|
'method': 'ultimate_api_enrichment',
|
|
'statistics': {
|
|
'total_profiles': len(profiles),
|
|
'successful': successful,
|
|
'failed': failed,
|
|
'not_found': not_found,
|
|
'success_rate': f"{successful/len(profiles)*100:.1f}%"
|
|
},
|
|
'output_files': {
|
|
'main_enriched_yaml': output_file,
|
|
'enrichment_report': output_file.replace('.yaml', '_enrichment_report.json')
|
|
}
|
|
}
|
|
|
|
report_file = output_file.replace('.yaml', '_enrichment_report.json')
|
|
with open(report_file, 'w', encoding='utf-8') as f:
|
|
json.dump(report, f, indent=2)
|
|
|
|
print("\n" + "="*80)
|
|
print("✅ ENRICHMENT COMPLETE!")
|
|
print("="*80)
|
|
print(f"📄 Main enriched file: {output_file}")
|
|
print(f"📊 Enrichment report: {report_file}")
|
|
print(f"✅ Success rate: {successful/len(profiles)*100:.1f}%")
|
|
|
|
# Show sample of enriched data
|
|
if successful > 0:
|
|
print("\n📋 Sample enriched profiles:")
|
|
count = 0
|
|
for profile in enriched_profiles:
|
|
if profile.get('enrichment_status') == 'success' and count < 3:
|
|
name = profile.get('name', 'Unknown')
|
|
details = enricher.extract_profile_details(profile['enriched_data'])
|
|
print(f"\n {count+1}. {name}")
|
|
print(f" Headline: {details.get('headline', 'N/A')}")
|
|
print(f" Location: {details.get('location', 'N/A')}")
|
|
print(f" Connections: {details.get('connections_count', 'N/A')}")
|
|
count += 1
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main()) |