#!/usr/bin/env python3 """ Extract person entities from CH-Annotator web annotations and add them to custodian YAML files. This script: 1. Finds custodian files with web_enrichment.web_archives references 2. Loads corresponding annotation files (annotations_v1.7.0.yaml) 3. Extracts person entities (AGT.PER, AGT.STF) with full provenance 4. Adds web_person_claims section to custodian files Usage: python scripts/extract_person_entities_from_annotations.py [--dry-run] [--limit N] """ import argparse import glob import os import re from datetime import datetime, timezone from pathlib import Path from typing import Any, Optional import yaml # ============================================================================ # FALSE POSITIVE FILTERING FOR AGT.PER ENTITIES # ============================================================================ # The LLM annotator may incorrectly tag groups, organizations, events, and # topic references as AGT.PER (person). This filter provides defense-in-depth # to catch these false positives. # ============================================================================ # Patterns that indicate a false positive (NOT a person) FALSE_POSITIVE_PATTERNS = [ # Group/collective references r'\b(staff|members|curators|colleagues|board|team|committee|participants)\b', r'\b(community|network|consortium|association|society|circle|group)\b', r'\b(visitors|archivists|researchers|filmmakers|historians|professionals)\b', # Conference/event references r'\b(conference|Conference|festival|Festival|congress|Congress|symposium)\b', r'\b(Award|award|Prize|prize|Ceremony|ceremony)\b', r'\b(Il Cinema Ritrovato|IASA|AMIA|FIAF|Le Giornate)\b', r'\b(Women and Silent Screen)\b', # Organization/institution references (require word before or after to avoid matching surnames) r'\b(collection|Collection|archive|Archive|fund|Fund|foundation|Foundation)\b', r'\b(institute|Institute|University|university|Academy|academy)\b', r'\b(museum|Museum|library|Library)\b', # "Center/Centre" only when preceded by organization indicators or followed by "for/of" r'\b(Research|Cultural|Heritage|Community|Art|Science|Information|Documentation|Knowledge)\s+(Center|Centre)\b', r'\b(Center|Centre)\s+(for|of|voor|van)\b', r'\b(ACE member|member institutions)\b', r'\b(Harvard Film Archive|Toonder studio)\b', # Network/platform references r'\b(VPRO|Tegenlicht|network)\b', # Topic/story references (Dutch "Verhalen van X" = "Stories of X") r'\b(Verhalen van|verhalen van|Stories of|stories of)\b', # Generic plural endings that indicate groups r'\b\w+s\s+(members|colleagues|participants|curators|staff)\b', # "X of Y" patterns that typically indicate groups/organizations r'\b(Commission|Committee|Board|Council)\s+(of|for)\b', # Patterns ending with group indicators r'\b(board members|staff members|team members|committee members)\b', r'\b(technical commission|Commission members)\b', # Generic role descriptions (Dutch and English) r'^(een|de|het|a|an|the)\s+(medewerker|staff|employee|curator|director|visitor|koning|koningin|king|queen)\b', r'^de\s+Koning$', # "de Koning" = "the King" in Dutch (not a name) r'^(echtgenote|spouse|wife|husband)$', r'^(schilder|painter|artist|writer)$', r'^(gevluchte|fled|escaped|refugee)\s+', # Specific non-person references r'WEBJONGENS', ] # Patterns for detecting usernames (case-sensitive, applied separately) USERNAME_PATTERNS = [ r'^[a-z][a-z0-9._]+$', # All lowercase with dots, underscores, numbers (like "basvt", "admin", "j.s.a.m.van.koningsbrugge") ] USERNAME_REGEX = [re.compile(p) for p in USERNAME_PATTERNS] # NOT case-insensitive # Compile patterns for efficiency FALSE_POSITIVE_REGEX = [re.compile(p, re.IGNORECASE) for p in FALSE_POSITIVE_PATTERNS] # Minimum name length (single characters or very short strings are suspicious) MIN_NAME_LENGTH = 3 # Maximum word count (very long "names" are likely descriptions, not names) MAX_WORD_COUNT = 8 # Minimum word count for proper names (single first names are often not useful) MIN_WORD_COUNT = 2 # At least first + last name def is_likely_person(name: str) -> tuple[bool, str]: """ Filter out false positive person detections. Args: name: The extracted person name to validate Returns: Tuple of (is_valid, rejection_reason) - is_valid: True if this appears to be a real person name - rejection_reason: Empty string if valid, otherwise reason for rejection """ if not name or not name.strip(): return False, "empty_name" name = name.strip() # Length check if len(name) < MIN_NAME_LENGTH: return False, f"too_short_{len(name)}_chars" # Word count checks word_count = len(name.split()) if word_count > MAX_WORD_COUNT: return False, f"too_many_words_{word_count}" # Single word names are usually not useful (just "John" or "Maria") # Exception: historical figures often referenced by single name (Rembrandt, Vermeer) # Exception: Names with particles (van Gogh) may appear as one "word" after splitting if word_count < MIN_WORD_COUNT: # Allow known single-name historical figures known_single_names = { 'rembrandt', 'vermeer', 'multatuli', 'mucha', 'rietveld', 'dudok', 'mondriaan' } if name.lower() not in known_single_names: return False, f"single_word_name" # Check against false positive patterns for pattern in FALSE_POSITIVE_REGEX: if pattern.search(name): return False, f"pattern_match:{pattern.pattern[:30]}" # Check for username patterns (case-sensitive - usernames are typically all lowercase) for pattern in USERNAME_REGEX: if pattern.match(name): return False, f"username_pattern:{pattern.pattern[:30]}" return True, "" # Custom YAML dumper to preserve formatting class CustomDumper(yaml.SafeDumper): pass def str_representer(dumper, data): if '\n' in data: return dumper.represent_scalar('tag:yaml.org,2002:str', data, style='|') return dumper.represent_scalar('tag:yaml.org,2002:str', data) CustomDumper.add_representer(str, str_representer) def load_yaml(filepath: Path) -> dict: """Load a YAML file.""" with open(filepath, 'r', encoding='utf-8') as f: return yaml.safe_load(f) or {} def save_yaml(filepath: Path, data: dict) -> None: """Save data to a YAML file.""" with open(filepath, 'w', encoding='utf-8') as f: yaml.dump(data, f, Dumper=CustomDumper, allow_unicode=True, default_flow_style=False, sort_keys=False, width=120) def find_annotation_file(web_archive_dir: str, base_path: Path) -> Optional[Path]: """ Find the annotation file for a web archive directory. Args: web_archive_dir: Path like "web/0002/drentsarchief.nl" base_path: Base path (data/custodian/) Returns: Path to annotations file or None """ # web_archive_dir is like "web/0002/drentsarchief.nl" annotation_path = base_path / web_archive_dir / "annotations_v1.7.0.yaml" if annotation_path.exists(): return annotation_path return None def extract_persons_from_annotations(annotation_data: dict, verbose: bool = False) -> tuple[list[dict], list[dict]]: """ Extract person entities from annotation data with false positive filtering. Looks for entities with: - hypernym: AGT - hyponym: AGT.PER (person) or AGT.STF (staff) Applies filtering to remove false positives (groups, organizations, events, etc.) Args: annotation_data: Loaded annotation YAML data verbose: If True, track rejected entities Returns: Tuple of (valid_persons, rejected_entities) - valid_persons: List of person entity dicts with provenance - rejected_entities: List of rejected entities with rejection reasons """ persons = [] rejected = [] session = annotation_data.get('session', {}) claims = session.get('claims', {}) entities = claims.get('entity', []) relationships = claims.get('relationship', []) # Build entity lookup for relationship resolution entity_lookup = {e.get('claim_id'): e for e in entities} # Extract person entities for entity in entities: hypernym = entity.get('hypernym', '') hyponym = entity.get('hyponym', '') # Check if this is a person entity if hypernym == 'AGT' and hyponym in ('AGT.PER', 'AGT.STF'): name = entity.get('text_content', '').strip() # Apply false positive filter is_valid, rejection_reason = is_likely_person(name) if not is_valid: if verbose: rejected.append({ 'name': name, 'entity_type': hyponym, 'rejection_reason': rejection_reason, }) continue person = { 'name': name, 'entity_type': hyponym, 'entity_id': entity.get('claim_id'), 'class_uri': entity.get('class_uri'), 'recognition_confidence': entity.get('recognition_confidence', 0.0), 'provenance': { 'xpath': entity.get('provenance', {}).get('path'), 'timestamp': entity.get('provenance', {}).get('timestamp'), 'agent': entity.get('provenance', {}).get('agent'), 'confidence': entity.get('provenance', {}).get('confidence', 0.0), 'context_convention': entity.get('provenance', {}).get('context_convention'), }, 'relationships': [] } # Find relationships involving this person for rel in relationships: subject = rel.get('subject', {}) obj = rel.get('object', {}) # Check if person is subject or object if subject.get('entity_id') == entity.get('claim_id'): person['relationships'].append({ 'role': 'subject', 'predicate': rel.get('predicate', {}).get('uri'), 'predicate_label': rel.get('predicate', {}).get('label'), 'object': obj.get('span_text'), 'object_type': obj.get('entity_type'), 'confidence': rel.get('extraction_confidence', 0.0), }) elif obj.get('entity_id') == entity.get('claim_id'): person['relationships'].append({ 'role': 'object', 'predicate': rel.get('predicate', {}).get('uri'), 'predicate_label': rel.get('predicate', {}).get('label'), 'subject': subject.get('span_text'), 'subject_type': subject.get('entity_type'), 'confidence': rel.get('extraction_confidence', 0.0), }) persons.append(person) return persons, rejected def process_custodian_file( custodian_path: Path, base_path: Path, dry_run: bool = False, verbose: bool = False ) -> dict: """ Process a single custodian file to extract and add person entities. Args: custodian_path: Path to custodian YAML file base_path: Base path for web archives dry_run: If True, don't write changes verbose: If True, track rejected entities Returns: Dict with processing stats """ stats = { 'file': str(custodian_path.name), 'web_archives_found': 0, 'annotation_files_found': 0, 'persons_extracted': 0, 'persons_rejected': 0, 'rejected_names': [], 'status': 'skipped', 'error': None, } try: custodian_data = load_yaml(custodian_path) except Exception as e: stats['status'] = 'error' stats['error'] = f"Failed to load YAML: {e}" return stats # Check for web_enrichment section web_enrichment = custodian_data.get('web_enrichment', {}) web_archives = web_enrichment.get('web_archives', []) if not web_archives: stats['status'] = 'no_web_archives' return stats stats['web_archives_found'] = len(web_archives) all_persons = [] all_rejected = [] source_annotations = [] for archive in web_archives: archive_dir = archive.get('directory', '') if not archive_dir: continue annotation_path = find_annotation_file(archive_dir, base_path) if not annotation_path: continue stats['annotation_files_found'] += 1 try: annotation_data = load_yaml(annotation_path) except Exception as e: stats['error'] = f"Failed to load annotation: {e}" continue persons, rejected = extract_persons_from_annotations(annotation_data, verbose=verbose) all_rejected.extend(rejected) if persons: # Add source URL to each person source_url = annotation_data.get('source_url', archive.get('url', '')) for person in persons: person['provenance']['source_url'] = source_url all_persons.extend(persons) source_annotations.append(str(annotation_path.relative_to(base_path.parent))) stats['persons_extracted'] = len(all_persons) stats['persons_rejected'] = len(all_rejected) stats['rejected_names'] = [r['name'] for r in all_rejected] if not all_persons: stats['status'] = 'no_persons_found' return stats # Create web_person_claims section web_person_claims = { 'extraction_timestamp': datetime.now(timezone.utc).isoformat(), 'extraction_method': 'ch_annotator_entity_extraction_v2', # v2 includes filtering 'filtering_applied': True, 'source_annotations': source_annotations, 'persons_count': len(all_persons), 'persons_rejected_count': len(all_rejected), 'persons': all_persons, } # Add to custodian data custodian_data['web_person_claims'] = web_person_claims if not dry_run: save_yaml(custodian_path, custodian_data) stats['status'] = 'updated' else: stats['status'] = 'would_update' return stats def find_custodian_files_with_web_archives(custodian_dir: Path) -> list[Path]: """ Find all custodian files that have web_enrichment.web_archives. Args: custodian_dir: Directory containing custodian YAML files Returns: List of paths to custodian files with web archives """ pattern = str(custodian_dir / "NL-*.yaml") files = [] for filepath in glob.glob(pattern): path = Path(filepath) try: with open(path, 'r', encoding='utf-8') as f: # Quick check for web_archives: in file content = f.read() if 'web_archives:' in content: files.append(path) except Exception: continue return sorted(files) def main(): parser = argparse.ArgumentParser( description='Extract person entities from web annotations to custodian files' ) parser.add_argument( '--dry-run', action='store_true', help='Show what would be done without making changes' ) parser.add_argument( '--limit', type=int, default=None, help='Limit number of files to process' ) parser.add_argument( '--custodian-dir', type=Path, default=Path('/Users/kempersc/apps/glam/data/custodian'), help='Directory containing custodian YAML files' ) parser.add_argument( '--verbose', '-v', action='store_true', help='Show detailed output' ) args = parser.parse_args() custodian_dir = args.custodian_dir base_path = custodian_dir # web archives are relative to custodian dir print(f"Scanning for custodian files with web archives in {custodian_dir}...") files = find_custodian_files_with_web_archives(custodian_dir) print(f"Found {len(files)} custodian files with web_archives") if args.limit: files = files[:args.limit] print(f"Limited to {args.limit} files") if args.dry_run: print("\n*** DRY RUN - No changes will be made ***\n") # Process statistics total_processed = 0 total_updated = 0 total_persons = 0 total_rejected = 0 all_rejected_names = [] for filepath in files: stats = process_custodian_file(filepath, base_path, dry_run=args.dry_run, verbose=args.verbose) total_processed += 1 if stats['status'] in ('updated', 'would_update'): total_updated += 1 total_persons += stats['persons_extracted'] total_rejected += stats['persons_rejected'] all_rejected_names.extend(stats['rejected_names']) if args.verbose or stats['persons_extracted'] > 0: msg = f"✓ {stats['file']}: {stats['persons_extracted']} persons" if stats['persons_rejected'] > 0: msg += f" ({stats['persons_rejected']} filtered out)" print(msg) elif args.verbose: if stats['status'] == 'error': print(f"✗ {stats['file']}: {stats['error']}") elif stats['status'] == 'no_persons_found': print(f"○ {stats['file']}: no persons in annotations") # Summary print("\n" + "="*60) print("SUMMARY") print("="*60) print(f"Files processed: {total_processed}") print(f"Files with persons: {total_updated}") print(f"Total persons found: {total_persons}") print(f"Total filtered out: {total_rejected}") if all_rejected_names and args.verbose: print(f"\nFiltered out names (false positives):") for name in sorted(set(all_rejected_names)): print(f" - {name}") if args.dry_run: print("\n*** DRY RUN - No changes were made ***") if __name__ == '__main__': main()