#!/usr/bin/env python3 """ Batch LLM Annotation Script for NDE Entries. This script processes NDE (Network Digital Heritage) entry files using the schema-driven LLM annotator to extract structured data from archived web pages. Usage: # Process all entries with web archives python scripts/batch_annotate_nde.py # Process specific entries python scripts/batch_annotate_nde.py --entries 1631 1586 0182 # Use different schema python scripts/batch_annotate_nde.py --schema heritage_custodian # Limit concurrency python scripts/batch_annotate_nde.py --concurrency 2 Features: - Schema-driven extraction using GLAMSchema - GLiNER2-style field specifications - Concurrent processing with rate limiting - YAML output with provenance tracking - Resume capability (skips already processed) """ import argparse import asyncio import logging import sys from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional, Tuple import yaml # Add src directory to path for imports sys.path.insert(0, str(Path(__file__).parent.parent / "src")) from dotenv import load_dotenv load_dotenv() from glam_extractor.annotators.llm_annotator import ( LLMAnnotator, LLMAnnotatorConfig, LLMProvider, AnnotationSession, ) from glam_extractor.annotators.schema_builder import ( GLAMSchema, FieldSpec, heritage_custodian_schema, web_observation_schema, ) # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler('batch_annotate.log'), ] ) logger = logging.getLogger(__name__) # ============================================================================= # CONSTANTS # ============================================================================= NDE_ENTRIES_DIR = Path("data/nde/enriched/entries") WEB_ARCHIVES_DIR = Path("data/nde/enriched/entries/web") OUTPUT_DIR = Path("data/nde/enriched/llm_annotations") # ============================================================================= # CUSTOM SCHEMAS # ============================================================================= def heritage_institution_schema() -> GLAMSchema: """ Schema optimized for NDE heritage institution web pages. Uses GLiNER2-style field specifications. """ return ( GLAMSchema("nde_heritage_institution") .entities("GRP", "TOP", "AGT", "TMP", "APP", "QTY") .classification( "institution_type", choices=[ "MUSEUM", "ARCHIVE", "LIBRARY", "GALLERY", "COLLECTING_SOCIETY", "HISTORICAL_SOCIETY", "RESEARCH_CENTER", "HOLY_SITE", "BOTANICAL_ZOO", "EDUCATION_PROVIDER", "DIGITAL_PLATFORM", "UNKNOWN" ], description="Primary classification of the heritage institution" ) .structure() # Core identification .field("full_name", dtype="str", required=True, description="Official institution name as it appears on the website") .field("short_name", dtype="str", description="Abbreviated name or acronym") .field("alternative_names", dtype="list", description="Other names, translations, historical names") # Contact information .field("email", dtype="str", description="Primary contact email", pattern=r"^[^@]+@[^@]+\.[^@]+$") .field("phone", dtype="str", description="Contact phone number") .field("street_address", dtype="str", description="Street address") .field("postal_code", dtype="str", description="Postal code") .field("city", dtype="str", description="City name") # Web presence .field("website", dtype="str", description="Official website URL") .field("social_facebook", dtype="str", description="Facebook page URL") .field("social_instagram", dtype="str", description="Instagram profile URL") .field("social_twitter", dtype="str", description="Twitter/X profile URL") .field("social_linkedin", dtype="str", description="LinkedIn page URL") .field("social_youtube", dtype="str", description="YouTube channel URL") # Operational info .field("opening_hours", dtype="str", description="Opening hours description") .field("admission_info", dtype="str", description="Admission/ticket information") .field("accessibility_info", dtype="str", description="Accessibility information") # Institutional info .field("description", dtype="str", description="Brief description of the institution") .field("founding_date", dtype="str", description="When the institution was founded") .field("collection_description", dtype="str", description="Description of collections/holdings") .field("collection_size", dtype="str", description="Size of collections (e.g., '1000 artworks')") # Identifiers .field("kvk_number", dtype="str", description="Dutch Chamber of Commerce number", pattern=r"^\d{8}$") .field("isil_code", dtype="str", description="ISIL identifier", pattern=r"^[A-Z]{2}-[A-Za-z0-9]+$") .field("wikidata_id", dtype="str", description="Wikidata Q-number", pattern=r"^Q\d+$") # Relationships .field("parent_organization", dtype="str", description="Parent or umbrella organization") .field("affiliated_organizations", dtype="list", description="Partner or affiliated institutions") .claims() .relations("located_in", "part_of", "operates", "founded_by") .build() ) # ============================================================================= # ENTRY PROCESSING # ============================================================================= def load_nde_entry(entry_path: Path) -> Optional[Dict[str, Any]]: """Load an NDE entry YAML file.""" try: with open(entry_path, 'r', encoding='utf-8') as f: return yaml.safe_load(f) except Exception as e: logger.error(f"Failed to load {entry_path}: {e}") return None def find_web_archive(entry: Dict[str, Any], entry_index: str) -> Optional[Path]: """Find the web archive HTML file for an entry (single page, for compatibility).""" # Check web_enrichment field web_enrichment = entry.get('web_enrichment', {}) web_archives = web_enrichment.get('web_archives', []) for archive in web_archives: archive_dir = archive.get('directory') if archive_dir: html_path = Path(archive_dir) / "rendered.html" if html_path.exists(): return html_path # Fallback: look in web/{entry_index}/ fallback_dir = WEB_ARCHIVES_DIR / entry_index if fallback_dir.exists(): for html_file in fallback_dir.glob("**/rendered.html"): return html_file return None # ============================================================================= # MULTI-PAGE KEY PAGE SELECTION # ============================================================================= # Priority patterns for identifying key pages (ordered by importance) KEY_PAGE_PATTERNS = [ # Homepage / index (r'/index\.html$', 10), (r'^index\.html$', 10), (r'/home\.html$', 9), # About pages (institution info) (r'about|over-ons|over|about-us|wie-zijn-wij|organisatie', 8), # Contact pages (addresses, emails, phones) (r'contact|kontakt|bereikbaarheid', 8), # Collection pages (r'collection|collectie|holdings|catalogus', 7), # Visit/practical pages (opening hours, accessibility) (r'bezoek|visit|praktisch|openingstijden|toegankelijkheid|bereikbaar', 7), # Mission/history (r'mission|missie|history|geschiedenis|visie', 6), # Staff pages (people) (r'team|staff|medewerkers|personeel|mensen', 5), ] # Maximum number of pages to process per entry MAX_PAGES_PER_ENTRY = 10 def find_key_pages(entry: Dict[str, Any], entry_index: str, max_pages: int = MAX_PAGES_PER_ENTRY) -> List[Tuple[Path, str, int]]: """ Find key pages in a web archive for multi-page annotation. Returns pages ordered by importance score. Args: entry: Entry data dict entry_index: Entry index string (e.g., "0027") max_pages: Maximum number of pages to return Returns: List of (path, relative_path, priority_score) tuples, sorted by score descending """ import re pages: List[Tuple[Path, str, int]] = [] seen_paths: set = set() # Determine archive directory archive_dir = None web_enrichment = entry.get('web_enrichment', {}) web_archives = web_enrichment.get('web_archives', []) for archive in web_archives: dir_path = archive.get('directory') if dir_path and Path(dir_path).exists(): archive_dir = Path(dir_path) break # Fallback to web/{entry_index}/ if not archive_dir: fallback_dir = WEB_ARCHIVES_DIR / entry_index if fallback_dir.exists(): archive_dir = fallback_dir if not archive_dir: return [] # Find all HTML files all_html_files = list(archive_dir.rglob("*.html")) # Score each file based on patterns for html_file in all_html_files: relative_path = str(html_file.relative_to(archive_dir)).lower() # Skip temporary files and duplicates if '.tmp.' in relative_path or relative_path in seen_paths: continue seen_paths.add(relative_path) # Calculate priority score score = 0 for pattern, priority in KEY_PAGE_PATTERNS: if re.search(pattern, relative_path, re.IGNORECASE): score = max(score, priority) break # Boost rendered.html (main homepage capture) if relative_path.endswith('rendered.html'): score = max(score, 10) # Add base score for index pages if 'index' in relative_path and score == 0: score = 3 # Skip completely unscored pages (low value) if score > 0: pages.append((html_file, relative_path, score)) # Sort by score descending, then by path length (shorter = more important) pages.sort(key=lambda x: (-x[2], len(x[1]))) # Limit to max pages return pages[:max_pages] def merge_annotation_sessions( sessions: List[Tuple[Any, Dict[str, Any], str]], ) -> Tuple[Any, Dict[str, Any], Dict[str, List[str]], Dict[str, str]]: """ Merge multiple annotation sessions from different pages. Deduplicates entities and aggregates claims, tracking page-level provenance. Args: sessions: List of (session, structured_data, page_path) tuples Returns: Tuple of (merged_session, merged_structured_data, field_to_pages_map, claim_id_to_page_map) """ from dataclasses import fields as dataclass_fields if not sessions: return None, {}, {}, {} # Use first session as base base_session, base_data, base_path = sessions[0] merged_data: Dict[str, Any] = dict(base_data) if base_data else {} field_sources: Dict[str, List[str]] = {} # Map claim_id -> source_page for ALL claims claim_source_pages: Dict[str, str] = {} # Track which pages contributed each field for key in merged_data: field_sources[key] = [base_path] # Track seen entity text to deduplicate seen_entities: Dict[str, Any] = {} # Tag all claims from base session with source page for claim in base_session.entity_claims: key = f"{claim.hypernym.value}:{claim.text_content}" seen_entities[key] = claim claim_source_pages[claim.claim_id] = base_path for claim in base_session.aggregate_claims: claim_source_pages[claim.claim_id] = base_path for claim in base_session.layout_claims: claim_source_pages[claim.claim_id] = base_path for claim in base_session.relationship_claims: claim_source_pages[claim.claim_id] = base_path # Merge additional sessions for session, structured_data, page_path in sessions[1:]: # Merge structured data (prefer non-None values) if structured_data: for key, value in structured_data.items(): if value is not None and value != "": if key not in merged_data or merged_data[key] is None or merged_data[key] == "": merged_data[key] = value field_sources.setdefault(key, []).append(page_path) elif isinstance(value, list) and isinstance(merged_data.get(key), list): # Merge lists (e.g., alternative_names) existing = set(str(v) for v in merged_data[key]) for item in value: if str(item) not in existing: merged_data[key].append(item) field_sources.setdefault(key, []).append(page_path) existing.add(str(item)) # Merge entities (deduplicate by hypernym:text) for claim in session.entity_claims: key = f"{claim.hypernym.value}:{claim.text_content}" if key not in seen_entities: seen_entities[key] = claim base_session.entity_claims.append(claim) claim_source_pages[claim.claim_id] = page_path # Merge aggregate claims (keep all) for claim in session.aggregate_claims: base_session.aggregate_claims.append(claim) claim_source_pages[claim.claim_id] = page_path # Merge layout claims for claim in session.layout_claims: base_session.layout_claims.append(claim) claim_source_pages[claim.claim_id] = page_path # Merge relationship claims (deduplicate by subject+predicate+object) seen_rels = { f"{r.subject.span_text}:{r.predicate.label}:{r.object.span_text}" for r in base_session.relationship_claims } for claim in session.relationship_claims: key = f"{claim.subject.span_text}:{claim.predicate.label}:{claim.object.span_text}" if key not in seen_rels: base_session.relationship_claims.append(claim) claim_source_pages[claim.claim_id] = page_path seen_rels.add(key) # Merge errors base_session.errors.extend(session.errors) return base_session, merged_data, field_sources, claim_source_pages def get_source_url(entry: Dict[str, Any]) -> Optional[str]: """Extract the source URL from an entry.""" # Try various fields if 'original_entry' in entry: url = entry['original_entry'].get('webadres_organisatie') if url: return url web_enrichment = entry.get('web_enrichment', {}) web_archives = web_enrichment.get('web_archives', []) if web_archives: return web_archives[0].get('url') google_maps = entry.get('google_maps_enrichment', {}) if google_maps.get('website'): return google_maps['website'] return None async def process_entry( annotator: LLMAnnotator, schema: GLAMSchema, entry_path: Path, output_dir: Path, force: bool = False, multi_page: bool = True, max_pages: int = MAX_PAGES_PER_ENTRY, ) -> Tuple[bool, Optional[Dict[str, Any]]]: """ Process a single NDE entry with optional multi-page annotation. Args: annotator: LLM annotator instance schema: GLAMSchema for extraction entry_path: Path to entry YAML file output_dir: Output directory for results force: Force reprocessing even if output exists multi_page: If True, process key pages and merge results (default True) max_pages: Maximum pages to process per entry Returns: Tuple of (success, result_dict) """ entry_name = entry_path.stem entry_index = entry_name.split('_')[0] output_path = output_dir / f"{entry_name}_llm.yaml" # Skip if already processed if output_path.exists() and not force: logger.info(f"Skipping {entry_name} (already processed)") return True, None # Load entry entry = load_nde_entry(entry_path) if not entry: return False, {"error": "Failed to load entry"} source_url = get_source_url(entry) # Find pages to process if multi_page: key_pages = find_key_pages(entry, entry_index, max_pages) if not key_pages: # Fallback to single page html_path = find_web_archive(entry, entry_index) if not html_path: logger.warning(f"No web archive found for {entry_name}") return False, {"error": "No web archive found"} key_pages = [(html_path, str(html_path), 10)] else: # Single page mode (legacy) html_path = find_web_archive(entry, entry_index) if not html_path: logger.warning(f"No web archive found for {entry_name}") return False, {"error": "No web archive found"} key_pages = [(html_path, str(html_path), 10)] logger.info(f"Processing {entry_name}: {len(key_pages)} page(s)") try: # Process each page page_sessions: List[Tuple[Any, Dict[str, Any], str]] = [] pages_processed: List[Dict[str, Any]] = [] for html_path, relative_path, priority in key_pages: try: logger.debug(f" → Annotating: {relative_path} (priority {priority})") session, structured_data = await annotator.annotate_with_schema( document=html_path, schema=schema, source_url=source_url, validate_output=True, ) page_sessions.append((session, structured_data, relative_path)) pages_processed.append({ "path": relative_path, "priority": priority, "entities_found": len(session.entity_claims), "claims_found": len(session.aggregate_claims), }) except Exception as e: logger.warning(f" āœ— Failed to annotate {relative_path}: {e}") pages_processed.append({ "path": relative_path, "priority": priority, "error": str(e), }) continue if not page_sessions: return False, {"error": "No pages were successfully annotated"} # Merge results from all pages merged_session, merged_data, field_sources, claim_source_pages = merge_annotation_sessions(page_sessions) # Build result result = { "entry_index": entry_index, "entry_name": entry_name, "source_file": str(entry_path), "source_url": source_url, "processing_timestamp": datetime.now(timezone.utc).isoformat(), "schema_name": schema.name, "multi_page_annotation": { "enabled": multi_page, "pages_found": len(key_pages), "pages_processed": len([p for p in pages_processed if "error" not in p]), "pages": pages_processed, "field_sources": field_sources, }, "structured_data": merged_data, "entities_count": len(merged_session.entity_claims), "claims_count": len(merged_session.aggregate_claims), "layout_regions_count": len(merged_session.layout_claims), "relationships_count": len(merged_session.relationship_claims), "errors": merged_session.errors, "provenance": { "annotator": "LLMAnnotator", "schema": schema.name, "session_id": merged_session.session_id, "completed_at": merged_session.completed_at, "multi_page": multi_page, } } # Add entity details with source page provenance if merged_session.entity_claims: result["entities"] = [ { "claim_id": claim.claim_id, "hypernym": claim.hypernym.value if hasattr(claim.hypernym, 'value') else str(claim.hypernym), "hyponym": claim.hyponym, "text": claim.text_content, "iob_label": claim.iob_label, "confidence": claim.recognition_confidence, "xpath": claim.provenance.path if claim.provenance else None, "source_page": claim_source_pages.get(claim.claim_id), } for claim in merged_session.entity_claims ] # Add aggregate claims with source page provenance if merged_session.aggregate_claims: result["claims"] = [ { "claim_id": claim.claim_id, "type": claim.claim_type, "value": claim.claim_value, "confidence": claim.provenance.confidence if claim.provenance else None, "xpath": claim.provenance.path if claim.provenance else None, "source_page": claim_source_pages.get(claim.claim_id), } for claim in merged_session.aggregate_claims ] # Add layout regions with source page provenance if merged_session.layout_claims: result["layout_regions"] = [ { "claim_id": claim.claim_id, "region": claim.region.value if hasattr(claim.region, 'value') else str(claim.region) if claim.region else None, "semantic_role": claim.semantic_role.value if hasattr(claim.semantic_role, 'value') else str(claim.semantic_role) if claim.semantic_role else None, "text_content": claim.text_content[:200] + "..." if claim.text_content and len(claim.text_content) > 200 else claim.text_content, "xpath": claim.xpath, "css_selector": claim.css_selector, "page_number": claim.page_number, "source_page": claim_source_pages.get(claim.claim_id), } for claim in merged_session.layout_claims ] # Add relationship claims with source page provenance if merged_session.relationship_claims: result["relationships"] = [ { "claim_id": claim.claim_id, "type": claim.relationship_hyponym or (claim.relationship_hypernym.value if claim.relationship_hypernym else None), "subject": claim.subject.span_text if claim.subject else None, "predicate": claim.predicate.label if claim.predicate else None, "object": claim.object.span_text if claim.object else None, "confidence": claim.extraction_confidence, "source_page": claim_source_pages.get(claim.claim_id), } for claim in merged_session.relationship_claims ] # Save result output_dir.mkdir(parents=True, exist_ok=True) with open(output_path, 'w', encoding='utf-8') as f: yaml.dump(result, f, default_flow_style=False, allow_unicode=True, sort_keys=False) logger.info( f"āœ“ {entry_name}: {len(merged_session.entity_claims)} entities, " f"{len(merged_session.aggregate_claims)} claims, " f"{len(merged_data)} structured fields " f"(from {len([p for p in pages_processed if 'error' not in p])} pages)" ) return True, result except Exception as e: logger.error(f"Failed to process {entry_name}: {e}") return False, {"error": str(e)} async def process_batch( entries: List[Path], schema: GLAMSchema, concurrency: int = 3, force: bool = False, multi_page: bool = True, max_pages: int = MAX_PAGES_PER_ENTRY, ) -> Dict[str, Any]: """ Process a batch of NDE entries. Args: entries: List of entry file paths schema: GLAMSchema for extraction concurrency: Maximum concurrent requests force: Force reprocessing multi_page: If True, process multiple pages per entry (default True) max_pages: Maximum pages to process per entry Returns: Summary dict with statistics """ # Create annotator annotator = LLMAnnotator(LLMAnnotatorConfig( provider=LLMProvider.ZAI, model="glm-4.6", )) output_dir = OUTPUT_DIR # Process with semaphore for rate limiting semaphore = asyncio.Semaphore(concurrency) async def process_with_limit(entry_path: Path): async with semaphore: return await process_entry(annotator, schema, entry_path, output_dir, force, multi_page, max_pages) # Run all tasks tasks = [process_with_limit(entry) for entry in entries] results = await asyncio.gather(*tasks, return_exceptions=True) # Collect statistics success_count = 0 error_count = 0 skip_count = 0 for i, result in enumerate(results): if isinstance(result, Exception): logger.error(f"Task {i} failed with exception: {result}") error_count += 1 elif result[0]: # success if result[1] is None: # skipped skip_count += 1 else: success_count += 1 else: error_count += 1 return { "total": len(entries), "success": success_count, "skipped": skip_count, "errors": error_count, "schema": schema.name, "multi_page": multi_page, "timestamp": datetime.now(timezone.utc).isoformat(), } def find_entries_with_web_archives() -> List[Path]: """Find all NDE entries that have web archives.""" entries = [] for entry_path in NDE_ENTRIES_DIR.glob("*.yaml"): entry = load_nde_entry(entry_path) if not entry: continue entry_index = entry_path.stem.split('_')[0] html_path = find_web_archive(entry, entry_index) if html_path: entries.append(entry_path) return sorted(entries) # ============================================================================= # MAIN # ============================================================================= def main(): parser = argparse.ArgumentParser( description="Batch LLM annotation of NDE entries" ) parser.add_argument( "--entries", nargs="+", help="Specific entry indices to process (e.g., 1631 1586)" ) parser.add_argument( "--schema", choices=["heritage_custodian", "heritage_institution", "web_observation"], default="heritage_institution", help="Schema to use for extraction" ) parser.add_argument( "--concurrency", type=int, default=3, help="Maximum concurrent requests (default: 3)" ) parser.add_argument( "--force", action="store_true", help="Force reprocessing of already processed entries" ) parser.add_argument( "--single-page", action="store_true", help="Process only homepage (disable multi-page annotation)" ) parser.add_argument( "--max-pages", type=int, default=MAX_PAGES_PER_ENTRY, help=f"Maximum pages to process per entry (default: {MAX_PAGES_PER_ENTRY})" ) parser.add_argument( "--list", action="store_true", help="List available entries with web archives and exit" ) args = parser.parse_args() # Select schema if args.schema == "heritage_custodian": schema = heritage_custodian_schema() elif args.schema == "heritage_institution": schema = heritage_institution_schema() elif args.schema == "web_observation": schema = web_observation_schema() else: schema = heritage_institution_schema() # Find entries to process if args.entries: # Process specific entries entries = [] for entry_idx in args.entries: matches = list(NDE_ENTRIES_DIR.glob(f"{entry_idx}_*.yaml")) entries.extend(matches) else: # Find all entries with web archives entries = find_entries_with_web_archives() if args.list: print(f"\nšŸ“‹ Found {len(entries)} entries with web archives:\n") for entry in entries: print(f" • {entry.stem}") return if not entries: print("No entries found to process.") return multi_page = not args.single_page max_pages = args.max_pages print(f"\nšŸš€ Processing {len(entries)} entries with schema: {schema.name}") print(f" Concurrency: {args.concurrency}") print(f" Multi-page: {multi_page} (max {max_pages} pages)") print(f" Output: {OUTPUT_DIR}\n") # Run batch processing summary = asyncio.run(process_batch( entries=entries, schema=schema, concurrency=args.concurrency, force=args.force, multi_page=multi_page, max_pages=max_pages, )) # Print summary print(f"\n{'='*50}") print(f"šŸ“Š BATCH PROCESSING SUMMARY") print(f"{'='*50}") print(f" Total: {summary['total']}") print(f" Success: {summary['success']}") print(f" Skipped: {summary['skipped']}") print(f" Errors: {summary['errors']}") print(f" Schema: {summary['schema']}") print(f" Multi-page: {summary['multi_page']}") print(f"{'='*50}\n") if __name__ == "__main__": main()