glam/scripts/generate_missing_annotations.py
2025-12-14 17:09:55 +01:00

521 lines
18 KiB
Python
Executable file

#!/usr/bin/env python3
"""
Generate Missing Web Annotations for data/custodian/web/ Archives.
Processes web archives that have HTML files but are missing annotations_v1.7.0.yaml.
Uses ZAI GLM-4 LLM annotator following CH-Annotator v1.7.0 convention.
Structure expected:
data/custodian/web/{entry_id}/{domain}/
├── metadata.yaml
├── index.html (or rendered.html)
└── annotations_v1.7.0.yaml (to be created)
Usage:
python scripts/generate_missing_annotations.py --dry-run # Preview what would be processed
python scripts/generate_missing_annotations.py --test 5 # Test on 5 entries
python scripts/generate_missing_annotations.py --batch 50 # Process 50 entries
python scripts/generate_missing_annotations.py --all # Process all missing
python scripts/generate_missing_annotations.py --resume # Resume from checkpoint
"""
import argparse
import asyncio
import json
import logging
import os
import sys
import time
import traceback
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import yaml
# Load environment variables from .env file
try:
from dotenv import load_dotenv
env_path = Path(__file__).parent.parent / ".env"
if env_path.exists():
load_dotenv(env_path)
except ImportError:
pass
# Add src to path for imports
src_path = Path(__file__).parent.parent / "src"
sys.path.insert(0, str(src_path))
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('missing_annotations.log'),
]
)
logger = logging.getLogger(__name__)
@dataclass
class ProcessingResult:
"""Result of processing a single entry."""
entry_id: str
domain: str
success: bool
entity_count: int = 0
claim_count: int = 0
relationship_count: int = 0
layout_count: int = 0
errors: List[str] = field(default_factory=list)
processing_time_seconds: float = 0.0
output_file: Optional[str] = None
@dataclass
class BatchReport:
"""Summary report for batch processing."""
started_at: str
completed_at: Optional[str] = None
total_entries: int = 0
processed_entries: int = 0
successful_entries: int = 0
failed_entries: int = 0
skipped_entries: int = 0
total_entities: int = 0
total_claims: int = 0
total_relationships: int = 0
total_layout_regions: int = 0
avg_processing_time: float = 0.0
errors: List[Dict[str, Any]] = field(default_factory=list)
def find_missing_annotations(base_path: Path) -> List[Tuple[str, Path]]:
"""
Find web archives with HTML but missing annotations_v1.7.0.yaml.
Structure: data/custodian/web/{entry_id}/{domain}/
Returns:
List of (entry_id/domain, entry_path) tuples
"""
entries = []
for entry_dir in sorted(base_path.iterdir()):
if not entry_dir.is_dir() or not entry_dir.name.isdigit():
continue
entry_id = entry_dir.name
for domain_dir in entry_dir.iterdir():
if not domain_dir.is_dir():
continue
# Check if metadata exists (valid archive)
metadata_file = domain_dir / "metadata.yaml"
if not metadata_file.exists():
continue
# Check if already has annotations
annotations_file = domain_dir / "annotations_v1.7.0.yaml"
if annotations_file.exists():
continue
# Check if has HTML to process
has_html = (domain_dir / "index.html").exists() or (domain_dir / "rendered.html").exists()
if not has_html:
continue
entries.append((f"{entry_id}/{domain_dir.name}", domain_dir))
return entries
def get_html_content(entry_path: Path) -> Tuple[Optional[str], Optional[str]]:
"""
Get HTML content from the entry directory.
Prefers rendered.html over index.html (cleaner content).
Returns:
Tuple of (html_content, html_file_path)
"""
# Prefer rendered HTML (usually cleaner)
html_file = entry_path / "rendered.html"
if not html_file.exists():
html_file = entry_path / "index.html"
if not html_file.exists():
return None, None
try:
with open(html_file, 'r', encoding='utf-8', errors='replace') as f:
content = f.read()
return content, str(html_file.relative_to(entry_path.parent.parent.parent))
except Exception as e:
logger.error(f"Failed to read {html_file}: {e}")
return None, None
def load_metadata(entry_path: Path) -> Optional[Dict[str, Any]]:
"""Load existing metadata.yaml for an entry."""
metadata_file = entry_path / "metadata.yaml"
if metadata_file.exists():
with open(metadata_file, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
return None
async def process_entry(
entry_id: str,
entry_path: Path,
annotator,
uncertainty_detector,
) -> ProcessingResult:
"""
Process a single web archive and extract annotations.
Args:
entry_id: Entry identifier (e.g., "1600/luthermuseum.nl")
entry_path: Path to entry directory
annotator: LLM annotator instance
uncertainty_detector: Uncertainty/hedging detector
Returns:
ProcessingResult with status
"""
start_time = time.time()
result = ProcessingResult(
entry_id=entry_id,
domain=entry_path.name,
success=False,
)
try:
# Load metadata for source URL
metadata = load_metadata(entry_path)
source_url = metadata.get('url') if metadata else None
# Get HTML content
html_content, html_file = get_html_content(entry_path)
if not html_content:
result.errors.append("No HTML content found")
return result
# Truncate very large HTML to avoid token limits
max_chars = 100000 # ~25k tokens
if len(html_content) > max_chars:
html_content = html_content[:max_chars]
logger.warning(f"Truncated HTML for {entry_id} from {len(html_content)} to {max_chars} chars")
# Run LLM annotation
logger.info(f"Processing {entry_id}...")
session = await annotator.annotate(
document=html_content,
source_url=source_url,
)
# Count results
result.entity_count = len(session.entity_claims)
result.claim_count = len(session.aggregate_claims)
result.relationship_count = len(session.relationship_claims)
result.layout_count = len(session.layout_claims)
# Add uncertainty analysis for aggregate claims
for claim in session.aggregate_claims:
if claim.claim_value:
text = str(claim.claim_value)
confidence = uncertainty_detector.get_claim_confidence(text)
if claim.provenance:
claim.provenance.confidence = confidence
# Prepare output
output_data = {
'extraction_version': 'ch_annotator-v1_7_0',
'extraction_timestamp': datetime.now(timezone.utc).isoformat(),
'source_url': source_url,
'html_file': html_file,
'session': session.to_dict(),
'summary': {
'entity_count': result.entity_count,
'claim_count': result.claim_count,
'relationship_count': result.relationship_count,
'layout_count': result.layout_count,
'errors': session.errors,
}
}
# Save annotations
output_file = entry_path / "annotations_v1.7.0.yaml"
with open(output_file, 'w', encoding='utf-8') as f:
yaml.dump(output_data, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
result.output_file = str(output_file)
result.success = True
result.errors = session.errors
logger.info(f" -> {result.entity_count} entities, {result.claim_count} claims")
except Exception as e:
result.errors.append(f"Processing failed: {str(e)}")
result.errors.append(traceback.format_exc())
logger.error(f"Failed to process {entry_id}: {e}")
result.processing_time_seconds = time.time() - start_time
return result
async def process_batch(
entries: List[Tuple[str, Path]],
annotator,
uncertainty_detector,
checkpoint_file: Optional[Path] = None,
rate_limit_delay: float = 1.0,
) -> BatchReport:
"""
Process a batch of entries with rate limiting.
"""
report = BatchReport(
started_at=datetime.now(timezone.utc).isoformat(),
total_entries=len(entries),
)
# Load checkpoint if exists
processed_ids = set()
if checkpoint_file and checkpoint_file.exists():
with open(checkpoint_file, 'r') as f:
checkpoint = json.load(f)
processed_ids = set(checkpoint.get('processed_ids', []))
logger.info(f"Loaded checkpoint with {len(processed_ids)} processed entries")
# Filter already processed entries
remaining_entries = [
(eid, path) for eid, path in entries
if eid not in processed_ids
]
report.skipped_entries = len(processed_ids)
logger.info(f"Processing {len(remaining_entries)} entries ({len(processed_ids)} already done)")
total_time = 0.0
for i, (entry_id, entry_path) in enumerate(remaining_entries):
# Rate limiting
await asyncio.sleep(rate_limit_delay)
result = await process_entry(
entry_id=entry_id,
entry_path=entry_path,
annotator=annotator,
uncertainty_detector=uncertainty_detector,
)
report.processed_entries += 1
processed_ids.add(entry_id)
if result.success:
report.successful_entries += 1
report.total_entities += result.entity_count
report.total_claims += result.claim_count
report.total_relationships += result.relationship_count
report.total_layout_regions += result.layout_count
else:
report.failed_entries += 1
report.errors.append({
'entry_id': entry_id,
'errors': result.errors[:2], # Limit error details
})
total_time += result.processing_time_seconds
# Progress logging every 10 entries
if (i + 1) % 10 == 0:
pct = ((i + 1) / len(remaining_entries)) * 100
logger.info(
f"Progress: {i+1}/{len(remaining_entries)} ({pct:.1f}%) - "
f"Success: {report.successful_entries}, Failed: {report.failed_entries}"
)
# Checkpoint every 10 entries
if checkpoint_file and (i + 1) % 10 == 0:
checkpoint_data = {
'processed_ids': list(processed_ids),
'last_updated': datetime.now(timezone.utc).isoformat(),
'processed_count': report.processed_entries,
'successful_count': report.successful_entries,
}
with open(checkpoint_file, 'w') as f:
json.dump(checkpoint_data, f)
logger.info(f"Saved checkpoint at {i+1} entries")
# Final stats
if report.processed_entries > 0:
report.avg_processing_time = total_time / report.processed_entries
report.completed_at = datetime.now(timezone.utc).isoformat()
# Final checkpoint save
if checkpoint_file:
checkpoint_data = {
'processed_ids': list(processed_ids),
'last_updated': datetime.now(timezone.utc).isoformat(),
'processed_count': report.processed_entries,
'successful_count': report.successful_entries,
'completed': True,
}
with open(checkpoint_file, 'w') as f:
json.dump(checkpoint_data, f)
return report
def save_report(report: BatchReport, output_path: Path):
"""Save batch report to YAML file."""
report_dict = {
'started_at': report.started_at,
'completed_at': report.completed_at,
'total_entries': report.total_entries,
'processed_entries': report.processed_entries,
'successful_entries': report.successful_entries,
'failed_entries': report.failed_entries,
'skipped_entries': report.skipped_entries,
'total_entities': report.total_entities,
'total_claims': report.total_claims,
'total_relationships': report.total_relationships,
'total_layout_regions': report.total_layout_regions,
'avg_processing_time_seconds': report.avg_processing_time,
'errors': report.errors[:50], # Limit errors in report
}
with open(output_path, 'w', encoding='utf-8') as f:
yaml.dump(report_dict, f, default_flow_style=False, allow_unicode=True)
logger.info(f"Saved report to {output_path}")
async def main():
parser = argparse.ArgumentParser(
description='Generate missing annotations for data/custodian/web/ archives'
)
parser.add_argument('--dry-run', action='store_true', help='Preview without processing')
parser.add_argument('--test', type=int, metavar='N', help='Test on N entries')
parser.add_argument('--batch', type=int, metavar='N', help='Process N entries')
parser.add_argument('--all', action='store_true', help='Process all missing entries')
parser.add_argument('--resume', action='store_true', help='Resume from checkpoint')
parser.add_argument('--provider', default='zai', choices=['zai', 'anthropic', 'openai'])
args = parser.parse_args()
# Paths
project_root = Path(__file__).parent.parent
base_path = project_root / "data" / "custodian" / "web"
checkpoint_file = project_root / "data" / "missing_annotations_checkpoint.json"
report_dir = project_root / "reports"
report_dir.mkdir(exist_ok=True)
# Find entries missing annotations
logger.info("Scanning for web archives missing annotations...")
entries = find_missing_annotations(base_path)
logger.info(f"Found {len(entries)} web archives missing annotations")
if not entries:
logger.info("No missing annotations found. All archives are annotated!")
return
# Dry run - just show what would be processed
if args.dry_run:
logger.info("\n=== DRY RUN - Would process these entries ===")
for i, (entry_id, path) in enumerate(entries[:50]):
logger.info(f" {i+1}. {entry_id}")
if len(entries) > 50:
logger.info(f" ... and {len(entries) - 50} more")
return
# Determine batch size
if args.test:
entries = entries[:args.test]
logger.info(f"Test mode: processing {len(entries)} entries")
elif args.batch:
entries = entries[:args.batch]
logger.info(f"Batch mode: processing {len(entries)} entries")
elif not args.all and not args.resume:
logger.error("Please specify --dry-run, --test N, --batch N, --all, or --resume")
sys.exit(1)
# Initialize annotator
logger.info(f"Initializing LLM annotator with provider: {args.provider}")
try:
from glam_extractor.annotators.llm_annotator import (
LLMAnnotator,
LLMAnnotatorConfig,
LLMProvider,
RetryConfig,
)
from glam_extractor.annotators.uncertainty import UncertaintyDetector
provider_enum = LLMProvider(args.provider)
default_models = {
LLMProvider.ZAI: "glm-4.6",
LLMProvider.ANTHROPIC: "claude-3-5-sonnet-20241022",
LLMProvider.OPENAI: "gpt-4o",
}
config = LLMAnnotatorConfig(
provider=provider_enum,
model=default_models[provider_enum],
retry=RetryConfig(max_retries=5),
fallback_providers=[
p for p in [LLMProvider.ZAI, LLMProvider.ANTHROPIC, LLMProvider.OPENAI]
if p != provider_enum
],
extract_images=False, # Disable vision API to avoid rate limits
)
annotator = LLMAnnotator(config)
uncertainty_detector = UncertaintyDetector()
logger.info(" -> Image analysis DISABLED to avoid vision API rate limits")
except ImportError as e:
logger.error(f"Failed to import annotator modules: {e}")
logger.error("Make sure glam_extractor is properly installed")
sys.exit(1)
except ValueError as e:
logger.error(f"Failed to initialize annotator: {e}")
logger.error(f"Make sure {args.provider.upper()}_API_TOKEN or similar is set in .env")
sys.exit(1)
# Process entries
report = await process_batch(
entries=entries,
annotator=annotator,
uncertainty_detector=uncertainty_detector,
checkpoint_file=checkpoint_file if (args.resume or args.all) else None,
rate_limit_delay=2.0, # 2 seconds between requests to avoid rate limits
)
# Save report
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
report_file = report_dir / f"missing_annotations_report_{timestamp}.yaml"
save_report(report, report_file)
# Summary
logger.info("=" * 60)
logger.info("ANNOTATION GENERATION COMPLETE")
logger.info("=" * 60)
logger.info(f"Total entries: {report.total_entries}")
logger.info(f"Processed: {report.processed_entries}")
logger.info(f"Successful: {report.successful_entries}")
logger.info(f"Failed: {report.failed_entries}")
logger.info(f"Skipped (checkpoint):{report.skipped_entries}")
logger.info("-" * 60)
logger.info(f"Total entities: {report.total_entities}")
logger.info(f"Total claims: {report.total_claims}")
logger.info(f"Total relationships: {report.total_relationships}")
logger.info(f"Total layout regions:{report.total_layout_regions}")
logger.info(f"Avg processing time: {report.avg_processing_time:.2f}s")
logger.info("-" * 60)
logger.info(f"Report saved to: {report_file}")
if __name__ == '__main__':
asyncio.run(main())