#!/usr/bin/env python3 """ Enrich hyponyms_curated.yaml with full Wikidata metadata. This script: 1. Reads hyponyms_curated.yaml 2. Fetches ALL Wikidata properties for each Q-number (except 'exclude' section) 3. Maintains a register of fetched entities to avoid re-fetching 4. Preserves existing curation data (country, time, hypernym, duplicate, type, etc.) 5. Outputs to hyponyms_curated_full.yaml Usage: python scripts/enrich_hyponyms_with_wikidata.py # Force refresh specific entities: python scripts/enrich_hyponyms_with_wikidata.py --refresh Q12345,Q67890 # Refresh all entities: python scripts/enrich_hyponyms_with_wikidata.py --refresh-all """ import argparse import json import os import sys import time from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional, Set import requests import yaml # Add project root to path PROJECT_ROOT = Path(__file__).parent.parent sys.path.insert(0, str(PROJECT_ROOT)) # Constants INPUT_FILE = PROJECT_ROOT / "data/wikidata/GLAMORCUBEPSXHFN/hyponyms_curated.yaml" OUTPUT_FILE = PROJECT_ROOT / "data/wikidata/GLAMORCUBEPSXHFN/hyponyms_curated_full.yaml" REGISTER_FILE = PROJECT_ROOT / "data/wikidata/GLAMORCUBEPSXHFN/.fetch_register.json" WIKIDATA_API_BASE = "https://www.wikidata.org/w/api.php" RATE_LIMIT_DELAY = 0.1 # 100ms between requests (10 req/sec) # Sections to process (exclude is NOT processed) SECTIONS_TO_PROCESS = ['hypernym', 'entity', 'entity_list', 'standard', 'collection'] class WikidataFetcher: """Fetch and cache Wikidata entity data.""" def __init__(self, register_file: Path): self.register_file = register_file self.register = self._load_register() self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'GLAM-Enrichment-Bot/1.0 (https://github.com/glam-project)' }) def _load_register(self) -> Dict[str, Any]: """Load fetch register from disk.""" if self.register_file.exists(): with open(self.register_file, 'r', encoding='utf-8') as f: return json.load(f) return { 'entities': {}, 'last_updated': None, 'total_fetches': 0 } def _save_register(self): """Save fetch register to disk.""" self.register['last_updated'] = datetime.now(timezone.utc).isoformat() self.register_file.parent.mkdir(parents=True, exist_ok=True) with open(self.register_file, 'w', encoding='utf-8') as f: json.dump(self.register, f, indent=2, ensure_ascii=False) def get_entity_data(self, identifier: str, force_refresh: bool = False) -> Optional[Dict[str, Any]]: """ Fetch complete entity data from Wikidata. Args: identifier: Wikidata identifier (Q12345, P31, Category:..., List_of_...) force_refresh: Force re-fetch even if cached Returns: Dictionary with all Wikidata properties or None if fetch fails """ # Check cache if not force_refresh and identifier in self.register['entities']: cached = self.register['entities'][identifier] print(f" ✓ {identifier} (cached from {cached.get('fetch_date', 'unknown')})") return cached.get('data') # Fetch from API print(f" ⬇ Fetching {identifier} from Wikidata...") try: # Get entity data using Wikibase REST API params = { 'action': 'wbgetentities', 'ids': identifier, 'format': 'json', 'props': 'info|sitelinks|aliases|labels|descriptions|claims|datatype' } response = self.session.get(WIKIDATA_API_BASE, params=params, timeout=30) response.raise_for_status() data = response.json() # Check for errors if 'error' in data: print(f" ✗ Error fetching {identifier}: {data['error'].get('info', 'Unknown error')}") return None # Extract entity data entities = data.get('entities', {}) if identifier not in entities: print(f" ✗ Entity {identifier} not found in Wikidata") return None entity_data = entities[identifier] # Check if entity exists (not deleted/missing) if 'missing' in entity_data: print(f" ✗ Entity {identifier} is missing or deleted") return None # Cache the data self.register['entities'][identifier] = { 'data': entity_data, 'fetch_date': datetime.now(timezone.utc).isoformat(), 'fetch_count': self.register['entities'].get(identifier, {}).get('fetch_count', 0) + 1 } self.register['total_fetches'] += 1 # Rate limiting time.sleep(RATE_LIMIT_DELAY) print(f" ✓ {identifier} fetched successfully") return entity_data except requests.RequestException as e: print(f" ✗ Network error fetching {identifier}: {e}") return None except Exception as e: print(f" ✗ Unexpected error fetching {identifier}: {e}") return None class HyponymEnricher: """Enrich hyponyms with Wikidata metadata.""" def __init__(self, input_file: Path, output_file: Path, fetcher: WikidataFetcher): self.input_file = input_file self.output_file = output_file self.fetcher = fetcher self.stats = { 'total_entities': 0, 'enriched': 0, 'cached': 0, 'failed': 0, 'skipped': 0 } def load_input(self) -> Dict[str, Any]: """Load hyponyms_curated.yaml.""" print(f"\n📂 Loading {self.input_file.name}...") with open(self.input_file, 'r', encoding='utf-8') as f: return yaml.safe_load(f) def extract_identifier(self, label: Any) -> Optional[str]: """ Extract Wikidata identifier from label. Handles multiple identifier types: - Q-numbers: Q12345, 'Q12345 - Museum' - P-numbers: P31, P2671 - Category identifiers: Category:Virtual_museums - List identifiers: List_of_museums Returns: Identifier string (Q12345, P31, Category:..., List_of_...) or None """ if not label: return None # Convert to string if it's an integer or other type label_str = str(label).strip() # Handle Category: identifiers (Wikipedia categories) if label_str.startswith('Category:'): return label_str # Handle List_of_ identifiers (Wikipedia list pages) if label_str.startswith('List_of_'): return label_str # Handle Q-numbers and P-numbers parts = label_str.split() if parts: first_part = parts[0] # Q-number (entity) if first_part.startswith('Q') and first_part[1:].isdigit(): return first_part # P-number (property) if first_part.startswith('P') and first_part[1:].isdigit(): return first_part # Invalid label (like a year integer without Q/P prefix) # These should be skipped return None def enrich_entity(self, entity: Dict[str, Any], force_refresh: bool = False) -> Dict[str, Any]: """ Enrich a single entity with Wikidata metadata. Args: entity: Entity dict from hyponyms_curated.yaml force_refresh: Force re-fetch from Wikidata Returns: Enriched entity dict with 'curated' and 'wikidata' keys """ label = entity.get('label') identifier = self.extract_identifier(label) if not identifier: self.stats['skipped'] += 1 return { 'curated': entity, 'wikidata': None, 'enrichment_status': 'no_identifier', 'identifier': None } self.stats['total_entities'] += 1 # Handle Category: and List_of_ identifiers (no Wikidata fetch needed) if identifier.startswith('Category:') or identifier.startswith('List_of_'): self.stats['skipped'] += 1 return { 'curated': entity, 'wikidata': None, 'enrichment_status': 'category_or_list', 'identifier': identifier, 'enrichment_note': 'Wikipedia category or list page (no Wikidata entity)' } # Fetch Wikidata data (for Q and P identifiers) wd_data = self.fetcher.get_entity_data(identifier, force_refresh) if wd_data is None: self.stats['failed'] += 1 return { 'curated': entity, 'wikidata': None, 'enrichment_status': 'fetch_failed', 'identifier': identifier } # Check if from cache if identifier in self.fetcher.register['entities']: cached_entry = self.fetcher.register['entities'][identifier] if cached_entry.get('fetch_count', 0) == 1: self.stats['enriched'] += 1 else: self.stats['cached'] += 1 # Build enriched entity enriched = { 'curated': entity, 'wikidata': self._flatten_wikidata(wd_data), 'enrichment_status': 'success', 'identifier': identifier, 'enrichment_date': datetime.now(timezone.utc).isoformat() } return enriched def _flatten_wikidata(self, wd_data: Dict[str, Any]) -> Dict[str, Any]: """ Flatten Wikidata entity data for readability. Extracts: - Labels (all languages) - Descriptions (all languages) - Aliases (all languages) - Claims/statements (all properties) - Sitelinks - Entity metadata (id, type, modified date) """ flattened = { 'id': wd_data.get('id'), 'type': wd_data.get('type'), 'modified': wd_data.get('modified'), 'labels': {}, 'descriptions': {}, 'aliases': {}, 'claims': {}, 'sitelinks': {}, 'metadata': {} } # Labels (all languages) for lang, label_data in wd_data.get('labels', {}).items(): flattened['labels'][lang] = label_data.get('value') # Descriptions (all languages) for lang, desc_data in wd_data.get('descriptions', {}).items(): flattened['descriptions'][lang] = desc_data.get('value') # Aliases (all languages) for lang, alias_list in wd_data.get('aliases', {}).items(): flattened['aliases'][lang] = [a.get('value') for a in alias_list] # Claims (all properties with values) for prop_id, claim_list in wd_data.get('claims', {}).items(): flattened['claims'][prop_id] = self._extract_claims(claim_list) # Sitelinks (Wikipedia and other wikis) for site, sitelink_data in wd_data.get('sitelinks', {}).items(): flattened['sitelinks'][site] = { 'title': sitelink_data.get('title'), 'url': sitelink_data.get('url'), 'badges': sitelink_data.get('badges', []) } # Additional metadata flattened['metadata'] = { 'pageid': wd_data.get('pageid'), 'ns': wd_data.get('ns'), 'title': wd_data.get('title'), 'lastrevid': wd_data.get('lastrevid') } return flattened def _extract_claims(self, claim_list: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Extract simplified claim/statement data.""" simplified = [] for claim in claim_list: mainsnak = claim.get('mainsnak', {}) datavalue = mainsnak.get('datavalue', {}) claim_data = { 'property': mainsnak.get('property'), 'rank': claim.get('rank'), 'datatype': mainsnak.get('datatype'), 'value': self._extract_value(datavalue) } # Qualifiers if 'qualifiers' in claim: claim_data['qualifiers'] = {} for qual_prop, qual_list in claim['qualifiers'].items(): claim_data['qualifiers'][qual_prop] = [ self._extract_value(q.get('datavalue', {})) for q in qual_list ] # References if 'references' in claim: claim_data['references'] = [ { 'hash': ref.get('hash'), 'snaks': { prop: [self._extract_value(s.get('datavalue', {})) for s in snak_list] for prop, snak_list in ref.get('snaks', {}).items() } } for ref in claim['references'] ] simplified.append(claim_data) return simplified def _extract_value(self, datavalue: Dict[str, Any]) -> Any: """Extract value from Wikidata datavalue object.""" if not datavalue: return None value_type = datavalue.get('type') value = datavalue.get('value') if not value: return None if value_type == 'wikibase-entityid': # Entity reference (Q-number) return f"Q{value.get('numeric-id')}" if isinstance(value, dict) else None elif value_type == 'string': return value elif value_type == 'time' and isinstance(value, dict): # Time value return { 'time': value.get('time'), 'precision': value.get('precision'), 'timezone': value.get('timezone'), 'calendarmodel': value.get('calendarmodel') } elif value_type == 'quantity' and isinstance(value, dict): # Quantity with unit return { 'amount': value.get('amount'), 'unit': value.get('unit'), 'upperBound': value.get('upperBound'), 'lowerBound': value.get('lowerBound') } elif value_type == 'monolingualtext' and isinstance(value, dict): # Text with language return { 'text': value.get('text'), 'language': value.get('language') } elif value_type == 'globecoordinate' and isinstance(value, dict): # Geographic coordinates return { 'latitude': value.get('latitude'), 'longitude': value.get('longitude'), 'precision': value.get('precision'), 'globe': value.get('globe') } else: # Other types - return as-is return value def enrich_section(self, section_name: str, section_data: List[Dict[str, Any]], force_refresh: Optional[Set[str]] = None) -> List[Dict[str, Any]]: """Enrich all entities in a section.""" if force_refresh is None: force_refresh = set() print(f"\n📊 Enriching section: {section_name}") print(f" Entities: {len(section_data)}") enriched = [] for i, entity in enumerate(section_data, 1): if i % 50 == 0: print(f" Progress: {i}/{len(section_data)} entities") identifier = self.extract_identifier(entity.get('label')) should_refresh = identifier in force_refresh if identifier else False enriched_entity = self.enrich_entity(entity, force_refresh=should_refresh) enriched.append(enriched_entity) return enriched def enrich_all(self, force_refresh: Optional[Set[str]] = None) -> Dict[str, Any]: """Enrich all sections in hyponyms_curated.yaml.""" data = self.load_input() enriched_data = { 'metadata': { 'source_file': str(self.input_file), 'enrichment_date': datetime.now(timezone.utc).isoformat(), 'enrichment_script': __file__, 'wikidata_api': WIKIDATA_API_BASE }, 'sources': data.get('sources', []) } # Process each section (except 'exclude') for section_name in SECTIONS_TO_PROCESS: if section_name in data: enriched_data[section_name] = self.enrich_section( section_name, data[section_name], force_refresh=force_refresh ) else: print(f"⚠ Section '{section_name}' not found in input file") # Copy exclude section as-is (no enrichment) if 'exclude' in data: enriched_data['exclude'] = data['exclude'] print(f"\n📋 Copied 'exclude' section ({len(data['exclude'])} entries) without enrichment") return enriched_data def save_output(self, enriched_data: Dict[str, Any]): """Save enriched data to YAML.""" print(f"\n💾 Saving to {self.output_file.name}...") self.output_file.parent.mkdir(parents=True, exist_ok=True) with open(self.output_file, 'w', encoding='utf-8') as f: yaml.dump(enriched_data, f, allow_unicode=True, default_flow_style=False, sort_keys=False, width=120) print(f"✓ Saved {self.output_file}") def print_stats(self): """Print enrichment statistics.""" print("\n" + "="*60) print("📈 ENRICHMENT STATISTICS") print("="*60) print(f"Total entities processed: {self.stats['total_entities']}") print(f" ✓ Newly enriched: {self.stats['enriched']}") print(f" ✓ From cache: {self.stats['cached']}") print(f" ✗ Failed to fetch: {self.stats['failed']}") print(f" ⊘ Skipped (no Q-ID): {self.stats['skipped']}") print("="*60) def main(): parser = argparse.ArgumentParser( description='Enrich hyponyms_curated.yaml with Wikidata metadata', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=__doc__ ) parser.add_argument( '--refresh', type=str, help='Comma-separated list of Q-numbers to force refresh (e.g., Q12345,Q67890)' ) parser.add_argument( '--refresh-all', action='store_true', help='Force refresh all entities (re-fetch from Wikidata)' ) parser.add_argument( '--dry-run', action='store_true', help='Perform enrichment but do not save output' ) args = parser.parse_args() # Parse refresh list force_refresh = set() if args.refresh_all: force_refresh = {'*'} # Special marker for all entities elif args.refresh: force_refresh = {q.strip() for q in args.refresh.split(',')} print(f"🔄 Force refresh: {', '.join(force_refresh)}") # Initialize print("="*60) print("🚀 WIKIDATA ENRICHMENT SCRIPT") print("="*60) fetcher = WikidataFetcher(REGISTER_FILE) enricher = HyponymEnricher(INPUT_FILE, OUTPUT_FILE, fetcher) # Check if refresh-all if args.refresh_all: print("⚠ REFRESH ALL mode enabled - will re-fetch all entities") response = input("Continue? (y/n): ") if response.lower() != 'y': print("Aborted.") return # Convert special marker to actual set of all Q-IDs data = enricher.load_input() all_qids = set() for section in SECTIONS_TO_PROCESS: if section in data: for entity in data[section]: identifier = enricher.extract_identifier(entity.get('label')) if identifier: all_qids.add(identifier) force_refresh = all_qids print(f"📋 Will refresh {len(force_refresh)} entities") # Enrich try: enriched_data = enricher.enrich_all(force_refresh=force_refresh) # Save if not args.dry_run: enricher.save_output(enriched_data) fetcher._save_register() print(f"✓ Fetch register saved to {REGISTER_FILE}") else: print("\n🔍 DRY RUN - Output not saved") # Stats enricher.print_stats() if not args.dry_run: print(f"\n✅ SUCCESS - Enriched data saved to:") print(f" {OUTPUT_FILE}") except KeyboardInterrupt: print("\n\n⚠ Interrupted by user") fetcher._save_register() print(f"✓ Fetch register saved to {REGISTER_FILE}") sys.exit(1) except Exception as e: print(f"\n\n❌ ERROR: {e}") import traceback traceback.print_exc() fetcher._save_register() print(f"✓ Fetch register saved to {REGISTER_FILE}") sys.exit(1) if __name__ == '__main__': main()