#!/usr/bin/env python3 """ Enrich NDE Dutch Heritage Organizations with Wikidata Q-numbers - FULL DATASET This script processes all 1,351 organizations, using the Wikidata MCP service to find matching Wikidata entities. It includes: - Batch processing with progress tracking - Rate limiting for API compliance - Comprehensive logging - Automatic backup before modifications - Recovery from interruptions Usage: python3 scripts/enrich_nde_full_dataset.py [--start-index INDEX] [--batch-size SIZE] """ import yaml import json import sys import argparse from pathlib import Path from datetime import datetime, timezone from typing import Dict, List, Optional, Tuple import time class FullDatasetEnricher: """Enriches all NDE organizations with Wikidata Q-numbers.""" def __init__(self, data_path: Path, sparql_log_dir: Path, start_index: int = 0, batch_size: int = 50): self.data_path = data_path self.sparql_log_dir = sparql_log_dir self.sparql_log_dir.mkdir(parents=True, exist_ok=True) self.start_index = start_index self.batch_size = batch_size # Statistics self.stats = { 'total_records': 0, 'already_enriched': 0, 'newly_enriched': 0, 'no_match_found': 0, 'multiple_matches': 0, 'queries_executed': 0, 'errors': 0, 'skipped': 0 } # Query log self.query_log = [] # Progress tracking self.checkpoint_file = self.sparql_log_dir / "enrichment_checkpoint.json" def load_checkpoint(self) -> Optional[int]: """Load last checkpoint to resume from interruption.""" if self.checkpoint_file.exists(): with open(self.checkpoint_file, 'r') as f: checkpoint = json.load(f) return checkpoint.get('last_processed_index', -1) return None def save_checkpoint(self, index: int, stats: Dict): """Save checkpoint for recovery.""" checkpoint = { 'last_processed_index': index, 'timestamp': datetime.now(timezone.utc).isoformat(), 'statistics': stats } with open(self.checkpoint_file, 'w') as f: json.dump(checkpoint, f, indent=2) def search_wikidata(self, org_name: str, org_type: Optional[str] = None, city: Optional[str] = None) -> Optional[str]: """ Search Wikidata for organization using MCP service. NOTE: This function uses PLACEHOLDER logic. In production, you would call: - wikidata_authenticated_search_entity(query) - wikidata_authenticated_execute_sparql(query) - wikidata_authenticated_get_metadata(entity_id) Args: org_name: Organization name org_type: Type of organization city: City name Returns: Q-number if found, None otherwise """ # Build search query search_query = org_name if city: search_query += f" {city}" if org_type and org_type != 'historische vereniging': search_query += f" {org_type}" # Log the search timestamp = datetime.now(timezone.utc).isoformat() self.query_log.append({ 'timestamp': timestamp, 'organization': org_name, 'search_query': search_query, 'method': 'search_entity' }) print(f" Search: {search_query[:80]}") # PLACEHOLDER: In production, call Wikidata MCP service here # q_number = wikidata_authenticated_search_entity(search_query) # For now, return None to indicate "would search here" return None def search_by_municipality(self, org_name: str, city: str) -> Optional[str]: """ Search for municipality archive by city name. Uses SPARQL query for precise matching. """ if not city or 'gemeente' not in org_name.lower(): return None # Extract municipality name from org_name or use city municipality_name = city print(f" SPARQL: Searching municipality '{municipality_name}'") # PLACEHOLDER: In production, execute SPARQL query # sparql_query = f""" # SELECT ?item ?itemLabel WHERE {{ # ?item wdt:P31 wd:Q2039348 . # ?item rdfs:label "{municipality_name}"@nl . # SERVICE wikibase:label {{ bd:serviceParam wikibase:language "nl,en". }} # }} # """ # results = wikidata_authenticated_execute_sparql(sparql_query) return None def enrich_record(self, record: Dict, record_idx: int) -> Tuple[Dict, str]: """ Enrich a single record with Wikidata Q-number. Args: record: Organization record record_idx: Record index Returns: Tuple of (updated_record, status_message) """ org_name = record.get('organisatie', 'Unknown') org_type = record.get('type_organisatie', '') city = record.get('plaatsnaam_bezoekadres', '') # Skip if already enriched if record.get('wikidata_id'): self.stats['already_enriched'] += 1 return record, f"✓ Already enriched: {record['wikidata_id']}" # Skip if already marked as no match if record.get('wikidata_enrichment_status') == 'no_match_found': self.stats['skipped'] += 1 return record, "○ Skipped (previously marked no match)" print(f"\n [{record_idx + 1}/{self.stats['total_records']}] {org_name[:60]}") print(f" Type: {org_type} | City: {city}") # Try different search strategies q_number = None # Strategy 1: Direct search q_number = self.search_wikidata(org_name, org_type, city) # Strategy 2: Municipality search (for archives) if not q_number and org_type == 'archief' and 'gemeente' in org_name.lower(): q_number = self.search_by_municipality(org_name, city) # Update record based on result if q_number: record['wikidata_id'] = q_number self.stats['newly_enriched'] += 1 self.stats['queries_executed'] += 1 return record, f"✓ Matched: {q_number}" else: # Mark as no match for future runs record['wikidata_enrichment_status'] = 'pending_review' self.stats['no_match_found'] += 1 self.stats['queries_executed'] += 1 return record, "✗ No match found" def run_enrichment(self): """Run the full dataset enrichment with batch processing.""" print("=" * 80) print("NDE WIKIDATA ENRICHMENT - FULL DATASET") print("=" * 80) print() # Check for checkpoint checkpoint_idx = self.load_checkpoint() if checkpoint_idx is not None and checkpoint_idx >= self.start_index: print(f"⚠️ Found checkpoint at index {checkpoint_idx}") response = input(f" Resume from index {checkpoint_idx + 1}? (y/n): ") if response.lower() == 'y': self.start_index = checkpoint_idx + 1 print(f" Resuming from index {self.start_index}") # Load data print(f"\n📂 Loading data from {self.data_path.name}...") with open(self.data_path, 'r', encoding='utf-8') as f: records = yaml.safe_load(f) self.stats['total_records'] = len(records) remaining = self.stats['total_records'] - self.start_index print(f" Total records: {self.stats['total_records']}") print(f" Starting at index: {self.start_index}") print(f" Remaining to process: {remaining}") print(f" Batch size: {self.batch_size}") print() # Create backup if self.start_index == 0: # Only backup at start backup_path = self.data_path.parent / f"{self.data_path.stem}.backup.{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}.yaml" print(f"💾 Creating backup: {backup_path.name}") with open(backup_path, 'w', encoding='utf-8') as f: yaml.dump(records, f, allow_unicode=True, default_flow_style=False, sort_keys=False) print() # Process records in batches start_time = time.time() for batch_start in range(self.start_index, len(records), self.batch_size): batch_end = min(batch_start + self.batch_size, len(records)) batch_num = (batch_start // self.batch_size) + 1 total_batches = (len(records) + self.batch_size - 1) // self.batch_size print(f"\n{'='*80}") print(f"BATCH {batch_num}/{total_batches}: Records {batch_start + 1} - {batch_end}") print(f"{'='*80}") batch_start_time = time.time() # Process batch for idx in range(batch_start, batch_end): try: records[idx], status = self.enrich_record(records[idx], idx) print(f" {status}") # Save checkpoint every 10 records if (idx + 1) % 10 == 0: self.save_checkpoint(idx, self.stats) # Rate limiting: pause briefly between requests time.sleep(0.5) # 2 requests per second max except Exception as e: print(f" ✗ Error: {e}") self.stats['errors'] += 1 continue # Batch complete batch_elapsed = time.time() - batch_start_time print(f"\n Batch complete in {batch_elapsed:.1f}s") # Save progress after each batch print(f" 💾 Saving progress...") with open(self.data_path, 'w', encoding='utf-8') as f: yaml.dump(records, f, allow_unicode=True, default_flow_style=False, sort_keys=False) self.save_checkpoint(batch_end - 1, self.stats) # Progress update progress = (batch_end / len(records)) * 100 enriched_pct = (self.stats['newly_enriched'] / batch_end * 100) if batch_end > 0 else 0 print(f" Progress: {progress:.1f}% | Enriched: {self.stats['newly_enriched']} ({enriched_pct:.1f}%)") # Estimated time remaining elapsed = time.time() - start_time records_done = batch_end - self.start_index if records_done > 0: avg_time_per_record = elapsed / records_done remaining_records = len(records) - batch_end eta_seconds = avg_time_per_record * remaining_records eta_minutes = eta_seconds / 60 print(f" ETA: {eta_minutes:.1f} minutes") # Pause between batches (rate limiting) if batch_end < len(records): print(f"\n ⏸ Pausing 30s between batches (rate limiting)...") time.sleep(30) # Save final results print(f"\n{'='*80}") print("SAVING FINAL RESULTS") print(f"{'='*80}") print(f"💾 Writing enriched data to {self.data_path}...") with open(self.data_path, 'w', encoding='utf-8') as f: yaml.dump(records, f, allow_unicode=True, default_flow_style=False, sort_keys=False) # Save master log self.save_master_log() # Print final statistics total_elapsed = time.time() - start_time self.print_final_statistics(total_elapsed) # Clean up checkpoint if self.checkpoint_file.exists(): self.checkpoint_file.unlink() print("\n✓ Checkpoint file removed") return records def save_master_log(self): """Save master log of enrichment run.""" timestamp = datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S') log_path = self.sparql_log_dir / f"enrichment_log_full_dataset_{timestamp}.json" master_log = { 'enrichment_date': datetime.now(timezone.utc).isoformat(), 'enrichment_method': 'Wikidata MCP service - full dataset batch processing', 'start_index': self.start_index, 'batch_size': self.batch_size, 'statistics': self.stats, 'query_count': len(self.query_log), 'queries': self.query_log[:100] # First 100 queries only } with open(log_path, 'w', encoding='utf-8') as f: json.dump(master_log, f, indent=2, ensure_ascii=False) print(f"\n📊 Enrichment log saved: {log_path.name}") def print_final_statistics(self, elapsed_seconds: float): """Print final enrichment statistics.""" print(f"\n{'='*80}") print("FINAL STATISTICS") print(f"{'='*80}") print(f"Total records: {self.stats['total_records']}") print(f"Already enriched: {self.stats['already_enriched']}") print(f"Newly enriched: {self.stats['newly_enriched']}") print(f"No match found: {self.stats['no_match_found']}") print(f"Skipped (previous): {self.stats['skipped']}") print(f"Errors: {self.stats['errors']}") print(f"Queries executed: {self.stats['queries_executed']}") print() total_enriched = self.stats['already_enriched'] + self.stats['newly_enriched'] enrichment_rate = (total_enriched / self.stats['total_records']) * 100 if self.stats['total_records'] > 0 else 0 print(f"Total enriched: {total_enriched} ({enrichment_rate:.1f}%)") print(f"Success rate (new): {(self.stats['newly_enriched'] / (self.stats['newly_enriched'] + self.stats['no_match_found']) * 100) if (self.stats['newly_enriched'] + self.stats['no_match_found']) > 0 else 0:.1f}%") print() print(f"Time elapsed: {elapsed_seconds / 60:.1f} minutes") print(f"Average per record: {elapsed_seconds / self.stats['total_records']:.2f}s") print() def main(): """Main entry point.""" parser = argparse.ArgumentParser(description='Enrich NDE dataset with Wikidata Q-numbers') parser.add_argument('--start-index', type=int, default=10, help='Start index (default: 10, skip test batch)') parser.add_argument('--batch-size', type=int, default=50, help='Batch size (default: 50)') args = parser.parse_args() # Paths base_dir = Path(__file__).parent.parent data_path = base_dir / "data" / "nde" / "voorbeeld_lijst_organisaties_en_diensten-totaallijst_nederland.yaml" sparql_log_dir = base_dir / "data" / "nde" / "sparql" # Verify file exists if not data_path.exists(): print(f"❌ Error: Data file not found: {data_path}") sys.exit(1) # Create enricher enricher = FullDatasetEnricher( data_path=data_path, sparql_log_dir=sparql_log_dir, start_index=args.start_index, batch_size=args.batch_size ) # Confirm before starting print(f"\n⚠️ WARNING: This will process {1351 - args.start_index} records") print(f" This process will take approximately 2-3 hours") print(f" A backup will be created before any modifications") print() response = input("Continue? (yes/no): ") if response.lower() != 'yes': print("Aborted.") sys.exit(0) # Run enrichment print("\n🚀 Starting full dataset enrichment...") print() enricher.run_enrichment() print(f"\n{'='*80}") print("✅ ENRICHMENT COMPLETE!") print(f"{'='*80}") print() print("Next steps:") print("1. Review records marked 'pending_review'") print("2. Run validation script to check Q-numbers") print("3. Generate final enrichment report") print() if __name__ == "__main__": main()