#!/usr/bin/env python3 """ Extract person data from LinkedIn company People HTML files with FULL PROVENANCE. This script follows: - Rule 6: WebObservation Claims MUST Have XPath Provenance - Rule 26: Person Data Provenance - Web Claims for Staff Information - Rule 35: Provenance Statements MUST Have Dual Timestamps For each extracted claim, we record: - claim_type: The type of claim (name, headline, linkedin_url, etc.) - claim_value: The extracted value - source_url: LinkedIn company page URL (derived from filename) - retrieved_on: Timestamp when HTML was saved (from file metadata) - statement_created_at: When the extraction was performed - source_archived_at: When the HTML file was created - xpath: XPath to the element containing this value - html_file: Path to archived HTML file - xpath_match_score: 1.0 for exact matches - retrieval_agent: The agent that performed extraction Usage: python scripts/extract_persons_with_provenance.py [--limit N] [--dry-run] python scripts/extract_persons_with_provenance.py --file "path/to/file.html" Author: OpenCode/Claude Created: 2025-01-09 """ import argparse import hashlib import json import os import re import sys from collections import Counter from datetime import datetime, timezone from html.parser import HTMLParser from pathlib import Path from typing import Any, Dict, List, Optional, Tuple from urllib.parse import unquote # Directory paths MANUAL_DIR = Path("/Volumes/KINGSTON/data/glam/data/custodian/person/affiliated/manual") PERSON_ENTITY_DIR = Path("/Users/kempersc/apps/glam/data/custodian/person/entity") OUTPUT_SUMMARY = Path("/Users/kempersc/apps/glam/data/person/_extraction_summary.json") # Provenance constants RETRIEVAL_AGENT = "extract_persons_with_provenance.py" SCHEMA_VERSION = "1.0.0" # Heritage type detection keywords (from parse_linkedin_html.py) HERITAGE_KEYWORDS = { 'G': ['gallery', 'galerie', 'kunsthal', 'art dealer', 'art gallery'], 'L': ['library', 'bibliotheek', 'bibliothek', 'librarian', 'KB ', 'national library'], 'A': ['archive', 'archief', 'archivist', 'beeld en geluid', 'filmmuseum', 'eye film', 'nationaal archief', 'stadsarchief', 'NIOD', 'IISH'], 'M': ['museum', 'musea', 'curator', 'conservator', 'collection manager', 'rijksmuseum', 'van gogh', 'stedelijk', 'mauritshuis', 'collectie'], 'O': ['ministry', 'ministerie', 'government', 'overheid', 'gemeente', 'OCW'], 'R': ['research', 'onderzoek', 'researcher', 'KNAW', 'humanities cluster', 'NWO'], 'E': ['university', 'universiteit', 'professor', 'lecturer', 'hogeschool', 'academy', 'PhD', 'student', 'education', 'UvA', 'reinwardt'], 'D': ['digital', 'platform', 'software', 'IT ', 'developer', 'data ', 'AI '], } class LinkedInProfileExtractor(HTMLParser): """ Extract LinkedIn profile data from HTML with XPath tracking. Records the XPath location of each extracted value for provenance. """ def __init__(self, html_file_path: str, source_archived_at: str): super().__init__() self.html_file_path = html_file_path self.source_archived_at = source_archived_at # Extracted profiles with claims self.profiles: List[Dict] = [] self.current_profile: Dict = {} self.current_claims: List[Dict] = [] # XPath tracking self.tag_stack: List[Tuple[str, Dict[str, str]]] = [] self.current_xpath: List[str] = [] self.element_counts: Dict[str, int] = {} # State tracking self.in_profile_card = False self.in_title = False self.in_subtitle = False self.in_badge = False self.current_text = "" self.card_index = -1 def _get_current_xpath(self) -> str: """Build current XPath from tag stack.""" if not self.current_xpath: return "/" return "/" + "/".join(self.current_xpath) def _add_claim(self, claim_type: str, claim_value: str, xpath: str) -> None: """Add a web claim with full provenance.""" if not claim_value or not claim_value.strip(): return claim = { "claim_type": claim_type, "claim_value": claim_value.strip(), "source_url": self._derive_source_url(), "retrieved_on": self.source_archived_at, "statement_created_at": datetime.now(timezone.utc).isoformat(), "source_archived_at": self.source_archived_at, "xpath": xpath, "html_file": self.html_file_path, "xpath_match_score": 1.0, "retrieval_agent": RETRIEVAL_AGENT, } self.current_claims.append(claim) def _derive_source_url(self) -> str: """Derive LinkedIn company page URL from filename.""" filename = Path(self.html_file_path).name # Extract institution name from filename name = filename.replace('.html', '') name = re.sub(r'_?People _ LinkedIn$', '', name) name = re.sub(r'^\(\d+\)\s*', '', name) name = re.sub(r'\s+', ' ', name).strip() # Create a plausible LinkedIn company URL slug = re.sub(r'[^a-z0-9-]', '-', name.lower()) slug = re.sub(r'-+', '-', slug).strip('-') return f"https://www.linkedin.com/company/{slug}/people/" def handle_starttag(self, tag: str, attrs: list) -> None: attrs_dict = dict(attrs) # Track XPath key = f"{tag}" if key not in self.element_counts: self.element_counts[key] = 0 self.element_counts[key] += 1 self.current_xpath.append(f"{tag}[{self.element_counts[key]}]") self.tag_stack.append((tag, attrs_dict)) attr_id = attrs_dict.get('id', '') attr_class = attrs_dict.get('class', '') # Detect profile card start if 'org-people-profile-card__profile-image' in attr_id: self.in_profile_card = True match = re.search(r'profile-image-(\d+)', attr_id) if match: new_index = int(match.group(1)) if new_index != self.card_index: # Save previous profile if self.current_profile.get('name'): self.current_profile['web_claims'] = self.current_claims self.profiles.append(self.current_profile) self.current_profile = {} self.current_claims = [] self.card_index = new_index # Extract URL from href href = attrs_dict.get('href', '') if href and 'linkedin.com/in/' in href: slug = self._extract_slug(href) if slug: self.current_profile['linkedin_slug'] = slug self.current_profile['linkedin_profile_url'] = f"https://www.linkedin.com/in/{slug}" self._add_claim('linkedin_url', f"https://www.linkedin.com/in/{slug}", self._get_current_xpath()) # Extract name from img alt if tag == 'img' and self.in_profile_card: alt = attrs_dict.get('alt', '') if alt and alt not in ('', 'photo', 'Profile photo'): # Clean LinkedIn status phrases clean_name = self._clean_status_from_name(alt) if clean_name: self.current_profile['name'] = clean_name self._add_claim('full_name', clean_name, self._get_current_xpath() + "/@alt") # Title section if 'artdeco-entity-lockup__title' in attr_class: self.in_title = True self.current_text = "" # Badge section if 'artdeco-entity-lockup__badge' in attr_class: self.in_badge = True self.current_text = "" # Subtitle section (headline) if 'artdeco-entity-lockup__subtitle' in attr_class: self.in_subtitle = True self.current_text = "" def handle_data(self, data: str) -> None: text = data.strip() if not text: return if self.in_title: self.current_text += " " + text elif self.in_badge: self.current_text += " " + text elif self.in_subtitle: self.current_text += " " + text def handle_endtag(self, tag: str) -> None: if tag == 'div': if self.in_title: text = self.current_text.strip() text = re.sub(r'\s+', ' ', text) if text and 'name' not in self.current_profile: if len(text) > 1 and not text.startswith('View '): clean_name = self._clean_status_from_name(text) self.current_profile['name'] = clean_name self._add_claim('full_name', clean_name, self._get_current_xpath()) if clean_name == 'LinkedIn Member': self.current_profile['is_anonymous'] = True self.in_title = False self.current_text = "" if self.in_badge: text = self.current_text.strip() degree = self._parse_degree(text) if degree: self.current_profile['degree'] = degree self._add_claim('connection_degree', degree, self._get_current_xpath()) self.in_badge = False self.current_text = "" if self.in_subtitle: text = self.current_text.strip() text = re.sub(r'\s+', ' ', text) if text and len(text) > 2: self.current_profile['headline'] = text self._add_claim('headline', text, self._get_current_xpath()) self.in_subtitle = False self.current_text = "" # Pop XPath stack if self.tag_stack and self.tag_stack[-1][0] == tag: self.tag_stack.pop() if self.current_xpath: self.current_xpath.pop() def _extract_slug(self, url: str) -> Optional[str]: """Extract profile slug from URL.""" match = re.search(r'linkedin\.com/in/([^?/]+)', url) return match.group(1) if match else None def _parse_degree(self, text: str) -> Optional[str]: """Parse connection degree from text.""" if '1st' in text: return '1st' if '2nd' in text: return '2nd' if '3rd' in text: return '3rd+' return None def _clean_status_from_name(self, name: str) -> str: """Remove LinkedIn status phrases from name.""" status_phrases = [ ' is open to work', ' is hiring', ' is looking for', ' open to work', ' - Hiring', ' - open to work' ] name_lower = name.lower() for phrase in status_phrases: if phrase.lower() in name_lower: idx = name_lower.find(phrase.lower()) return name[:idx].strip() return name def finalize(self) -> List[Dict]: """Finalize parsing and return all profiles with claims.""" # Save last profile if self.current_profile.get('name'): self.current_profile['web_claims'] = self.current_claims self.profiles.append(self.current_profile) return self.profiles def detect_heritage_type(headline: str) -> Tuple[bool, Optional[str]]: """Detect if a headline is heritage-relevant and what type.""" if not headline: return (False, None) headline_lower = headline.lower() for heritage_type, keywords in HERITAGE_KEYWORDS.items(): for keyword in keywords: if keyword.lower() in headline_lower: return (True, heritage_type) # Generic heritage terms generic = ['heritage', 'erfgoed', 'culture', 'cultuur', 'cultural', 'film', 'media', 'arts', 'kunst', 'preservation', 'collection'] for keyword in generic: if keyword in headline_lower: return (True, None) return (False, None) def create_person_entity(profile: Dict, custodian_name: str, custodian_slug: str, html_file: Path, source_archived_at: str) -> Dict: """ Create a person entity with full provenance following Rule 20 and Rule 26. Returns a complete person entity dict ready to be saved as JSON. """ name = profile.get('name', 'Unknown') headline = profile.get('headline', '') linkedin_slug = profile.get('linkedin_slug', '') # Determine heritage relevance is_heritage, heritage_type = detect_heritage_type(headline) if not headline and custodian_name: # Assume heritage-relevant if associated with a custodian is_heritage = True # Generate person ID timestamp = datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%SZ') if linkedin_slug: person_id = linkedin_slug filename = f"{linkedin_slug}_{timestamp}.json" else: # Generate ID for anonymous profiles name_slug = re.sub(r'[^a-z0-9]+', '_', name.lower())[:30] person_id = f"{custodian_slug}_staff_{name_slug}" filename = f"{person_id}_{timestamp}.json" # Build web_claims with full provenance (Rule 6) web_claims = profile.get('web_claims', []) person_entity = { "person_id": person_id, "extraction_metadata": { "extraction_agent": RETRIEVAL_AGENT, "extraction_date": datetime.now(timezone.utc).isoformat(), "extraction_source": f"LinkedIn company page: {custodian_name}", "source_file": str(html_file.name), "source_archived_at": source_archived_at, "schema_version": SCHEMA_VERSION, }, "profile_data": { "name": name, "linkedin_url": profile.get('linkedin_profile_url'), "headline": headline, "location": None, # Will be extracted from profile if available "connections": None, "about": None, "experience": [], "education": [], "skills": [], "languages": [], "profile_image_url": None, }, "heritage_relevance": { "is_heritage_relevant": is_heritage, "heritage_types": [heritage_type] if heritage_type else [], "rationale": f"Identified as staff at {custodian_name}" if is_heritage else None, }, "affiliations": [ { "custodian_name": custodian_name, "custodian_slug": custodian_slug, "role_title": headline, "affiliation_provenance": { "source": "LinkedIn company people page", "source_url": profile.get('linkedin_profile_url', ''), "retrieved_on": source_archived_at, "retrieval_agent": RETRIEVAL_AGENT, } } ], "web_claims": web_claims, "source_observations": [ { "source_file": str(html_file), "observed_on": source_archived_at, "extraction_agent": RETRIEVAL_AGENT, } ], "linkedin_slug": linkedin_slug if linkedin_slug else None, } return person_entity, filename def get_file_timestamp(filepath: Path) -> str: """Get file modification timestamp as ISO string.""" mtime = filepath.stat().st_mtime return datetime.fromtimestamp(mtime, tz=timezone.utc).isoformat() def extract_institution_name(filename: str) -> str: """Extract institution name from LinkedIn People HTML filename.""" name = Path(filename).name name = name.replace('.html', '') name = re.sub(r'_?People _ LinkedIn$', '', name) name = re.sub(r'^\(\d+\)\s*', '', name) name = re.sub(r'^,\s*', '', name) name = re.sub(r'\s+', ' ', name).strip() name = name.strip('_') return name def generate_slug(name: str) -> str: """Generate URL-friendly slug from institution name.""" slug = name.lower() slug = re.sub(r'[^a-z0-9\s-]', '', slug) slug = re.sub(r'[\s-]+', '-', slug) return slug.strip('-') def process_html_file(html_file: Path, dry_run: bool = False) -> Dict[str, Any]: """ Process a single HTML file and extract all person profiles with provenance. Returns summary of extraction results. """ institution_name = extract_institution_name(html_file.name) if not institution_name or len(institution_name) < 3: return { 'status': 'skipped', 'file': html_file.name, 'reason': f'Invalid institution name: "{institution_name}"' } slug = generate_slug(institution_name) source_archived_at = get_file_timestamp(html_file) # Read and parse HTML try: with open(html_file, 'r', encoding='utf-8', errors='replace') as f: html_content = f.read() except Exception as e: return { 'status': 'error', 'file': html_file.name, 'reason': f'Failed to read file: {e}' } # Extract profiles with XPath tracking extractor = LinkedInProfileExtractor(str(html_file), source_archived_at) try: extractor.feed(html_content) except Exception as e: return { 'status': 'error', 'file': html_file.name, 'reason': f'HTML parsing error: {e}' } profiles = extractor.finalize() # Create person entity files entities_created = 0 heritage_relevant = 0 total_claims = 0 for profile in profiles: entity, filename = create_person_entity( profile, institution_name, slug, html_file, source_archived_at ) if entity['heritage_relevance']['is_heritage_relevant']: heritage_relevant += 1 total_claims += len(entity.get('web_claims', [])) if not dry_run: output_path = PERSON_ENTITY_DIR / filename try: with open(output_path, 'w', encoding='utf-8') as f: json.dump(entity, f, indent=2, ensure_ascii=False) entities_created += 1 except Exception as e: print(f" ERROR saving {filename}: {e}", file=sys.stderr) else: entities_created += 1 return { 'status': 'success', 'file': html_file.name, 'institution_name': institution_name, 'slug': slug, 'profiles_extracted': len(profiles), 'entities_created': entities_created, 'heritage_relevant': heritage_relevant, 'total_web_claims': total_claims, } def main(): parser = argparse.ArgumentParser( description='Extract person data from LinkedIn HTML with full provenance' ) parser.add_argument('--limit', type=int, help='Limit number of files to process') parser.add_argument('--dry-run', action='store_true', help='Do not write files') parser.add_argument('--file', type=Path, help='Process single file') parser.add_argument('--verbose', '-v', action='store_true', help='Verbose output') args = parser.parse_args() # Ensure output directory exists PERSON_ENTITY_DIR.mkdir(parents=True, exist_ok=True) if args.file: # Single file mode if not args.file.exists(): print(f"Error: File not found: {args.file}", file=sys.stderr) return 1 result = process_html_file(args.file, args.dry_run) print(json.dumps(result, indent=2)) return 0 if result['status'] == 'success' else 1 # Batch mode html_files = sorted(MANUAL_DIR.glob("*.html")) if args.limit: html_files = html_files[:args.limit] print("=" * 70) print("LINKEDIN PERSON EXTRACTION WITH PROVENANCE") print("=" * 70) print(f"\nInput directory: {MANUAL_DIR}") print(f"Output directory: {PERSON_ENTITY_DIR}") print(f"Total files to process: {len(html_files)}") print(f"Dry run: {args.dry_run}") print(f"\nStarting at: {datetime.now(timezone.utc).isoformat()}") print() # Statistics stats = { 'total_files': len(html_files), 'processed': 0, 'errors': 0, 'skipped': 0, 'total_profiles': 0, 'total_entities': 0, 'heritage_relevant': 0, 'total_web_claims': 0, 'errors_list': [], } results = [] for i, html_file in enumerate(html_files, 1): result = process_html_file(html_file, args.dry_run) results.append(result) if result['status'] == 'success': stats['processed'] += 1 stats['total_profiles'] += result.get('profiles_extracted', 0) stats['total_entities'] += result.get('entities_created', 0) stats['heritage_relevant'] += result.get('heritage_relevant', 0) stats['total_web_claims'] += result.get('total_web_claims', 0) if args.verbose: print(f"[{i:4d}/{len(html_files)}] OK - {result['institution_name']} " f"({result['profiles_extracted']} profiles, {result['total_web_claims']} claims)") elif result['status'] == 'error': stats['errors'] += 1 stats['errors_list'].append(result) if args.verbose: print(f"[{i:4d}/{len(html_files)}] ERROR - {result['file']}: {result['reason']}") else: stats['skipped'] += 1 # Progress report every 100 files if i % 100 == 0: pct = (i / len(html_files)) * 100 print(f"Progress: {i}/{len(html_files)} ({pct:.1f}%) - " f"{stats['total_entities']} entities, {stats['total_web_claims']} claims") # Final report print() print("=" * 70) print("EXTRACTION COMPLETE") print("=" * 70) print(f"\nTotal files: {stats['total_files']}") print(f"Processed: {stats['processed']}") print(f"Skipped: {stats['skipped']}") print(f"Errors: {stats['errors']}") print() print(f"Total profiles extracted: {stats['total_profiles']}") print(f"Person entities created: {stats['total_entities']}") print(f"Heritage-relevant: {stats['heritage_relevant']}") print(f"Total web claims (with provenance): {stats['total_web_claims']}") print() if stats['errors'] > 0: print("First 10 errors:") for err in stats['errors_list'][:10]: print(f" - {err['file']}: {err.get('reason', 'Unknown')}") # Save summary summary = { 'extraction_timestamp': datetime.now(timezone.utc).isoformat(), 'script': RETRIEVAL_AGENT, 'schema_version': SCHEMA_VERSION, 'dry_run': args.dry_run, 'statistics': stats, 'compliance': { 'rule_6': 'WebObservation Claims MUST Have XPath Provenance', 'rule_26': 'Person Data Provenance - Web Claims for Staff Information', 'rule_35': 'Provenance Statements MUST Have Dual Timestamps', }, } if not args.dry_run: with open(OUTPUT_SUMMARY, 'w', encoding='utf-8') as f: json.dump(summary, f, indent=2, ensure_ascii=False) print(f"\nSummary saved to: {OUTPUT_SUMMARY}") print("=" * 70) return 0 if __name__ == '__main__': sys.exit(main())