glam/scripts/batch_annotate_nde.py
2025-12-05 15:30:23 +01:00

852 lines
30 KiB
Python

#!/usr/bin/env python3
"""
Batch LLM Annotation Script for NDE Entries.
This script processes NDE (Network Digital Heritage) entry files
using the schema-driven LLM annotator to extract structured data
from archived web pages.
Usage:
# Process all entries with web archives
python scripts/batch_annotate_nde.py
# Process specific entries
python scripts/batch_annotate_nde.py --entries 1631 1586 0182
# Use different schema
python scripts/batch_annotate_nde.py --schema heritage_custodian
# Limit concurrency
python scripts/batch_annotate_nde.py --concurrency 2
Features:
- Schema-driven extraction using GLAMSchema
- GLiNER2-style field specifications
- Concurrent processing with rate limiting
- YAML output with provenance tracking
- Resume capability (skips already processed)
"""
import argparse
import asyncio
import logging
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import yaml
# Add src directory to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
from dotenv import load_dotenv
load_dotenv()
from glam_extractor.annotators.llm_annotator import (
LLMAnnotator,
LLMAnnotatorConfig,
LLMProvider,
AnnotationSession,
)
from glam_extractor.annotators.schema_builder import (
GLAMSchema,
FieldSpec,
heritage_custodian_schema,
web_observation_schema,
)
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('batch_annotate.log'),
]
)
logger = logging.getLogger(__name__)
# =============================================================================
# CONSTANTS
# =============================================================================
NDE_ENTRIES_DIR = Path("data/nde/enriched/entries")
WEB_ARCHIVES_DIR = Path("data/nde/enriched/entries/web")
OUTPUT_DIR = Path("data/nde/enriched/llm_annotations")
# =============================================================================
# CUSTOM SCHEMAS
# =============================================================================
def heritage_institution_schema() -> GLAMSchema:
"""
Schema optimized for NDE heritage institution web pages.
Uses GLiNER2-style field specifications.
"""
return (
GLAMSchema("nde_heritage_institution")
.entities("GRP", "TOP", "AGT", "TMP", "APP", "QTY")
.classification(
"institution_type",
choices=[
"MUSEUM", "ARCHIVE", "LIBRARY", "GALLERY",
"COLLECTING_SOCIETY", "HISTORICAL_SOCIETY",
"RESEARCH_CENTER", "HOLY_SITE", "BOTANICAL_ZOO",
"EDUCATION_PROVIDER", "DIGITAL_PLATFORM", "UNKNOWN"
],
description="Primary classification of the heritage institution"
)
.structure()
# Core identification
.field("full_name", dtype="str", required=True,
description="Official institution name as it appears on the website")
.field("short_name", dtype="str",
description="Abbreviated name or acronym")
.field("alternative_names", dtype="list",
description="Other names, translations, historical names")
# Contact information
.field("email", dtype="str",
description="Primary contact email",
pattern=r"^[^@]+@[^@]+\.[^@]+$")
.field("phone", dtype="str",
description="Contact phone number")
.field("street_address", dtype="str",
description="Street address")
.field("postal_code", dtype="str",
description="Postal code")
.field("city", dtype="str",
description="City name")
# Web presence
.field("website", dtype="str",
description="Official website URL")
.field("social_facebook", dtype="str",
description="Facebook page URL")
.field("social_instagram", dtype="str",
description="Instagram profile URL")
.field("social_twitter", dtype="str",
description="Twitter/X profile URL")
.field("social_linkedin", dtype="str",
description="LinkedIn page URL")
.field("social_youtube", dtype="str",
description="YouTube channel URL")
# Operational info
.field("opening_hours", dtype="str",
description="Opening hours description")
.field("admission_info", dtype="str",
description="Admission/ticket information")
.field("accessibility_info", dtype="str",
description="Accessibility information")
# Institutional info
.field("description", dtype="str",
description="Brief description of the institution")
.field("founding_date", dtype="str",
description="When the institution was founded")
.field("collection_description", dtype="str",
description="Description of collections/holdings")
.field("collection_size", dtype="str",
description="Size of collections (e.g., '1000 artworks')")
# Identifiers
.field("kvk_number", dtype="str",
description="Dutch Chamber of Commerce number",
pattern=r"^\d{8}$")
.field("isil_code", dtype="str",
description="ISIL identifier",
pattern=r"^[A-Z]{2}-[A-Za-z0-9]+$")
.field("wikidata_id", dtype="str",
description="Wikidata Q-number",
pattern=r"^Q\d+$")
# Relationships
.field("parent_organization", dtype="str",
description="Parent or umbrella organization")
.field("affiliated_organizations", dtype="list",
description="Partner or affiliated institutions")
.claims()
.relations("located_in", "part_of", "operates", "founded_by")
.build()
)
# =============================================================================
# ENTRY PROCESSING
# =============================================================================
def load_nde_entry(entry_path: Path) -> Optional[Dict[str, Any]]:
"""Load an NDE entry YAML file."""
try:
with open(entry_path, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
except Exception as e:
logger.error(f"Failed to load {entry_path}: {e}")
return None
def find_web_archive(entry: Dict[str, Any], entry_index: str) -> Optional[Path]:
"""Find the web archive HTML file for an entry (single page, for compatibility)."""
# Check web_enrichment field
web_enrichment = entry.get('web_enrichment', {})
web_archives = web_enrichment.get('web_archives', [])
for archive in web_archives:
archive_dir = archive.get('directory')
if archive_dir:
html_path = Path(archive_dir) / "rendered.html"
if html_path.exists():
return html_path
# Fallback: look in web/{entry_index}/
fallback_dir = WEB_ARCHIVES_DIR / entry_index
if fallback_dir.exists():
for html_file in fallback_dir.glob("**/rendered.html"):
return html_file
return None
# =============================================================================
# MULTI-PAGE KEY PAGE SELECTION
# =============================================================================
# Priority patterns for identifying key pages (ordered by importance)
KEY_PAGE_PATTERNS = [
# Homepage / index
(r'/index\.html$', 10),
(r'^index\.html$', 10),
(r'/home\.html$', 9),
# About pages (institution info)
(r'about|over-ons|over|about-us|wie-zijn-wij|organisatie', 8),
# Contact pages (addresses, emails, phones)
(r'contact|kontakt|bereikbaarheid', 8),
# Collection pages
(r'collection|collectie|holdings|catalogus', 7),
# Visit/practical pages (opening hours, accessibility)
(r'bezoek|visit|praktisch|openingstijden|toegankelijkheid|bereikbaar', 7),
# Mission/history
(r'mission|missie|history|geschiedenis|visie', 6),
# Staff pages (people)
(r'team|staff|medewerkers|personeel|mensen', 5),
]
# Maximum number of pages to process per entry
MAX_PAGES_PER_ENTRY = 10
def find_key_pages(entry: Dict[str, Any], entry_index: str, max_pages: int = MAX_PAGES_PER_ENTRY) -> List[Tuple[Path, str, int]]:
"""
Find key pages in a web archive for multi-page annotation.
Returns pages ordered by importance score.
Args:
entry: Entry data dict
entry_index: Entry index string (e.g., "0027")
max_pages: Maximum number of pages to return
Returns:
List of (path, relative_path, priority_score) tuples, sorted by score descending
"""
import re
pages: List[Tuple[Path, str, int]] = []
seen_paths: set = set()
# Determine archive directory
archive_dir = None
web_enrichment = entry.get('web_enrichment', {})
web_archives = web_enrichment.get('web_archives', [])
for archive in web_archives:
dir_path = archive.get('directory')
if dir_path and Path(dir_path).exists():
archive_dir = Path(dir_path)
break
# Fallback to web/{entry_index}/
if not archive_dir:
fallback_dir = WEB_ARCHIVES_DIR / entry_index
if fallback_dir.exists():
archive_dir = fallback_dir
if not archive_dir:
return []
# Find all HTML files
all_html_files = list(archive_dir.rglob("*.html"))
# Score each file based on patterns
for html_file in all_html_files:
relative_path = str(html_file.relative_to(archive_dir)).lower()
# Skip temporary files and duplicates
if '.tmp.' in relative_path or relative_path in seen_paths:
continue
seen_paths.add(relative_path)
# Calculate priority score
score = 0
for pattern, priority in KEY_PAGE_PATTERNS:
if re.search(pattern, relative_path, re.IGNORECASE):
score = max(score, priority)
break
# Boost rendered.html (main homepage capture)
if relative_path.endswith('rendered.html'):
score = max(score, 10)
# Add base score for index pages
if 'index' in relative_path and score == 0:
score = 3
# Skip completely unscored pages (low value)
if score > 0:
pages.append((html_file, relative_path, score))
# Sort by score descending, then by path length (shorter = more important)
pages.sort(key=lambda x: (-x[2], len(x[1])))
# Limit to max pages
return pages[:max_pages]
def merge_annotation_sessions(
sessions: List[Tuple[Any, Dict[str, Any], str]],
) -> Tuple[Any, Dict[str, Any], Dict[str, List[str]], Dict[str, str]]:
"""
Merge multiple annotation sessions from different pages.
Deduplicates entities and aggregates claims, tracking page-level provenance.
Args:
sessions: List of (session, structured_data, page_path) tuples
Returns:
Tuple of (merged_session, merged_structured_data, field_to_pages_map, claim_id_to_page_map)
"""
from dataclasses import fields as dataclass_fields
if not sessions:
return None, {}, {}, {}
# Use first session as base
base_session, base_data, base_path = sessions[0]
merged_data: Dict[str, Any] = dict(base_data) if base_data else {}
field_sources: Dict[str, List[str]] = {}
# Map claim_id -> source_page for ALL claims
claim_source_pages: Dict[str, str] = {}
# Track which pages contributed each field
for key in merged_data:
field_sources[key] = [base_path]
# Track seen entity text to deduplicate
seen_entities: Dict[str, Any] = {}
# Tag all claims from base session with source page
for claim in base_session.entity_claims:
key = f"{claim.hypernym.value}:{claim.text_content}"
seen_entities[key] = claim
claim_source_pages[claim.claim_id] = base_path
for claim in base_session.aggregate_claims:
claim_source_pages[claim.claim_id] = base_path
for claim in base_session.layout_claims:
claim_source_pages[claim.claim_id] = base_path
for claim in base_session.relationship_claims:
claim_source_pages[claim.claim_id] = base_path
# Merge additional sessions
for session, structured_data, page_path in sessions[1:]:
# Merge structured data (prefer non-None values)
if structured_data:
for key, value in structured_data.items():
if value is not None and value != "":
if key not in merged_data or merged_data[key] is None or merged_data[key] == "":
merged_data[key] = value
field_sources.setdefault(key, []).append(page_path)
elif isinstance(value, list) and isinstance(merged_data.get(key), list):
# Merge lists (e.g., alternative_names)
existing = set(str(v) for v in merged_data[key])
for item in value:
if str(item) not in existing:
merged_data[key].append(item)
field_sources.setdefault(key, []).append(page_path)
existing.add(str(item))
# Merge entities (deduplicate by hypernym:text)
for claim in session.entity_claims:
key = f"{claim.hypernym.value}:{claim.text_content}"
if key not in seen_entities:
seen_entities[key] = claim
base_session.entity_claims.append(claim)
claim_source_pages[claim.claim_id] = page_path
# Merge aggregate claims (keep all)
for claim in session.aggregate_claims:
base_session.aggregate_claims.append(claim)
claim_source_pages[claim.claim_id] = page_path
# Merge layout claims
for claim in session.layout_claims:
base_session.layout_claims.append(claim)
claim_source_pages[claim.claim_id] = page_path
# Merge relationship claims (deduplicate by subject+predicate+object)
seen_rels = {
f"{r.subject.span_text}:{r.predicate.label}:{r.object.span_text}"
for r in base_session.relationship_claims
}
for claim in session.relationship_claims:
key = f"{claim.subject.span_text}:{claim.predicate.label}:{claim.object.span_text}"
if key not in seen_rels:
base_session.relationship_claims.append(claim)
claim_source_pages[claim.claim_id] = page_path
seen_rels.add(key)
# Merge errors
base_session.errors.extend(session.errors)
return base_session, merged_data, field_sources, claim_source_pages
def get_source_url(entry: Dict[str, Any]) -> Optional[str]:
"""Extract the source URL from an entry."""
# Try various fields
if 'original_entry' in entry:
url = entry['original_entry'].get('webadres_organisatie')
if url:
return url
web_enrichment = entry.get('web_enrichment', {})
web_archives = web_enrichment.get('web_archives', [])
if web_archives:
return web_archives[0].get('url')
google_maps = entry.get('google_maps_enrichment', {})
if google_maps.get('website'):
return google_maps['website']
return None
async def process_entry(
annotator: LLMAnnotator,
schema: GLAMSchema,
entry_path: Path,
output_dir: Path,
force: bool = False,
multi_page: bool = True,
max_pages: int = MAX_PAGES_PER_ENTRY,
) -> Tuple[bool, Optional[Dict[str, Any]]]:
"""
Process a single NDE entry with optional multi-page annotation.
Args:
annotator: LLM annotator instance
schema: GLAMSchema for extraction
entry_path: Path to entry YAML file
output_dir: Output directory for results
force: Force reprocessing even if output exists
multi_page: If True, process key pages and merge results (default True)
max_pages: Maximum pages to process per entry
Returns:
Tuple of (success, result_dict)
"""
entry_name = entry_path.stem
entry_index = entry_name.split('_')[0]
output_path = output_dir / f"{entry_name}_llm.yaml"
# Skip if already processed
if output_path.exists() and not force:
logger.info(f"Skipping {entry_name} (already processed)")
return True, None
# Load entry
entry = load_nde_entry(entry_path)
if not entry:
return False, {"error": "Failed to load entry"}
source_url = get_source_url(entry)
# Find pages to process
if multi_page:
key_pages = find_key_pages(entry, entry_index, max_pages)
if not key_pages:
# Fallback to single page
html_path = find_web_archive(entry, entry_index)
if not html_path:
logger.warning(f"No web archive found for {entry_name}")
return False, {"error": "No web archive found"}
key_pages = [(html_path, str(html_path), 10)]
else:
# Single page mode (legacy)
html_path = find_web_archive(entry, entry_index)
if not html_path:
logger.warning(f"No web archive found for {entry_name}")
return False, {"error": "No web archive found"}
key_pages = [(html_path, str(html_path), 10)]
logger.info(f"Processing {entry_name}: {len(key_pages)} page(s)")
try:
# Process each page
page_sessions: List[Tuple[Any, Dict[str, Any], str]] = []
pages_processed: List[Dict[str, Any]] = []
for html_path, relative_path, priority in key_pages:
try:
logger.debug(f" → Annotating: {relative_path} (priority {priority})")
session, structured_data = await annotator.annotate_with_schema(
document=html_path,
schema=schema,
source_url=source_url,
validate_output=True,
)
page_sessions.append((session, structured_data, relative_path))
pages_processed.append({
"path": relative_path,
"priority": priority,
"entities_found": len(session.entity_claims),
"claims_found": len(session.aggregate_claims),
})
except Exception as e:
logger.warning(f" ✗ Failed to annotate {relative_path}: {e}")
pages_processed.append({
"path": relative_path,
"priority": priority,
"error": str(e),
})
continue
if not page_sessions:
return False, {"error": "No pages were successfully annotated"}
# Merge results from all pages
merged_session, merged_data, field_sources, claim_source_pages = merge_annotation_sessions(page_sessions)
# Build result
result = {
"entry_index": entry_index,
"entry_name": entry_name,
"source_file": str(entry_path),
"source_url": source_url,
"processing_timestamp": datetime.now(timezone.utc).isoformat(),
"schema_name": schema.name,
"multi_page_annotation": {
"enabled": multi_page,
"pages_found": len(key_pages),
"pages_processed": len([p for p in pages_processed if "error" not in p]),
"pages": pages_processed,
"field_sources": field_sources,
},
"structured_data": merged_data,
"entities_count": len(merged_session.entity_claims),
"claims_count": len(merged_session.aggregate_claims),
"layout_regions_count": len(merged_session.layout_claims),
"relationships_count": len(merged_session.relationship_claims),
"errors": merged_session.errors,
"provenance": {
"annotator": "LLMAnnotator",
"schema": schema.name,
"session_id": merged_session.session_id,
"completed_at": merged_session.completed_at,
"multi_page": multi_page,
}
}
# Add entity details with source page provenance
if merged_session.entity_claims:
result["entities"] = [
{
"claim_id": claim.claim_id,
"hypernym": claim.hypernym.value if hasattr(claim.hypernym, 'value') else str(claim.hypernym),
"hyponym": claim.hyponym,
"text": claim.text_content,
"iob_label": claim.iob_label,
"confidence": claim.recognition_confidence,
"xpath": claim.provenance.path if claim.provenance else None,
"source_page": claim_source_pages.get(claim.claim_id),
}
for claim in merged_session.entity_claims
]
# Add aggregate claims with source page provenance
if merged_session.aggregate_claims:
result["claims"] = [
{
"claim_id": claim.claim_id,
"type": claim.claim_type,
"value": claim.claim_value,
"confidence": claim.provenance.confidence if claim.provenance else None,
"xpath": claim.provenance.path if claim.provenance else None,
"source_page": claim_source_pages.get(claim.claim_id),
}
for claim in merged_session.aggregate_claims
]
# Add layout regions with source page provenance
if merged_session.layout_claims:
result["layout_regions"] = [
{
"claim_id": claim.claim_id,
"region": claim.region.value if hasattr(claim.region, 'value') else str(claim.region) if claim.region else None,
"semantic_role": claim.semantic_role.value if hasattr(claim.semantic_role, 'value') else str(claim.semantic_role) if claim.semantic_role else None,
"text_content": claim.text_content[:200] + "..." if claim.text_content and len(claim.text_content) > 200 else claim.text_content,
"xpath": claim.xpath,
"css_selector": claim.css_selector,
"page_number": claim.page_number,
"source_page": claim_source_pages.get(claim.claim_id),
}
for claim in merged_session.layout_claims
]
# Add relationship claims with source page provenance
if merged_session.relationship_claims:
result["relationships"] = [
{
"claim_id": claim.claim_id,
"type": claim.relationship_hyponym or (claim.relationship_hypernym.value if claim.relationship_hypernym else None),
"subject": claim.subject.span_text if claim.subject else None,
"predicate": claim.predicate.label if claim.predicate else None,
"object": claim.object.span_text if claim.object else None,
"confidence": claim.extraction_confidence,
"source_page": claim_source_pages.get(claim.claim_id),
}
for claim in merged_session.relationship_claims
]
# Save result
output_dir.mkdir(parents=True, exist_ok=True)
with open(output_path, 'w', encoding='utf-8') as f:
yaml.dump(result, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
logger.info(
f"{entry_name}: {len(merged_session.entity_claims)} entities, "
f"{len(merged_session.aggregate_claims)} claims, "
f"{len(merged_data)} structured fields "
f"(from {len([p for p in pages_processed if 'error' not in p])} pages)"
)
return True, result
except Exception as e:
logger.error(f"Failed to process {entry_name}: {e}")
return False, {"error": str(e)}
async def process_batch(
entries: List[Path],
schema: GLAMSchema,
concurrency: int = 3,
force: bool = False,
multi_page: bool = True,
max_pages: int = MAX_PAGES_PER_ENTRY,
) -> Dict[str, Any]:
"""
Process a batch of NDE entries.
Args:
entries: List of entry file paths
schema: GLAMSchema for extraction
concurrency: Maximum concurrent requests
force: Force reprocessing
multi_page: If True, process multiple pages per entry (default True)
max_pages: Maximum pages to process per entry
Returns:
Summary dict with statistics
"""
# Create annotator
annotator = LLMAnnotator(LLMAnnotatorConfig(
provider=LLMProvider.ZAI,
model="glm-4.6",
))
output_dir = OUTPUT_DIR
# Process with semaphore for rate limiting
semaphore = asyncio.Semaphore(concurrency)
async def process_with_limit(entry_path: Path):
async with semaphore:
return await process_entry(annotator, schema, entry_path, output_dir, force, multi_page, max_pages)
# Run all tasks
tasks = [process_with_limit(entry) for entry in entries]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Collect statistics
success_count = 0
error_count = 0
skip_count = 0
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.error(f"Task {i} failed with exception: {result}")
error_count += 1
elif result[0]: # success
if result[1] is None: # skipped
skip_count += 1
else:
success_count += 1
else:
error_count += 1
return {
"total": len(entries),
"success": success_count,
"skipped": skip_count,
"errors": error_count,
"schema": schema.name,
"multi_page": multi_page,
"timestamp": datetime.now(timezone.utc).isoformat(),
}
def find_entries_with_web_archives() -> List[Path]:
"""Find all NDE entries that have web archives."""
entries = []
for entry_path in NDE_ENTRIES_DIR.glob("*.yaml"):
entry = load_nde_entry(entry_path)
if not entry:
continue
entry_index = entry_path.stem.split('_')[0]
html_path = find_web_archive(entry, entry_index)
if html_path:
entries.append(entry_path)
return sorted(entries)
# =============================================================================
# MAIN
# =============================================================================
def main():
parser = argparse.ArgumentParser(
description="Batch LLM annotation of NDE entries"
)
parser.add_argument(
"--entries",
nargs="+",
help="Specific entry indices to process (e.g., 1631 1586)"
)
parser.add_argument(
"--schema",
choices=["heritage_custodian", "heritage_institution", "web_observation"],
default="heritage_institution",
help="Schema to use for extraction"
)
parser.add_argument(
"--concurrency",
type=int,
default=3,
help="Maximum concurrent requests (default: 3)"
)
parser.add_argument(
"--force",
action="store_true",
help="Force reprocessing of already processed entries"
)
parser.add_argument(
"--single-page",
action="store_true",
help="Process only homepage (disable multi-page annotation)"
)
parser.add_argument(
"--max-pages",
type=int,
default=MAX_PAGES_PER_ENTRY,
help=f"Maximum pages to process per entry (default: {MAX_PAGES_PER_ENTRY})"
)
parser.add_argument(
"--list",
action="store_true",
help="List available entries with web archives and exit"
)
args = parser.parse_args()
# Select schema
if args.schema == "heritage_custodian":
schema = heritage_custodian_schema()
elif args.schema == "heritage_institution":
schema = heritage_institution_schema()
elif args.schema == "web_observation":
schema = web_observation_schema()
else:
schema = heritage_institution_schema()
# Find entries to process
if args.entries:
# Process specific entries
entries = []
for entry_idx in args.entries:
matches = list(NDE_ENTRIES_DIR.glob(f"{entry_idx}_*.yaml"))
entries.extend(matches)
else:
# Find all entries with web archives
entries = find_entries_with_web_archives()
if args.list:
print(f"\n📋 Found {len(entries)} entries with web archives:\n")
for entry in entries:
print(f"{entry.stem}")
return
if not entries:
print("No entries found to process.")
return
multi_page = not args.single_page
max_pages = args.max_pages
print(f"\n🚀 Processing {len(entries)} entries with schema: {schema.name}")
print(f" Concurrency: {args.concurrency}")
print(f" Multi-page: {multi_page} (max {max_pages} pages)")
print(f" Output: {OUTPUT_DIR}\n")
# Run batch processing
summary = asyncio.run(process_batch(
entries=entries,
schema=schema,
concurrency=args.concurrency,
force=args.force,
multi_page=multi_page,
max_pages=max_pages,
))
# Print summary
print(f"\n{'='*50}")
print(f"📊 BATCH PROCESSING SUMMARY")
print(f"{'='*50}")
print(f" Total: {summary['total']}")
print(f" Success: {summary['success']}")
print(f" Skipped: {summary['skipped']}")
print(f" Errors: {summary['errors']}")
print(f" Schema: {summary['schema']}")
print(f" Multi-page: {summary['multi_page']}")
print(f"{'='*50}\n")
if __name__ == "__main__":
main()