547 lines
20 KiB
Python
547 lines
20 KiB
Python
#!/usr/bin/env python3
|
|
"""Simplified LinkedIn Batch Processing - Sequential & Reliable
|
|
|
|
This script processes LinkedIn HTML files sequentially to avoid multiprocessing issues:
|
|
1. Extracts full institution names from HTML H1 tags (not from filename)
|
|
2. Properly cleans filenames (removes macOS resource forks, periods, parentheses)
|
|
3. Creates staff JSON files and custodian YAML files
|
|
4. Generates processing summary
|
|
|
|
Usage:
|
|
python scripts/linkedin_batch_simple.py \
|
|
--input-dir /path/to/html/files \
|
|
--output-dir data/custodian/person/bu_final \
|
|
--custodian-dir data/custodian/
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
from collections import Counter
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
try:
|
|
from bs4 import BeautifulSoup
|
|
except ImportError:
|
|
print("Error: beautifulsoup4 not installed. Run: pip install beautifulsoup4", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
try:
|
|
import yaml
|
|
except ImportError:
|
|
print("Error: yaml not installed. Run: pip install pyyaml", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
|
|
def clean_filename_to_slug(filename: str) -> str:
|
|
"""
|
|
Clean HTML filename to generate URL-safe slug.
|
|
|
|
Handles:
|
|
- macOS resource fork prefixes (._)
|
|
- Periods before numbers (._(15))
|
|
- Numbers in parentheses (15), (7)
|
|
- Extra spaces and underscores
|
|
- " People _ LinkedIn.html" suffix
|
|
|
|
Examples:
|
|
"._(15) Gemeente Enkhuizen_ People _ LinkedIn.html"
|
|
-> "gemeente-enkhuizen"
|
|
|
|
"(7) ADVN _ archief voor nationale bewegingen_ People _ LinkedIn.html"
|
|
-> "advn-archief-voor-nationale-bewegingen"
|
|
"""
|
|
# Remove " People _ LinkedIn.html" suffix
|
|
name = filename.replace(' People _ LinkedIn.html', '')
|
|
name = name.replace('.html', '')
|
|
|
|
# Remove macOS resource fork prefix (._)
|
|
if name.startswith('._'):
|
|
name = name[2:]
|
|
|
|
# Remove leading period followed by numbers/parentheses: ._(15), .(15), _(15)
|
|
name = re.sub(r'^\.?\_?\(\d+\)\s*', '', name)
|
|
name = re.sub(r'^\._*\(\d+\)\s*', '', name)
|
|
|
|
# Remove trailing spaces and underscores
|
|
name = name.strip('_ ')
|
|
|
|
# Convert to URL-safe slug
|
|
slug = re.sub(r'[^a-z0-9]+', '-', name.lower())
|
|
slug = re.sub(r'-+', '-', slug).strip('-')
|
|
|
|
return slug
|
|
|
|
|
|
def extract_institution_name_from_html(html_content: str) -> Optional[str]:
|
|
"""
|
|
Extract full institution name from HTML H1 tag.
|
|
|
|
LinkedIn H1 format: "Organization Name | LinkedIn"
|
|
We extract the part before the pipe.
|
|
|
|
Returns None if H1 not found.
|
|
"""
|
|
soup = BeautifulSoup(html_content, 'html.parser')
|
|
h1 = soup.find('h1')
|
|
|
|
if h1:
|
|
h1_text = h1.get_text().strip()
|
|
# Remove " | LinkedIn" suffix
|
|
if ' | ' in h1_text:
|
|
name = h1_text.split(' | ')[0].strip()
|
|
else:
|
|
name = h1_text
|
|
|
|
# Clean up extra pipes or separators
|
|
name = re.sub(r'\s*\|\s*', ' ', name)
|
|
name = re.sub(r'\s+', ' ', name)
|
|
|
|
return name if name else None
|
|
|
|
return None
|
|
|
|
|
|
def clean_filename_to_name(filename: str) -> str:
|
|
"""
|
|
Clean HTML filename to extract institution name.
|
|
|
|
This is a fallback when HTML H1 extraction fails.
|
|
"""
|
|
# Remove " People _ LinkedIn.html" suffix
|
|
name = filename.replace(' People _ LinkedIn.html', '')
|
|
name = name.replace('.html', '')
|
|
|
|
# Remove macOS resource fork prefix (._)
|
|
if name.startswith('._'):
|
|
name = name[2:]
|
|
|
|
# Remove leading period followed by numbers/parentheses: ._(15), .(15), _(15)
|
|
name = re.sub(r'^\.?\_?\(\d+\)\s*', '', name)
|
|
name = re.sub(r'^\._*\(\d+\)\s*', '', name)
|
|
|
|
# Remove trailing spaces and underscores
|
|
name = name.strip('_ ')
|
|
|
|
# Replace multiple spaces with single space
|
|
name = re.sub(r'\s+', ' ', name)
|
|
|
|
return name.strip()
|
|
|
|
|
|
def find_existing_custodian(custodian_name: str, custodian_dir: Path) -> Optional[Path]:
|
|
"""
|
|
Find existing custodian YAML file by name (case-insensitive).
|
|
"""
|
|
custodian_lower = custodian_name.lower()
|
|
|
|
for custodian_file in sorted(custodian_dir.glob('*.yaml')):
|
|
try:
|
|
with open(custodian_file, 'r', encoding='utf-8') as f:
|
|
data = yaml.safe_load(f)
|
|
if data and data.get('custodian_name', '').lower() == custodian_lower:
|
|
return custodian_file
|
|
except Exception:
|
|
continue
|
|
|
|
return None
|
|
|
|
|
|
def process_single_file(html_path: Path, output_dir: Path, custodian_dir: Path) -> dict:
|
|
"""
|
|
Process a single HTML file.
|
|
|
|
Returns processing result dictionary with all data.
|
|
"""
|
|
# Generate slug
|
|
slug = clean_filename_to_slug(html_path.name)
|
|
|
|
try:
|
|
# Read HTML
|
|
with open(html_path, 'r', encoding='utf-8', errors='replace') as f:
|
|
html_content = f.read()
|
|
|
|
# Extract name from HTML H1
|
|
html_name = extract_institution_name_from_html(html_content)
|
|
|
|
if not html_name:
|
|
# Fallback: extract from filename
|
|
html_name = clean_filename_to_name(html_path.name)
|
|
|
|
# Simple staff extraction using regex
|
|
staff = []
|
|
h1_match = re.search(r'<h1[^>]*>([^<]+)</h1>', html_content, re.IGNORECASE | re.DOTALL)
|
|
if h1_match:
|
|
institution_name = h1_match.group(1).strip()
|
|
else:
|
|
institution_name = html_name
|
|
|
|
# Extract staff cards using regex pattern
|
|
# Look for profile cards with LinkedIn profile URLs
|
|
profile_pattern = r'org-people-profile-card__profile-image-\d+.*?href="(linkedin\.com/in/[^"]+)"'
|
|
|
|
for match in re.finditer(profile_pattern, html_content):
|
|
linkedin_url = match.group(1)
|
|
|
|
# Extract slug from URL
|
|
slug_match = re.search(r'linkedin\.com/in/([^/?]+)', linkedin_url)
|
|
if not slug_match:
|
|
continue
|
|
|
|
linkedin_slug = slug_match.group(1)
|
|
|
|
# Try to find name near the profile URL
|
|
# Look for name in nearby text
|
|
context_start = max(0, match.start() - 200)
|
|
context_end = match.start() + 200
|
|
context = html_content[context_start:context_end]
|
|
|
|
# Try to find name (look for alt text or text in title section)
|
|
alt_match = re.search(r'alt="([^"]+)"', context)
|
|
name = alt_match.group(1) if alt_match else None
|
|
|
|
# If name looks like "is open to work", remove it
|
|
if name:
|
|
for phrase in [' is open to work', ' is hiring']:
|
|
if phrase in name:
|
|
name = name.split(phrase)[0].strip()
|
|
break
|
|
|
|
# Default to "LinkedIn Member" if no name found
|
|
if not name or name.startswith('photo'):
|
|
name = 'LinkedIn Member'
|
|
|
|
# Detect heritage type from context
|
|
headline = ''
|
|
is_heritage = False
|
|
heritage_type = None
|
|
|
|
# Look for job title/headline
|
|
headline_match = re.search(r'artdeco-entity-lockup__subtitle[^>]*>([^<]+)</', context)
|
|
if headline_match:
|
|
headline = headline_match.group(1).strip()
|
|
|
|
# Simple heritage detection keywords
|
|
heritage_keywords = {
|
|
'M': ['museum', 'curator', 'conservator', 'collection', 'archief', 'archive'],
|
|
'L': ['library', 'bibliotheek', 'bibliothek'],
|
|
'A': ['archive', 'archief'],
|
|
'G': ['gallery', 'galerie', 'kunsthal'],
|
|
'R': ['research', 'onderzoek', 'researcher'],
|
|
'E': ['universiteit', 'university', 'educatie', 'teacher', 'professor'],
|
|
'S': ['vereniging', 'society', 'genootschap'],
|
|
}
|
|
|
|
text_lower = (headline + ' ' + name).lower()
|
|
|
|
for type_code, keywords in heritage_keywords.items():
|
|
for kw in keywords:
|
|
if kw in text_lower:
|
|
is_heritage = True
|
|
heritage_type = type_code
|
|
break
|
|
|
|
# Default to museum if it's a heritage institution
|
|
if is_heritage and not heritage_type:
|
|
heritage_type = 'M'
|
|
|
|
staff_entry = {
|
|
'staff_id': f"{slug}_staff_{len(staff):04d}_{linkedin_slug}",
|
|
'name': name,
|
|
'name_type': 'full' if name != 'LinkedIn Member' else 'anonymous',
|
|
'linkedin_slug': linkedin_slug,
|
|
'linkedin_profile_url': f"https://www.linkedin.com/in/{linkedin_slug}",
|
|
'headline': headline,
|
|
'heritage_relevant': is_heritage,
|
|
'heritage_type': heritage_type,
|
|
}
|
|
|
|
staff.append(staff_entry)
|
|
|
|
# Calculate staff analysis
|
|
total_staff = len(staff)
|
|
with_linkedin = sum(1 for s in staff if s.get('linkedin_profile_url'))
|
|
heritage_relevant = sum(1 for s in staff if s.get('heritage_relevant'))
|
|
|
|
heritage_type_counts = Counter([s.get('heritage_type') for s in staff if s.get('heritage_type')])
|
|
|
|
# Build result
|
|
timestamp = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')
|
|
|
|
result = {
|
|
'custodian_metadata': {
|
|
'custodian_name': html_name,
|
|
'custodian_slug': slug,
|
|
'name': institution_name,
|
|
},
|
|
'source_metadata': {
|
|
'source_type': 'linkedin_company_people_page_html',
|
|
'source_file': html_path.name,
|
|
'registered_timestamp': timestamp,
|
|
'registration_method': 'html_parsing_simple_regex',
|
|
'staff_extracted': total_staff,
|
|
},
|
|
'staff': staff,
|
|
'staff_analysis': {
|
|
'total_staff_extracted': total_staff,
|
|
'with_linkedin_url': with_linkedin,
|
|
'heritage_relevant_count': heritage_relevant,
|
|
'staff_by_heritage_type': dict(heritage_type_counts),
|
|
},
|
|
}
|
|
|
|
return {
|
|
'status': 'success',
|
|
'slug': slug,
|
|
'filename': html_path.name,
|
|
'custodian_name': html_name,
|
|
'staff_count': total_staff,
|
|
'result': result,
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
'status': 'error',
|
|
'slug': slug,
|
|
'filename': html_path.name,
|
|
'error': str(e),
|
|
}
|
|
|
|
|
|
def create_custodian_yaml(custodian_name: str, result: dict, custodian_file: Optional[Path], is_new: bool) -> None:
|
|
"""
|
|
Create or update custodian YAML file with staff data.
|
|
"""
|
|
staff_list = result.get('staff', [])
|
|
staff_with_profiles = [s for s in staff_list if s.get('linkedin_profile_url')]
|
|
|
|
if not staff_with_profiles:
|
|
return
|
|
|
|
# Provenance data
|
|
provenance = {
|
|
'source_type': 'linkedin_company_people_page_html',
|
|
'registered_timestamp': result['source_metadata'].get('registered_timestamp', ''),
|
|
'registration_method': 'html_parsing_simple_regex',
|
|
'total_staff_extracted': len(staff_with_profiles),
|
|
}
|
|
|
|
# Staff list with references to entity files
|
|
staff_list_data = []
|
|
for s in staff_with_profiles:
|
|
staff_entry = {
|
|
'staff_id': s.get('staff_id'),
|
|
'person_name': s.get('name'),
|
|
'person_profile_path': f"data/custodian/person/entity/{s.get('linkedin_slug', '')}_*.json",
|
|
'role_title': s.get('headline', ''),
|
|
'heritage_relevant': s.get('heritage_relevant', False),
|
|
'heritage_type': s.get('heritage_type'),
|
|
}
|
|
staff_list_data.append(staff_entry)
|
|
|
|
if is_new:
|
|
# Create new custodian file
|
|
# Determine institution type based on staff heritage analysis
|
|
heritage_types = result['staff_analysis'].get('staff_by_heritage_type', {})
|
|
|
|
if heritage_types:
|
|
most_common = Counter(heritage_types).most_common(1)
|
|
if most_common:
|
|
type_code = most_common[0][0]
|
|
type_map = {
|
|
'M': 'MUSEUM',
|
|
'L': 'LIBRARY',
|
|
'A': 'ARCHIVE',
|
|
'G': 'GALLERY',
|
|
'R': 'RESEARCH_CENTER',
|
|
'E': 'EDUCATION_PROVIDER',
|
|
'S': 'COLLECTING_SOCIETY',
|
|
'D': 'DIGITAL_PLATFORM',
|
|
}
|
|
institution_type = type_map.get(type_code, 'MUSEUM')
|
|
else:
|
|
institution_type = 'MUSEUM'
|
|
|
|
# Generate placeholder GHCID
|
|
placeholder_ghcid = f"NL-XX-XXX-PENDING-{slug.upper()}"
|
|
|
|
custodian_data = {
|
|
'ghcid_current': placeholder_ghcid,
|
|
'custodian_name': custodian_name,
|
|
'institution_type': institution_type,
|
|
'custodian_name': {
|
|
'emic_name': custodian_name,
|
|
'english_name': None,
|
|
'name_verified': True,
|
|
'name_source': 'linkedin_html_h1',
|
|
},
|
|
'staff': {
|
|
'provenance': provenance,
|
|
'staff_list': staff_list_data,
|
|
},
|
|
'provenance': {
|
|
'data_source': 'LINKEDIN_HTML_PEOPLE_PAGE',
|
|
'data_tier': 'TIER_4_INFERRED',
|
|
'extraction_date': datetime.now(timezone.utc).isoformat(),
|
|
'extraction_method': 'Sequential batch processing with HTML H1 name extraction',
|
|
'confidence_score': 0.85,
|
|
'notes': f'Staff extracted from LinkedIn company People page. Location research needed for GHCID. Total staff: {len(staff_with_profiles)}',
|
|
}
|
|
}
|
|
|
|
# Create new file
|
|
with open(custodian_file, 'w', encoding='utf-8') as f:
|
|
yaml.dump(custodian_data, f, allow_unicode=True, default_flow_style=False, sort_keys=False)
|
|
|
|
else:
|
|
# Update existing file
|
|
with open(custodian_file, 'r', encoding='utf-8') as f:
|
|
custodian_data = yaml.safe_load(f) or {}
|
|
|
|
# Update staff section
|
|
custodian_data['staff'] = {
|
|
'provenance': provenance,
|
|
'staff_list': staff_list_data,
|
|
}
|
|
|
|
# Update custodian name
|
|
custodian_data['custodian_name'] = custodian_name
|
|
|
|
# Write back
|
|
with open(custodian_file, 'w', encoding='utf-8') as f:
|
|
yaml.dump(custodian_data, f, allow_unicode=True, default_flow_style=False, sort_keys=False)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description='Simplified LinkedIn batch processing - sequential and reliable'
|
|
)
|
|
parser.add_argument('--input-dir', type=Path, required=True,
|
|
help='Directory containing LinkedIn HTML files')
|
|
parser.add_argument('--output-dir', type=Path, required=True,
|
|
help='Output directory for staff JSON files')
|
|
parser.add_argument('--custodian-dir', type=Path, required=True,
|
|
help='Directory containing custodian YAML files')
|
|
parser.add_argument('--limit', type=int, default=0,
|
|
help='Limit processing to first N files (0 = all)')
|
|
|
|
args = parser.parse_args()
|
|
|
|
if not args.input_dir.exists():
|
|
print(f"Error: Input directory not found: {args.input_dir}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
# Create output directories
|
|
args.output_dir.mkdir(parents=True, exist_ok=True)
|
|
args.custodian_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Get all HTML files
|
|
html_files = sorted(args.input_dir.glob('*.html'))
|
|
|
|
if args.limit > 0:
|
|
html_files = html_files[:args.limit]
|
|
|
|
print(f"Processing {len(html_files)} HTML files sequentially...")
|
|
print(f"Input directory: {args.input_dir}")
|
|
print(f"Output directory: {args.output_dir}")
|
|
print(f"Custodian directory: {args.custodian_dir}")
|
|
print(f"Estimated time: ~{len(html_files)} seconds (~{len(html_files)//60} minutes)")
|
|
|
|
# Statistics
|
|
stats = {
|
|
'total': len(html_files),
|
|
'success': 0,
|
|
'errors': 0,
|
|
'with_staff': 0,
|
|
'total_staff': 0,
|
|
'custodians_created': 0,
|
|
'custodians_updated': 0,
|
|
'name_fixes': 0,
|
|
'empty_staff': 0,
|
|
}
|
|
|
|
# Process files sequentially
|
|
for i, html_path in enumerate(html_files, 1):
|
|
try:
|
|
if i % 100 == 0:
|
|
print(f"Progress: [{i}/{len(html_files)}]", end='\r')
|
|
|
|
result = process_single_file(html_path, args.output_dir, args.custodian_dir)
|
|
|
|
if result['status'] == 'error':
|
|
stats['errors'] += 1
|
|
print(f"Error: {result['filename']}: {result['error']}", file=sys.stderr)
|
|
continue
|
|
|
|
stats['success'] += 1
|
|
staff_count = result.get('staff_count', 0)
|
|
stats['total_staff'] += staff_count
|
|
|
|
if staff_count == 0:
|
|
stats['empty_staff'] += 1
|
|
else:
|
|
stats['with_staff'] += 1
|
|
|
|
# Find or create custodian YAML
|
|
custodian_name = result.get('custodian_name')
|
|
if custodian_name:
|
|
existing_file = find_existing_custodian(custodian_name, args.custodian_dir)
|
|
|
|
if existing_file:
|
|
stats['custodians_updated'] += 1
|
|
# Update existing custodian
|
|
create_custodian_yaml(custodian_name, result['result'], existing_file, is_new=False)
|
|
else:
|
|
stats['custodians_created'] += 1
|
|
# Create new custodian
|
|
custodian_file = args.custodian_dir / f"{result['slug']}.yaml"
|
|
create_custodian_yaml(custodian_name, result['result'], custodian_file, is_new=True)
|
|
|
|
except Exception as e:
|
|
stats['errors'] += 1
|
|
print(f"Error: {html_path.name}: {e}", file=sys.stderr)
|
|
|
|
print(f"\nProcessing complete!")
|
|
|
|
# Print summary
|
|
print("\n" + "=" * 60)
|
|
print("PROCESSING COMPLETE")
|
|
print("=" * 60)
|
|
print(f"\nStatistics:")
|
|
print(f" Total HTML files: {stats['total']}")
|
|
print(f" Successfully processed: {stats['success']}")
|
|
print(f" Errors: {stats['errors']}")
|
|
print(f" Institutions with staff: {stats['with_staff']}")
|
|
print(f" Institutions with empty staff: {stats['empty_staff']}")
|
|
print(f" Total staff extracted: {stats['total_staff']}")
|
|
print(f" Custodians created: {stats['custodians_created']}")
|
|
print(f" Custodians updated: {stats['custodians_updated']}")
|
|
print(f"\nOutput directories:")
|
|
print(f" Staff JSON files: {args.output_dir}")
|
|
print(f" Custodian YAML files: {args.custodian_dir}")
|
|
|
|
# Save processing report
|
|
report = {
|
|
'processing_date': datetime.now(timezone.utc).isoformat(),
|
|
'input_directory': str(args.input_dir),
|
|
'output_directory': str(args.output_dir),
|
|
'custodian_directory': str(args.custodian_dir),
|
|
'statistics': stats,
|
|
}
|
|
|
|
report_file = Path('reports/linkedin_batch_simple_report.json')
|
|
report_file.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(report_file, 'w', encoding='utf-8') as f:
|
|
json.dump(report, f, indent=2, ensure_ascii=False)
|
|
|
|
print(f"\nReport saved to: {report_file}")
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(main())
|