447 lines
18 KiB
Python
447 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
"""Final LinkedIn Batch Processing - Extracts ALL Data
|
|
|
|
This script extracts ALL data from LinkedIn HTML files:
|
|
1. Full institution names from HTML H1 tags (fixes name extraction bug)
|
|
2. Complete staff data (names, URLs, job titles, heritage analysis) using parse_linkedin_html.py
|
|
3. Cleans filenames properly (removes macOS resource forks, periods, parentheses)
|
|
4. Creates custodian YAML files with full staff lists
|
|
|
|
Usage:
|
|
python scripts/linkedin_batch_final.py --input-dir /path/to/html/files --output-dir data/custodian/person/bu_final --custodian-dir data/custodian/
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import re
|
|
import sys
|
|
from collections import Counter
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
try:
|
|
from bs4 import BeautifulSoup
|
|
except ImportError:
|
|
print("Error: beautifulsoup4 not installed. Run: pip install beautifulsoup4", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
try:
|
|
import yaml
|
|
except ImportError:
|
|
print("Error: yaml not installed. Run: pip install pyyaml", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
# Add scripts directory to path
|
|
sys.path.insert(0, str(Path(__file__).parent))
|
|
|
|
# Import existing parser
|
|
from parse_linkedin_html import parse_html_file, generate_staff_id
|
|
|
|
# Global custodian lookup cache (populated in main())
|
|
custodian_lookup_cache = {}
|
|
|
|
|
|
def clean_filename_to_slug(filename):
|
|
"""Clean HTML filename to generate URL-safe slug."""
|
|
name = filename.replace(' People _ LinkedIn.html', '')
|
|
name = name.replace('.html', '')
|
|
if name.startswith('._'):
|
|
name = name[2:]
|
|
name = re.sub(r'^\.?\_?\(\d+\)\s*', '', name)
|
|
name = re.sub(r'^\._*\(\d+\)\s*', '', name)
|
|
name = name.strip('_ ')
|
|
name = re.sub(r'\s+', ' ', name)
|
|
slug = re.sub(r'[^a-z0-9]+', '-', name.lower())
|
|
slug = re.sub(r'-+', '-', slug).strip('-')
|
|
return slug
|
|
|
|
|
|
def extract_h1_name_from_html(html_content):
|
|
"""Extract institution name from HTML H1 tag."""
|
|
soup = BeautifulSoup(html_content, 'html.parser')
|
|
h1 = soup.find('h1')
|
|
if h1:
|
|
h1_text = h1.get_text().strip()
|
|
if '|' in h1_text:
|
|
name = h1_text.split('|')[0].strip()
|
|
else:
|
|
name = h1_text
|
|
name = re.sub(r'\s*\|\s*', ' ', name)
|
|
name = re.sub(r'\s+', ' ', name)
|
|
return name if name else None
|
|
return None
|
|
|
|
|
|
def extract_basic_metadata(html_content):
|
|
"""Extract basic metadata from HTML (followers, members)."""
|
|
follower_count = ''
|
|
associated_members = 0
|
|
|
|
follower_match = re.search(r'(\d+K?)\s+followers?', html_content, re.IGNORECASE)
|
|
if follower_match:
|
|
follower_count = follower_match.group(1)
|
|
|
|
member_match = re.search(r'(\d+)\s+associated\s+members?', html_content, re.IGNORECASE)
|
|
if member_match:
|
|
associated_members = int(member_match.group(1))
|
|
|
|
profile_count = html_content.count('org-people-profile-card')
|
|
|
|
return {
|
|
'follower_count': follower_count,
|
|
'associated_members': associated_members,
|
|
'profile_cards_detected': profile_count,
|
|
}
|
|
|
|
|
|
def find_existing_custodian(custodian_name, custodian_dir):
|
|
"""Find existing custodian YAML file by name (case-insensitive) using lookup cache."""
|
|
custodian_lower = custodian_name.lower()
|
|
return custodian_lookup_cache.get(custodian_lower)
|
|
|
|
|
|
def process_single_file(html_path, output_dir, custodian_dir):
|
|
"""Process a single HTML file and extract ALL data."""
|
|
# Read HTML
|
|
with open(html_path, 'r', encoding='utf-8', errors='replace') as f:
|
|
html_content = f.read()
|
|
|
|
# Extract name from H1
|
|
h1_name = extract_h1_name_from_html(html_content)
|
|
|
|
if not h1_name:
|
|
filename_clean = html_path.name.replace(' People _ LinkedIn.html', '')
|
|
filename_clean = filename_clean.replace('.html', '')
|
|
if filename_clean.startswith('._'):
|
|
filename_clean = filename_clean[2:]
|
|
filename_clean = re.sub(r'^\.?\_?\(\d+\)\s*', '', filename_clean)
|
|
filename_clean = re.sub(r'^\._*\(\d+\)\s*', '', filename_clean)
|
|
filename_clean = re.sub(r'\s+', ' ', filename_clean).strip()
|
|
h1_name = filename_clean
|
|
|
|
# Generate slug
|
|
slug = clean_filename_to_slug(html_path.name)
|
|
|
|
# Extract basic metadata
|
|
basic_metadata = extract_basic_metadata(html_content)
|
|
|
|
# Parse HTML using existing parser for complete staff data
|
|
try:
|
|
staff_result = parse_html_file(html_path, h1_name, slug)
|
|
use_full_parser = True
|
|
parse_error = None
|
|
except Exception as e:
|
|
use_full_parser = False
|
|
parse_error = str(e)
|
|
staff_result = {
|
|
'custodian_metadata': {
|
|
'custodian_name': h1_name,
|
|
'custodian_slug': slug,
|
|
'name': h1_name,
|
|
},
|
|
'source_metadata': {
|
|
'source_type': 'linkedin_company_people_page_html',
|
|
'source_file': html_path.name,
|
|
'registered_timestamp': datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ'),
|
|
'registration_method': 'html_parsing_with_full_staff_data',
|
|
'staff_extracted': 0,
|
|
},
|
|
'staff': [],
|
|
'staff_analysis': {
|
|
'total_staff_extracted': 0,
|
|
'with_linkedin_url': 0,
|
|
'with_alternate_profiles': 0,
|
|
'anonymous_members': 0,
|
|
'heritage_relevant_count': 0,
|
|
'staff_by_heritage_type': {},
|
|
},
|
|
}
|
|
|
|
# Merge basic metadata into staff result
|
|
staff_result['custodian_metadata']['follower_count'] = basic_metadata.get('follower_count', '')
|
|
staff_result['custodian_metadata']['associated_members'] = basic_metadata.get('associated_members', 0)
|
|
if 'profile_cards_detected' in basic_metadata:
|
|
staff_result['custodian_metadata']['profile_cards_detected'] = basic_metadata['profile_cards_detected']
|
|
|
|
# Save staff JSON
|
|
timestamp = datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%SZ')
|
|
staff_filename = output_dir / f"{slug}_staff_{timestamp}.json"
|
|
|
|
with open(staff_filename, 'w', encoding='utf-8') as f:
|
|
json.dump(staff_result, f, indent=2, ensure_ascii=False)
|
|
|
|
return {
|
|
'status': 'success',
|
|
'slug': slug,
|
|
'filename': html_path.name,
|
|
'custodian_name': h1_name,
|
|
'staff_count': staff_result.get('staff_analysis', {}).get('total_staff_extracted', 0),
|
|
'use_full_parser': use_full_parser,
|
|
'parse_error': parse_error,
|
|
'result': staff_result,
|
|
}
|
|
|
|
|
|
def create_or_update_custodian(custodian_name, result, custodian_dir):
|
|
"""Create or update custodian YAML file with staff data."""
|
|
result_data = result.get('result', {})
|
|
staff_list = result_data.get('staff', [])
|
|
staff_with_profiles = [s for s in staff_list if s.get('linkedin_profile_url')]
|
|
|
|
if not staff_with_profiles:
|
|
return (None, False)
|
|
|
|
# Provenance
|
|
provenance = {
|
|
'source_type': 'linkedin_company_people_page_html',
|
|
'registered_timestamp': result_data.get('source_metadata', {}).get('registered_timestamp', ''),
|
|
'registration_method': 'html_parsing_with_full_staff_data',
|
|
'total_staff_extracted': len(staff_with_profiles),
|
|
}
|
|
|
|
# Staff list
|
|
staff_list_data = []
|
|
for s in staff_with_profiles:
|
|
staff_entry = {
|
|
'staff_id': s.get('staff_id'),
|
|
'person_name': s.get('name'),
|
|
'person_profile_path': f"data/custodian/person/entity/{s.get('linkedin_slug', '')}_*.json",
|
|
'role_title': s.get('headline', ''),
|
|
'heritage_relevant': s.get('heritage_relevant', False),
|
|
'heritage_type': s.get('heritage_type'),
|
|
'linkedin_profile_url': s.get('linkedin_profile_url'),
|
|
'linkedin_slug': s.get('linkedin_slug'),
|
|
}
|
|
staff_list_data.append(staff_entry)
|
|
|
|
# Find existing custodian
|
|
existing_file = find_existing_custodian(custodian_name, custodian_dir)
|
|
|
|
if existing_file:
|
|
is_new = False
|
|
with open(existing_file, 'r', encoding='utf-8') as f:
|
|
custodian_data = yaml.safe_load(f) or {}
|
|
|
|
custodian_data['custodian_name'] = custodian_name
|
|
custodian_data['staff'] = {'provenance': provenance, 'staff_list': staff_list_data}
|
|
|
|
with open(existing_file, 'w', encoding='utf-8') as f:
|
|
yaml.dump(custodian_data, f, allow_unicode=True, default_flow_style=False, sort_keys=False)
|
|
|
|
return (existing_file, False)
|
|
else:
|
|
# Create new custodian file
|
|
heritage_types = result_data.get('staff_analysis', {}).get('staff_by_heritage_type', {})
|
|
institution_type = 'MUSEUM' # Default
|
|
|
|
if heritage_types:
|
|
most_common = Counter(heritage_types).most_common(1)
|
|
if most_common:
|
|
type_code = most_common[0][0]
|
|
type_map = {
|
|
'M': 'MUSEUM',
|
|
'L': 'LIBRARY',
|
|
'A': 'ARCHIVE',
|
|
'G': 'GALLERY',
|
|
'R': 'RESEARCH_CENTER',
|
|
'E': 'EDUCATION_PROVIDER',
|
|
'S': 'COLLECTING_SOCIETY',
|
|
'D': 'DIGITAL_PLATFORM',
|
|
}
|
|
institution_type = type_map.get(type_code, 'MUSEUM')
|
|
|
|
# Derive slug from custodian name for GHCID
|
|
slug_for_ghcid = clean_filename_to_slug(f"{custodian_name}.html")
|
|
placeholder_ghcid = f"NL-XX-XXX-PENDING-{slug_for_ghcid.upper()}"
|
|
|
|
custodian_data = {
|
|
'ghcid_current': placeholder_ghcid,
|
|
'custodian_name': custodian_name,
|
|
'institution_type': institution_type,
|
|
'custodian_name': {
|
|
'emic_name': custodian_name,
|
|
'english_name': None,
|
|
'name_verified': True,
|
|
'name_source': 'linkedin_html_h1',
|
|
},
|
|
'staff': {'provenance': provenance, 'staff_list': staff_list_data},
|
|
'linkedin_enrichment': {
|
|
'source_file': result_data.get('source_metadata', {}).get('source_file', ''),
|
|
'extraction_date': result_data.get('source_metadata', {}).get('registered_timestamp', ''),
|
|
'follower_count': result_data.get('custodian_metadata', {}).get('follower_count', ''),
|
|
'associated_members': result_data.get('custodian_metadata', {}).get('associated_members', 0),
|
|
'profile_cards_detected': result_data.get('custodian_metadata', {}).get('profile_cards_detected', 0),
|
|
'source_type': 'linkedin_company_people_page_html',
|
|
'extraction_method': 'html_parsing_with_full_staff_data',
|
|
},
|
|
'provenance': {
|
|
'data_source': 'LINKEDIN_HTML_PEOPLE_PAGE',
|
|
'data_tier': 'TIER_4_INFERRED',
|
|
'extraction_date': datetime.now(timezone.utc).isoformat(),
|
|
'extraction_method': 'Comprehensive batch processing with H1 name extraction and full staff data',
|
|
'confidence_score': 0.90,
|
|
'notes': f'Staff extracted from LinkedIn company People page. H1 name used: {custodian_name}. Total staff: {len(staff_with_profiles)}. Location research needed for GHCID.',
|
|
}
|
|
}
|
|
|
|
custodian_file = custodian_dir / f"{placeholder_ghcid}.yaml"
|
|
with open(custodian_file, 'w', encoding='utf-8') as f:
|
|
yaml.dump(custodian_data, f, allow_unicode=True, default_flow_style=False, sort_keys=False)
|
|
|
|
return (custodian_file, True)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description='Final LinkedIn batch processing - extracts ALL data (H1 names + staff data)'
|
|
)
|
|
parser.add_argument('--input-dir', type=Path, required=True,
|
|
help='Directory containing LinkedIn HTML files')
|
|
parser.add_argument('--output-dir', type=Path, required=True,
|
|
help='Output directory for staff JSON files')
|
|
parser.add_argument('--custodian-dir', type=Path, required=True,
|
|
help='Directory containing custodian YAML files')
|
|
parser.add_argument('--limit', type=int, default=0,
|
|
help='Limit processing to first N files (0 = all)')
|
|
parser.add_argument('--skip-index', action='store_true',
|
|
help='Skip custodian index building (faster, always creates new files)')
|
|
|
|
args = parser.parse_args()
|
|
|
|
if not args.input_dir.exists():
|
|
print("Error: Input directory not found: " + str(args.input_dir), file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
args.output_dir.mkdir(parents=True, exist_ok=True)
|
|
args.custodian_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Build custodian lookup cache for fast name matching (unless skipped)
|
|
custodian_lookup = {}
|
|
if not args.skip_index:
|
|
yaml_files = list(args.custodian_dir.glob('*.yaml'))
|
|
print("Building custodian index from " + str(len(yaml_files)) + " files...")
|
|
|
|
for i, custodian_file in enumerate(sorted(yaml_files), 1):
|
|
try:
|
|
with open(custodian_file, 'r', encoding='utf-8') as f:
|
|
data = yaml.safe_load(f)
|
|
if data and data.get('custodian_name'):
|
|
custodian_lookup[data['custodian_name'].lower()] = custodian_file
|
|
except:
|
|
continue
|
|
|
|
if i % 5000 == 0:
|
|
print(f" Indexed {i}/{len(yaml_files)} files...")
|
|
|
|
print("Custodian index built: " + str(len(custodian_lookup)) + " entries")
|
|
else:
|
|
print("SKIP: Custodian index building (--skip-index enabled)")
|
|
|
|
# Pass lookup cache to find_existing_custodian
|
|
global custodian_lookup_cache
|
|
custodian_lookup_cache = custodian_lookup
|
|
|
|
html_files = sorted(args.input_dir.glob('*.html'))
|
|
|
|
if args.limit > 0:
|
|
html_files = html_files[:args.limit]
|
|
|
|
print("Processing " + str(len(html_files)) + " HTML files...")
|
|
print("Input directory: " + str(args.input_dir))
|
|
print("Output directory: " + str(args.output_dir))
|
|
print("Custodian directory: " + str(args.custodian_dir))
|
|
print("Extracting: H1 institution names + Complete staff data (names, URLs, job titles, heritage analysis)")
|
|
|
|
stats = {
|
|
'total': len(html_files),
|
|
'success': 0,
|
|
'errors': 0,
|
|
'with_staff': 0,
|
|
'total_staff': 0,
|
|
'custodians_created': 0,
|
|
'custodians_updated': 0,
|
|
'name_from_h1': 0,
|
|
'full_parser_success': 0,
|
|
'full_parser_failed': 0,
|
|
}
|
|
|
|
for i, html_path in enumerate(html_files, 1):
|
|
try:
|
|
if i % 50 == 0:
|
|
print("[{0:3d}/{1}]".format(i, len(html_files)), end='')
|
|
|
|
result = process_single_file(html_path, args.output_dir, args.custodian_dir)
|
|
|
|
if result['status'] == 'success':
|
|
stats['success'] += 1
|
|
staff_count = result.get('staff_count', 0)
|
|
stats['total_staff'] += staff_count
|
|
|
|
if result.get('use_full_parser'):
|
|
stats['full_parser_success'] += 1
|
|
else:
|
|
stats['full_parser_failed'] += 1
|
|
|
|
if staff_count > 0:
|
|
stats['with_staff'] += 1
|
|
|
|
if result.get('custodian_name', ''):
|
|
stats['name_from_h1'] += 1
|
|
|
|
# Create or update custodian YAML file
|
|
custodian_name = result.get('custodian_name', '')
|
|
if custodian_name:
|
|
custodian_file, is_new = create_or_update_custodian(custodian_name, result, args.custodian_dir)
|
|
if is_new:
|
|
stats['custodians_created'] += 1
|
|
else:
|
|
stats['custodians_updated'] += 1
|
|
|
|
elif result['status'] == 'error':
|
|
stats['errors'] += 1
|
|
print("Error: " + result['filename'] + ": " + result.get('parse_error', ''), file=sys.stderr)
|
|
|
|
except Exception as e:
|
|
stats['errors'] += 1
|
|
print("Exception: " + html_path.name + ": " + str(e), file=sys.stderr)
|
|
|
|
print("\nProcessing complete!")
|
|
|
|
print("\n" + "=" * 60)
|
|
print("PROCESSING COMPLETE")
|
|
print("=" * 60)
|
|
print("\nStatistics:")
|
|
print(" Total HTML files: " + str(stats['total']))
|
|
print(" Successfully processed: " + str(stats['success']))
|
|
print(" Errors: " + str(stats['errors']))
|
|
print(" Institutions with staff: " + str(stats['with_staff']))
|
|
print(" Total staff extracted: " + str(stats['total_staff']))
|
|
print(" Custodians created: " + str(stats['custodians_created']))
|
|
print(" Custodians updated: " + str(stats['custodians_updated']))
|
|
print(" Names from H1: " + str(stats['name_from_h1']))
|
|
print(" Full parser successful: " + str(stats['full_parser_success']))
|
|
print(" Full parser failed: " + str(stats['full_parser_failed']))
|
|
print("\nOutput directories:")
|
|
print(" Staff JSON files: " + str(args.output_dir))
|
|
print(" Custodian YAML files: " + str(args.custodian_dir))
|
|
|
|
report = {
|
|
'processing_date': datetime.now(timezone.utc).isoformat(),
|
|
'input_directory': str(args.input_dir),
|
|
'output_directory': str(args.output_dir),
|
|
'custodian_directory': str(args.custodian_dir),
|
|
'statistics': stats,
|
|
}
|
|
|
|
report_file = Path('reports/linkedin_batch_final_report.json')
|
|
report_file.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(report_file, 'w', encoding='utf-8') as f:
|
|
json.dump(report, f, indent=2, ensure_ascii=False)
|
|
|
|
print("\nReport saved to: " + str(report_file))
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(main())
|