#!/usr/bin/env python3 """ Generate Missing Web Annotations for data/custodian/web/ Archives. Processes web archives that have HTML files but are missing annotations_v1.7.0.yaml. Uses ZAI GLM-4 LLM annotator following CH-Annotator v1.7.0 convention. Structure expected: data/custodian/web/{entry_id}/{domain}/ ├── metadata.yaml ├── index.html (or rendered.html) └── annotations_v1.7.0.yaml (to be created) Usage: python scripts/generate_missing_annotations.py --dry-run # Preview what would be processed python scripts/generate_missing_annotations.py --test 5 # Test on 5 entries python scripts/generate_missing_annotations.py --batch 50 # Process 50 entries python scripts/generate_missing_annotations.py --all # Process all missing python scripts/generate_missing_annotations.py --resume # Resume from checkpoint """ import argparse import asyncio import json import logging import os import sys import time import traceback from dataclasses import dataclass, field from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional, Tuple import yaml # Load environment variables from .env file try: from dotenv import load_dotenv env_path = Path(__file__).parent.parent / ".env" if env_path.exists(): load_dotenv(env_path) except ImportError: pass # Add src to path for imports src_path = Path(__file__).parent.parent / "src" sys.path.insert(0, str(src_path)) # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler('missing_annotations.log'), ] ) logger = logging.getLogger(__name__) @dataclass class ProcessingResult: """Result of processing a single entry.""" entry_id: str domain: str success: bool entity_count: int = 0 claim_count: int = 0 relationship_count: int = 0 layout_count: int = 0 errors: List[str] = field(default_factory=list) processing_time_seconds: float = 0.0 output_file: Optional[str] = None @dataclass class BatchReport: """Summary report for batch processing.""" started_at: str completed_at: Optional[str] = None total_entries: int = 0 processed_entries: int = 0 successful_entries: int = 0 failed_entries: int = 0 skipped_entries: int = 0 total_entities: int = 0 total_claims: int = 0 total_relationships: int = 0 total_layout_regions: int = 0 avg_processing_time: float = 0.0 errors: List[Dict[str, Any]] = field(default_factory=list) def find_missing_annotations(base_path: Path) -> List[Tuple[str, Path]]: """ Find web archives with HTML but missing annotations_v1.7.0.yaml. Structure: data/custodian/web/{entry_id}/{domain}/ Returns: List of (entry_id/domain, entry_path) tuples """ entries = [] for entry_dir in sorted(base_path.iterdir()): if not entry_dir.is_dir() or not entry_dir.name.isdigit(): continue entry_id = entry_dir.name for domain_dir in entry_dir.iterdir(): if not domain_dir.is_dir(): continue # Check if metadata exists (valid archive) metadata_file = domain_dir / "metadata.yaml" if not metadata_file.exists(): continue # Check if already has annotations annotations_file = domain_dir / "annotations_v1.7.0.yaml" if annotations_file.exists(): continue # Check if has HTML to process has_html = (domain_dir / "index.html").exists() or (domain_dir / "rendered.html").exists() if not has_html: continue entries.append((f"{entry_id}/{domain_dir.name}", domain_dir)) return entries def get_html_content(entry_path: Path) -> Tuple[Optional[str], Optional[str]]: """ Get HTML content from the entry directory. Prefers rendered.html over index.html (cleaner content). Returns: Tuple of (html_content, html_file_path) """ # Prefer rendered HTML (usually cleaner) html_file = entry_path / "rendered.html" if not html_file.exists(): html_file = entry_path / "index.html" if not html_file.exists(): return None, None try: with open(html_file, 'r', encoding='utf-8', errors='replace') as f: content = f.read() return content, str(html_file.relative_to(entry_path.parent.parent.parent)) except Exception as e: logger.error(f"Failed to read {html_file}: {e}") return None, None def load_metadata(entry_path: Path) -> Optional[Dict[str, Any]]: """Load existing metadata.yaml for an entry.""" metadata_file = entry_path / "metadata.yaml" if metadata_file.exists(): with open(metadata_file, 'r', encoding='utf-8') as f: return yaml.safe_load(f) return None async def process_entry( entry_id: str, entry_path: Path, annotator, uncertainty_detector, ) -> ProcessingResult: """ Process a single web archive and extract annotations. Args: entry_id: Entry identifier (e.g., "1600/luthermuseum.nl") entry_path: Path to entry directory annotator: LLM annotator instance uncertainty_detector: Uncertainty/hedging detector Returns: ProcessingResult with status """ start_time = time.time() result = ProcessingResult( entry_id=entry_id, domain=entry_path.name, success=False, ) try: # Load metadata for source URL metadata = load_metadata(entry_path) source_url = metadata.get('url') if metadata else None # Get HTML content html_content, html_file = get_html_content(entry_path) if not html_content: result.errors.append("No HTML content found") return result # Truncate very large HTML to avoid token limits max_chars = 100000 # ~25k tokens if len(html_content) > max_chars: html_content = html_content[:max_chars] logger.warning(f"Truncated HTML for {entry_id} from {len(html_content)} to {max_chars} chars") # Run LLM annotation logger.info(f"Processing {entry_id}...") session = await annotator.annotate( document=html_content, source_url=source_url, ) # Count results result.entity_count = len(session.entity_claims) result.claim_count = len(session.aggregate_claims) result.relationship_count = len(session.relationship_claims) result.layout_count = len(session.layout_claims) # Add uncertainty analysis for aggregate claims for claim in session.aggregate_claims: if claim.claim_value: text = str(claim.claim_value) confidence = uncertainty_detector.get_claim_confidence(text) if claim.provenance: claim.provenance.confidence = confidence # Prepare output output_data = { 'extraction_version': 'ch_annotator-v1_7_0', 'extraction_timestamp': datetime.now(timezone.utc).isoformat(), 'source_url': source_url, 'html_file': html_file, 'session': session.to_dict(), 'summary': { 'entity_count': result.entity_count, 'claim_count': result.claim_count, 'relationship_count': result.relationship_count, 'layout_count': result.layout_count, 'errors': session.errors, } } # Save annotations output_file = entry_path / "annotations_v1.7.0.yaml" with open(output_file, 'w', encoding='utf-8') as f: yaml.dump(output_data, f, default_flow_style=False, allow_unicode=True, sort_keys=False) result.output_file = str(output_file) result.success = True result.errors = session.errors logger.info(f" -> {result.entity_count} entities, {result.claim_count} claims") except Exception as e: result.errors.append(f"Processing failed: {str(e)}") result.errors.append(traceback.format_exc()) logger.error(f"Failed to process {entry_id}: {e}") result.processing_time_seconds = time.time() - start_time return result async def process_batch( entries: List[Tuple[str, Path]], annotator, uncertainty_detector, checkpoint_file: Optional[Path] = None, rate_limit_delay: float = 1.0, ) -> BatchReport: """ Process a batch of entries with rate limiting. """ report = BatchReport( started_at=datetime.now(timezone.utc).isoformat(), total_entries=len(entries), ) # Load checkpoint if exists processed_ids = set() if checkpoint_file and checkpoint_file.exists(): with open(checkpoint_file, 'r') as f: checkpoint = json.load(f) processed_ids = set(checkpoint.get('processed_ids', [])) logger.info(f"Loaded checkpoint with {len(processed_ids)} processed entries") # Filter already processed entries remaining_entries = [ (eid, path) for eid, path in entries if eid not in processed_ids ] report.skipped_entries = len(processed_ids) logger.info(f"Processing {len(remaining_entries)} entries ({len(processed_ids)} already done)") total_time = 0.0 for i, (entry_id, entry_path) in enumerate(remaining_entries): # Rate limiting await asyncio.sleep(rate_limit_delay) result = await process_entry( entry_id=entry_id, entry_path=entry_path, annotator=annotator, uncertainty_detector=uncertainty_detector, ) report.processed_entries += 1 processed_ids.add(entry_id) if result.success: report.successful_entries += 1 report.total_entities += result.entity_count report.total_claims += result.claim_count report.total_relationships += result.relationship_count report.total_layout_regions += result.layout_count else: report.failed_entries += 1 report.errors.append({ 'entry_id': entry_id, 'errors': result.errors[:2], # Limit error details }) total_time += result.processing_time_seconds # Progress logging every 10 entries if (i + 1) % 10 == 0: pct = ((i + 1) / len(remaining_entries)) * 100 logger.info( f"Progress: {i+1}/{len(remaining_entries)} ({pct:.1f}%) - " f"Success: {report.successful_entries}, Failed: {report.failed_entries}" ) # Checkpoint every 10 entries if checkpoint_file and (i + 1) % 10 == 0: checkpoint_data = { 'processed_ids': list(processed_ids), 'last_updated': datetime.now(timezone.utc).isoformat(), 'processed_count': report.processed_entries, 'successful_count': report.successful_entries, } with open(checkpoint_file, 'w') as f: json.dump(checkpoint_data, f) logger.info(f"Saved checkpoint at {i+1} entries") # Final stats if report.processed_entries > 0: report.avg_processing_time = total_time / report.processed_entries report.completed_at = datetime.now(timezone.utc).isoformat() # Final checkpoint save if checkpoint_file: checkpoint_data = { 'processed_ids': list(processed_ids), 'last_updated': datetime.now(timezone.utc).isoformat(), 'processed_count': report.processed_entries, 'successful_count': report.successful_entries, 'completed': True, } with open(checkpoint_file, 'w') as f: json.dump(checkpoint_data, f) return report def save_report(report: BatchReport, output_path: Path): """Save batch report to YAML file.""" report_dict = { 'started_at': report.started_at, 'completed_at': report.completed_at, 'total_entries': report.total_entries, 'processed_entries': report.processed_entries, 'successful_entries': report.successful_entries, 'failed_entries': report.failed_entries, 'skipped_entries': report.skipped_entries, 'total_entities': report.total_entities, 'total_claims': report.total_claims, 'total_relationships': report.total_relationships, 'total_layout_regions': report.total_layout_regions, 'avg_processing_time_seconds': report.avg_processing_time, 'errors': report.errors[:50], # Limit errors in report } with open(output_path, 'w', encoding='utf-8') as f: yaml.dump(report_dict, f, default_flow_style=False, allow_unicode=True) logger.info(f"Saved report to {output_path}") async def main(): parser = argparse.ArgumentParser( description='Generate missing annotations for data/custodian/web/ archives' ) parser.add_argument('--dry-run', action='store_true', help='Preview without processing') parser.add_argument('--test', type=int, metavar='N', help='Test on N entries') parser.add_argument('--batch', type=int, metavar='N', help='Process N entries') parser.add_argument('--all', action='store_true', help='Process all missing entries') parser.add_argument('--resume', action='store_true', help='Resume from checkpoint') parser.add_argument('--provider', default='zai', choices=['zai', 'anthropic', 'openai']) args = parser.parse_args() # Paths project_root = Path(__file__).parent.parent base_path = project_root / "data" / "custodian" / "web" checkpoint_file = project_root / "data" / "missing_annotations_checkpoint.json" report_dir = project_root / "reports" report_dir.mkdir(exist_ok=True) # Find entries missing annotations logger.info("Scanning for web archives missing annotations...") entries = find_missing_annotations(base_path) logger.info(f"Found {len(entries)} web archives missing annotations") if not entries: logger.info("No missing annotations found. All archives are annotated!") return # Dry run - just show what would be processed if args.dry_run: logger.info("\n=== DRY RUN - Would process these entries ===") for i, (entry_id, path) in enumerate(entries[:50]): logger.info(f" {i+1}. {entry_id}") if len(entries) > 50: logger.info(f" ... and {len(entries) - 50} more") return # Determine batch size if args.test: entries = entries[:args.test] logger.info(f"Test mode: processing {len(entries)} entries") elif args.batch: entries = entries[:args.batch] logger.info(f"Batch mode: processing {len(entries)} entries") elif not args.all and not args.resume: logger.error("Please specify --dry-run, --test N, --batch N, --all, or --resume") sys.exit(1) # Initialize annotator logger.info(f"Initializing LLM annotator with provider: {args.provider}") try: from glam_extractor.annotators.llm_annotator import ( LLMAnnotator, LLMAnnotatorConfig, LLMProvider, RetryConfig, ) from glam_extractor.annotators.uncertainty import UncertaintyDetector provider_enum = LLMProvider(args.provider) default_models = { LLMProvider.ZAI: "glm-4.6", LLMProvider.ANTHROPIC: "claude-3-5-sonnet-20241022", LLMProvider.OPENAI: "gpt-4o", } config = LLMAnnotatorConfig( provider=provider_enum, model=default_models[provider_enum], retry=RetryConfig(max_retries=5), fallback_providers=[ p for p in [LLMProvider.ZAI, LLMProvider.ANTHROPIC, LLMProvider.OPENAI] if p != provider_enum ], extract_images=False, # Disable vision API to avoid rate limits ) annotator = LLMAnnotator(config) uncertainty_detector = UncertaintyDetector() logger.info(" -> Image analysis DISABLED to avoid vision API rate limits") except ImportError as e: logger.error(f"Failed to import annotator modules: {e}") logger.error("Make sure glam_extractor is properly installed") sys.exit(1) except ValueError as e: logger.error(f"Failed to initialize annotator: {e}") logger.error(f"Make sure {args.provider.upper()}_API_TOKEN or similar is set in .env") sys.exit(1) # Process entries report = await process_batch( entries=entries, annotator=annotator, uncertainty_detector=uncertainty_detector, checkpoint_file=checkpoint_file if (args.resume or args.all) else None, rate_limit_delay=2.0, # 2 seconds between requests to avoid rate limits ) # Save report timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') report_file = report_dir / f"missing_annotations_report_{timestamp}.yaml" save_report(report, report_file) # Summary logger.info("=" * 60) logger.info("ANNOTATION GENERATION COMPLETE") logger.info("=" * 60) logger.info(f"Total entries: {report.total_entries}") logger.info(f"Processed: {report.processed_entries}") logger.info(f"Successful: {report.successful_entries}") logger.info(f"Failed: {report.failed_entries}") logger.info(f"Skipped (checkpoint):{report.skipped_entries}") logger.info("-" * 60) logger.info(f"Total entities: {report.total_entities}") logger.info(f"Total claims: {report.total_claims}") logger.info(f"Total relationships: {report.total_relationships}") logger.info(f"Total layout regions:{report.total_layout_regions}") logger.info(f"Avg processing time: {report.avg_processing_time:.2f}s") logger.info("-" * 60) logger.info(f"Report saved to: {report_file}") if __name__ == '__main__': asyncio.run(main())