#!/usr/bin/env python3
"""Optimized LinkedIn Batch Processing - Fast Processing
This is an optimized version that:
1. Uses parallel processing for speed
2. Skips already-processed files
3. Extracts full institution names from HTML H1 tags
4. Creates custodian YAML files with staff lists
Usage:
python scripts/linkedin_batch_fast.py \
--input-dir /path/to/html/files \
--output-dir data/custodian/person/bu_fixed \
--custodian-dir data/custodian/
"""
import argparse
import json
import os
import re
import sys
from collections import Counter
from datetime import datetime, timezone
from pathlib import Path
from concurrent.futures import ProcessPoolExecutor, as_completed
from functools import partial
try:
from bs4 import BeautifulSoup
except ImportError:
print("Error: beautifulsoup4 not installed. Run: pip install beautifulsoup4", file=sys.stderr)
sys.exit(1)
try:
import yaml
except ImportError:
print("Error: yaml not installed. Run: pip install pyyaml", file=sys.stderr)
sys.exit(1)
# Add scripts directory to path
sys.path.insert(0, str(Path(__file__).parent))
# Try to import existing parser
try:
from parse_linkedin_html import parse_html_file, generate_staff_id
except ImportError:
parse_html_file = None
generate_staff_id = None
print("Warning: parse_linkedin_html not found, using simplified parsing", file=sys.stderr)
def clean_filename_to_slug(filename):
"""Clean HTML filename to generate slug."""
# Remove " People _ LinkedIn.html" suffix
name = filename.replace(' People _ LinkedIn.html', '')
name = name.replace('.html', '')
# Remove macOS resource fork prefix (._)
if name.startswith('._'):
name = name[2:]
# Remove leading period followed by numbers/parentheses: ._(15), .(15), _(15)
name = re.sub(r'^\.?\_?\(\d+\)\s*', '', name)
name = re.sub(r'^\._*\(\d+\)\s*', '', name)
# Remove trailing spaces and underscores
name = name.strip('_ ')
# Convert to URL-safe slug
slug = re.sub(r'[^a-z0-9]+', '-', name.lower())
slug = re.sub(r'-+', '-', slug).strip('-')
return slug
def extract_institution_name_from_html_fast(html_content):
"""Extract institution name from HTML using regex (faster than BeautifulSoup)."""
# Try to find H1 tag with regex
h1_match = re.search(r'
]*>([^<|]+?)\s*\|\s*LinkedIn
', html_content, re.IGNORECASE | re.DOTALL)
if h1_match:
name = h1_match.group(1).strip()
# Clean up: name
name = re.sub(r'\s+', ' ', name)
name = name.strip()
return name if name else None
return None
def find_existing_custodian(custodian_name, custodian_dir):
"""Find existing custodian YAML file by name (case-insensitive)."""
custodian_lower = custodian_name.lower()
for custodian_file in sorted(custodian_dir.glob('*.yaml')):
try:
with open(custodian_file, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
if data and data.get('custodian_name', '').lower() == custodian_lower:
return custodian_file
except Exception:
continue
return None
def process_single_file(html_path, input_dir, output_dir, skip_existing):
"""Process a single HTML file. Returns processing result dictionary."""
# Generate slug
slug = clean_filename_to_slug(html_path.name)
# Check if already processed
existing_files = list(output_dir.glob(slug + '_staff_*.json'))
if skip_existing and existing_files:
return {
'status': 'skipped',
'slug': slug,
'filename': html_path.name,
'reason': 'already_processed',
}
try:
# Read HTML
with open(html_path, 'r', encoding='utf-8', errors='replace') as f:
html_content = f.read()
# Extract name from HTML
html_name = extract_institution_name_from_html_fast(html_content)
if not html_name:
# Fallback: extract from filename
name_no_ext = html_path.name.replace(' People _ LinkedIn.html', '')
name_no_ext = name_no_ext.replace('.html', '')
if name_no_ext.startswith('._'):
name_no_ext = name_no_ext[2:]
name_no_ext = re.sub(r'^\.?\_?\(\d+\)\s*', '', name_no_ext)
name_no_ext = re.sub(r'^\._*\(\d+\)\s*', '', name_no_ext)
name_no_ext = re.sub(r'\s+', ' ', name_no_ext).strip()
html_name = name_no_ext
# Parse HTML using existing parser if available
if parse_html_file:
result = parse_html_file(html_path, html_name, slug)
else:
# Fallback: simple structure
result = {
'custodian_metadata': {
'custodian_name': html_name,
'custodian_slug': slug,
'name': html_name,
},
'source_metadata': {
'source_type': 'linkedin_company_people_page_html',
'source_file': html_path.name,
'registered_timestamp': datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ'),
'registration_method': 'html_parsing',
'staff_extracted': 0,
},
'staff': [],
'staff_analysis': {
'total_staff_extracted': 0,
'with_linkedin_url': 0,
'with_alternate_profiles': 0,
'anonymous_members': 0,
'heritage_relevant_count': 0,
'staff_by_heritage_type': {},
},
}
# Update custodian name
if 'custodian_metadata' in result:
result['custodian_metadata']['custodian_name'] = html_name
result['custodian_metadata']['name'] = html_name
# Update source filename
if 'source_metadata' in result:
result['source_metadata']['source_file'] = html_path.name
# Save staff JSON
timestamp = datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%SZ')
staff_filename = output_dir / f"{slug}_staff_{timestamp}.json"
with open(staff_filename, 'w', encoding='utf-8') as f:
json.dump(result, f, indent=2, ensure_ascii=False)
return {
'status': 'success',
'slug': slug,
'filename': html_path.name,
'custodian_name': html_name,
'staff_count': result.get('staff_analysis', {}).get('total_staff_extracted', 0),
'result': result,
}
except Exception as e:
return {
'status': 'error',
'slug': slug,
'filename': html_path.name,
'error': str(e),
}
def create_custodian_yaml(custodian_name, result, custodian_file, is_new):
"""Create or update custodian YAML file with staff data."""
staff_list = result.get('staff', [])
staff_with_profiles = [s for s in staff_list if s.get('linkedin_slug')]
if not staff_with_profiles:
return
# Provenance data
provenance = {
'source_type': 'linkedin_company_people_page_html',
'registered_timestamp': result.get('source_metadata', {}).get('registered_timestamp', ''),
'registration_method': 'html_parsing',
'total_staff_extracted': len(staff_with_profiles),
}
# Staff list with references to entity files
staff_list_data = []
for s in staff_with_profiles:
staff_entry = {
'staff_id': s.get('staff_id'),
'person_name': s.get('name'),
'person_profile_path': f"data/custodian/person/entity/{s.get('linkedin_slug', '')}_*.json",
'role_title': s.get('headline', ''),
'heritage_relevant': s.get('heritage_relevant', False),
'heritage_type': s.get('heritage_type'),
}
staff_list_data.append(staff_entry)
if is_new:
# Create new custodian file
# Determine institution type based on staff heritage analysis
heritage_types = result.get('staff_analysis', {}).get('staff_by_heritage_type', {})
# Default institution type
institution_type = 'MUSEUM'
if heritage_types:
most_common = Counter(heritage_types).most_common(1)
if most_common:
type_code = most_common[0][0]
type_map = {
'M': 'MUSEUM',
'L': 'LIBRARY',
'A': 'ARCHIVE',
'G': 'GALLERY',
'R': 'RESEARCH_CENTER',
'E': 'EDUCATION_PROVIDER',
'S': 'COLLECTING_SOCIETY',
'D': 'DIGITAL_PLATFORM',
}
institution_type = type_map.get(type_code, 'MUSEUM')
# Generate placeholder GHCID
slug = clean_filename_to_slug(f"{custodian_name}.html")
placeholder_ghcid = f"NL-XX-XXX-PENDING-{slug.upper()}"
custodian_data = {
'ghcid_current': placeholder_ghcid,
'custodian_name': custodian_name,
'institution_type': institution_type,
'custodian_name': {
'emic_name': custodian_name,
'english_name': None,
'name_verified': True,
'name_source': 'linkedin_html_h1',
},
'staff': {
'provenance': provenance,
'staff_list': staff_list_data,
},
'provenance': {
'data_source': 'LINKEDIN_HTML_PEOPLE_PAGE',
'data_tier': 'TIER_4_INFERRED',
'extraction_date': datetime.now(timezone.utc).isoformat(),
'extraction_method': 'Optimized batch processing with HTML H1 name extraction',
'confidence_score': 0.85,
'notes': f'Staff extracted from LinkedIn company People page. Location research needed for GHCID. Total staff: {len(staff_with_profiles)}',
}
}
# Create new file
with open(custodian_file, 'w', encoding='utf-8') as f:
yaml.dump(custodian_data, f, allow_unicode=True, default_flow_style=False, sort_keys=False)
else:
# Update existing file
with open(custodian_file, 'r', encoding='utf-8') as f:
custodian_data = yaml.safe_load(f) or {}
# Update staff section
custodian_data['staff'] = {
'provenance': provenance,
'staff_list': staff_list_data,
}
# Update custodian name
custodian_data['custodian_name'] = custodian_name
# Write back
with open(custodian_file, 'w', encoding='utf-8') as f:
yaml.dump(custodian_data, f, allow_unicode=True, default_flow_style=False, sort_keys=False)
def process_wrapper_func(f, input_dir_val, output_dir_val, skip_existing_val):
"""Wrapper function for multiprocessing - must be at module level."""
return process_single_file(f, input_dir_val, output_dir_val, skip_existing_val)
def main():
parser = argparse.ArgumentParser(
description='Optimized LinkedIn batch processing - parallel and fast'
)
parser.add_argument('--input-dir', type=Path, required=True,
help='Directory containing LinkedIn HTML files')
parser.add_argument('--output-dir', type=Path, required=True,
help='Output directory for staff JSON files')
parser.add_argument('--custodian-dir', type=Path, required=True,
help='Directory containing custodian YAML files')
parser.add_argument('--workers', type=int, default=8,
help='Number of parallel workers (default: 8)')
parser.add_argument('--skip-existing', action='store_true',
help='Skip already-processed files')
parser.add_argument('--limit', type=int, default=0,
help='Limit processing to first N files (0 = all)')
args = parser.parse_args()
if not args.input_dir.exists():
print(f"Error: Input directory not found: {args.input_dir}", file=sys.stderr)
sys.exit(1)
# Create output directories
args.output_dir.mkdir(parents=True, exist_ok=True)
args.custodian_dir.mkdir(parents=True, exist_ok=True)
# Get all HTML files
html_files = sorted(args.input_dir.glob('*.html'))
if args.limit > 0:
html_files = html_files[:args.limit]
print(f"Processing {len(html_files)} HTML files with {args.workers} workers...")
print(f"Input directory: {args.input_dir}")
print(f"Output directory: {args.output_dir}")
print(f"Custodian directory: {args.custodian_dir}")
if args.skip_existing:
print("Skipping already-processed files")
# Prepare arguments for parallel processing using partial
from functools import partial
process_func = partial(process_wrapper_func, input_dir_val=args.input_dir, output_dir_val=args.output_dir, skip_existing_val=args.skip_existing)
# Process files in parallel
results = []
with ProcessPoolExecutor(max_workers=args.workers) as executor:
futures = {executor.submit(process_func, f): f for f in html_files}
completed = 0
for future in as_completed(futures):
completed += 1
if completed % 100 == 0:
print(f"Progress: {completed}/{len(html_files)} processed", end='\r')
try:
result = future.result()
results.append(result)
except Exception as e:
print(f"\nError getting result: {e}", file=sys.stderr)
print(f"\nProcessing complete: {completed} files")
# Statistics
stats = {
'total': len(html_files),
'success': 0,
'errors': 0,
'skipped': 0,
'with_staff': 0,
'total_staff': 0,
'custodians_created': 0,
'custodians_updated': 0,
'empty_staff': 0,
}
# Process results and create custodian YAMLs
for result in results:
if result['status'] == 'skipped':
stats['skipped'] += 1
elif result['status'] == 'error':
stats['errors'] += 1
print(f"Error processing {result['filename']}: {result['error']}", file=sys.stderr)
elif result['status'] == 'success':
stats['success'] += 1
stats['total_staff'] += result.get('staff_count', 0)
if result['staff_count'] == 0:
stats['empty_staff'] += 1
# Find or create custodian YAML
custodian_name = result.get('custodian_name')
if custodian_name:
existing_file = find_existing_custodian(custodian_name, args.custodian_dir)
if existing_file:
stats['custodians_updated'] += 1
# Update existing custodian
create_custodian_yaml(custodian_name, result['result'], existing_file, is_new=False)
else:
stats['custodians_created'] += 1
# Create new custodian
slug = clean_filename_to_slug(f"{custodian_name}.html")
placeholder_ghcid = f"NL-XX-XXX-PENDING-{slug.upper()}"
custodian_file = args.custodian_dir / f"{placeholder_ghcid}.yaml"
create_custodian_yaml(custodian_name, result['result'], custodian_file, is_new=True)
# Print summary
print("\n" + "=" * 60)
print("PROCESSING COMPLETE")
print("=" * 60)
print(f"\nStatistics:")
print(f" Total HTML files: {stats['total']}")
print(f" Successfully processed: {stats['success']}")
print(f" Skipped (already processed): {stats['skipped']}")
print(f" Errors: {stats['errors']}")
print(f" Institutions with staff: {stats['success'] - stats['empty_staff']}")
print(f" Institutions with empty staff: {stats['empty_staff']}")
print(f" Total staff extracted: {stats['total_staff']}")
print(f" Custodians created: {stats['custodians_created']}")
print(f" Custodians updated: {stats['custodians_updated']}")
print(f"\nOutput directories:")
print(f" Staff JSON files: {args.output_dir}")
print(f" Custodian YAML files: {args.custodian_dir}")
return 0
if __name__ == '__main__':
sys.exit(main())