#!/usr/bin/env python3 """ Hybrid Entity Extraction: LLM Annotations + Pattern Validation + Layout Scoring This script combines three sources of entity knowledge: 1. LLM-extracted annotations (annotations_v1.7.0.yaml) - entities with XPath provenance 2. Pattern-based validation (dutch_web_patterns.yaml) - regex patterns for entity types 3. Layout hints (from dutch_web_patterns.yaml metadata) - XPath -> entity type correlations The pipeline: 1. Load LLM annotations for a custodian's web archives 2. Load layout hints for XPath -> entity type correlations 3. For each LLM-extracted entity: a. Apply layout scoring (boost if XPath matches expected location for entity type) b. Apply pattern validation (boost if text matches pattern for claimed type) c. Calculate final confidence score 4. Merge validated entities into custodian file Usage: python scripts/extract_hybrid.py --dry-run --limit 5 --verbose python scripts/extract_hybrid.py --custodian NL-DR-ASS-A-DA """ import argparse import glob import re from dataclasses import dataclass, field from datetime import datetime, timezone from pathlib import Path from typing import Any, Optional import yaml # ============================================================================ # DATA CLASSES # ============================================================================ @dataclass class LayoutHint: """Configuration for XPath -> entity type correlation.""" entity_type: str description: str primary_xpaths: list[str] confidence_boost: float @dataclass class EntityClaim: """An extracted entity claim from LLM annotation.""" claim_id: str text_content: str hypernym: str # e.g., GRP, TOP, AGT hyponym: str # e.g., GRP.HER, TOP.ADR xpath: str recognition_confidence: float # Computed scores layout_score: float = 0.0 pattern_score: float = 0.0 final_confidence: float = 0.0 pattern_match: Optional[str] = None layout_match: Optional[str] = None @dataclass class ProcessingStats: """Statistics for processing run.""" files_processed: int = 0 files_with_annotations: int = 0 files_updated: int = 0 total_entities: int = 0 entities_boosted_by_layout: int = 0 entities_boosted_by_pattern: int = 0 entities_above_threshold: int = 0 # ============================================================================ # YAML HANDLING # ============================================================================ class CustomDumper(yaml.SafeDumper): """Custom YAML dumper to preserve formatting.""" pass def str_representer(dumper, data): """Represent strings with proper multiline handling.""" if '\n' in data: return dumper.represent_scalar('tag:yaml.org,2002:str', data, style='|') return dumper.represent_scalar('tag:yaml.org,2002:str', data) CustomDumper.add_representer(str, str_representer) def load_yaml(filepath: Path) -> dict: """Load a YAML file.""" with open(filepath, 'r', encoding='utf-8') as f: return yaml.safe_load(f) or {} def save_yaml(filepath: Path, data: dict) -> None: """Save data to a YAML file.""" with open(filepath, 'w', encoding='utf-8') as f: yaml.dump(data, f, Dumper=CustomDumper, allow_unicode=True, default_flow_style=False, sort_keys=False, width=120) # ============================================================================ # LAYOUT HINTS LOADING # ============================================================================ def load_layout_hints(pattern_file: Path) -> dict[str, LayoutHint]: """ Load layout hints from dutch_web_patterns.yaml metadata. Returns: Dict mapping entity type (e.g., 'GRP.HER') to LayoutHint config """ data = load_yaml(pattern_file) metadata = data.get('metadata', {}) layout_hints_config = metadata.get('layout_hints', {}) high_conf = layout_hints_config.get('high_confidence_locations', {}) hints = {} for entity_type, config in high_conf.items(): if isinstance(config, dict): hints[entity_type] = LayoutHint( entity_type=entity_type, description=config.get('description', ''), primary_xpaths=config.get('primary_xpaths', []), confidence_boost=config.get('confidence_boost', 0.1) ) return hints def load_low_confidence_locations(pattern_file: Path) -> list[str]: """Load XPath patterns that should be deprioritized.""" data = load_yaml(pattern_file) metadata = data.get('metadata', {}) layout_hints_config = metadata.get('layout_hints', {}) return layout_hints_config.get('low_confidence_locations', []) def load_discard_locations(pattern_file: Path) -> list[str]: """Load XPath patterns that should be discarded entirely.""" data = load_yaml(pattern_file) metadata = data.get('metadata', {}) layout_hints_config = metadata.get('layout_hints', {}) return layout_hints_config.get('discard_locations', []) # ============================================================================ # XPATH MATCHING # ============================================================================ def normalize_xpath(xpath: str) -> str: """ Normalize XPath for matching against layout hints. Simplifies complex XPaths like: /html/body/div[4]/section/div/div/div[1]/div/h1 to canonical patterns like: body/*/h1 """ if not xpath: return '' # Remove leading /html if present xpath = re.sub(r'^/html/?', '', xpath) # Remove numeric indices from elements xpath = re.sub(r'\[\d+\]', '', xpath) # Simplify attribute selectors (keep just the attribute name) xpath = re.sub(r'\[@\w+=[\'"][^\'"]+[\'"]\]', '', xpath) return xpath def xpath_matches_pattern(xpath: str, pattern: str) -> bool: """ Check if an XPath matches a layout hint pattern. Handles wildcards in patterns: - body/*/h1 matches body/div/section/h1 - body/footer/* matches body/footer/div/p """ normalized = normalize_xpath(xpath) # Direct match if normalized == pattern: return True # Handle wildcard patterns if '*' in pattern: # Convert pattern to regex # body/*/h1 -> body/.+/h1 # body/footer/* -> body/footer/.+ regex_pattern = pattern.replace('*', '.+') regex_pattern = f'^{regex_pattern}$' try: if re.match(regex_pattern, normalized): return True except re.error: pass # Check if pattern is a suffix of the xpath # e.g., "head/title" matches "/html/head/title" if normalized.endswith('/' + pattern) or normalized == pattern: return True # Check if key elements match # e.g., "body/*/h1" should match "body/div/section/header/h1" pattern_parts = pattern.split('/') xpath_parts = normalized.split('/') if len(pattern_parts) <= len(xpath_parts): # Check first and last elements if pattern_parts[0] == xpath_parts[0] or pattern_parts[0] == '*': if pattern_parts[-1] == xpath_parts[-1] or pattern_parts[-1] == '*': return True return False def calculate_layout_score( xpath: str, entity_type: str, layout_hints: dict[str, LayoutHint], low_conf_locations: list[str], discard_locations: list[str] ) -> tuple[float, Optional[str]]: """ Calculate layout-based confidence adjustment for an entity. Returns: Tuple of (score_adjustment, matched_pattern_or_None) - Positive score = boost (entity at expected location) - Negative score = penalty (entity at low-confidence location) - Zero = neutral """ if not xpath: return 0.0, None normalized = normalize_xpath(xpath) # Check discard locations first (severe penalty) for discard_pattern in discard_locations: if xpath_matches_pattern(xpath, discard_pattern): return -0.5, f"discard:{discard_pattern}" # Check low confidence locations (mild penalty) for low_conf_pattern in low_conf_locations: if xpath_matches_pattern(xpath, low_conf_pattern): return -0.1, f"low_conf:{low_conf_pattern}" # Check high confidence locations for this entity type # Try both the full hyponym (GRP.HER) and the hypernym (GRP) entity_types_to_check = [entity_type] if '.' in entity_type: # Add parent types: GRP.HER.MUS -> [GRP.HER.MUS, GRP.HER, GRP] parts = entity_type.split('.') for i in range(len(parts) - 1, 0, -1): entity_types_to_check.append('.'.join(parts[:i])) for check_type in entity_types_to_check: if check_type in layout_hints: hint = layout_hints[check_type] for pattern in hint.primary_xpaths: if xpath_matches_pattern(xpath, pattern): return hint.confidence_boost, f"high_conf:{pattern}" return 0.0, None # ============================================================================ # PATTERN VALIDATION # ============================================================================ # Entity type patterns for quick validation ENTITY_TYPE_PATTERNS = { 'GRP.HER': [ r'\b(museum|archief|bibliotheek|collectie|erfgoed)\b', r'\bherinneringscentrum\b', r'\bdocumentatiecentrum\b', ], 'GRP.ASS': [ r'\b(vereniging|stichting|genootschap|kring)\b', r'\bheemkunde', r'\bhistorisch', ], 'GRP.GOV': [ r'\bgemeente\s+\w+', r'\bprovincie\s+\w+', r'\brijks', r'\bnationaal', ], 'GRP.EDU': [ r'\b(universiteit|hogeschool|academie|school)\b', r'\bonderwijs', ], 'TOP.ADR': [ r'\d{4}\s*[A-Z]{2}', # Dutch postal code r'\b(straat|weg|laan|plein|gracht|singel|kade)\b', ], 'TOP.SET': [ r'^[A-Z][a-z]+$', # Proper noun (settlement name) ], 'AGT.PER': [ r'^[A-Z][a-z]+\s+[A-Z][a-z]+', # First Last name pattern r'\b(voorzitter|secretaris|penningmeester)\b', ], 'TMP.OPH': [ r'\d{1,2}:\d{2}\s*[-–]\s*\d{1,2}:\d{2}', # Time range r'\b(maandag|dinsdag|woensdag|donderdag|vrijdag|zaterdag|zondag)\b', ], } def calculate_pattern_score(text: str, entity_type: str) -> tuple[float, Optional[str]]: """ Calculate pattern-based confidence adjustment. Returns: Tuple of (score_adjustment, matched_pattern_or_None) """ if not text or not entity_type: return 0.0, None text_lower = text.lower() # Try both the full hyponym and parent types entity_types_to_check = [entity_type] if '.' in entity_type: parts = entity_type.split('.') for i in range(len(parts) - 1, 0, -1): entity_types_to_check.append('.'.join(parts[:i])) for check_type in entity_types_to_check: if check_type in ENTITY_TYPE_PATTERNS: patterns = ENTITY_TYPE_PATTERNS[check_type] for pattern in patterns: try: if re.search(pattern, text_lower, re.IGNORECASE): return 0.15, pattern except re.error: pass return 0.0, None # ============================================================================ # ANNOTATION PROCESSING # ============================================================================ def load_annotations(annotation_file: Path) -> list[EntityClaim]: """Load entity claims from an annotation file.""" data = load_yaml(annotation_file) session = data.get('session', {}) claims = session.get('claims', {}) entity_claims = claims.get('entity', []) entities = [] for claim in entity_claims: if not isinstance(claim, dict): continue text = claim.get('text_content', '') if not text: continue # Get XPath from provenance provenance = claim.get('provenance', {}) xpath = provenance.get('path', '') entity = EntityClaim( claim_id=claim.get('claim_id', ''), text_content=text, hypernym=claim.get('hypernym', ''), hyponym=claim.get('hyponym', ''), xpath=xpath, recognition_confidence=claim.get('recognition_confidence', 0.5), ) entities.append(entity) return entities def process_entity( entity: EntityClaim, layout_hints: dict[str, LayoutHint], low_conf_locations: list[str], discard_locations: list[str] ) -> EntityClaim: """ Process an entity claim and calculate final confidence. Applies: 1. Layout scoring based on XPath 2. Pattern validation based on text content 3. Combines scores with base recognition confidence """ # Use hyponym if available, otherwise hypernym entity_type = entity.hyponym or entity.hypernym # Calculate layout score layout_score, layout_match = calculate_layout_score( entity.xpath, entity_type, layout_hints, low_conf_locations, discard_locations ) entity.layout_score = layout_score entity.layout_match = layout_match # Calculate pattern score pattern_score, pattern_match = calculate_pattern_score( entity.text_content, entity_type ) entity.pattern_score = pattern_score entity.pattern_match = pattern_match # Calculate final confidence # Base + layout adjustment + pattern adjustment (capped at 1.0) base = entity.recognition_confidence final = min(1.0, max(0.0, base + layout_score + pattern_score)) entity.final_confidence = final return entity # ============================================================================ # CUSTODIAN FILE PROCESSING # ============================================================================ def find_annotation_files(base_path: Path, archive_info: dict) -> list[Path]: """Find annotation files for a web archive. Args: base_path: Base path for custodian data (e.g., data/custodian) archive_info: Web archive info dict with 'directory' key Returns: List of annotation file paths found """ directory = archive_info.get('directory', '') if not directory: return [] # The annotation file is in the web archive directory structure # data/custodian/web/NNNN/domain.com/annotations_v1.7.0.yaml annotation_files = [] # Directory format: web/NNNN/domain.com # base_path is data/custodian # So full path is: data/custodian/web/NNNN/domain.com/annotations_v1.7.0.yaml annotation_path = base_path / directory / 'annotations_v1.7.0.yaml' if annotation_path.exists(): annotation_files.append(annotation_path) return annotation_files def process_custodian( custodian_path: Path, base_path: Path, layout_hints: dict[str, LayoutHint], low_conf_locations: list[str], discard_locations: list[str], confidence_threshold: float = 0.6, dry_run: bool = False, verbose: bool = False ) -> dict: """ Process a single custodian file with hybrid extraction. Returns: Dict with processing statistics """ stats = { 'file': custodian_path.name, 'status': 'skipped', 'annotations_found': 0, 'entities_processed': 0, 'entities_boosted_layout': 0, 'entities_boosted_pattern': 0, 'entities_above_threshold': 0, 'error': None, } try: custodian_data = load_yaml(custodian_path) except Exception as e: stats['status'] = 'error' stats['error'] = str(e) return stats # Get web archives web_enrichment = custodian_data.get('web_enrichment', {}) web_archives = web_enrichment.get('web_archives', []) if not web_archives: stats['status'] = 'no_web_archives' return stats all_entities = [] for archive in web_archives: # Find annotation files annotation_files = find_annotation_files(base_path, archive) for ann_file in annotation_files: stats['annotations_found'] += 1 # Load entities from annotation entities = load_annotations(ann_file) for entity in entities: # Process with layout and pattern scoring processed = process_entity( entity, layout_hints, low_conf_locations, discard_locations ) stats['entities_processed'] += 1 if processed.layout_score > 0: stats['entities_boosted_layout'] += 1 if processed.pattern_score > 0: stats['entities_boosted_pattern'] += 1 if processed.final_confidence >= confidence_threshold: stats['entities_above_threshold'] += 1 all_entities.append(processed) if not all_entities: stats['status'] = 'no_entities_above_threshold' return stats # Deduplicate by text + type seen = set() unique_entities = [] for entity in all_entities: key = (entity.text_content.lower(), entity.hyponym or entity.hypernym) if key not in seen: seen.add(key) unique_entities.append(entity) # Build validated_entity_claims section validated_claims = { 'extraction_timestamp': datetime.now(timezone.utc).isoformat(), 'extraction_method': 'hybrid_llm_pattern_layout_v1', 'confidence_threshold': confidence_threshold, 'entities_count': len(unique_entities), 'claims': [] } for entity in unique_entities: claim = { 'entity': entity.text_content, 'entity_type': entity.hyponym or entity.hypernym, 'xpath': entity.xpath, 'base_confidence': round(entity.recognition_confidence, 3), 'layout_score': round(entity.layout_score, 3), 'pattern_score': round(entity.pattern_score, 3), 'final_confidence': round(entity.final_confidence, 3), } if entity.layout_match: claim['layout_match'] = entity.layout_match if entity.pattern_match: claim['pattern_match'] = entity.pattern_match validated_claims['claims'].append(claim) # Sort by confidence validated_claims['claims'].sort(key=lambda x: x['final_confidence'], reverse=True) # Update custodian data custodian_data['validated_entity_claims'] = validated_claims if verbose: print(f"\n {custodian_path.name}:") print(f" Annotations: {stats['annotations_found']}") print(f" Entities processed: {stats['entities_processed']}") print(f" Layout boosted: {stats['entities_boosted_layout']}") print(f" Pattern boosted: {stats['entities_boosted_pattern']}") print(f" Above threshold: {stats['entities_above_threshold']}") print(f" Unique entities: {len(unique_entities)}") # Show top entities for claim in validated_claims['claims'][:5]: boost_info = [] if claim.get('layout_match'): boost_info.append(f"L:{claim['layout_score']:+.2f}") if claim.get('pattern_match'): boost_info.append(f"P:{claim['pattern_score']:+.2f}") boost_str = ' '.join(boost_info) if boost_info else '' print(f" [{claim['final_confidence']:.2f}] {claim['entity_type']}: {claim['entity'][:50]} {boost_str}") if not dry_run: save_yaml(custodian_path, custodian_data) stats['status'] = 'updated' else: stats['status'] = 'would_update' return stats def find_custodian_files_with_web_archives(custodian_dir: Path) -> list[Path]: """Find all custodian files that have web_enrichment.web_archives.""" pattern = str(custodian_dir / "NL-*.yaml") files = [] for filepath in glob.glob(pattern): path = Path(filepath) try: with open(path, 'r', encoding='utf-8') as f: content = f.read() if 'web_archives:' in content: files.append(path) except Exception: continue return sorted(files) # ============================================================================ # MAIN # ============================================================================ def main(): parser = argparse.ArgumentParser( description='Hybrid entity extraction: LLM annotations + pattern validation + layout scoring' ) parser.add_argument( '--dry-run', action='store_true', help='Show what would be done without making changes' ) parser.add_argument( '--limit', type=int, default=None, help='Limit number of files to process' ) parser.add_argument( '--custodian', type=str, default=None, help='Process only a specific custodian GHCID' ) parser.add_argument( '--custodian-dir', type=Path, default=Path('/Users/kempersc/apps/glam/data/custodian'), help='Directory containing custodian YAML files' ) parser.add_argument( '--pattern-file', type=Path, default=Path('/Users/kempersc/apps/glam/data/entity_annotation/modules/processing/dutch_web_patterns.yaml'), help='Path to pattern definition file with layout hints' ) parser.add_argument( '--confidence-threshold', type=float, default=0.6, help='Minimum final confidence to include entity (default: 0.6)' ) parser.add_argument( '--verbose', '-v', action='store_true', help='Show detailed output' ) args = parser.parse_args() custodian_dir = args.custodian_dir base_path = custodian_dir # Load layout hints print(f"Loading layout hints from {args.pattern_file}...") try: layout_hints = load_layout_hints(args.pattern_file) low_conf_locations = load_low_confidence_locations(args.pattern_file) discard_locations = load_discard_locations(args.pattern_file) print(f" Loaded {len(layout_hints)} entity type layout hints") print(f" Loaded {len(low_conf_locations)} low-confidence locations") print(f" Loaded {len(discard_locations)} discard locations") except Exception as e: print(f"Error loading layout hints: {e}") return 1 # Find custodian files if args.custodian: specific_file = custodian_dir / f"{args.custodian}.yaml" if not specific_file.exists(): print(f"Error: Custodian file not found: {specific_file}") return 1 files = [specific_file] print(f"Processing specific custodian: {args.custodian}") else: print(f"Scanning for custodian files with web archives...") files = find_custodian_files_with_web_archives(custodian_dir) print(f"Found {len(files)} custodian files with web_archives") if args.limit: files = files[:args.limit] print(f"Limited to {args.limit} files") if args.dry_run: print("\n*** DRY RUN - No changes will be made ***\n") # Process statistics total_stats = ProcessingStats() for filepath in files: stats = process_custodian( filepath, base_path, layout_hints, low_conf_locations, discard_locations, confidence_threshold=args.confidence_threshold, dry_run=args.dry_run, verbose=args.verbose ) total_stats.files_processed += 1 if stats['annotations_found'] > 0: total_stats.files_with_annotations += 1 if stats['status'] in ('updated', 'would_update'): total_stats.files_updated += 1 total_stats.total_entities += stats['entities_above_threshold'] total_stats.entities_boosted_by_layout += stats['entities_boosted_layout'] total_stats.entities_boosted_by_pattern += stats['entities_boosted_pattern'] if not args.verbose and stats['status'] in ('updated', 'would_update'): print(f" {stats['file']}: {stats['entities_above_threshold']} entities") # Summary print("\n" + "=" * 60) print("SUMMARY") print("=" * 60) print(f"Files processed: {total_stats.files_processed}") print(f"Files with annotations: {total_stats.files_with_annotations}") print(f"Files updated: {total_stats.files_updated}") print(f"Total entities extracted: {total_stats.total_entities}") print(f"Entities boosted (layout): {total_stats.entities_boosted_by_layout}") print(f"Entities boosted (pattern):{total_stats.entities_boosted_by_pattern}") print(f"Confidence threshold: {args.confidence_threshold}") if args.dry_run: print("\n*** DRY RUN - No changes were made ***") return 0 if __name__ == '__main__': exit(main())