glam/scripts/extract_hybrid.py
2025-12-14 17:09:55 +01:00

770 lines
25 KiB
Python
Executable file
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Hybrid Entity Extraction: LLM Annotations + Pattern Validation + Layout Scoring
This script combines three sources of entity knowledge:
1. LLM-extracted annotations (annotations_v1.7.0.yaml) - entities with XPath provenance
2. Pattern-based validation (dutch_web_patterns.yaml) - regex patterns for entity types
3. Layout hints (from dutch_web_patterns.yaml metadata) - XPath -> entity type correlations
The pipeline:
1. Load LLM annotations for a custodian's web archives
2. Load layout hints for XPath -> entity type correlations
3. For each LLM-extracted entity:
a. Apply layout scoring (boost if XPath matches expected location for entity type)
b. Apply pattern validation (boost if text matches pattern for claimed type)
c. Calculate final confidence score
4. Merge validated entities into custodian file
Usage:
python scripts/extract_hybrid.py --dry-run --limit 5 --verbose
python scripts/extract_hybrid.py --custodian NL-DR-ASS-A-DA
"""
import argparse
import glob
import re
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Optional
import yaml
# ============================================================================
# DATA CLASSES
# ============================================================================
@dataclass
class LayoutHint:
"""Configuration for XPath -> entity type correlation."""
entity_type: str
description: str
primary_xpaths: list[str]
confidence_boost: float
@dataclass
class EntityClaim:
"""An extracted entity claim from LLM annotation."""
claim_id: str
text_content: str
hypernym: str # e.g., GRP, TOP, AGT
hyponym: str # e.g., GRP.HER, TOP.ADR
xpath: str
recognition_confidence: float
# Computed scores
layout_score: float = 0.0
pattern_score: float = 0.0
final_confidence: float = 0.0
pattern_match: Optional[str] = None
layout_match: Optional[str] = None
@dataclass
class ProcessingStats:
"""Statistics for processing run."""
files_processed: int = 0
files_with_annotations: int = 0
files_updated: int = 0
total_entities: int = 0
entities_boosted_by_layout: int = 0
entities_boosted_by_pattern: int = 0
entities_above_threshold: int = 0
# ============================================================================
# YAML HANDLING
# ============================================================================
class CustomDumper(yaml.SafeDumper):
"""Custom YAML dumper to preserve formatting."""
pass
def str_representer(dumper, data):
"""Represent strings with proper multiline handling."""
if '\n' in data:
return dumper.represent_scalar('tag:yaml.org,2002:str', data, style='|')
return dumper.represent_scalar('tag:yaml.org,2002:str', data)
CustomDumper.add_representer(str, str_representer)
def load_yaml(filepath: Path) -> dict:
"""Load a YAML file."""
with open(filepath, 'r', encoding='utf-8') as f:
return yaml.safe_load(f) or {}
def save_yaml(filepath: Path, data: dict) -> None:
"""Save data to a YAML file."""
with open(filepath, 'w', encoding='utf-8') as f:
yaml.dump(data, f, Dumper=CustomDumper, allow_unicode=True,
default_flow_style=False, sort_keys=False, width=120)
# ============================================================================
# LAYOUT HINTS LOADING
# ============================================================================
def load_layout_hints(pattern_file: Path) -> dict[str, LayoutHint]:
"""
Load layout hints from dutch_web_patterns.yaml metadata.
Returns:
Dict mapping entity type (e.g., 'GRP.HER') to LayoutHint config
"""
data = load_yaml(pattern_file)
metadata = data.get('metadata', {})
layout_hints_config = metadata.get('layout_hints', {})
high_conf = layout_hints_config.get('high_confidence_locations', {})
hints = {}
for entity_type, config in high_conf.items():
if isinstance(config, dict):
hints[entity_type] = LayoutHint(
entity_type=entity_type,
description=config.get('description', ''),
primary_xpaths=config.get('primary_xpaths', []),
confidence_boost=config.get('confidence_boost', 0.1)
)
return hints
def load_low_confidence_locations(pattern_file: Path) -> list[str]:
"""Load XPath patterns that should be deprioritized."""
data = load_yaml(pattern_file)
metadata = data.get('metadata', {})
layout_hints_config = metadata.get('layout_hints', {})
return layout_hints_config.get('low_confidence_locations', [])
def load_discard_locations(pattern_file: Path) -> list[str]:
"""Load XPath patterns that should be discarded entirely."""
data = load_yaml(pattern_file)
metadata = data.get('metadata', {})
layout_hints_config = metadata.get('layout_hints', {})
return layout_hints_config.get('discard_locations', [])
# ============================================================================
# XPATH MATCHING
# ============================================================================
def normalize_xpath(xpath: str) -> str:
"""
Normalize XPath for matching against layout hints.
Simplifies complex XPaths like:
/html/body/div[4]/section/div/div/div[1]/div/h1
to canonical patterns like:
body/*/h1
"""
if not xpath:
return ''
# Remove leading /html if present
xpath = re.sub(r'^/html/?', '', xpath)
# Remove numeric indices from elements
xpath = re.sub(r'\[\d+\]', '', xpath)
# Simplify attribute selectors (keep just the attribute name)
xpath = re.sub(r'\[@\w+=[\'"][^\'"]+[\'"]\]', '', xpath)
return xpath
def xpath_matches_pattern(xpath: str, pattern: str) -> bool:
"""
Check if an XPath matches a layout hint pattern.
Handles wildcards in patterns:
- body/*/h1 matches body/div/section/h1
- body/footer/* matches body/footer/div/p
"""
normalized = normalize_xpath(xpath)
# Direct match
if normalized == pattern:
return True
# Handle wildcard patterns
if '*' in pattern:
# Convert pattern to regex
# body/*/h1 -> body/.+/h1
# body/footer/* -> body/footer/.+
regex_pattern = pattern.replace('*', '.+')
regex_pattern = f'^{regex_pattern}$'
try:
if re.match(regex_pattern, normalized):
return True
except re.error:
pass
# Check if pattern is a suffix of the xpath
# e.g., "head/title" matches "/html/head/title"
if normalized.endswith('/' + pattern) or normalized == pattern:
return True
# Check if key elements match
# e.g., "body/*/h1" should match "body/div/section/header/h1"
pattern_parts = pattern.split('/')
xpath_parts = normalized.split('/')
if len(pattern_parts) <= len(xpath_parts):
# Check first and last elements
if pattern_parts[0] == xpath_parts[0] or pattern_parts[0] == '*':
if pattern_parts[-1] == xpath_parts[-1] or pattern_parts[-1] == '*':
return True
return False
def calculate_layout_score(
xpath: str,
entity_type: str,
layout_hints: dict[str, LayoutHint],
low_conf_locations: list[str],
discard_locations: list[str]
) -> tuple[float, Optional[str]]:
"""
Calculate layout-based confidence adjustment for an entity.
Returns:
Tuple of (score_adjustment, matched_pattern_or_None)
- Positive score = boost (entity at expected location)
- Negative score = penalty (entity at low-confidence location)
- Zero = neutral
"""
if not xpath:
return 0.0, None
normalized = normalize_xpath(xpath)
# Check discard locations first (severe penalty)
for discard_pattern in discard_locations:
if xpath_matches_pattern(xpath, discard_pattern):
return -0.5, f"discard:{discard_pattern}"
# Check low confidence locations (mild penalty)
for low_conf_pattern in low_conf_locations:
if xpath_matches_pattern(xpath, low_conf_pattern):
return -0.1, f"low_conf:{low_conf_pattern}"
# Check high confidence locations for this entity type
# Try both the full hyponym (GRP.HER) and the hypernym (GRP)
entity_types_to_check = [entity_type]
if '.' in entity_type:
# Add parent types: GRP.HER.MUS -> [GRP.HER.MUS, GRP.HER, GRP]
parts = entity_type.split('.')
for i in range(len(parts) - 1, 0, -1):
entity_types_to_check.append('.'.join(parts[:i]))
for check_type in entity_types_to_check:
if check_type in layout_hints:
hint = layout_hints[check_type]
for pattern in hint.primary_xpaths:
if xpath_matches_pattern(xpath, pattern):
return hint.confidence_boost, f"high_conf:{pattern}"
return 0.0, None
# ============================================================================
# PATTERN VALIDATION
# ============================================================================
# Entity type patterns for quick validation
ENTITY_TYPE_PATTERNS = {
'GRP.HER': [
r'\b(museum|archief|bibliotheek|collectie|erfgoed)\b',
r'\bherinneringscentrum\b',
r'\bdocumentatiecentrum\b',
],
'GRP.ASS': [
r'\b(vereniging|stichting|genootschap|kring)\b',
r'\bheemkunde',
r'\bhistorisch',
],
'GRP.GOV': [
r'\bgemeente\s+\w+',
r'\bprovincie\s+\w+',
r'\brijks',
r'\bnationaal',
],
'GRP.EDU': [
r'\b(universiteit|hogeschool|academie|school)\b',
r'\bonderwijs',
],
'TOP.ADR': [
r'\d{4}\s*[A-Z]{2}', # Dutch postal code
r'\b(straat|weg|laan|plein|gracht|singel|kade)\b',
],
'TOP.SET': [
r'^[A-Z][a-z]+$', # Proper noun (settlement name)
],
'AGT.PER': [
r'^[A-Z][a-z]+\s+[A-Z][a-z]+', # First Last name pattern
r'\b(voorzitter|secretaris|penningmeester)\b',
],
'TMP.OPH': [
r'\d{1,2}:\d{2}\s*[-]\s*\d{1,2}:\d{2}', # Time range
r'\b(maandag|dinsdag|woensdag|donderdag|vrijdag|zaterdag|zondag)\b',
],
}
def calculate_pattern_score(text: str, entity_type: str) -> tuple[float, Optional[str]]:
"""
Calculate pattern-based confidence adjustment.
Returns:
Tuple of (score_adjustment, matched_pattern_or_None)
"""
if not text or not entity_type:
return 0.0, None
text_lower = text.lower()
# Try both the full hyponym and parent types
entity_types_to_check = [entity_type]
if '.' in entity_type:
parts = entity_type.split('.')
for i in range(len(parts) - 1, 0, -1):
entity_types_to_check.append('.'.join(parts[:i]))
for check_type in entity_types_to_check:
if check_type in ENTITY_TYPE_PATTERNS:
patterns = ENTITY_TYPE_PATTERNS[check_type]
for pattern in patterns:
try:
if re.search(pattern, text_lower, re.IGNORECASE):
return 0.15, pattern
except re.error:
pass
return 0.0, None
# ============================================================================
# ANNOTATION PROCESSING
# ============================================================================
def load_annotations(annotation_file: Path) -> list[EntityClaim]:
"""Load entity claims from an annotation file."""
data = load_yaml(annotation_file)
session = data.get('session', {})
claims = session.get('claims', {})
entity_claims = claims.get('entity', [])
entities = []
for claim in entity_claims:
if not isinstance(claim, dict):
continue
text = claim.get('text_content', '')
if not text:
continue
# Get XPath from provenance
provenance = claim.get('provenance', {})
xpath = provenance.get('path', '')
entity = EntityClaim(
claim_id=claim.get('claim_id', ''),
text_content=text,
hypernym=claim.get('hypernym', ''),
hyponym=claim.get('hyponym', ''),
xpath=xpath,
recognition_confidence=claim.get('recognition_confidence', 0.5),
)
entities.append(entity)
return entities
def process_entity(
entity: EntityClaim,
layout_hints: dict[str, LayoutHint],
low_conf_locations: list[str],
discard_locations: list[str]
) -> EntityClaim:
"""
Process an entity claim and calculate final confidence.
Applies:
1. Layout scoring based on XPath
2. Pattern validation based on text content
3. Combines scores with base recognition confidence
"""
# Use hyponym if available, otherwise hypernym
entity_type = entity.hyponym or entity.hypernym
# Calculate layout score
layout_score, layout_match = calculate_layout_score(
entity.xpath,
entity_type,
layout_hints,
low_conf_locations,
discard_locations
)
entity.layout_score = layout_score
entity.layout_match = layout_match
# Calculate pattern score
pattern_score, pattern_match = calculate_pattern_score(
entity.text_content,
entity_type
)
entity.pattern_score = pattern_score
entity.pattern_match = pattern_match
# Calculate final confidence
# Base + layout adjustment + pattern adjustment (capped at 1.0)
base = entity.recognition_confidence
final = min(1.0, max(0.0, base + layout_score + pattern_score))
entity.final_confidence = final
return entity
# ============================================================================
# CUSTODIAN FILE PROCESSING
# ============================================================================
def find_annotation_files(base_path: Path, archive_info: dict) -> list[Path]:
"""Find annotation files for a web archive.
Args:
base_path: Base path for custodian data (e.g., data/custodian)
archive_info: Web archive info dict with 'directory' key
Returns:
List of annotation file paths found
"""
directory = archive_info.get('directory', '')
if not directory:
return []
# The annotation file is in the web archive directory structure
# data/custodian/web/NNNN/domain.com/annotations_v1.7.0.yaml
annotation_files = []
# Directory format: web/NNNN/domain.com
# base_path is data/custodian
# So full path is: data/custodian/web/NNNN/domain.com/annotations_v1.7.0.yaml
annotation_path = base_path / directory / 'annotations_v1.7.0.yaml'
if annotation_path.exists():
annotation_files.append(annotation_path)
return annotation_files
def process_custodian(
custodian_path: Path,
base_path: Path,
layout_hints: dict[str, LayoutHint],
low_conf_locations: list[str],
discard_locations: list[str],
confidence_threshold: float = 0.6,
dry_run: bool = False,
verbose: bool = False
) -> dict:
"""
Process a single custodian file with hybrid extraction.
Returns:
Dict with processing statistics
"""
stats = {
'file': custodian_path.name,
'status': 'skipped',
'annotations_found': 0,
'entities_processed': 0,
'entities_boosted_layout': 0,
'entities_boosted_pattern': 0,
'entities_above_threshold': 0,
'error': None,
}
try:
custodian_data = load_yaml(custodian_path)
except Exception as e:
stats['status'] = 'error'
stats['error'] = str(e)
return stats
# Get web archives
web_enrichment = custodian_data.get('web_enrichment', {})
web_archives = web_enrichment.get('web_archives', [])
if not web_archives:
stats['status'] = 'no_web_archives'
return stats
all_entities = []
for archive in web_archives:
# Find annotation files
annotation_files = find_annotation_files(base_path, archive)
for ann_file in annotation_files:
stats['annotations_found'] += 1
# Load entities from annotation
entities = load_annotations(ann_file)
for entity in entities:
# Process with layout and pattern scoring
processed = process_entity(
entity,
layout_hints,
low_conf_locations,
discard_locations
)
stats['entities_processed'] += 1
if processed.layout_score > 0:
stats['entities_boosted_layout'] += 1
if processed.pattern_score > 0:
stats['entities_boosted_pattern'] += 1
if processed.final_confidence >= confidence_threshold:
stats['entities_above_threshold'] += 1
all_entities.append(processed)
if not all_entities:
stats['status'] = 'no_entities_above_threshold'
return stats
# Deduplicate by text + type
seen = set()
unique_entities = []
for entity in all_entities:
key = (entity.text_content.lower(), entity.hyponym or entity.hypernym)
if key not in seen:
seen.add(key)
unique_entities.append(entity)
# Build validated_entity_claims section
validated_claims = {
'extraction_timestamp': datetime.now(timezone.utc).isoformat(),
'extraction_method': 'hybrid_llm_pattern_layout_v1',
'confidence_threshold': confidence_threshold,
'entities_count': len(unique_entities),
'claims': []
}
for entity in unique_entities:
claim = {
'entity': entity.text_content,
'entity_type': entity.hyponym or entity.hypernym,
'xpath': entity.xpath,
'base_confidence': round(entity.recognition_confidence, 3),
'layout_score': round(entity.layout_score, 3),
'pattern_score': round(entity.pattern_score, 3),
'final_confidence': round(entity.final_confidence, 3),
}
if entity.layout_match:
claim['layout_match'] = entity.layout_match
if entity.pattern_match:
claim['pattern_match'] = entity.pattern_match
validated_claims['claims'].append(claim)
# Sort by confidence
validated_claims['claims'].sort(key=lambda x: x['final_confidence'], reverse=True)
# Update custodian data
custodian_data['validated_entity_claims'] = validated_claims
if verbose:
print(f"\n {custodian_path.name}:")
print(f" Annotations: {stats['annotations_found']}")
print(f" Entities processed: {stats['entities_processed']}")
print(f" Layout boosted: {stats['entities_boosted_layout']}")
print(f" Pattern boosted: {stats['entities_boosted_pattern']}")
print(f" Above threshold: {stats['entities_above_threshold']}")
print(f" Unique entities: {len(unique_entities)}")
# Show top entities
for claim in validated_claims['claims'][:5]:
boost_info = []
if claim.get('layout_match'):
boost_info.append(f"L:{claim['layout_score']:+.2f}")
if claim.get('pattern_match'):
boost_info.append(f"P:{claim['pattern_score']:+.2f}")
boost_str = ' '.join(boost_info) if boost_info else ''
print(f" [{claim['final_confidence']:.2f}] {claim['entity_type']}: {claim['entity'][:50]} {boost_str}")
if not dry_run:
save_yaml(custodian_path, custodian_data)
stats['status'] = 'updated'
else:
stats['status'] = 'would_update'
return stats
def find_custodian_files_with_web_archives(custodian_dir: Path) -> list[Path]:
"""Find all custodian files that have web_enrichment.web_archives."""
pattern = str(custodian_dir / "NL-*.yaml")
files = []
for filepath in glob.glob(pattern):
path = Path(filepath)
try:
with open(path, 'r', encoding='utf-8') as f:
content = f.read()
if 'web_archives:' in content:
files.append(path)
except Exception:
continue
return sorted(files)
# ============================================================================
# MAIN
# ============================================================================
def main():
parser = argparse.ArgumentParser(
description='Hybrid entity extraction: LLM annotations + pattern validation + layout scoring'
)
parser.add_argument(
'--dry-run',
action='store_true',
help='Show what would be done without making changes'
)
parser.add_argument(
'--limit',
type=int,
default=None,
help='Limit number of files to process'
)
parser.add_argument(
'--custodian',
type=str,
default=None,
help='Process only a specific custodian GHCID'
)
parser.add_argument(
'--custodian-dir',
type=Path,
default=Path('/Users/kempersc/apps/glam/data/custodian'),
help='Directory containing custodian YAML files'
)
parser.add_argument(
'--pattern-file',
type=Path,
default=Path('/Users/kempersc/apps/glam/data/entity_annotation/modules/processing/dutch_web_patterns.yaml'),
help='Path to pattern definition file with layout hints'
)
parser.add_argument(
'--confidence-threshold',
type=float,
default=0.6,
help='Minimum final confidence to include entity (default: 0.6)'
)
parser.add_argument(
'--verbose', '-v',
action='store_true',
help='Show detailed output'
)
args = parser.parse_args()
custodian_dir = args.custodian_dir
base_path = custodian_dir
# Load layout hints
print(f"Loading layout hints from {args.pattern_file}...")
try:
layout_hints = load_layout_hints(args.pattern_file)
low_conf_locations = load_low_confidence_locations(args.pattern_file)
discard_locations = load_discard_locations(args.pattern_file)
print(f" Loaded {len(layout_hints)} entity type layout hints")
print(f" Loaded {len(low_conf_locations)} low-confidence locations")
print(f" Loaded {len(discard_locations)} discard locations")
except Exception as e:
print(f"Error loading layout hints: {e}")
return 1
# Find custodian files
if args.custodian:
specific_file = custodian_dir / f"{args.custodian}.yaml"
if not specific_file.exists():
print(f"Error: Custodian file not found: {specific_file}")
return 1
files = [specific_file]
print(f"Processing specific custodian: {args.custodian}")
else:
print(f"Scanning for custodian files with web archives...")
files = find_custodian_files_with_web_archives(custodian_dir)
print(f"Found {len(files)} custodian files with web_archives")
if args.limit:
files = files[:args.limit]
print(f"Limited to {args.limit} files")
if args.dry_run:
print("\n*** DRY RUN - No changes will be made ***\n")
# Process statistics
total_stats = ProcessingStats()
for filepath in files:
stats = process_custodian(
filepath,
base_path,
layout_hints,
low_conf_locations,
discard_locations,
confidence_threshold=args.confidence_threshold,
dry_run=args.dry_run,
verbose=args.verbose
)
total_stats.files_processed += 1
if stats['annotations_found'] > 0:
total_stats.files_with_annotations += 1
if stats['status'] in ('updated', 'would_update'):
total_stats.files_updated += 1
total_stats.total_entities += stats['entities_above_threshold']
total_stats.entities_boosted_by_layout += stats['entities_boosted_layout']
total_stats.entities_boosted_by_pattern += stats['entities_boosted_pattern']
if not args.verbose and stats['status'] in ('updated', 'would_update'):
print(f" {stats['file']}: {stats['entities_above_threshold']} entities")
# Summary
print("\n" + "=" * 60)
print("SUMMARY")
print("=" * 60)
print(f"Files processed: {total_stats.files_processed}")
print(f"Files with annotations: {total_stats.files_with_annotations}")
print(f"Files updated: {total_stats.files_updated}")
print(f"Total entities extracted: {total_stats.total_entities}")
print(f"Entities boosted (layout): {total_stats.entities_boosted_by_layout}")
print(f"Entities boosted (pattern):{total_stats.entities_boosted_by_pattern}")
print(f"Confidence threshold: {args.confidence_threshold}")
if args.dry_run:
print("\n*** DRY RUN - No changes were made ***")
return 0
if __name__ == '__main__':
exit(main())