glam/scripts/linkedin_batch_simple.py
2025-12-30 03:43:31 +01:00

547 lines
20 KiB
Python

#!/usr/bin/env python3
"""Simplified LinkedIn Batch Processing - Sequential & Reliable
This script processes LinkedIn HTML files sequentially to avoid multiprocessing issues:
1. Extracts full institution names from HTML H1 tags (not from filename)
2. Properly cleans filenames (removes macOS resource forks, periods, parentheses)
3. Creates staff JSON files and custodian YAML files
4. Generates processing summary
Usage:
python scripts/linkedin_batch_simple.py \
--input-dir /path/to/html/files \
--output-dir data/custodian/person/bu_final \
--custodian-dir data/custodian/
"""
import argparse
import json
import os
import re
import sys
from collections import Counter
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
try:
from bs4 import BeautifulSoup
except ImportError:
print("Error: beautifulsoup4 not installed. Run: pip install beautifulsoup4", file=sys.stderr)
sys.exit(1)
try:
import yaml
except ImportError:
print("Error: yaml not installed. Run: pip install pyyaml", file=sys.stderr)
sys.exit(1)
def clean_filename_to_slug(filename: str) -> str:
"""
Clean HTML filename to generate URL-safe slug.
Handles:
- macOS resource fork prefixes (._)
- Periods before numbers (._(15))
- Numbers in parentheses (15), (7)
- Extra spaces and underscores
- " People _ LinkedIn.html" suffix
Examples:
"._(15) Gemeente Enkhuizen_ People _ LinkedIn.html"
-> "gemeente-enkhuizen"
"(7) ADVN _ archief voor nationale bewegingen_ People _ LinkedIn.html"
-> "advn-archief-voor-nationale-bewegingen"
"""
# Remove " People _ LinkedIn.html" suffix
name = filename.replace(' People _ LinkedIn.html', '')
name = name.replace('.html', '')
# Remove macOS resource fork prefix (._)
if name.startswith('._'):
name = name[2:]
# Remove leading period followed by numbers/parentheses: ._(15), .(15), _(15)
name = re.sub(r'^\.?\_?\(\d+\)\s*', '', name)
name = re.sub(r'^\._*\(\d+\)\s*', '', name)
# Remove trailing spaces and underscores
name = name.strip('_ ')
# Convert to URL-safe slug
slug = re.sub(r'[^a-z0-9]+', '-', name.lower())
slug = re.sub(r'-+', '-', slug).strip('-')
return slug
def extract_institution_name_from_html(html_content: str) -> Optional[str]:
"""
Extract full institution name from HTML H1 tag.
LinkedIn H1 format: "Organization Name | LinkedIn"
We extract the part before the pipe.
Returns None if H1 not found.
"""
soup = BeautifulSoup(html_content, 'html.parser')
h1 = soup.find('h1')
if h1:
h1_text = h1.get_text().strip()
# Remove " | LinkedIn" suffix
if ' | ' in h1_text:
name = h1_text.split(' | ')[0].strip()
else:
name = h1_text
# Clean up extra pipes or separators
name = re.sub(r'\s*\|\s*', ' ', name)
name = re.sub(r'\s+', ' ', name)
return name if name else None
return None
def clean_filename_to_name(filename: str) -> str:
"""
Clean HTML filename to extract institution name.
This is a fallback when HTML H1 extraction fails.
"""
# Remove " People _ LinkedIn.html" suffix
name = filename.replace(' People _ LinkedIn.html', '')
name = name.replace('.html', '')
# Remove macOS resource fork prefix (._)
if name.startswith('._'):
name = name[2:]
# Remove leading period followed by numbers/parentheses: ._(15), .(15), _(15)
name = re.sub(r'^\.?\_?\(\d+\)\s*', '', name)
name = re.sub(r'^\._*\(\d+\)\s*', '', name)
# Remove trailing spaces and underscores
name = name.strip('_ ')
# Replace multiple spaces with single space
name = re.sub(r'\s+', ' ', name)
return name.strip()
def find_existing_custodian(custodian_name: str, custodian_dir: Path) -> Optional[Path]:
"""
Find existing custodian YAML file by name (case-insensitive).
"""
custodian_lower = custodian_name.lower()
for custodian_file in sorted(custodian_dir.glob('*.yaml')):
try:
with open(custodian_file, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
if data and data.get('custodian_name', '').lower() == custodian_lower:
return custodian_file
except Exception:
continue
return None
def process_single_file(html_path: Path, output_dir: Path, custodian_dir: Path) -> dict:
"""
Process a single HTML file.
Returns processing result dictionary with all data.
"""
# Generate slug
slug = clean_filename_to_slug(html_path.name)
try:
# Read HTML
with open(html_path, 'r', encoding='utf-8', errors='replace') as f:
html_content = f.read()
# Extract name from HTML H1
html_name = extract_institution_name_from_html(html_content)
if not html_name:
# Fallback: extract from filename
html_name = clean_filename_to_name(html_path.name)
# Simple staff extraction using regex
staff = []
h1_match = re.search(r'<h1[^>]*>([^<]+)</h1>', html_content, re.IGNORECASE | re.DOTALL)
if h1_match:
institution_name = h1_match.group(1).strip()
else:
institution_name = html_name
# Extract staff cards using regex pattern
# Look for profile cards with LinkedIn profile URLs
profile_pattern = r'org-people-profile-card__profile-image-\d+.*?href="(linkedin\.com/in/[^"]+)"'
for match in re.finditer(profile_pattern, html_content):
linkedin_url = match.group(1)
# Extract slug from URL
slug_match = re.search(r'linkedin\.com/in/([^/?]+)', linkedin_url)
if not slug_match:
continue
linkedin_slug = slug_match.group(1)
# Try to find name near the profile URL
# Look for name in nearby text
context_start = max(0, match.start() - 200)
context_end = match.start() + 200
context = html_content[context_start:context_end]
# Try to find name (look for alt text or text in title section)
alt_match = re.search(r'alt="([^"]+)"', context)
name = alt_match.group(1) if alt_match else None
# If name looks like "is open to work", remove it
if name:
for phrase in [' is open to work', ' is hiring']:
if phrase in name:
name = name.split(phrase)[0].strip()
break
# Default to "LinkedIn Member" if no name found
if not name or name.startswith('photo'):
name = 'LinkedIn Member'
# Detect heritage type from context
headline = ''
is_heritage = False
heritage_type = None
# Look for job title/headline
headline_match = re.search(r'artdeco-entity-lockup__subtitle[^>]*>([^<]+)</', context)
if headline_match:
headline = headline_match.group(1).strip()
# Simple heritage detection keywords
heritage_keywords = {
'M': ['museum', 'curator', 'conservator', 'collection', 'archief', 'archive'],
'L': ['library', 'bibliotheek', 'bibliothek'],
'A': ['archive', 'archief'],
'G': ['gallery', 'galerie', 'kunsthal'],
'R': ['research', 'onderzoek', 'researcher'],
'E': ['universiteit', 'university', 'educatie', 'teacher', 'professor'],
'S': ['vereniging', 'society', 'genootschap'],
}
text_lower = (headline + ' ' + name).lower()
for type_code, keywords in heritage_keywords.items():
for kw in keywords:
if kw in text_lower:
is_heritage = True
heritage_type = type_code
break
# Default to museum if it's a heritage institution
if is_heritage and not heritage_type:
heritage_type = 'M'
staff_entry = {
'staff_id': f"{slug}_staff_{len(staff):04d}_{linkedin_slug}",
'name': name,
'name_type': 'full' if name != 'LinkedIn Member' else 'anonymous',
'linkedin_slug': linkedin_slug,
'linkedin_profile_url': f"https://www.linkedin.com/in/{linkedin_slug}",
'headline': headline,
'heritage_relevant': is_heritage,
'heritage_type': heritage_type,
}
staff.append(staff_entry)
# Calculate staff analysis
total_staff = len(staff)
with_linkedin = sum(1 for s in staff if s.get('linkedin_profile_url'))
heritage_relevant = sum(1 for s in staff if s.get('heritage_relevant'))
heritage_type_counts = Counter([s.get('heritage_type') for s in staff if s.get('heritage_type')])
# Build result
timestamp = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')
result = {
'custodian_metadata': {
'custodian_name': html_name,
'custodian_slug': slug,
'name': institution_name,
},
'source_metadata': {
'source_type': 'linkedin_company_people_page_html',
'source_file': html_path.name,
'registered_timestamp': timestamp,
'registration_method': 'html_parsing_simple_regex',
'staff_extracted': total_staff,
},
'staff': staff,
'staff_analysis': {
'total_staff_extracted': total_staff,
'with_linkedin_url': with_linkedin,
'heritage_relevant_count': heritage_relevant,
'staff_by_heritage_type': dict(heritage_type_counts),
},
}
return {
'status': 'success',
'slug': slug,
'filename': html_path.name,
'custodian_name': html_name,
'staff_count': total_staff,
'result': result,
}
except Exception as e:
return {
'status': 'error',
'slug': slug,
'filename': html_path.name,
'error': str(e),
}
def create_custodian_yaml(custodian_name: str, result: dict, custodian_file: Optional[Path], is_new: bool) -> None:
"""
Create or update custodian YAML file with staff data.
"""
staff_list = result.get('staff', [])
staff_with_profiles = [s for s in staff_list if s.get('linkedin_profile_url')]
if not staff_with_profiles:
return
# Provenance data
provenance = {
'source_type': 'linkedin_company_people_page_html',
'registered_timestamp': result['source_metadata'].get('registered_timestamp', ''),
'registration_method': 'html_parsing_simple_regex',
'total_staff_extracted': len(staff_with_profiles),
}
# Staff list with references to entity files
staff_list_data = []
for s in staff_with_profiles:
staff_entry = {
'staff_id': s.get('staff_id'),
'person_name': s.get('name'),
'person_profile_path': f"data/custodian/person/entity/{s.get('linkedin_slug', '')}_*.json",
'role_title': s.get('headline', ''),
'heritage_relevant': s.get('heritage_relevant', False),
'heritage_type': s.get('heritage_type'),
}
staff_list_data.append(staff_entry)
if is_new:
# Create new custodian file
# Determine institution type based on staff heritage analysis
heritage_types = result['staff_analysis'].get('staff_by_heritage_type', {})
if heritage_types:
most_common = Counter(heritage_types).most_common(1)
if most_common:
type_code = most_common[0][0]
type_map = {
'M': 'MUSEUM',
'L': 'LIBRARY',
'A': 'ARCHIVE',
'G': 'GALLERY',
'R': 'RESEARCH_CENTER',
'E': 'EDUCATION_PROVIDER',
'S': 'COLLECTING_SOCIETY',
'D': 'DIGITAL_PLATFORM',
}
institution_type = type_map.get(type_code, 'MUSEUM')
else:
institution_type = 'MUSEUM'
# Generate placeholder GHCID
placeholder_ghcid = f"NL-XX-XXX-PENDING-{slug.upper()}"
custodian_data = {
'ghcid_current': placeholder_ghcid,
'custodian_name': custodian_name,
'institution_type': institution_type,
'custodian_name': {
'emic_name': custodian_name,
'english_name': None,
'name_verified': True,
'name_source': 'linkedin_html_h1',
},
'staff': {
'provenance': provenance,
'staff_list': staff_list_data,
},
'provenance': {
'data_source': 'LINKEDIN_HTML_PEOPLE_PAGE',
'data_tier': 'TIER_4_INFERRED',
'extraction_date': datetime.now(timezone.utc).isoformat(),
'extraction_method': 'Sequential batch processing with HTML H1 name extraction',
'confidence_score': 0.85,
'notes': f'Staff extracted from LinkedIn company People page. Location research needed for GHCID. Total staff: {len(staff_with_profiles)}',
}
}
# Create new file
with open(custodian_file, 'w', encoding='utf-8') as f:
yaml.dump(custodian_data, f, allow_unicode=True, default_flow_style=False, sort_keys=False)
else:
# Update existing file
with open(custodian_file, 'r', encoding='utf-8') as f:
custodian_data = yaml.safe_load(f) or {}
# Update staff section
custodian_data['staff'] = {
'provenance': provenance,
'staff_list': staff_list_data,
}
# Update custodian name
custodian_data['custodian_name'] = custodian_name
# Write back
with open(custodian_file, 'w', encoding='utf-8') as f:
yaml.dump(custodian_data, f, allow_unicode=True, default_flow_style=False, sort_keys=False)
def main():
parser = argparse.ArgumentParser(
description='Simplified LinkedIn batch processing - sequential and reliable'
)
parser.add_argument('--input-dir', type=Path, required=True,
help='Directory containing LinkedIn HTML files')
parser.add_argument('--output-dir', type=Path, required=True,
help='Output directory for staff JSON files')
parser.add_argument('--custodian-dir', type=Path, required=True,
help='Directory containing custodian YAML files')
parser.add_argument('--limit', type=int, default=0,
help='Limit processing to first N files (0 = all)')
args = parser.parse_args()
if not args.input_dir.exists():
print(f"Error: Input directory not found: {args.input_dir}", file=sys.stderr)
sys.exit(1)
# Create output directories
args.output_dir.mkdir(parents=True, exist_ok=True)
args.custodian_dir.mkdir(parents=True, exist_ok=True)
# Get all HTML files
html_files = sorted(args.input_dir.glob('*.html'))
if args.limit > 0:
html_files = html_files[:args.limit]
print(f"Processing {len(html_files)} HTML files sequentially...")
print(f"Input directory: {args.input_dir}")
print(f"Output directory: {args.output_dir}")
print(f"Custodian directory: {args.custodian_dir}")
print(f"Estimated time: ~{len(html_files)} seconds (~{len(html_files)//60} minutes)")
# Statistics
stats = {
'total': len(html_files),
'success': 0,
'errors': 0,
'with_staff': 0,
'total_staff': 0,
'custodians_created': 0,
'custodians_updated': 0,
'name_fixes': 0,
'empty_staff': 0,
}
# Process files sequentially
for i, html_path in enumerate(html_files, 1):
try:
if i % 100 == 0:
print(f"Progress: [{i}/{len(html_files)}]", end='\r')
result = process_single_file(html_path, args.output_dir, args.custodian_dir)
if result['status'] == 'error':
stats['errors'] += 1
print(f"Error: {result['filename']}: {result['error']}", file=sys.stderr)
continue
stats['success'] += 1
staff_count = result.get('staff_count', 0)
stats['total_staff'] += staff_count
if staff_count == 0:
stats['empty_staff'] += 1
else:
stats['with_staff'] += 1
# Find or create custodian YAML
custodian_name = result.get('custodian_name')
if custodian_name:
existing_file = find_existing_custodian(custodian_name, args.custodian_dir)
if existing_file:
stats['custodians_updated'] += 1
# Update existing custodian
create_custodian_yaml(custodian_name, result['result'], existing_file, is_new=False)
else:
stats['custodians_created'] += 1
# Create new custodian
custodian_file = args.custodian_dir / f"{result['slug']}.yaml"
create_custodian_yaml(custodian_name, result['result'], custodian_file, is_new=True)
except Exception as e:
stats['errors'] += 1
print(f"Error: {html_path.name}: {e}", file=sys.stderr)
print(f"\nProcessing complete!")
# Print summary
print("\n" + "=" * 60)
print("PROCESSING COMPLETE")
print("=" * 60)
print(f"\nStatistics:")
print(f" Total HTML files: {stats['total']}")
print(f" Successfully processed: {stats['success']}")
print(f" Errors: {stats['errors']}")
print(f" Institutions with staff: {stats['with_staff']}")
print(f" Institutions with empty staff: {stats['empty_staff']}")
print(f" Total staff extracted: {stats['total_staff']}")
print(f" Custodians created: {stats['custodians_created']}")
print(f" Custodians updated: {stats['custodians_updated']}")
print(f"\nOutput directories:")
print(f" Staff JSON files: {args.output_dir}")
print(f" Custodian YAML files: {args.custodian_dir}")
# Save processing report
report = {
'processing_date': datetime.now(timezone.utc).isoformat(),
'input_directory': str(args.input_dir),
'output_directory': str(args.output_dir),
'custodian_directory': str(args.custodian_dir),
'statistics': stats,
}
report_file = Path('reports/linkedin_batch_simple_report.json')
report_file.parent.mkdir(parents=True, exist_ok=True)
with open(report_file, 'w', encoding='utf-8') as f:
json.dump(report, f, indent=2, ensure_ascii=False)
print(f"\nReport saved to: {report_file}")
return 0
if __name__ == '__main__':
sys.exit(main())