glam/scripts/extract_persons_with_provenance.py
kempersc 0845d9f30e feat(scripts): add person enrichment and slot mapping utilities
Person Enrichment Scripts:
- enrich_person_comprehensive.py: Full-featured web search enrichment via Linkup
  with Rule 6/21/26/34/35 compliance (dual timestamps, no fabrication)
- enrich_ppids_linkup.py: Batch PPID enrichment pipeline
- extract_persons_with_provenance.py: Extract person data from LinkedIn HTML
  with XPath provenance tracking

LinkML Slot Management:
- update_slot_mappings.py: Update slots for RiC-O naming (Rule 39) and
  semantic URI requirements (Rule 38)
- update_class_slot_references.py: Update class files referencing renamed slots
- validate_slot_mappings.py: Validate slot definitions against ontology rules

All scripts follow established project conventions for provenance and
ontology alignment.
2026-01-10 13:32:32 +01:00

630 lines
23 KiB
Python

#!/usr/bin/env python3
"""
Extract person data from LinkedIn company People HTML files with FULL PROVENANCE.
This script follows:
- Rule 6: WebObservation Claims MUST Have XPath Provenance
- Rule 26: Person Data Provenance - Web Claims for Staff Information
- Rule 35: Provenance Statements MUST Have Dual Timestamps
For each extracted claim, we record:
- claim_type: The type of claim (name, headline, linkedin_url, etc.)
- claim_value: The extracted value
- source_url: LinkedIn company page URL (derived from filename)
- retrieved_on: Timestamp when HTML was saved (from file metadata)
- statement_created_at: When the extraction was performed
- source_archived_at: When the HTML file was created
- xpath: XPath to the element containing this value
- html_file: Path to archived HTML file
- xpath_match_score: 1.0 for exact matches
- retrieval_agent: The agent that performed extraction
Usage:
python scripts/extract_persons_with_provenance.py [--limit N] [--dry-run]
python scripts/extract_persons_with_provenance.py --file "path/to/file.html"
Author: OpenCode/Claude
Created: 2025-01-09
"""
import argparse
import hashlib
import json
import os
import re
import sys
from collections import Counter
from datetime import datetime, timezone
from html.parser import HTMLParser
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from urllib.parse import unquote
# Directory paths
MANUAL_DIR = Path("/Volumes/KINGSTON/data/glam/data/custodian/person/affiliated/manual")
PERSON_ENTITY_DIR = Path("/Users/kempersc/apps/glam/data/custodian/person/entity")
OUTPUT_SUMMARY = Path("/Users/kempersc/apps/glam/data/person/_extraction_summary.json")
# Provenance constants
RETRIEVAL_AGENT = "extract_persons_with_provenance.py"
SCHEMA_VERSION = "1.0.0"
# Heritage type detection keywords (from parse_linkedin_html.py)
HERITAGE_KEYWORDS = {
'G': ['gallery', 'galerie', 'kunsthal', 'art dealer', 'art gallery'],
'L': ['library', 'bibliotheek', 'bibliothek', 'librarian', 'KB ', 'national library'],
'A': ['archive', 'archief', 'archivist', 'beeld en geluid', 'filmmuseum', 'eye film',
'nationaal archief', 'stadsarchief', 'NIOD', 'IISH'],
'M': ['museum', 'musea', 'curator', 'conservator', 'collection manager', 'rijksmuseum',
'van gogh', 'stedelijk', 'mauritshuis', 'collectie'],
'O': ['ministry', 'ministerie', 'government', 'overheid', 'gemeente', 'OCW'],
'R': ['research', 'onderzoek', 'researcher', 'KNAW', 'humanities cluster', 'NWO'],
'E': ['university', 'universiteit', 'professor', 'lecturer', 'hogeschool', 'academy',
'PhD', 'student', 'education', 'UvA', 'reinwardt'],
'D': ['digital', 'platform', 'software', 'IT ', 'developer', 'data ', 'AI '],
}
class LinkedInProfileExtractor(HTMLParser):
"""
Extract LinkedIn profile data from HTML with XPath tracking.
Records the XPath location of each extracted value for provenance.
"""
def __init__(self, html_file_path: str, source_archived_at: str):
super().__init__()
self.html_file_path = html_file_path
self.source_archived_at = source_archived_at
# Extracted profiles with claims
self.profiles: List[Dict] = []
self.current_profile: Dict = {}
self.current_claims: List[Dict] = []
# XPath tracking
self.tag_stack: List[Tuple[str, Dict[str, str]]] = []
self.current_xpath: List[str] = []
self.element_counts: Dict[str, int] = {}
# State tracking
self.in_profile_card = False
self.in_title = False
self.in_subtitle = False
self.in_badge = False
self.current_text = ""
self.card_index = -1
def _get_current_xpath(self) -> str:
"""Build current XPath from tag stack."""
if not self.current_xpath:
return "/"
return "/" + "/".join(self.current_xpath)
def _add_claim(self, claim_type: str, claim_value: str, xpath: str) -> None:
"""Add a web claim with full provenance."""
if not claim_value or not claim_value.strip():
return
claim = {
"claim_type": claim_type,
"claim_value": claim_value.strip(),
"source_url": self._derive_source_url(),
"retrieved_on": self.source_archived_at,
"statement_created_at": datetime.now(timezone.utc).isoformat(),
"source_archived_at": self.source_archived_at,
"xpath": xpath,
"html_file": self.html_file_path,
"xpath_match_score": 1.0,
"retrieval_agent": RETRIEVAL_AGENT,
}
self.current_claims.append(claim)
def _derive_source_url(self) -> str:
"""Derive LinkedIn company page URL from filename."""
filename = Path(self.html_file_path).name
# Extract institution name from filename
name = filename.replace('.html', '')
name = re.sub(r'_?People _ LinkedIn$', '', name)
name = re.sub(r'^\(\d+\)\s*', '', name)
name = re.sub(r'\s+', ' ', name).strip()
# Create a plausible LinkedIn company URL
slug = re.sub(r'[^a-z0-9-]', '-', name.lower())
slug = re.sub(r'-+', '-', slug).strip('-')
return f"https://www.linkedin.com/company/{slug}/people/"
def handle_starttag(self, tag: str, attrs: list) -> None:
attrs_dict = dict(attrs)
# Track XPath
key = f"{tag}"
if key not in self.element_counts:
self.element_counts[key] = 0
self.element_counts[key] += 1
self.current_xpath.append(f"{tag}[{self.element_counts[key]}]")
self.tag_stack.append((tag, attrs_dict))
attr_id = attrs_dict.get('id', '')
attr_class = attrs_dict.get('class', '')
# Detect profile card start
if 'org-people-profile-card__profile-image' in attr_id:
self.in_profile_card = True
match = re.search(r'profile-image-(\d+)', attr_id)
if match:
new_index = int(match.group(1))
if new_index != self.card_index:
# Save previous profile
if self.current_profile.get('name'):
self.current_profile['web_claims'] = self.current_claims
self.profiles.append(self.current_profile)
self.current_profile = {}
self.current_claims = []
self.card_index = new_index
# Extract URL from href
href = attrs_dict.get('href', '')
if href and 'linkedin.com/in/' in href:
slug = self._extract_slug(href)
if slug:
self.current_profile['linkedin_slug'] = slug
self.current_profile['linkedin_profile_url'] = f"https://www.linkedin.com/in/{slug}"
self._add_claim('linkedin_url', f"https://www.linkedin.com/in/{slug}",
self._get_current_xpath())
# Extract name from img alt
if tag == 'img' and self.in_profile_card:
alt = attrs_dict.get('alt', '')
if alt and alt not in ('', 'photo', 'Profile photo'):
# Clean LinkedIn status phrases
clean_name = self._clean_status_from_name(alt)
if clean_name:
self.current_profile['name'] = clean_name
self._add_claim('full_name', clean_name, self._get_current_xpath() + "/@alt")
# Title section
if 'artdeco-entity-lockup__title' in attr_class:
self.in_title = True
self.current_text = ""
# Badge section
if 'artdeco-entity-lockup__badge' in attr_class:
self.in_badge = True
self.current_text = ""
# Subtitle section (headline)
if 'artdeco-entity-lockup__subtitle' in attr_class:
self.in_subtitle = True
self.current_text = ""
def handle_data(self, data: str) -> None:
text = data.strip()
if not text:
return
if self.in_title:
self.current_text += " " + text
elif self.in_badge:
self.current_text += " " + text
elif self.in_subtitle:
self.current_text += " " + text
def handle_endtag(self, tag: str) -> None:
if tag == 'div':
if self.in_title:
text = self.current_text.strip()
text = re.sub(r'\s+', ' ', text)
if text and 'name' not in self.current_profile:
if len(text) > 1 and not text.startswith('View '):
clean_name = self._clean_status_from_name(text)
self.current_profile['name'] = clean_name
self._add_claim('full_name', clean_name, self._get_current_xpath())
if clean_name == 'LinkedIn Member':
self.current_profile['is_anonymous'] = True
self.in_title = False
self.current_text = ""
if self.in_badge:
text = self.current_text.strip()
degree = self._parse_degree(text)
if degree:
self.current_profile['degree'] = degree
self._add_claim('connection_degree', degree, self._get_current_xpath())
self.in_badge = False
self.current_text = ""
if self.in_subtitle:
text = self.current_text.strip()
text = re.sub(r'\s+', ' ', text)
if text and len(text) > 2:
self.current_profile['headline'] = text
self._add_claim('headline', text, self._get_current_xpath())
self.in_subtitle = False
self.current_text = ""
# Pop XPath stack
if self.tag_stack and self.tag_stack[-1][0] == tag:
self.tag_stack.pop()
if self.current_xpath:
self.current_xpath.pop()
def _extract_slug(self, url: str) -> Optional[str]:
"""Extract profile slug from URL."""
match = re.search(r'linkedin\.com/in/([^?/]+)', url)
return match.group(1) if match else None
def _parse_degree(self, text: str) -> Optional[str]:
"""Parse connection degree from text."""
if '1st' in text:
return '1st'
if '2nd' in text:
return '2nd'
if '3rd' in text:
return '3rd+'
return None
def _clean_status_from_name(self, name: str) -> str:
"""Remove LinkedIn status phrases from name."""
status_phrases = [
' is open to work', ' is hiring', ' is looking for',
' open to work', ' - Hiring', ' - open to work'
]
name_lower = name.lower()
for phrase in status_phrases:
if phrase.lower() in name_lower:
idx = name_lower.find(phrase.lower())
return name[:idx].strip()
return name
def finalize(self) -> List[Dict]:
"""Finalize parsing and return all profiles with claims."""
# Save last profile
if self.current_profile.get('name'):
self.current_profile['web_claims'] = self.current_claims
self.profiles.append(self.current_profile)
return self.profiles
def detect_heritage_type(headline: str) -> Tuple[bool, Optional[str]]:
"""Detect if a headline is heritage-relevant and what type."""
if not headline:
return (False, None)
headline_lower = headline.lower()
for heritage_type, keywords in HERITAGE_KEYWORDS.items():
for keyword in keywords:
if keyword.lower() in headline_lower:
return (True, heritage_type)
# Generic heritage terms
generic = ['heritage', 'erfgoed', 'culture', 'cultuur', 'cultural', 'film',
'media', 'arts', 'kunst', 'preservation', 'collection']
for keyword in generic:
if keyword in headline_lower:
return (True, None)
return (False, None)
def create_person_entity(profile: Dict, custodian_name: str, custodian_slug: str,
html_file: Path, source_archived_at: str) -> Dict:
"""
Create a person entity with full provenance following Rule 20 and Rule 26.
Returns a complete person entity dict ready to be saved as JSON.
"""
name = profile.get('name', 'Unknown')
headline = profile.get('headline', '')
linkedin_slug = profile.get('linkedin_slug', '')
# Determine heritage relevance
is_heritage, heritage_type = detect_heritage_type(headline)
if not headline and custodian_name:
# Assume heritage-relevant if associated with a custodian
is_heritage = True
# Generate person ID
timestamp = datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%SZ')
if linkedin_slug:
person_id = linkedin_slug
filename = f"{linkedin_slug}_{timestamp}.json"
else:
# Generate ID for anonymous profiles
name_slug = re.sub(r'[^a-z0-9]+', '_', name.lower())[:30]
person_id = f"{custodian_slug}_staff_{name_slug}"
filename = f"{person_id}_{timestamp}.json"
# Build web_claims with full provenance (Rule 6)
web_claims = profile.get('web_claims', [])
person_entity = {
"person_id": person_id,
"extraction_metadata": {
"extraction_agent": RETRIEVAL_AGENT,
"extraction_date": datetime.now(timezone.utc).isoformat(),
"extraction_source": f"LinkedIn company page: {custodian_name}",
"source_file": str(html_file.name),
"source_archived_at": source_archived_at,
"schema_version": SCHEMA_VERSION,
},
"profile_data": {
"name": name,
"linkedin_url": profile.get('linkedin_profile_url'),
"headline": headline,
"location": None, # Will be extracted from profile if available
"connections": None,
"about": None,
"experience": [],
"education": [],
"skills": [],
"languages": [],
"profile_image_url": None,
},
"heritage_relevance": {
"is_heritage_relevant": is_heritage,
"heritage_types": [heritage_type] if heritage_type else [],
"rationale": f"Identified as staff at {custodian_name}" if is_heritage else None,
},
"affiliations": [
{
"custodian_name": custodian_name,
"custodian_slug": custodian_slug,
"role_title": headline,
"affiliation_provenance": {
"source": "LinkedIn company people page",
"source_url": profile.get('linkedin_profile_url', ''),
"retrieved_on": source_archived_at,
"retrieval_agent": RETRIEVAL_AGENT,
}
}
],
"web_claims": web_claims,
"source_observations": [
{
"source_file": str(html_file),
"observed_on": source_archived_at,
"extraction_agent": RETRIEVAL_AGENT,
}
],
"linkedin_slug": linkedin_slug if linkedin_slug else None,
}
return person_entity, filename
def get_file_timestamp(filepath: Path) -> str:
"""Get file modification timestamp as ISO string."""
mtime = filepath.stat().st_mtime
return datetime.fromtimestamp(mtime, tz=timezone.utc).isoformat()
def extract_institution_name(filename: str) -> str:
"""Extract institution name from LinkedIn People HTML filename."""
name = Path(filename).name
name = name.replace('.html', '')
name = re.sub(r'_?People _ LinkedIn$', '', name)
name = re.sub(r'^\(\d+\)\s*', '', name)
name = re.sub(r'^,\s*', '', name)
name = re.sub(r'\s+', ' ', name).strip()
name = name.strip('_')
return name
def generate_slug(name: str) -> str:
"""Generate URL-friendly slug from institution name."""
slug = name.lower()
slug = re.sub(r'[^a-z0-9\s-]', '', slug)
slug = re.sub(r'[\s-]+', '-', slug)
return slug.strip('-')
def process_html_file(html_file: Path, dry_run: bool = False) -> Dict[str, Any]:
"""
Process a single HTML file and extract all person profiles with provenance.
Returns summary of extraction results.
"""
institution_name = extract_institution_name(html_file.name)
if not institution_name or len(institution_name) < 3:
return {
'status': 'skipped',
'file': html_file.name,
'reason': f'Invalid institution name: "{institution_name}"'
}
slug = generate_slug(institution_name)
source_archived_at = get_file_timestamp(html_file)
# Read and parse HTML
try:
with open(html_file, 'r', encoding='utf-8', errors='replace') as f:
html_content = f.read()
except Exception as e:
return {
'status': 'error',
'file': html_file.name,
'reason': f'Failed to read file: {e}'
}
# Extract profiles with XPath tracking
extractor = LinkedInProfileExtractor(str(html_file), source_archived_at)
try:
extractor.feed(html_content)
except Exception as e:
return {
'status': 'error',
'file': html_file.name,
'reason': f'HTML parsing error: {e}'
}
profiles = extractor.finalize()
# Create person entity files
entities_created = 0
heritage_relevant = 0
total_claims = 0
for profile in profiles:
entity, filename = create_person_entity(
profile, institution_name, slug, html_file, source_archived_at
)
if entity['heritage_relevance']['is_heritage_relevant']:
heritage_relevant += 1
total_claims += len(entity.get('web_claims', []))
if not dry_run:
output_path = PERSON_ENTITY_DIR / filename
try:
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(entity, f, indent=2, ensure_ascii=False)
entities_created += 1
except Exception as e:
print(f" ERROR saving {filename}: {e}", file=sys.stderr)
else:
entities_created += 1
return {
'status': 'success',
'file': html_file.name,
'institution_name': institution_name,
'slug': slug,
'profiles_extracted': len(profiles),
'entities_created': entities_created,
'heritage_relevant': heritage_relevant,
'total_web_claims': total_claims,
}
def main():
parser = argparse.ArgumentParser(
description='Extract person data from LinkedIn HTML with full provenance'
)
parser.add_argument('--limit', type=int, help='Limit number of files to process')
parser.add_argument('--dry-run', action='store_true', help='Do not write files')
parser.add_argument('--file', type=Path, help='Process single file')
parser.add_argument('--verbose', '-v', action='store_true', help='Verbose output')
args = parser.parse_args()
# Ensure output directory exists
PERSON_ENTITY_DIR.mkdir(parents=True, exist_ok=True)
if args.file:
# Single file mode
if not args.file.exists():
print(f"Error: File not found: {args.file}", file=sys.stderr)
return 1
result = process_html_file(args.file, args.dry_run)
print(json.dumps(result, indent=2))
return 0 if result['status'] == 'success' else 1
# Batch mode
html_files = sorted(MANUAL_DIR.glob("*.html"))
if args.limit:
html_files = html_files[:args.limit]
print("=" * 70)
print("LINKEDIN PERSON EXTRACTION WITH PROVENANCE")
print("=" * 70)
print(f"\nInput directory: {MANUAL_DIR}")
print(f"Output directory: {PERSON_ENTITY_DIR}")
print(f"Total files to process: {len(html_files)}")
print(f"Dry run: {args.dry_run}")
print(f"\nStarting at: {datetime.now(timezone.utc).isoformat()}")
print()
# Statistics
stats = {
'total_files': len(html_files),
'processed': 0,
'errors': 0,
'skipped': 0,
'total_profiles': 0,
'total_entities': 0,
'heritage_relevant': 0,
'total_web_claims': 0,
'errors_list': [],
}
results = []
for i, html_file in enumerate(html_files, 1):
result = process_html_file(html_file, args.dry_run)
results.append(result)
if result['status'] == 'success':
stats['processed'] += 1
stats['total_profiles'] += result.get('profiles_extracted', 0)
stats['total_entities'] += result.get('entities_created', 0)
stats['heritage_relevant'] += result.get('heritage_relevant', 0)
stats['total_web_claims'] += result.get('total_web_claims', 0)
if args.verbose:
print(f"[{i:4d}/{len(html_files)}] OK - {result['institution_name']} "
f"({result['profiles_extracted']} profiles, {result['total_web_claims']} claims)")
elif result['status'] == 'error':
stats['errors'] += 1
stats['errors_list'].append(result)
if args.verbose:
print(f"[{i:4d}/{len(html_files)}] ERROR - {result['file']}: {result['reason']}")
else:
stats['skipped'] += 1
# Progress report every 100 files
if i % 100 == 0:
pct = (i / len(html_files)) * 100
print(f"Progress: {i}/{len(html_files)} ({pct:.1f}%) - "
f"{stats['total_entities']} entities, {stats['total_web_claims']} claims")
# Final report
print()
print("=" * 70)
print("EXTRACTION COMPLETE")
print("=" * 70)
print(f"\nTotal files: {stats['total_files']}")
print(f"Processed: {stats['processed']}")
print(f"Skipped: {stats['skipped']}")
print(f"Errors: {stats['errors']}")
print()
print(f"Total profiles extracted: {stats['total_profiles']}")
print(f"Person entities created: {stats['total_entities']}")
print(f"Heritage-relevant: {stats['heritage_relevant']}")
print(f"Total web claims (with provenance): {stats['total_web_claims']}")
print()
if stats['errors'] > 0:
print("First 10 errors:")
for err in stats['errors_list'][:10]:
print(f" - {err['file']}: {err.get('reason', 'Unknown')}")
# Save summary
summary = {
'extraction_timestamp': datetime.now(timezone.utc).isoformat(),
'script': RETRIEVAL_AGENT,
'schema_version': SCHEMA_VERSION,
'dry_run': args.dry_run,
'statistics': stats,
'compliance': {
'rule_6': 'WebObservation Claims MUST Have XPath Provenance',
'rule_26': 'Person Data Provenance - Web Claims for Staff Information',
'rule_35': 'Provenance Statements MUST Have Dual Timestamps',
},
}
if not args.dry_run:
with open(OUTPUT_SUMMARY, 'w', encoding='utf-8') as f:
json.dump(summary, f, indent=2, ensure_ascii=False)
print(f"\nSummary saved to: {OUTPUT_SUMMARY}")
print("=" * 70)
return 0
if __name__ == '__main__':
sys.exit(main())