glam/scripts/batch_parse_linkedin_html.py
2025-12-14 17:09:55 +01:00

145 lines
4.8 KiB
Python
Executable file

#!/usr/bin/env python3
"""
Batch parse all LinkedIn HTML files in the manual directory.
Extracts custodian name from filename, generates slug, and parses to JSON.
"""
import json
import re
import sys
import unicodedata
from datetime import datetime, timezone
from pathlib import Path
# Add scripts directory to path for import
sys.path.insert(0, str(Path(__file__).parent))
from parse_linkedin_html import parse_html_file
def extract_custodian_name(filename: str) -> str:
"""Extract custodian name from LinkedIn filename.
Filename format: "(N) Custodian Name_ People _ LinkedIn.html"
"""
# Remove the "(N) " prefix
name = re.sub(r'^\(\d+\)\s*', '', filename)
# Remove "_ People _ LinkedIn.html" suffix
name = re.sub(r'_\s*People\s*_\s*LinkedIn\.html$', '', name)
# Clean up underscores that LinkedIn uses instead of colons
name = name.replace('_ ', ': ').replace(' _', ':')
# Remove trailing/leading whitespace
return name.strip()
def generate_slug(name: str) -> str:
"""Generate URL-safe slug from custodian name."""
# Normalize unicode
normalized = unicodedata.normalize('NFD', name.lower())
# Remove diacritics
ascii_name = ''.join(c for c in normalized if unicodedata.category(c) != 'Mn')
# Replace special chars with hyphens
slug = re.sub(r'[^a-z0-9]+', '-', ascii_name)
# Clean up multiple hyphens
slug = re.sub(r'-+', '-', slug).strip('-')
return slug
def main():
manual_dir = Path('/Users/kempersc/apps/glam/data/custodian/person/affiliated/manual')
output_dir = Path('/Users/kempersc/apps/glam/data/custodian/person/affiliated/parsed')
output_dir.mkdir(parents=True, exist_ok=True)
# Get timestamp for this batch
timestamp = datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%SZ')
# Find all HTML files
html_files = sorted(manual_dir.glob('*.html'))
print(f"Found {len(html_files)} HTML files to process")
print(f"Output directory: {output_dir}")
print(f"Timestamp: {timestamp}")
print("-" * 60)
results = []
errors = []
for html_file in html_files:
filename = html_file.name
# Skip non-standard files
if 'People' not in filename:
print(f"SKIP: {filename} (not a People page)")
continue
custodian_name = extract_custodian_name(filename)
custodian_slug = generate_slug(custodian_name)
# Generate output filename
output_file = output_dir / f"{custodian_slug}_staff_{timestamp}.json"
# Skip if already exists
if output_file.exists():
print(f"SKIP: {custodian_name} (already exists)")
continue
try:
print(f"Parsing: {custodian_name}")
result = parse_html_file(html_file, custodian_name, custodian_slug)
# Save output
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(result, f, indent=2, ensure_ascii=False)
staff_count = result['staff_analysis']['total_staff_extracted']
heritage_count = result['staff_analysis']['heritage_relevant_count']
print(f" -> {staff_count} staff ({heritage_count} heritage-relevant)")
results.append({
'custodian_name': custodian_name,
'custodian_slug': custodian_slug,
'staff_count': staff_count,
'heritage_relevant': heritage_count,
'output_file': str(output_file.name)
})
except Exception as e:
print(f" ERROR: {e}")
errors.append({
'custodian_name': custodian_name,
'error': str(e)
})
# Summary
print("\n" + "=" * 60)
print(f"BATCH COMPLETE")
print(f" Processed: {len(results)}")
print(f" Errors: {len(errors)}")
print(f" Total staff: {sum(r['staff_count'] for r in results)}")
print(f" Total heritage-relevant: {sum(r['heritage_relevant'] for r in results)}")
# Save batch summary
summary_file = output_dir / f"batch_results_{timestamp}.json"
with open(summary_file, 'w', encoding='utf-8') as f:
json.dump({
'timestamp': timestamp,
'processed': len(results),
'errors': len(errors),
'total_staff': sum(r['staff_count'] for r in results),
'total_heritage_relevant': sum(r['heritage_relevant'] for r in results),
'results': results,
'errors_list': errors
}, f, indent=2, ensure_ascii=False)
print(f"\nBatch summary saved to: {summary_file}")
if errors:
print("\nErrors:")
for err in errors:
print(f" - {err['custodian_name']}: {err['error']}")
return 0 if not errors else 1
if __name__ == '__main__':
sys.exit(main())