525 lines
18 KiB
Python
525 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Extract person entities from CH-Annotator web annotations and add them to custodian YAML files.
|
|
|
|
This script:
|
|
1. Finds custodian files with web_enrichment.web_archives references
|
|
2. Loads corresponding annotation files (annotations_v1.7.0.yaml)
|
|
3. Extracts person entities (AGT.PER, AGT.STF) with full provenance
|
|
4. Adds web_person_claims section to custodian files
|
|
|
|
Usage:
|
|
python scripts/extract_person_entities_from_annotations.py [--dry-run] [--limit N]
|
|
"""
|
|
|
|
import argparse
|
|
import glob
|
|
import os
|
|
import re
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any, Optional
|
|
|
|
import yaml
|
|
|
|
|
|
# ============================================================================
|
|
# FALSE POSITIVE FILTERING FOR AGT.PER ENTITIES
|
|
# ============================================================================
|
|
# The LLM annotator may incorrectly tag groups, organizations, events, and
|
|
# topic references as AGT.PER (person). This filter provides defense-in-depth
|
|
# to catch these false positives.
|
|
# ============================================================================
|
|
|
|
# Patterns that indicate a false positive (NOT a person)
|
|
FALSE_POSITIVE_PATTERNS = [
|
|
# Group/collective references
|
|
r'\b(staff|members|curators|colleagues|board|team|committee|participants)\b',
|
|
r'\b(community|network|consortium|association|society|circle|group)\b',
|
|
r'\b(visitors|archivists|researchers|filmmakers|historians|professionals)\b',
|
|
|
|
# Conference/event references
|
|
r'\b(conference|Conference|festival|Festival|congress|Congress|symposium)\b',
|
|
r'\b(Award|award|Prize|prize|Ceremony|ceremony)\b',
|
|
r'\b(Il Cinema Ritrovato|IASA|AMIA|FIAF|Le Giornate)\b',
|
|
r'\b(Women and Silent Screen)\b',
|
|
|
|
# Organization/institution references (require word before or after to avoid matching surnames)
|
|
r'\b(collection|Collection|archive|Archive|fund|Fund|foundation|Foundation)\b',
|
|
r'\b(institute|Institute|University|university|Academy|academy)\b',
|
|
r'\b(museum|Museum|library|Library)\b',
|
|
# "Center/Centre" only when preceded by organization indicators or followed by "for/of"
|
|
r'\b(Research|Cultural|Heritage|Community|Art|Science|Information|Documentation|Knowledge)\s+(Center|Centre)\b',
|
|
r'\b(Center|Centre)\s+(for|of|voor|van)\b',
|
|
r'\b(ACE member|member institutions)\b',
|
|
r'\b(Harvard Film Archive|Toonder studio)\b',
|
|
|
|
# Network/platform references
|
|
r'\b(VPRO|Tegenlicht|network)\b',
|
|
|
|
# Topic/story references (Dutch "Verhalen van X" = "Stories of X")
|
|
r'\b(Verhalen van|verhalen van|Stories of|stories of)\b',
|
|
|
|
# Generic plural endings that indicate groups
|
|
r'\b\w+s\s+(members|colleagues|participants|curators|staff)\b',
|
|
|
|
# "X of Y" patterns that typically indicate groups/organizations
|
|
r'\b(Commission|Committee|Board|Council)\s+(of|for)\b',
|
|
|
|
# Patterns ending with group indicators
|
|
r'\b(board members|staff members|team members|committee members)\b',
|
|
r'\b(technical commission|Commission members)\b',
|
|
|
|
# Generic role descriptions (Dutch and English)
|
|
r'^(een|de|het|a|an|the)\s+(medewerker|staff|employee|curator|director|visitor|koning|koningin|king|queen)\b',
|
|
r'^de\s+Koning$', # "de Koning" = "the King" in Dutch (not a name)
|
|
r'^(echtgenote|spouse|wife|husband)$',
|
|
r'^(schilder|painter|artist|writer)$',
|
|
r'^(gevluchte|fled|escaped|refugee)\s+',
|
|
|
|
# Specific non-person references
|
|
r'WEBJONGENS',
|
|
]
|
|
|
|
# Patterns for detecting usernames (case-sensitive, applied separately)
|
|
USERNAME_PATTERNS = [
|
|
r'^[a-z][a-z0-9._]+$', # All lowercase with dots, underscores, numbers (like "basvt", "admin", "j.s.a.m.van.koningsbrugge")
|
|
]
|
|
|
|
USERNAME_REGEX = [re.compile(p) for p in USERNAME_PATTERNS] # NOT case-insensitive
|
|
|
|
# Compile patterns for efficiency
|
|
FALSE_POSITIVE_REGEX = [re.compile(p, re.IGNORECASE) for p in FALSE_POSITIVE_PATTERNS]
|
|
|
|
# Minimum name length (single characters or very short strings are suspicious)
|
|
MIN_NAME_LENGTH = 3
|
|
|
|
# Maximum word count (very long "names" are likely descriptions, not names)
|
|
MAX_WORD_COUNT = 8
|
|
|
|
# Minimum word count for proper names (single first names are often not useful)
|
|
MIN_WORD_COUNT = 2 # At least first + last name
|
|
|
|
|
|
def is_likely_person(name: str) -> tuple[bool, str]:
|
|
"""
|
|
Filter out false positive person detections.
|
|
|
|
Args:
|
|
name: The extracted person name to validate
|
|
|
|
Returns:
|
|
Tuple of (is_valid, rejection_reason)
|
|
- is_valid: True if this appears to be a real person name
|
|
- rejection_reason: Empty string if valid, otherwise reason for rejection
|
|
"""
|
|
if not name or not name.strip():
|
|
return False, "empty_name"
|
|
|
|
name = name.strip()
|
|
|
|
# Length check
|
|
if len(name) < MIN_NAME_LENGTH:
|
|
return False, f"too_short_{len(name)}_chars"
|
|
|
|
# Word count checks
|
|
word_count = len(name.split())
|
|
if word_count > MAX_WORD_COUNT:
|
|
return False, f"too_many_words_{word_count}"
|
|
|
|
# Single word names are usually not useful (just "John" or "Maria")
|
|
# Exception: historical figures often referenced by single name (Rembrandt, Vermeer)
|
|
# Exception: Names with particles (van Gogh) may appear as one "word" after splitting
|
|
if word_count < MIN_WORD_COUNT:
|
|
# Allow known single-name historical figures
|
|
known_single_names = {
|
|
'rembrandt', 'vermeer', 'multatuli', 'mucha', 'rietveld', 'dudok', 'mondriaan'
|
|
}
|
|
if name.lower() not in known_single_names:
|
|
return False, f"single_word_name"
|
|
|
|
# Check against false positive patterns
|
|
for pattern in FALSE_POSITIVE_REGEX:
|
|
if pattern.search(name):
|
|
return False, f"pattern_match:{pattern.pattern[:30]}"
|
|
|
|
# Check for username patterns (case-sensitive - usernames are typically all lowercase)
|
|
for pattern in USERNAME_REGEX:
|
|
if pattern.match(name):
|
|
return False, f"username_pattern:{pattern.pattern[:30]}"
|
|
|
|
return True, ""
|
|
|
|
|
|
# Custom YAML dumper to preserve formatting
|
|
class CustomDumper(yaml.SafeDumper):
|
|
pass
|
|
|
|
|
|
def str_representer(dumper, data):
|
|
if '\n' in data:
|
|
return dumper.represent_scalar('tag:yaml.org,2002:str', data, style='|')
|
|
return dumper.represent_scalar('tag:yaml.org,2002:str', data)
|
|
|
|
|
|
CustomDumper.add_representer(str, str_representer)
|
|
|
|
|
|
def load_yaml(filepath: Path) -> dict:
|
|
"""Load a YAML file."""
|
|
with open(filepath, 'r', encoding='utf-8') as f:
|
|
return yaml.safe_load(f) or {}
|
|
|
|
|
|
def save_yaml(filepath: Path, data: dict) -> None:
|
|
"""Save data to a YAML file."""
|
|
with open(filepath, 'w', encoding='utf-8') as f:
|
|
yaml.dump(data, f, Dumper=CustomDumper, allow_unicode=True,
|
|
default_flow_style=False, sort_keys=False, width=120)
|
|
|
|
|
|
def find_annotation_file(web_archive_dir: str, base_path: Path) -> Optional[Path]:
|
|
"""
|
|
Find the annotation file for a web archive directory.
|
|
|
|
Args:
|
|
web_archive_dir: Path like "web/0002/drentsarchief.nl"
|
|
base_path: Base path (data/custodian/)
|
|
|
|
Returns:
|
|
Path to annotations file or None
|
|
"""
|
|
# web_archive_dir is like "web/0002/drentsarchief.nl"
|
|
annotation_path = base_path / web_archive_dir / "annotations_v1.7.0.yaml"
|
|
if annotation_path.exists():
|
|
return annotation_path
|
|
return None
|
|
|
|
|
|
def extract_persons_from_annotations(annotation_data: dict, verbose: bool = False) -> tuple[list[dict], list[dict]]:
|
|
"""
|
|
Extract person entities from annotation data with false positive filtering.
|
|
|
|
Looks for entities with:
|
|
- hypernym: AGT
|
|
- hyponym: AGT.PER (person) or AGT.STF (staff)
|
|
|
|
Applies filtering to remove false positives (groups, organizations, events, etc.)
|
|
|
|
Args:
|
|
annotation_data: Loaded annotation YAML data
|
|
verbose: If True, track rejected entities
|
|
|
|
Returns:
|
|
Tuple of (valid_persons, rejected_entities)
|
|
- valid_persons: List of person entity dicts with provenance
|
|
- rejected_entities: List of rejected entities with rejection reasons
|
|
"""
|
|
persons = []
|
|
rejected = []
|
|
|
|
session = annotation_data.get('session', {})
|
|
claims = session.get('claims', {})
|
|
entities = claims.get('entity', [])
|
|
relationships = claims.get('relationship', [])
|
|
|
|
# Build entity lookup for relationship resolution
|
|
entity_lookup = {e.get('claim_id'): e for e in entities}
|
|
|
|
# Extract person entities
|
|
for entity in entities:
|
|
hypernym = entity.get('hypernym', '')
|
|
hyponym = entity.get('hyponym', '')
|
|
|
|
# Check if this is a person entity
|
|
if hypernym == 'AGT' and hyponym in ('AGT.PER', 'AGT.STF'):
|
|
name = entity.get('text_content', '').strip()
|
|
|
|
# Apply false positive filter
|
|
is_valid, rejection_reason = is_likely_person(name)
|
|
|
|
if not is_valid:
|
|
if verbose:
|
|
rejected.append({
|
|
'name': name,
|
|
'entity_type': hyponym,
|
|
'rejection_reason': rejection_reason,
|
|
})
|
|
continue
|
|
|
|
person = {
|
|
'name': name,
|
|
'entity_type': hyponym,
|
|
'entity_id': entity.get('claim_id'),
|
|
'class_uri': entity.get('class_uri'),
|
|
'recognition_confidence': entity.get('recognition_confidence', 0.0),
|
|
'provenance': {
|
|
'xpath': entity.get('provenance', {}).get('path'),
|
|
'timestamp': entity.get('provenance', {}).get('timestamp'),
|
|
'agent': entity.get('provenance', {}).get('agent'),
|
|
'confidence': entity.get('provenance', {}).get('confidence', 0.0),
|
|
'context_convention': entity.get('provenance', {}).get('context_convention'),
|
|
},
|
|
'relationships': []
|
|
}
|
|
|
|
# Find relationships involving this person
|
|
for rel in relationships:
|
|
subject = rel.get('subject', {})
|
|
obj = rel.get('object', {})
|
|
|
|
# Check if person is subject or object
|
|
if subject.get('entity_id') == entity.get('claim_id'):
|
|
person['relationships'].append({
|
|
'role': 'subject',
|
|
'predicate': rel.get('predicate', {}).get('uri'),
|
|
'predicate_label': rel.get('predicate', {}).get('label'),
|
|
'object': obj.get('span_text'),
|
|
'object_type': obj.get('entity_type'),
|
|
'confidence': rel.get('extraction_confidence', 0.0),
|
|
})
|
|
elif obj.get('entity_id') == entity.get('claim_id'):
|
|
person['relationships'].append({
|
|
'role': 'object',
|
|
'predicate': rel.get('predicate', {}).get('uri'),
|
|
'predicate_label': rel.get('predicate', {}).get('label'),
|
|
'subject': subject.get('span_text'),
|
|
'subject_type': subject.get('entity_type'),
|
|
'confidence': rel.get('extraction_confidence', 0.0),
|
|
})
|
|
|
|
persons.append(person)
|
|
|
|
return persons, rejected
|
|
|
|
|
|
def process_custodian_file(
|
|
custodian_path: Path,
|
|
base_path: Path,
|
|
dry_run: bool = False,
|
|
verbose: bool = False
|
|
) -> dict:
|
|
"""
|
|
Process a single custodian file to extract and add person entities.
|
|
|
|
Args:
|
|
custodian_path: Path to custodian YAML file
|
|
base_path: Base path for web archives
|
|
dry_run: If True, don't write changes
|
|
verbose: If True, track rejected entities
|
|
|
|
Returns:
|
|
Dict with processing stats
|
|
"""
|
|
stats = {
|
|
'file': str(custodian_path.name),
|
|
'web_archives_found': 0,
|
|
'annotation_files_found': 0,
|
|
'persons_extracted': 0,
|
|
'persons_rejected': 0,
|
|
'rejected_names': [],
|
|
'status': 'skipped',
|
|
'error': None,
|
|
}
|
|
|
|
try:
|
|
custodian_data = load_yaml(custodian_path)
|
|
except Exception as e:
|
|
stats['status'] = 'error'
|
|
stats['error'] = f"Failed to load YAML: {e}"
|
|
return stats
|
|
|
|
# Check for web_enrichment section
|
|
web_enrichment = custodian_data.get('web_enrichment', {})
|
|
web_archives = web_enrichment.get('web_archives', [])
|
|
|
|
if not web_archives:
|
|
stats['status'] = 'no_web_archives'
|
|
return stats
|
|
|
|
stats['web_archives_found'] = len(web_archives)
|
|
|
|
all_persons = []
|
|
all_rejected = []
|
|
source_annotations = []
|
|
|
|
for archive in web_archives:
|
|
archive_dir = archive.get('directory', '')
|
|
if not archive_dir:
|
|
continue
|
|
|
|
annotation_path = find_annotation_file(archive_dir, base_path)
|
|
if not annotation_path:
|
|
continue
|
|
|
|
stats['annotation_files_found'] += 1
|
|
|
|
try:
|
|
annotation_data = load_yaml(annotation_path)
|
|
except Exception as e:
|
|
stats['error'] = f"Failed to load annotation: {e}"
|
|
continue
|
|
|
|
persons, rejected = extract_persons_from_annotations(annotation_data, verbose=verbose)
|
|
all_rejected.extend(rejected)
|
|
|
|
if persons:
|
|
# Add source URL to each person
|
|
source_url = annotation_data.get('source_url', archive.get('url', ''))
|
|
for person in persons:
|
|
person['provenance']['source_url'] = source_url
|
|
|
|
all_persons.extend(persons)
|
|
source_annotations.append(str(annotation_path.relative_to(base_path.parent)))
|
|
|
|
stats['persons_extracted'] = len(all_persons)
|
|
stats['persons_rejected'] = len(all_rejected)
|
|
stats['rejected_names'] = [r['name'] for r in all_rejected]
|
|
|
|
if not all_persons:
|
|
stats['status'] = 'no_persons_found'
|
|
return stats
|
|
|
|
# Create web_person_claims section
|
|
web_person_claims = {
|
|
'extraction_timestamp': datetime.now(timezone.utc).isoformat(),
|
|
'extraction_method': 'ch_annotator_entity_extraction_v2', # v2 includes filtering
|
|
'filtering_applied': True,
|
|
'source_annotations': source_annotations,
|
|
'persons_count': len(all_persons),
|
|
'persons_rejected_count': len(all_rejected),
|
|
'persons': all_persons,
|
|
}
|
|
|
|
# Add to custodian data
|
|
custodian_data['web_person_claims'] = web_person_claims
|
|
|
|
if not dry_run:
|
|
save_yaml(custodian_path, custodian_data)
|
|
stats['status'] = 'updated'
|
|
else:
|
|
stats['status'] = 'would_update'
|
|
|
|
return stats
|
|
|
|
|
|
def find_custodian_files_with_web_archives(custodian_dir: Path) -> list[Path]:
|
|
"""
|
|
Find all custodian files that have web_enrichment.web_archives.
|
|
|
|
Args:
|
|
custodian_dir: Directory containing custodian YAML files
|
|
|
|
Returns:
|
|
List of paths to custodian files with web archives
|
|
"""
|
|
pattern = str(custodian_dir / "NL-*.yaml")
|
|
files = []
|
|
|
|
for filepath in glob.glob(pattern):
|
|
path = Path(filepath)
|
|
try:
|
|
with open(path, 'r', encoding='utf-8') as f:
|
|
# Quick check for web_archives: in file
|
|
content = f.read()
|
|
if 'web_archives:' in content:
|
|
files.append(path)
|
|
except Exception:
|
|
continue
|
|
|
|
return sorted(files)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description='Extract person entities from web annotations to custodian files'
|
|
)
|
|
parser.add_argument(
|
|
'--dry-run',
|
|
action='store_true',
|
|
help='Show what would be done without making changes'
|
|
)
|
|
parser.add_argument(
|
|
'--limit',
|
|
type=int,
|
|
default=None,
|
|
help='Limit number of files to process'
|
|
)
|
|
parser.add_argument(
|
|
'--custodian-dir',
|
|
type=Path,
|
|
default=Path('/Users/kempersc/apps/glam/data/custodian'),
|
|
help='Directory containing custodian YAML files'
|
|
)
|
|
parser.add_argument(
|
|
'--verbose',
|
|
'-v',
|
|
action='store_true',
|
|
help='Show detailed output'
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
custodian_dir = args.custodian_dir
|
|
base_path = custodian_dir # web archives are relative to custodian dir
|
|
|
|
print(f"Scanning for custodian files with web archives in {custodian_dir}...")
|
|
files = find_custodian_files_with_web_archives(custodian_dir)
|
|
print(f"Found {len(files)} custodian files with web_archives")
|
|
|
|
if args.limit:
|
|
files = files[:args.limit]
|
|
print(f"Limited to {args.limit} files")
|
|
|
|
if args.dry_run:
|
|
print("\n*** DRY RUN - No changes will be made ***\n")
|
|
|
|
# Process statistics
|
|
total_processed = 0
|
|
total_updated = 0
|
|
total_persons = 0
|
|
total_rejected = 0
|
|
all_rejected_names = []
|
|
|
|
for filepath in files:
|
|
stats = process_custodian_file(filepath, base_path, dry_run=args.dry_run, verbose=args.verbose)
|
|
total_processed += 1
|
|
|
|
if stats['status'] in ('updated', 'would_update'):
|
|
total_updated += 1
|
|
total_persons += stats['persons_extracted']
|
|
total_rejected += stats['persons_rejected']
|
|
all_rejected_names.extend(stats['rejected_names'])
|
|
|
|
if args.verbose or stats['persons_extracted'] > 0:
|
|
msg = f"✓ {stats['file']}: {stats['persons_extracted']} persons"
|
|
if stats['persons_rejected'] > 0:
|
|
msg += f" ({stats['persons_rejected']} filtered out)"
|
|
print(msg)
|
|
|
|
elif args.verbose:
|
|
if stats['status'] == 'error':
|
|
print(f"✗ {stats['file']}: {stats['error']}")
|
|
elif stats['status'] == 'no_persons_found':
|
|
print(f"○ {stats['file']}: no persons in annotations")
|
|
|
|
# Summary
|
|
print("\n" + "="*60)
|
|
print("SUMMARY")
|
|
print("="*60)
|
|
print(f"Files processed: {total_processed}")
|
|
print(f"Files with persons: {total_updated}")
|
|
print(f"Total persons found: {total_persons}")
|
|
print(f"Total filtered out: {total_rejected}")
|
|
|
|
if all_rejected_names and args.verbose:
|
|
print(f"\nFiltered out names (false positives):")
|
|
for name in sorted(set(all_rejected_names)):
|
|
print(f" - {name}")
|
|
|
|
if args.dry_run:
|
|
print("\n*** DRY RUN - No changes were made ***")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|